Skip to content

Commit

Permalink
use slots class instead of a tuple
Browse files Browse the repository at this point in the history
  • Loading branch information
sauyon committed Mar 16, 2023
1 parent a5d6d88 commit 8f46ed8
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions src/bentoml/_internal/marshal/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ def is_locked(self):
def release(self):
self.sema += 1

@attr.define
class Job:
enqueue_time: float,
data: t.Any,
future: asyncio.Future[t.Any],
dispatch_time: float = 0,

class Optimizer:
"""
Expand Down Expand Up @@ -123,9 +129,7 @@ def __init__(
self.tick_interval = 0.001

self._controller = None
self._queue: collections.deque[
tuple[float, t.Any, asyncio.Future[t.Any]]
] = collections.deque() # TODO(bojiang): maxlen
self._queue: collections.deque[Job] = collections.deque() # TODO(bojiang): maxlen
self._sema = shared_sema if shared_sema else NonBlockSema(1)

def shutdown(self):
Expand Down Expand Up @@ -259,20 +263,20 @@ async def controller(self):
dt = self.tick_interval
decay = 0.95 # the decay rate of wait time
now = time.time()
w0 = now - self._queue[0][0]
wn = now - self._queue[-1][0]
w0 = now - self._queue[0].enqueue_time
wn = now - self._queue[-1].enqueue_time
a = self.optimizer.o_a
b = self.optimizer.o_b

# the estimated latency of the first request if we began processing now
latency_0 = w0 + a * n + b

if n > 1 and latency_0 >= self.max_latency_in_ms:
self._queue.popleft()[2].cancel()
self._queue.popleft().future.cancel()
continue
if self._sema.is_locked():
if n == 1 and w0 >= self.max_latency_in_ms:
self._queue.popleft()[2].cancel()
self._queue.popleft().future.cancel()
continue
await asyncio.sleep(self.tick_interval)
continue
Expand All @@ -288,7 +292,7 @@ async def controller(self):
):
n = len(self._queue)
now = time.time()
wn = now - self._queue[-1][0]
wn = now - self._queue[-1].enqueue_time
latency_0 += dt

# wait for additional requests to arrive
Expand All @@ -314,7 +318,7 @@ async def inbound_call(self, data: t.Any):
return await future

async def outbound_call(
self, inputs_info: tuple[tuple[float, t.Any, asyncio.Future[t.Any]]]
self, inputs_info: tuple[Job, ...]
):
_time_start = time.time()
_done = False
Expand All @@ -329,7 +333,7 @@ async def outbound_call(
_done = True
self.optimizer.log_outbound(
n=len(inputs_info),
wait=_time_start - inputs_info[-1][0],
wait=_time_start - inputs_info[-1].enqueue_time,
duration=time.time() - _time_start,
)
except asyncio.CancelledError:
Expand Down

0 comments on commit 8f46ed8

Please sign in to comment.