Skip to content

Commit

Permalink
Support canary deployments
Browse files Browse the repository at this point in the history
  • Loading branch information
gibchikafa committed Feb 12, 2024
1 parent 3a5a7c5 commit 848a08a
Show file tree
Hide file tree
Showing 6 changed files with 582 additions and 354 deletions.
131 changes: 35 additions & 96 deletions python/hsml/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
from hsml.core import serving_api
from hsml.engine import serving_engine
from hsml.predictor_state import PredictorState
from hsml.resources import Resources
from hsml.inference_logger import InferenceLogger
from hsml.inference_batcher import InferenceBatcher
from hsml.transformer import Transformer

from hsml.client.exceptions import ModelServingException
from hsml.constants import DEPLOYABLE_COMPONENT, PREDICTOR_STATE
from hsml.predictor_specification import PredictorSpecification


class Deployment:
Expand Down Expand Up @@ -311,63 +309,6 @@ def predictor(self):
def predictor(self, predictor):
self._predictor = predictor

@property
def requested_instances(self):
"""Total number of requested instances in the deployment."""
return self._predictor.requested_instances

# Single predictor

@property
def model_name(self):
"""Name of the model deployed by the predictor"""
return self._predictor.model_name

@model_name.setter
def model_name(self, model_name: str):
self._predictor.model_name = model_name

@property
def model_path(self):
"""Model path deployed by the predictor."""
return self._predictor.model_path

@model_path.setter
def model_path(self, model_path: str):
self._predictor.model_path = model_path

@property
def model_version(self):
"""Model version deployed by the predictor."""
return self._predictor.model_version

@model_version.setter
def model_version(self, model_version: int):
self._predictor.model_version = model_version

@property
def artifact_version(self):
"""Artifact version deployed by the predictor."""
return self._predictor.artifact_version

@artifact_version.setter
def artifact_version(self, artifact_version: Union[int, str]):
self._predictor.artifact_version = artifact_version

@property
def artifact_path(self):
"""Path of the model artifact deployed by the predictor."""
return self._predictor.artifact_path

@property
def model_server(self):
"""Model server ran by the predictor."""
return self._predictor.model_server

@model_server.setter
def model_server(self, model_server: str):
self._predictor.model_server = model_server

@property
def serving_tool(self):
"""Serving tool used to run the model server."""
Expand All @@ -377,24 +318,6 @@ def serving_tool(self):
def serving_tool(self, serving_tool: str):
self._predictor.serving_tool = serving_tool

@property
def script_file(self):
"""Script file used by the predictor."""
return self._predictor.script_file

@script_file.setter
def script_file(self, script_file: str):
self._predictor.script_file = script_file

@property
def resources(self):
"""Resource configuration for the predictor."""
return self._predictor.resources

@resources.setter
def resources(self, resources: Resources):
self._predictor.resources = resources

@property
def inference_logger(self):
"""Configuration of the inference logger attached to this predictor."""
Expand All @@ -404,24 +327,6 @@ def inference_logger(self):
def inference_logger(self, inference_logger: InferenceLogger):
self._predictor.inference_logger = inference_logger

@property
def inference_batcher(self):
"""Configuration of the inference batcher attached to this predictor."""
return self._predictor.inference_batcher

@inference_batcher.setter
def inference_batcher(self, inference_batcher: InferenceBatcher):
self._predictor.inference_batcher = inference_batcher

@property
def transformer(self):
"""Transformer configured in the predictor."""
return self._predictor.transformer

@transformer.setter
def transformer(self, transformer: Transformer):
self._predictor.transformer = transformer

@property
def created_at(self):
"""Created at date of the predictor."""
Expand All @@ -432,6 +337,40 @@ def creator(self):
"""Creator of the predictor."""
return self._predictor.creator

@property
def specification(self):
"""Specification for the main serving"""
return self._predictor.specification

@specification.setter
def specification(self, specification: Union[PredictorSpecification, dict]):
self._predictor.specification = specification

@property
def candidate_specification(self):
return self._predictor.candidate_specification

@candidate_specification.setter
def candidate_specification(self, candidate_specification: Union[PredictorSpecification, dict]):
self._predictor.candidate_specification = candidate_specification

@property
def candidate_traffic_percentage(self):
"""The traffic percentage for the candidate predictor"""
return self._predictor.candidate_traffic_percentage

@candidate_traffic_percentage.setter
def candidate_traffic_percentage(self, candidate_traffic_percentage: int):
self._predictor.candidate_traffic_percentage = candidate_traffic_percentage

@property
def requested_instances(self):
"""Total number of requested instances in the deployment."""
requested_instances = self._predictor.specification.requested_instances
if self._predictor.candidate_specification is not None:
requested_instances += self._predictor.candidate_specification.requested_instances
return requested_instances

