From 533abdb7a51f755019b483e2f6d935bfbb46dc6c Mon Sep 17 00:00:00 2001 From: Sauyon Lee Date: Wed, 15 Mar 2023 18:15:36 -0700 Subject: [PATCH] implement batching strategies --- .../_internal/configuration/containers.py | 11 +- .../_internal/configuration/v1/__init__.py | 2 + .../v1/default_configuration.yaml | 7 ++ src/bentoml/_internal/marshal/dispatcher.py | 119 ++++++++++++++---- src/bentoml/_internal/models/model.py | 2 + src/bentoml/_internal/runner/runner.py | 32 ++++- src/bentoml/_internal/server/runner_app.py | 1 + 7 files changed, 148 insertions(+), 26 deletions(-) diff --git a/src/bentoml/_internal/configuration/containers.py b/src/bentoml/_internal/configuration/containers.py index 2c93e57b9e3..c7f5708bca7 100644 --- a/src/bentoml/_internal/configuration/containers.py +++ b/src/bentoml/_internal/configuration/containers.py @@ -139,7 +139,16 @@ def __init__( ) from None def _finalize(self): - RUNNER_CFG_KEYS = ["batching", "resources", "logging", "metrics", "timeout"] + RUNNER_CFG_KEYS = [ + "batching", + "resources", + "logging", + "metrics", + "timeout", + "strategy", + "strategy_options", + ] + global_runner_cfg = {k: self.config["runners"][k] for k in RUNNER_CFG_KEYS} custom_runners_cfg = dict( filter( diff --git a/src/bentoml/_internal/configuration/v1/__init__.py b/src/bentoml/_internal/configuration/v1/__init__.py index 1509bf4e5cf..d718c2767cd 100644 --- a/src/bentoml/_internal/configuration/v1/__init__.py +++ b/src/bentoml/_internal/configuration/v1/__init__.py @@ -142,6 +142,8 @@ s.Optional("enabled"): bool, s.Optional("max_batch_size"): s.And(int, ensure_larger_than_zero), s.Optional("max_latency_ms"): s.And(int, ensure_larger_than_zero), + s.Optional("strategy"): str, + s.Optional("strategy_options"): dict, }, # NOTE: there is a distinction between being unset and None here; if set to 'None' # in configuration for a specific runner, it will override the global configuration. diff --git a/src/bentoml/_internal/configuration/v1/default_configuration.yaml b/src/bentoml/_internal/configuration/v1/default_configuration.yaml index 831d3de7371..7568e782537 100644 --- a/src/bentoml/_internal/configuration/v1/default_configuration.yaml +++ b/src/bentoml/_internal/configuration/v1/default_configuration.yaml @@ -84,6 +84,13 @@ runners: batching: enabled: true max_batch_size: 100 + # which strategy to use to batch requests + # there are currently two available options: + # - fixed_wait: always wait a fixed time after a request arrives + # - average_wait: intelligently wait a time based on + strategy: intelligent_wait + strategy_options: + decay: 0.95 max_latency_ms: 10000 logging: access: diff --git a/src/bentoml/_internal/marshal/dispatcher.py b/src/bentoml/_internal/marshal/dispatcher.py index ff26b005aa5..8a9d1fdce82 100644 --- a/src/bentoml/_internal/marshal/dispatcher.py +++ b/src/bentoml/_internal/marshal/dispatcher.py @@ -7,6 +7,8 @@ import functools import traceback import collections +from abc import ABC +from abc import abstractmethod import numpy as np @@ -41,7 +43,7 @@ class Job: class Optimizer: """ - Analyse historical data to optimize CorkDispatcher. + Analyze historical data to predict execution time using a simple linear regression on batch size. """ N_KEPT_SAMPLE = 50 # amount of outbound info kept for inferring params @@ -98,14 +100,97 @@ def trigger_refresh(self): T_OUT = t.TypeVar("T_OUT") -class CorkDispatcher: - """ - A decorator that: - * wrap batch function - * implement CORK algorithm to cork & release calling of wrapped function - The wrapped function should be an async function. - """ +BATCHING_STRATEGY_REGISTRY = {} + + +class BatchingStrategy(abc.ABC): + strategy_id: str + + @abc.abstractmethod + def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[t.Sequence[Job]], dispatch: t.Callable[]): + pass + + def __init_subclass__(cls, strategy_id: str): + BATCHING_STRATEGY_REGISTRY[strategy_id] = cls + cls.strategy_id = strategy_id + + +class TargetLatencyStrategy(strategy_id="target_latency"): + latency: float = 1 + + def __init__(self, options: dict[t.Any, t.Any]): + for key in options: + if key == "latency": + self.latency = options[key] / 1000.0 + else: + logger.warning("Strategy 'target_latency' ignoring unknown configuration key '{key}'.") + + async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float): + now = time.time() + w0 = now - queue[0].enqueue_time + latency_0 = w0 + optimizer.o_a * n + optimizer.o_b + + while latency_0 < self.latency: + n = len(queue) + now = time.time() + w0 = now - queue[0].enqueue_time + latency_0 = w0 + optimizer.o_a * n + optimizer.o_b + + await asyncio.sleep(tick_interval) + +class FixedWaitStrategy(strategy_id="fixed_wait"): + wait: float = 1 + + def __init__(self, options: dict[t.Any, t.Any]): + for key in options: + if key == "wait": + self.wait = options[key] / 1000.0 + else: + logger.warning("Strategy 'fixed_wait' ignoring unknown configuration key '{key}'") + + async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float): + now = time.time() + w0 = now - queue[0].enqueue_time + + if w0 < self.wait: + await asyncio.sleep(self.wait - w0) + + +class IntelligentWaitStrategy(strategy_id="intelligent_wait"): + decay: float = 0.95 + + def __init__(self, options: dict[t.Any, t.Any]): + for key in options: + if key == "decay": + self.decay = options[key] + else: + logger.warning("Strategy 'intelligent_wait' ignoring unknown configuration value") + + async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float): + n = len(queue) + now = time.time() + wn = now - queue[-1].enqueue_time + latency_0 = w0 + optimizer.o_a * n + optimizer.o_b + while ( + # if we don't already have enough requests, + n < max_batch_size + # we are not about to cancel the first request, + and latency_0 + dt <= self.max_latency * 0.95 + # and waiting will cause average latency to decrese + and n * (wn + dt + optimizer.o_a) <= optimizer.wait * decay + ): + n = len(queue) + now = time.time() + w0 = now - queue[0].enqueue_time + latency_0 = w0 + optimizer.o_a * n + optimizer.o_b + + # wait for additional requests to arrive + await asyncio.sleep(tick_interval) + + + +class Dispatcher: def __init__( self, max_latency_in_ms: int, @@ -282,22 +367,9 @@ async def controller(self): continue # we are now free to dispatch whenever we like - while ( - # if we don't already have enough requests, - n < self.max_batch_size - # we are not about to cancel the first request, - and latency_0 + dt <= self.max_latency_in_ms * 0.95 - # and waiting will cause average latency to decrese - and n * (wn + dt + a) <= self.optimizer.wait * decay - ): - n = len(self._queue) - now = time.time() - wn = now - self._queue[-1].enqueue_time - latency_0 += dt - - # wait for additional requests to arrive - await asyncio.sleep(self.tick_interval) + await self.strategy.wait(self._queue, optimizer, self.max_latency, self.max_batch_size, self.tick_interval) + n = len(self._queue) n_call_out = min(self.max_batch_size, n) # call self._sema.acquire() @@ -308,6 +380,7 @@ async def controller(self): except Exception as e: # pylint: disable=broad-except logger.error(traceback.format_exc(), exc_info=e) + async def inbound_call(self, data: t.Any): now = time.time() future = self._loop.create_future() diff --git a/src/bentoml/_internal/models/model.py b/src/bentoml/_internal/models/model.py index d22b3cb5124..97af0c89248 100644 --- a/src/bentoml/_internal/models/model.py +++ b/src/bentoml/_internal/models/model.py @@ -326,6 +326,7 @@ def to_runner( name: str = "", max_batch_size: int | None = None, max_latency_ms: int | None = None, + batching_strategy: BatchingStrategy | None = None, method_configs: dict[str, dict[str, int]] | None = None, ) -> Runner: """ @@ -348,6 +349,7 @@ def to_runner( models=[self], max_batch_size=max_batch_size, max_latency_ms=max_latency_ms, + batching_strategy=batching_strategy, method_configs=method_configs, ) diff --git a/src/bentoml/_internal/runner/runner.py b/src/bentoml/_internal/runner/runner.py index 7441cc20a66..67406de9674 100644 --- a/src/bentoml/_internal/runner/runner.py +++ b/src/bentoml/_internal/runner/runner.py @@ -4,6 +4,7 @@ import logging from abc import ABC from abc import abstractmethod +from pprint import pprint import attr from simple_di import inject @@ -19,6 +20,7 @@ from .runner_handle import RunnerHandle from .runner_handle import DummyRunnerHandle from ..configuration.containers import BentoMLContainer +from ..marshal.dispatcher import BATCHING_STRATEGY_REGISTRY if t.TYPE_CHECKING: from ...triton import Runner as TritonRunner @@ -47,6 +49,7 @@ class RunnerMethod(t.Generic[T, P, R]): config: RunnableMethodConfig max_batch_size: int max_latency_ms: int + batching_strategy: BatchingStrategy def run(self, *args: P.args, **kwargs: P.kwargs) -> R: return self.runner._runner_handle.run_method(self, *args, **kwargs) @@ -159,6 +162,7 @@ def __init__( models: list[Model] | None = None, max_batch_size: int | None = None, max_latency_ms: int | None = None, + batching_strategy: BatchingStrategy | None = None, method_configs: dict[str, dict[str, int]] | None = None, ) -> None: """ @@ -177,8 +181,9 @@ def __init__( models: An optional list composed of ``bentoml.Model`` instances. max_batch_size: Max batch size config for dynamic batching. If not provided, use the default value from configuration. - max_latency_ms: Max latency config for dynamic batching. If not provided, use the default value from - configuration. + max_latency_ms: Max latency config. If not provided, uses the default value from configuration. + batching_strategy: Batching strategy for dynamic batching. If not provided, uses the default value + from the configuration. method_configs: A dictionary per method config for this given Runner signatures. Returns: @@ -215,6 +220,7 @@ def __init__( method_max_batch_size = None method_max_latency_ms = None + method_batching_strategy = None if method_name in method_configs: method_max_batch_size = method_configs[method_name].get( "max_batch_size" @@ -222,6 +228,23 @@ def __init__( method_max_latency_ms = method_configs[method_name].get( "max_latency_ms" ) + method_batching_strategy = method_configs[method_name].get( + "batching_strategy" + ) + + if config["batching"]["strategy"] not in BATCHING_STRATEGY_REGISTRY: + raise BentoMLConfigException( + f"Unknown batching strategy '{config['batching']['strategy']}'. Available strategies are: {','.join(BATCHING_STRATEGY_REGISTRY.keys())}.", + ) + + try: + default_batching_strategy = BATCHING_STRATEGY_REGISTRY[ + config["batching"]["strategy"] + ](**config["batching"]["strategy_options"]) + except Exception as e: + raise BentoMLConfigException( + f"Initializing strategy '{config['batching']['strategy']}' with configured options ({pprint(config['batching']['strategy_options'])}) failed." + ) from e runner_method_map[method_name] = RunnerMethod( runner=self, @@ -237,6 +260,11 @@ def __init__( max_latency_ms, default=config["batching"]["max_latency_ms"], ), + batching_strategy=first_not_none( + method_batching_strategy, + batching_strategy, + default=BATCHING_STRATEGY_REGISTRY[config["batching"]["strategy"]], + ), ) self.__attrs_init__( diff --git a/src/bentoml/_internal/server/runner_app.py b/src/bentoml/_internal/server/runner_app.py index 98c7c31c6a5..3d9e776aa23 100644 --- a/src/bentoml/_internal/server/runner_app.py +++ b/src/bentoml/_internal/server/runner_app.py @@ -58,6 +58,7 @@ def __init__( max_batch_size = method.max_batch_size if method.config.batchable else 1 self.dispatchers[method.name] = CorkDispatcher( max_latency_in_ms=method.max_latency_ms, + batching_strategy=method.batching_strategy, max_batch_size=max_batch_size, fallback=functools.partial( ServiceUnavailable, message="process is overloaded"