diff --git a/src/bentoml/_internal/marshal/dispatcher.py b/src/bentoml/_internal/marshal/dispatcher.py index 3d8143b306e..ff26b005aa5 100644 --- a/src/bentoml/_internal/marshal/dispatcher.py +++ b/src/bentoml/_internal/marshal/dispatcher.py @@ -32,6 +32,12 @@ def is_locked(self): def release(self): self.sema += 1 +@attr.define +class Job: + enqueue_time: float, + data: t.Any, + future: asyncio.Future[t.Any], + dispatch_time: float = 0, class Optimizer: """ @@ -123,9 +129,7 @@ def __init__( self.tick_interval = 0.001 self._controller = None - self._queue: collections.deque[ - tuple[float, t.Any, asyncio.Future[t.Any]] - ] = collections.deque() # TODO(bojiang): maxlen + self._queue: collections.deque[Job] = collections.deque() # TODO(bojiang): maxlen self._sema = shared_sema if shared_sema else NonBlockSema(1) def shutdown(self): @@ -259,8 +263,8 @@ async def controller(self): dt = self.tick_interval decay = 0.95 # the decay rate of wait time now = time.time() - w0 = now - self._queue[0][0] - wn = now - self._queue[-1][0] + w0 = now - self._queue[0].enqueue_time + wn = now - self._queue[-1].enqueue_time a = self.optimizer.o_a b = self.optimizer.o_b @@ -268,11 +272,11 @@ async def controller(self): latency_0 = w0 + a * n + b if n > 1 and latency_0 >= self.max_latency_in_ms: - self._queue.popleft()[2].cancel() + self._queue.popleft().future.cancel() continue if self._sema.is_locked(): if n == 1 and w0 >= self.max_latency_in_ms: - self._queue.popleft()[2].cancel() + self._queue.popleft().future.cancel() continue await asyncio.sleep(self.tick_interval) continue @@ -288,7 +292,7 @@ async def controller(self): ): n = len(self._queue) now = time.time() - wn = now - self._queue[-1][0] + wn = now - self._queue[-1].enqueue_time latency_0 += dt # wait for additional requests to arrive @@ -314,7 +318,7 @@ async def inbound_call(self, data: t.Any): return await future async def outbound_call( - self, inputs_info: tuple[tuple[float, t.Any, asyncio.Future[t.Any]]] + self, inputs_info: tuple[Job, ...] ): _time_start = time.time() _done = False @@ -329,7 +333,7 @@ async def outbound_call( _done = True self.optimizer.log_outbound( n=len(inputs_info), - wait=_time_start - inputs_info[-1][0], + wait=_time_start - inputs_info[-1].enqueue_time, duration=time.time() - _time_start, ) except asyncio.CancelledError: