Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HWORKS-927] Support canary deployments #217

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 35 additions & 96 deletions python/hsml/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
from hsml.core import serving_api
from hsml.engine import serving_engine
from hsml.predictor_state import PredictorState
from hsml.resources import Resources
from hsml.inference_logger import InferenceLogger
from hsml.inference_batcher import InferenceBatcher
from hsml.transformer import Transformer

from hsml.client.exceptions import ModelServingException
from hsml.constants import DEPLOYABLE_COMPONENT, PREDICTOR_STATE
from hsml.predictor_specification import PredictorSpecification


class Deployment:
Expand Down Expand Up @@ -311,63 +309,6 @@ def predictor(self):
def predictor(self, predictor):
self._predictor = predictor

@property
def requested_instances(self):
"""Total number of requested instances in the deployment."""
return self._predictor.requested_instances

# Single predictor

@property
def model_name(self):
"""Name of the model deployed by the predictor"""
return self._predictor.model_name

@model_name.setter
def model_name(self, model_name: str):
self._predictor.model_name = model_name

@property
def model_path(self):
"""Model path deployed by the predictor."""
return self._predictor.model_path

@model_path.setter
def model_path(self, model_path: str):
self._predictor.model_path = model_path

@property
def model_version(self):
"""Model version deployed by the predictor."""
return self._predictor.model_version

@model_version.setter
def model_version(self, model_version: int):
self._predictor.model_version = model_version

@property
def artifact_version(self):
"""Artifact version deployed by the predictor."""
return self._predictor.artifact_version

@artifact_version.setter
def artifact_version(self, artifact_version: Union[int, str]):
self._predictor.artifact_version = artifact_version

@property
def artifact_path(self):
"""Path of the model artifact deployed by the predictor."""
return self._predictor.artifact_path

@property
def model_server(self):
"""Model server ran by the predictor."""
return self._predictor.model_server

@model_server.setter
def model_server(self, model_server: str):
self._predictor.model_server = model_server

@property
def serving_tool(self):
"""Serving tool used to run the model server."""
Expand All @@ -377,24 +318,6 @@ def serving_tool(self):
def serving_tool(self, serving_tool: str):
self._predictor.serving_tool = serving_tool

@property
def script_file(self):
"""Script file used by the predictor."""
return self._predictor.script_file

@script_file.setter
def script_file(self, script_file: str):
self._predictor.script_file = script_file

@property
def resources(self):
"""Resource configuration for the predictor."""
return self._predictor.resources

@resources.setter
def resources(self, resources: Resources):
self._predictor.resources = resources

@property
def inference_logger(self):
"""Configuration of the inference logger attached to this predictor."""
Expand All @@ -404,24 +327,6 @@ def inference_logger(self):
def inference_logger(self, inference_logger: InferenceLogger):
self._predictor.inference_logger = inference_logger

@property
def inference_batcher(self):
"""Configuration of the inference batcher attached to this predictor."""
return self._predictor.inference_batcher

@inference_batcher.setter
def inference_batcher(self, inference_batcher: InferenceBatcher):
self._predictor.inference_batcher = inference_batcher

@property
def transformer(self):
"""Transformer configured in the predictor."""
return self._predictor.transformer

@transformer.setter
def transformer(self, transformer: Transformer):
self._predictor.transformer = transformer

@property
def created_at(self):
"""Created at date of the predictor."""
Expand All @@ -432,6 +337,40 @@ def creator(self):
"""Creator of the predictor."""
return self._predictor.creator

@property
def specification(self):
"""Specification for the main serving"""
return self._predictor.specification

@specification.setter
def specification(self, specification: Union[PredictorSpecification, dict]):
self._predictor.specification = specification

@property
def candidate_specification(self):
return self._predictor.candidate_specification

@candidate_specification.setter
def candidate_specification(self, candidate_specification: Union[PredictorSpecification, dict]):
self._predictor.candidate_specification = candidate_specification

@property
def candidate_traffic_percentage(self):
"""The traffic percentage for the candidate predictor"""
return self._predictor.candidate_traffic_percentage

@candidate_traffic_percentage.setter
def candidate_traffic_percentage(self, candidate_traffic_percentage: int):
self._predictor.candidate_traffic_percentage = candidate_traffic_percentage

@property
def requested_instances(self):
"""Total number of requested instances in the deployment."""
requested_instances = self._predictor.specification.requested_instances
if self._predictor.candidate_specification is not None:
requested_instances += self._predictor.candidate_specification.requested_instances
return requested_instances