def __repr__(self):
desc = (
f", description: {self._description!r}"
Expand Down
67 changes: 44 additions & 23 deletions python/hsml/engine/serving_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ def start(self, deployment_instance, await_status: int) -> bool:
(done, state) = self._check_status(
deployment_instance, PREDICTOR_STATE.STATUS_RUNNING
)

if not done:
min_instances = self._get_min_starting_instances(deployment_instance)
num_steps = (len(self.START_STEPS) - 1) + min_instances
if deployment_instance._predictor._state.condition is None:
num_steps = min_instances # backward compatibility
if hasattr(deployment_instance._predictor._state, 'condition'):
if deployment_instance._predictor._state.condition is None:
num_steps = min_instances # backward compatibility
pbar = tqdm(total=num_steps)
pbar.set_description("Creating deployment")

Expand Down Expand Up @@ -147,9 +147,10 @@ def stop(self, deployment_instance, await_status: int) -> bool:
if deployment_instance.requested_instances >= num_instances
else num_instances
)
if deployment_instance._predictor._state.condition is None:
# backward compatibility
num_steps = self._get_min_starting_instances(deployment_instance)
if hasattr(deployment_instance._predictor._state, 'condition'):
if deployment_instance._predictor._state.condition is None:
# backward compatibility
num_steps = self._get_min_starting_instances(deployment_instance)
pbar = tqdm(total=num_steps)
pbar.set_description("Preparing to stop deployment")

Expand Down Expand Up @@ -280,30 +281,31 @@ def _check_status(self, deployment_instance, desired_status):
return (False, state)

def _get_starting_progress(self, current_step, state, num_instances):
if state.condition is None: # backward compatibility
if hasattr(state, 'condition') and state.condition is None: # backward compatibility
progress = num_instances - current_step
if state.status == PREDICTOR_STATE.STATUS_RUNNING:
return (progress, "Deployment is ready")
return (progress, None if current_step == 0 else "Deployment is starting")

step = self.START_STEPS.index(state.condition.type)
condition = self._get_condition(state)
step = self.START_STEPS.index(condition.type)
if (
state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_STARTED
or state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_READY
condition.type == PREDICTOR_STATE.CONDITION_TYPE_STARTED
or condition.type == PREDICTOR_STATE.CONDITION_TYPE_READY
):
step += num_instances
progress = step - current_step
desc = None
if state.condition.type != PREDICTOR_STATE.CONDITION_TYPE_STOPPED:
if condition.type != PREDICTOR_STATE.CONDITION_TYPE_STOPPED:
desc = (
state.condition.reason
condition.reason
if state.status != PREDICTOR_STATE.STATUS_FAILED
else "Deployment failed to start"
)
return (progress, desc)

def _get_stopping_progress(self, total_steps, current_step, state, num_instances):
if state.condition is None: # backward compatibility
if hasattr(state, "condition") and state.condition is None: # backward compatibility
progress = (total_steps - num_instances) - current_step
if state.status == PREDICTOR_STATE.STATUS_STOPPED:
return (progress, "Deployment is stopped")
Expand All @@ -313,33 +315,48 @@ def _get_stopping_progress(self, total_steps, current_step, state, num_instances
)

step = 0
if state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_SCHEDULED:
step = 1 if state.condition.status is None else 0
elif state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_STOPPED:
condition = self._get_condition(state)
if condition.type == PREDICTOR_STATE.CONDITION_TYPE_SCHEDULED:
step = 1 if condition.status is None else 0
elif condition.type == PREDICTOR_STATE.CONDITION_TYPE_STOPPED:
num_instances = (total_steps - 2) - num_instances # num stopped instances
step = (
(2 + num_instances)
if (state.condition.status is None or state.condition.status)
if (condition.status is None or condition.status)
else 0
)
progress = step - current_step
desc = None
if (
state.condition.type != PREDICTOR_STATE.CONDITION_TYPE_READY
condition.type != PREDICTOR_STATE.CONDITION_TYPE_READY
and state.status != PREDICTOR_STATE.STATUS_FAILED
):
desc = (
"Deployment is stopped"
if state.status == PREDICTOR_STATE.STATUS_STOPPED
else state.condition.reason
else state.serving_status.condition.reason
)

return (progress, desc)

def _get_condition(self, state):
if state.candidate_status is None:
return state.serving_status.condition
if state.serving_status.condition.status is None or not state.serving_status.condition.status:
return state.serving_status.condition
if state.candidate_status.condition.status is None or not state.candidate_status.condition.status:
return state.candidate_status.condition
# ok to return the status of either
return state.serving_status.condition

def _get_min_starting_instances(self, deployment_instance):
min_start_instances = 1 # predictor
if deployment_instance.transformer is not None:
if deployment_instance.specification.transformer is not None:
min_start_instances += 1 # transformer
if deployment_instance.candidate_specification is not None:
min_start_instances += 1
if deployment_instance.candidate_specification.transformer is not None:
min_start_instances += 1
return (
deployment_instance.requested_instances
if deployment_instance.requested_instances >= min_start_instances
Expand All @@ -349,9 +366,13 @@ def _get_min_starting_instances(self, deployment_instance):
def _get_available_instances(self, state):
if state.status == PREDICTOR_STATE.STATUS_CREATING:
return 0
num_instances = state.available_predictor_instances
if state.available_transformer_instances is not None:
num_instances += state.available_transformer_instances
num_instances = state.serving_status.available_predictor_instances
if state.serving_status.available_transformer_instances is not None:
num_instances += state.serving_status.available_transformer_instances
if state.candidate_status is not None:
num_instances += state.candidate_status.available_predictor_instances
if state.candidate_status.available_transformer_instances is not None:
num_instances += state.candidate_status.available_transformer_instances
return num_instances

def _get_stopped_instances(self, available_instances, requested_instances):
Expand Down
Loading

0 comments on commit 848a08a

Please sign in to comment.