def __repr__(self):
desc = (
f", description: {self._description!r}"
Expand Down
67 changes: 44 additions & 23 deletions python/hsml/engine/serving_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ def start(self, deployment_instance, await_status: int) -> bool:
(done, state) = self._check_status(
deployment_instance, PREDICTOR_STATE.STATUS_RUNNING
)

if not done:
min_instances = self._get_min_starting_instances(deployment_instance)
num_steps = (len(self.START_STEPS) - 1) + min_instances
if deployment_instance._predictor._state.condition is None:
num_steps = min_instances # backward compatibility
if hasattr(deployment_instance._predictor._state, 'condition'):
if deployment_instance._predictor._state.condition is None:
num_steps = min_instances # backward compatibility
pbar = tqdm(total=num_steps)
pbar.set_description("Creating deployment")

Expand Down Expand Up @@ -147,9 +147,10 @@ def stop(self, deployment_instance, await_status: int) -> bool:
if deployment_instance.requested_instances >= num_instances
else num_instances
)
if deployment_instance._predictor._state.condition is None:
# backward compatibility
num_steps = self._get_min_starting_instances(deployment_instance)
if hasattr(deployment_instance._predictor._state, 'condition'):
if deployment_instance._predictor._state.condition is None:
# backward compatibility
num_steps = self._get_min_starting_instances(deployment_instance)
pbar = tqdm(total=num_steps)
pbar.set_description("Preparing to stop deployment")

Expand Down Expand Up @@ -280,30 +281,31 @@ def _check_status(self, deployment_instance, desired_status):
return (False, state)

def _get_starting_progress(self, current_step, state, num_instances):
if state.condition is None: # backward compatibility
if hasattr(state, 'condition') and state.condition is None: # backward compatibility
progress = num_instances - current_step
if state.status == PREDICTOR_STATE.STATUS_RUNNING:
return (progress, "Deployment is ready")
return (progress, None if current_step == 0 else "Deployment is starting")

step = self.START_STEPS.index(state.condition.type)
condition = self._get_condition(state)
step = self.START_STEPS.index(condition.type)
if (
state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_STARTED
or state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_READY
condition.type == PREDICTOR_STATE.CONDITION_TYPE_STARTED
or condition.type == PREDICTOR_STATE.CONDITION_TYPE_READY
):
step += num_instances
progress = step - current_step
desc = None
if state.condition.type != PREDICTOR_STATE.CONDITION_TYPE_STOPPED:
if condition.type != PREDICTOR_STATE.CONDITION_TYPE_STOPPED:
desc = (
state.condition.reason
condition.reason
if state.status != PREDICTOR_STATE.STATUS_FAILED
else "Deployment failed to start"
)
return (progress, desc)

def _get_stopping_progress(self, total_steps, current_step, state, num_instances):
if state.condition is None: # backward compatibility
if hasattr(state, "condition") and state.condition is None: # backward compatibility
progress = (total_steps - num_instances) - current_step
if state.status == PREDICTOR_STATE.STATUS_STOPPED:
return (progress, "Deployment is stopped")
Expand All @@ -313,33 +315,48 @@ def _get_stopping_progress(self, total_steps, current_step, state, num_instances
)

step = 0
if state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_SCHEDULED:
step = 1 if state.condition.status is None else 0
elif state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_STOPPED:
condition = self._get_condition(state)
if condition.type == PREDICTOR_STATE.CONDITION_TYPE_SCHEDULED:
step = 1 if condition.status is None else 0
elif condition.type == PREDICTOR_STATE.CONDITION_TYPE_STOPPED:
num_instances = (total_steps - 2) - num_instances # num stopped instances
step = (
(2 + num_instances)
if (state.condition.status is None or state.condition.status)
if (condition.status is None or condition.status)
else 0
)
progress = step - current_step
desc = None
if (
state.condition.type != PREDICTOR_STATE.CONDITION_TYPE_READY
condition.type != PREDICTOR_STATE.CONDITION_TYPE_READY
and state.status != PREDICTOR_STATE.STATUS_FAILED
):
desc = (
"Deployment is stopped"
if state.status == PREDICTOR_STATE.STATUS_STOPPED
else state.condition.reason
else state.serving_status.condition.reason
)

return (progress, desc)

def _get_condition(self, state):
if state.candidate_status is None:
return state.serving_status.condition
if state.serving_status.condition.status is None or not state.serving_status.condition.status:
return state.serving_status.condition
if state.candidate_status.condition.status is None or not state.candidate_status.condition.status:
return state.candidate_status.condition
# ok to return the status of either
return state.serving_status.condition

def _get_min_starting_instances(self, deployment_instance):
min_start_instances = 1 # predictor
if deployment_instance.transformer is not None:
if deployment_instance.specification.transformer is not None:
min_start_instances += 1 # transformer
if deployment_instance.candidate_specification is not None:
min_start_instances += 1
if deployment_instance.candidate_specification.transformer is not None:
min_start_instances += 1
return (
deployment_instance.requested_instances
if deployment_instance.requested_instances >= min_start_instances
Expand All @@ -349,9 +366,13 @@ def _get_min_starting_instances(self, deployment_instance):
def _get_available_instances(self, state):
if state.status == PREDICTOR_STATE.STATUS_CREATING:
return 0
num_instances = state.available_predictor_instances
if state.available_transformer_instances is not None:
num_instances += state.available_transformer_instances
num_instances = state.serving_status.available_predictor_instances
if state.serving_status.available_transformer_instances is not None:
num_instances += state.serving_status.available_transformer_instances
if state.candidate_status is not None:
num_instances += state.candidate_status.available_predictor_instances
if state.candidate_status.available_transformer_instances is not None:
num_instances += state.candidate_status.available_transformer_instances
return num_instances

def _get_stopped_instances(self, available_instances, requested_instances):
Expand Down
Loading
Loading