From 22aa805158713a60c317cb2a2d9ea7a6d5b83858 Mon Sep 17 00:00:00 2001 From: Brynn Yin <24237253+brynn-code@users.noreply.github.com> Date: Fri, 26 Jan 2024 18:34:15 +0800 Subject: [PATCH] [SDK][Internal] Support command node in orchstrator (#1855) # Description Please add an informative description that covers that changes made by the pull request and link all relevant issues. # All Promptflow Contribution checklist: - [ ] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [ ] Title of the pull request is clear and informative. - [ ] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --------- Signed-off-by: Brynn Yin --- src/promptflow/promptflow/_sdk/_constants.py | 6 +- src/promptflow/promptflow/_sdk/_errors.py | 6 + .../promptflow/_sdk/_submitter/__init__.py | 3 + .../_submitter/experiment_orchestrator.py | 297 +++++++++++++++--- .../_sdk/_submitter/run_submitter.py | 1 + .../promptflow/_sdk/_submitter/utils.py | 4 + .../promptflow/_sdk/entities/_experiment.py | 54 +++- .../promptflow/_sdk/entities/_run.py | 11 +- .../promptflow/_sdk/schemas/_experiment.py | 29 +- .../promptflow/_sdk/schemas/_run.py | 5 + .../tests/sdk_cli_test/e2etests/test_cli.py | 6 +- .../sdk_cli_test/e2etests/test_experiment.py | 45 +++ .../basic-script.exp.yaml | 51 +++ .../generate_data/generate_data.py | 24 ++ 14 files changed, 470 insertions(+), 72 deletions(-) create mode 100644 src/promptflow/tests/test_configs/experiments/basic-script-template/basic-script.exp.yaml create mode 100644 src/promptflow/tests/test_configs/experiments/basic-script-template/generate_data/generate_data.py diff --git a/src/promptflow/promptflow/_sdk/_constants.py b/src/promptflow/promptflow/_sdk/_constants.py index 4727acfd089..9fc1d3a9e2a 100644 --- a/src/promptflow/promptflow/_sdk/_constants.py +++ b/src/promptflow/promptflow/_sdk/_constants.py @@ -155,6 +155,7 @@ class RunTypes: BATCH = "batch" EVALUATION = "evaluation" PAIRWISE_EVALUATE = "pairwise_evaluate" + COMMAND = "command" class AzureRunTypes: @@ -241,6 +242,9 @@ class FlowRunProperties: NODE_VARIANT = "node_variant" RUN = "run" SYSTEM_METRICS = "system_metrics" + # Experiment command node fields only + COMMAND = "command" + OUTPUTS = "outputs" class CommonYamlFields: @@ -398,7 +402,7 @@ class DownloadedRun: class ExperimentNodeType(object): FLOW = "flow" - CODE = "code" + COMMAND = "command" class ExperimentStatus(object): diff --git a/src/promptflow/promptflow/_sdk/_errors.py b/src/promptflow/promptflow/_sdk/_errors.py index 24786a71e49..16f723ad379 100644 --- a/src/promptflow/promptflow/_sdk/_errors.py +++ b/src/promptflow/promptflow/_sdk/_errors.py @@ -167,3 +167,9 @@ class DownloadInternalError(SDKInternalError): """Exception raised if download internal error.""" pass + + +class ExperimentCommandRunError(SDKError): + """Exception raised if experiment validation failed.""" + + pass diff --git a/src/promptflow/promptflow/_sdk/_submitter/__init__.py b/src/promptflow/promptflow/_sdk/_submitter/__init__.py index e5cbc5abd64..4284d92cfde 100644 --- a/src/promptflow/promptflow/_sdk/_submitter/__init__.py +++ b/src/promptflow/promptflow/_sdk/_submitter/__init__.py @@ -1,3 +1,6 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- from .run_submitter import RunSubmitter from .test_submitter import TestSubmitter from .utils import ( diff --git a/src/promptflow/promptflow/_sdk/_submitter/experiment_orchestrator.py b/src/promptflow/promptflow/_sdk/_submitter/experiment_orchestrator.py index 16636d664f8..49801659002 100644 --- a/src/promptflow/promptflow/_sdk/_submitter/experiment_orchestrator.py +++ b/src/promptflow/promptflow/_sdk/_submitter/experiment_orchestrator.py @@ -1,18 +1,26 @@ # --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- +import os +import subprocess from datetime import datetime from pathlib import Path +from typing import Dict from promptflow._sdk._configuration import Configuration -from promptflow._sdk._constants import ExperimentNodeType, ExperimentStatus -from promptflow._sdk._errors import ExperimentHasCycle, ExperimentValueError +from promptflow._sdk._constants import ExperimentNodeType, ExperimentStatus, FlowRunProperties, RunTypes +from promptflow._sdk._errors import ExperimentCommandRunError, ExperimentHasCycle, ExperimentValueError from promptflow._sdk._submitter import RunSubmitter +from promptflow._sdk._submitter.utils import SubmitterHelper from promptflow._sdk.entities import Run from promptflow._sdk.entities._experiment import Experiment from promptflow._sdk.operations import RunOperations from promptflow._sdk.operations._experiment_operations import ExperimentOperations +from promptflow._sdk.operations._local_storage_operations import LocalStorageOperations from promptflow._utils.logger_utils import LoggerFactory +from promptflow.contracts.run_info import Status +from promptflow.contracts.run_mode import RunMode +from promptflow.exceptions import UserErrorException logger = LoggerFactory.get_logger(name=__name__) @@ -24,6 +32,7 @@ def __init__(self, run_operations: RunOperations, experiment_operations: Experim self.run_operations = run_operations self.experiment_operations = experiment_operations self.run_submitter = ExperimentRunSubmitter(run_operations) + self.command_submitter = ExperimentCommandSubmitter(run_operations) def start(self, experiment: Experiment, **kwargs): """Start an experiment. @@ -53,6 +62,7 @@ def start(self, experiment: Experiment, **kwargs): experiment._append_node_run(node.name, run) self.experiment_operations.create_or_update(experiment) run_dict[node.name] = run + logger.info(f"Node {node.name} run {run.name} completed, outputs to {run._output_path}.") except Exception as e: logger.error(f"Running experiment {experiment.name} failed with error {e}.") finally: @@ -69,7 +79,13 @@ def _ensure_nodes_order(self, nodes): def _prepare_edges(node): node_names = set() for input_value in node.inputs.values(): - if input_value.startswith("${") and not input_value.startswith("${data."): + if not isinstance(input_value, str): + continue + if ( + input_value.startswith("${") + and not input_value.startswith("${data.") + and not input_value.startswith("${inputs.") + ): referenced_node_name = input_value.split(".")[0].replace("${", "") node_names.add(referenced_node_name) return node_names @@ -99,14 +115,15 @@ def _prepare_edges(node): def _run_node(self, node, experiment, run_dict) -> Run: if node.type == ExperimentNodeType.FLOW: return self._run_flow_node(node, experiment, run_dict) - elif node.type == ExperimentNodeType.CODE: - return self._run_script_node(node, experiment) + elif node.type == ExperimentNodeType.COMMAND: + return self._run_command_node(node, experiment, run_dict) raise ExperimentValueError(f"Unknown experiment node {node.name!r} type {node.type!r}") def _run_flow_node(self, node, experiment, run_dict): run_output_path = (Path(experiment._output_dir) / "runs" / node.name).resolve().absolute().as_posix() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") run = ExperimentRun( + node_name=node.name, experiment=experiment, experiment_runs=run_dict, # Use node name as prefix for run name? @@ -123,14 +140,33 @@ def _run_flow_node(self, node, experiment, run_dict): logger.debug(f"Creating run {run.name}") return self.run_submitter.submit(run) - def _run_script_node(self, node, experiment): - pass + def _run_command_node(self, node, experiment, run_dict): + run_output_path = (Path(experiment._output_dir) / "runs" / node.name).resolve().absolute().as_posix() + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") + run = ExperimentRun( + type=RunTypes.COMMAND, + node_name=node.name, + experiment=experiment, + experiment_runs=run_dict, + name=f"{node.name}_attempt{timestamp}", + display_name=node.display_name or node.name, + column_mapping=node.inputs, + # Use command code path as flow path + flow=node.code, + outputs=node.outputs, + command=node.command, + environment_variables=node.environment_variables, + config=Configuration(overrides={Configuration.RUN_OUTPUT_PATH: run_output_path}), + ) + logger.debug(f"Creating run {run.name}") + return self.command_submitter.submit(run) class ExperimentRun(Run): """Experiment run, includes experiment running context, like data, inputs and runs.""" - def __init__(self, experiment, experiment_runs, **kwargs): + def __init__(self, experiment, node_name, experiment_runs: Dict[str, "ExperimentRun"], **kwargs): + self.node_name = node_name self.experiment = experiment self.experiment_data = {data.name: data for data in experiment.data} self.experiment_inputs = {input.name: input for input in experiment.inputs} @@ -140,21 +176,62 @@ def __init__(self, experiment, experiment_runs, **kwargs): def _resolve_column_mapping(self): """Resolve column mapping with experiment inputs to constant values.""" - logger.info(f"Start resolve node {self.display_name!r} column mapping.") + logger.info(f"Start resolve node {self.node_name!r} column mapping.") resolved_mapping = {} for name, value in self.column_mapping.items(): - if not value.startswith("${inputs."): + if not isinstance(value, str) or not value.startswith("${inputs."): resolved_mapping[name] = value continue input_name = value.split(".")[1].replace("}", "") if input_name not in self.experiment_inputs: raise ExperimentValueError( - f"Node {self.display_name!r} inputs {value!r} related experiment input {input_name!r} not found." + f"Node {self.node_name!r} inputs {value!r} related experiment input {input_name!r} not found." ) resolved_mapping[name] = self.experiment_inputs[input_name].default - logger.debug(f"Resolved node {self.display_name!r} column mapping {resolved_mapping}.") + logger.debug(f"Resolved node {self.node_name!r} column mapping {resolved_mapping}.") self.column_mapping = resolved_mapping + def _get_referenced_data_and_run(self) -> tuple: + """Get the node referenced data and runs. Format: {name: ExperimentData/ExperimentRun}""" + data, run = {}, {} + inputs_mapping = self.column_mapping + for value in inputs_mapping.values(): + if not isinstance(value, str): + continue + if value.startswith("${data."): + name = value.split(".")[1].replace("}", "") + if name not in self.experiment_data: + raise ExperimentValueError( + f"Node {self.display_name!r} inputs {value!r} related experiment data {name!r} not found." + ) + data[name] = self.experiment_data[name] + elif value.startswith("${"): + name = value.split(".")[0].replace("${", "") + if name not in self.experiment_runs: + raise ExperimentValueError( + f"Node {self.display_name!r} inputs {value!r} related experiment run {name!r} not found." + ) + run[name] = self.experiment_runs[name] + return data, run + + +class ExperimentRunSubmitterHelper: + @staticmethod + def resolve_binding_from_run(run_name, run, run_operations) -> dict: + """Return the valid binding dict based on a run.""" + binding_dict = { + # to align with cloud behavior, run.inputs should refer to original data + f"{run_name}.inputs": run_operations._get_data_path(run), + } + + # Update command node outputs + if run._outputs: + binding_dict.update({f"{run_name}.outputs.{name}": path for name, path in run._outputs.items()}) + else: + binding_dict.update({f"{run_name}.outputs": run_operations._get_outputs_path(run)}) + logger.debug(f"Resolved node {run_name} binding inputs {binding_dict}.") + return binding_dict + class ExperimentRunSubmitter(RunSubmitter): """Experiment run submitter, override some function from RunSubmitter as experiment run could be different.""" @@ -165,42 +242,174 @@ def _validate_inputs(cls, run: Run): return def _resolve_input_dirs(self, run: ExperimentRun): - logger.info("Start resolve node %s input dirs.", run.name) + logger.info("Start resolve node %s input dirs.", run.node_name) logger.debug(f"Experiment context: {run.experiment_data}, {run.experiment_runs}, inputs: {run.column_mapping}") # Get the node referenced data and run - data_name, run_name = None, None - inputs_mapping = run.column_mapping - for value in inputs_mapping.values(): - referenced_data, referenced_run = None, None - if value.startswith("${data."): - referenced_data = value.split(".")[1].replace("}", "") - elif value.startswith("${"): - referenced_run = value.split(".")[0].replace("${", "") - if referenced_data: - if data_name and data_name != referenced_data: - raise ExperimentValueError( - f"Experiment has multiple data inputs {data_name!r} and {referenced_data!r}" - ) - data_name = referenced_data - if referenced_run: - if run_name and run_name != referenced_run: - raise ExperimentValueError( - f"Experiment has multiple run inputs {run_name!r} and {referenced_run!r}" - ) - run_name = referenced_run - logger.debug(f"Resolve node {run.name} referenced data {data_name!r}, run {run_name!r}.") + referenced_data, referenced_run = run._get_referenced_data_and_run() + if len(referenced_data) > 1: + raise ExperimentValueError( + f"Experiment flow node {run.node_name!r} has multiple data inputs {referenced_data}, " + "only 1 is expected." + ) + if len(referenced_run) > 1: + raise ExperimentValueError( + f"Experiment flow node {run.node_name!r} has multiple run inputs {referenced_run}, " + "only 1 is expected." + ) + (data_name, data_obj) = next(iter(referenced_data.items())) if referenced_data else (None, None) + (run_name, run_obj) = next(iter(referenced_run.items())) if referenced_run else (None, None) + logger.debug(f"Resolve node {run.node_name} referenced data {data_name!r}, run {run_name!r}.") # Build inputs from experiment data and run result = {} - if data_name in run.experiment_data and run.experiment_data[data_name].path: - result.update({f"data.{data_name}": run.experiment_data[data_name].path}) - if run_name in run.experiment_runs: - result.update( + if data_obj: + result.update({f"data.{data_name}": data_obj.path}) + if run_obj: + result.update(ExperimentRunSubmitterHelper.resolve_binding_from_run(run_name, run_obj, self.run_operations)) + result = {k: str(Path(v).resolve()) for k, v in result.items() if v is not None} + logger.debug(f"Resolved node {run.node_name} input dirs {result}.") + return result + + +class ExperimentCommandSubmitter: + """Experiment command submitter, responsible for experiment command running.""" + + def __init__(self, run_operations: RunOperations): + self.run_operations = run_operations + + def submit(self, run: ExperimentRun, **kwargs): + """Submit an experiment command run. + + :param run: Experiment command to submit. + :type run: ~promptflow.entities.Run + """ + local_storage = LocalStorageOperations(run, run_mode=RunMode.SingleNode) + self._submit_command_run(run=run, local_storage=local_storage) + return self.run_operations.get(name=run.name) + + def _resolve_inputs(self, run: ExperimentRun): + """Resolve binding inputs to constant values.""" + # e.g. "input_path": "${data.my_data}" -> "${inputs.input_path}": "real_data_path" + logger.info("Start resolve node %s inputs.", run.node_name) + data, runs = run._get_referenced_data_and_run() + # prepare "${data.my_data}": real_data_path + binding_dict = {"${data.%s}" % name: val.path for name, val in data.items()} + # prepare "${run.outputs}": run_outputs_path, "${run.inputs}": run_inputs_path + for name, val in runs.items(): + binding_dict.update( { - f"{run_name}.outputs": self.run_operations._get_outputs_path(run.experiment_runs[run_name]), - # to align with cloud behavior, run.inputs should refer to original data - f"{run_name}.inputs": self.run_operations._get_data_path(run.experiment_runs[run_name]), + "${%s}" % k: v + for k, v in ExperimentRunSubmitterHelper.resolve_binding_from_run( + name, val, self.run_operations + ).items() } ) - result = {k: str(Path(v).resolve()) for k, v in result.items() if v is not None} - logger.debug(f"Resolved node {run.name} input dirs {result}.") - return result + logger.debug(f"Resolved node {run.node_name} binding inputs {binding_dict}.") + # resolve inputs + resolved_inputs = {} + for name, value in run.column_mapping.items(): + if not isinstance(value, str) or not value.startswith("${"): + resolved_inputs[name] = value + continue + # my_input: "${run.outputs}" -> my_input: run_outputs_path + if value in binding_dict: + resolved_inputs[name] = binding_dict[value] + continue + logger.warning( + f"Possibly invalid partial input value binding {value!r} found for node {run.node_name!r}. " + "Only full binding is supported for command node. For example: ${data.my_data}, ${main_node.outputs}." + ) + resolved_inputs[name] = value + logger.debug(f"Resolved node {run.node_name} inputs {resolved_inputs}.") + return resolved_inputs + + def _resolve_outputs(self, run: ExperimentRun): + """Resolve outputs to real path.""" + # e.g. "output_path": "${outputs.my_output}" -> "${outputs.output_path}": "real_output_path" + logger.info("Start resolve node %s outputs.", run.node_name) + # resolve outputs + resolved_outputs = {} + for name, value in run._outputs.items(): + # Set default output path if user doesn't set it + if not value: + # Create default output path if user doesn't set it + value = run._output_path / name + value.mkdir(parents=True, exist_ok=True) + value = value.resolve().absolute().as_posix() + # Update default to run + run._outputs[name] = value + # Note: We will do nothing if user config the value, as we don't know it's a file or folder + resolved_outputs[name] = value + logger.debug(f"Resolved node {run.node_name} outputs {resolved_outputs}.") + return resolved_outputs + + def _resolve_command(self, run: ExperimentRun, inputs: dict, outputs: dict): + """Resolve command to real command.""" + logger.info("Start resolve node %s command.", run.node_name) + # resolve command + resolved_command = run._command + # replace inputs + for name, value in inputs.items(): + resolved_command = resolved_command.replace(f"${{inputs.{name}}}", str(value)) + # replace outputs + for name, value in outputs.items(): + resolved_command = resolved_command.replace(f"${{outputs.{name}}}", str(value)) + logger.debug(f"Resolved node {run.node_name} command {resolved_command}.") + if "${" in resolved_command: + logger.warning( + f"Possibly unresolved command value binding found for node {run.node_name!r}. " + f"Resolved command: {resolved_command}. Please check your command again." + ) + return resolved_command + + def _submit_command_run(self, run: ExperimentRun, local_storage: LocalStorageOperations) -> dict: + # resolve environment variables + SubmitterHelper.resolve_environment_variables(environment_variables=run.environment_variables) + SubmitterHelper.init_env(environment_variables=run.environment_variables) + + # resolve inputs & outputs for command preparing + # e.g. input_path: ${data.my_data} -> ${inputs.input_path}: real_data_path + inputs = self._resolve_inputs(run) + outputs = self._resolve_outputs(run) + + # replace to command + command = self._resolve_command(run, inputs, outputs) + + # execute command + status = Status.Failed.value + # create run to db when fully prepared to run in executor, otherwise won't create it + run._dump() # pylint: disable=protected-access + try: + return_code = ExperimentCommandExecutor.run(command=command, cwd=run.flow, local_storage=local_storage) + if return_code != 0: + raise ExperimentCommandRunError( + f"Run {run.name} failed with return code {return_code}, " + f"please check out {run.properties[FlowRunProperties.OUTPUT_PATH]} for more details." + ) + status = Status.Completed.value + except Exception as e: + # when run failed in executor, store the exception in result and dump to file + logger.warning(f"Run {run.name} failed when executing in executor with exception {e}.") + # for user error, swallow stack trace and return failed run since user don't need the stack trace + if not isinstance(e, UserErrorException): + # for other errors, raise it to user to help debug root cause. + raise e + finally: + self.run_operations.update( + name=run.name, + status=status, + end_time=datetime.now(), + ) + + +class ExperimentCommandExecutor: + """Experiment command executor, responsible for experiment command running.""" + + @staticmethod + def run(command: str, cwd: str, local_storage: LocalStorageOperations): + """Start a subprocess to run the command""" + log_path = local_storage.logger.file_path + logger.info(f"Start running command {command}, log path: {log_path}.") + with open(log_path, "w") as log_file: + process = subprocess.Popen(command, stdout=log_file, stderr=log_file, shell=True, env=os.environ, cwd=cwd) + process.wait() + return process.returncode diff --git a/src/promptflow/promptflow/_sdk/_submitter/run_submitter.py b/src/promptflow/promptflow/_sdk/_submitter/run_submitter.py index 4db25f5d753..0feb35abcf4 100644 --- a/src/promptflow/promptflow/_sdk/_submitter/run_submitter.py +++ b/src/promptflow/promptflow/_sdk/_submitter/run_submitter.py @@ -79,6 +79,7 @@ def _validate_inputs(cls, run: Run): def _submit_bulk_run( self, flow: Union[ProtectedFlow, EagerFlow], run: Run, local_storage: LocalStorageOperations ) -> dict: + logger.info(f"Submitting run {run.name}, reach logs at {local_storage.logger.file_path}.") run_id = run.name if flow.language == FlowLanguage.CSharp: connections = [] diff --git a/src/promptflow/promptflow/_sdk/_submitter/utils.py b/src/promptflow/promptflow/_sdk/_submitter/utils.py index 3f9616ea8ca..1b62736f19a 100644 --- a/src/promptflow/promptflow/_sdk/_submitter/utils.py +++ b/src/promptflow/promptflow/_sdk/_submitter/utils.py @@ -41,8 +41,11 @@ from promptflow._sdk.entities._flow import Flow, ProtectedFlow from promptflow._utils.context_utils import _change_working_dir from promptflow._utils.flow_utils import dump_flow_dag, load_flow_dag +from promptflow._utils.logger_utils import get_cli_sdk_logger from promptflow.contracts.flow import Flow as ExecutableFlow +logger = get_cli_sdk_logger() + def overwrite_variant(flow_dag: dict, tuning_node: str = None, variant: str = None, drop_node_variants: bool = False): # need to overwrite default variant if tuning node and variant not specified. @@ -263,6 +266,7 @@ def resolve_environment_variables(cls, environment_variables: dict, client=None) if not environment_variables: return None connection_names = get_used_connection_names_from_dict(environment_variables) + logger.debug("Used connection names: %s", connection_names) connections = cls.resolve_connection_names(connection_names=connection_names, client=client) update_dict_value_with_connections(built_connections=connections, connection_dict=environment_variables) diff --git a/src/promptflow/promptflow/_sdk/entities/_experiment.py b/src/promptflow/promptflow/_sdk/entities/_experiment.py index 1f953e6bb93..3955771f8f5 100644 --- a/src/promptflow/promptflow/_sdk/entities/_experiment.py +++ b/src/promptflow/promptflow/_sdk/entities/_experiment.py @@ -28,12 +28,12 @@ from promptflow._sdk.entities._validation import MutableValidationResult, SchemaValidatableMixin from promptflow._sdk.entities._yaml_translatable import YAMLTranslatableMixin from promptflow._sdk.schemas._experiment import ( + CommandNodeSchema, ExperimentDataSchema, ExperimentInputSchema, ExperimentSchema, ExperimentTemplateSchema, FlowNodeSchema, - ScriptNodeSchema, ) from promptflow._utils.logger_utils import get_cli_sdk_logger from promptflow.contracts.tool import ValueType @@ -114,7 +114,6 @@ def __init__( self.environment_variables = environment_variables or {} self.connections = connections or {} self._properties = properties or {} - self._creation_context = kwargs.get("creation_context", None) # init here to make sure those fields initialized in all branches. self.path = path # default run name: flow directory name + timestamp @@ -141,23 +140,52 @@ def _save_snapshot(self, target): self.path = saved_flow_path.resolve().absolute().as_posix() -class ScriptNode(YAMLTranslatableMixin): - def __init__(self, source, inputs, name, display_name=None, runtime=None, environment_variables=None, **kwargs): - self.type = ExperimentNodeType.CODE - self.display_name = display_name +class CommandNode(YAMLTranslatableMixin): + def __init__( + self, + command, + name, + inputs=None, + outputs=None, + runtime=None, + environment_variables=None, + code=None, + display_name=None, + **kwargs, + ): + self.type = ExperimentNodeType.COMMAND self.name = name - self.source = source - self.inputs = inputs + self.display_name = display_name + self.code = code + self.command = command + self.inputs = inputs or {} + self.outputs = outputs or {} self.runtime = runtime self.environment_variables = environment_variables or {} @classmethod def _get_schema_cls(cls): - return ScriptNodeSchema + return CommandNodeSchema def _save_snapshot(self, target): - # Do nothing for script node for now - pass + """Save command source to experiment snapshot.""" + Path(target).mkdir(parents=True, exist_ok=True) + saved_path = Path(target) / self.name + if not self.code: + # Create an empty folder + saved_path.mkdir(parents=True, exist_ok=True) + self.code = saved_path.resolve().absolute().as_posix() + return + code = Path(self.code) + if not code.exists(): + raise ExperimentValueError(f"Command node code {code} does not exist.") + if code.is_dir(): + shutil.copytree(src=self.code, dst=saved_path) + else: + saved_path.mkdir(parents=True, exist_ok=True) + shutil.copy(src=self.code, dst=saved_path) + logger.debug(f"Command node source saved to {saved_path}.") + self.code = saved_path.resolve().absolute().as_posix() class ExperimentTemplate(YAMLTranslatableMixin, SchemaValidatableMixin): @@ -334,9 +362,9 @@ def _from_orm_object(cls, obj: ORMExperiment) -> "Experiment": nodes.append( FlowNode._load_from_dict(node_dict, context=context, additional_message="Failed to load node.") ) - elif node_dict["type"] == ExperimentNodeType.CODE: + elif node_dict["type"] == ExperimentNodeType.COMMAND: nodes.append( - ScriptNode._load_from_dict(node_dict, context=context, additional_message="Failed to load node.") + CommandNode._load_from_dict(node_dict, context=context, additional_message="Failed to load node.") ) else: raise Exception(f"Unknown node type {node_dict['type']}") diff --git a/src/promptflow/promptflow/_sdk/entities/_run.py b/src/promptflow/promptflow/_sdk/entities/_run.py index 9a3e5fbaff8..5f66c850b88 100644 --- a/src/promptflow/promptflow/_sdk/entities/_run.py +++ b/src/promptflow/promptflow/_sdk/entities/_run.py @@ -128,7 +128,7 @@ def __init__( **kwargs, ): # TODO: remove when RUN CRUD don't depend on this - self.type = RunTypes.BATCH + self.type = kwargs.get("type", RunTypes.BATCH) self.data = data self.column_mapping = column_mapping self.display_name = display_name @@ -181,6 +181,8 @@ def __init__( self._output_path = Path(source) self._runtime = kwargs.get("runtime", None) self._resources = kwargs.get("resources", None) + self._outputs = kwargs.get("outputs", None) + self._command = kwargs.get("command", None) @property def created_on(self) -> str: @@ -204,6 +206,10 @@ def properties(self) -> Dict[str, str]: result[FlowRunProperties.RUN] = run_name if self.variant: result[FlowRunProperties.NODE_VARIANT] = self.variant + if self._command: + result[FlowRunProperties.COMMAND] = self._command + if self._outputs: + result[FlowRunProperties.OUTPUTS] = self._outputs elif self._run_source == RunInfoSources.EXISTING_RUN: result = { FlowRunProperties.OUTPUT_PATH: Path(self.source).resolve().as_posix(), @@ -245,6 +251,9 @@ def _from_orm_object(cls, obj: ORMRun) -> "Run": properties={FlowRunProperties.SYSTEM_METRICS: properties_json.get(FlowRunProperties.SYSTEM_METRICS, {})}, # compatible with old runs, their run_source is empty, treat them as local run_source=obj.run_source or RunInfoSources.LOCAL, + # experiment command node only fields + command=properties_json.get(FlowRunProperties.COMMAND, None), + outputs=properties_json.get(FlowRunProperties.OUTPUTS, None), ) @classmethod diff --git a/src/promptflow/promptflow/_sdk/schemas/_experiment.py b/src/promptflow/promptflow/_sdk/schemas/_experiment.py index fcd5e319084..f7c87c0d509 100644 --- a/src/promptflow/promptflow/_sdk/schemas/_experiment.py +++ b/src/promptflow/promptflow/_sdk/schemas/_experiment.py @@ -15,16 +15,18 @@ from promptflow._sdk.schemas._run import RunSchema -class ScriptNodeSchema(metaclass=PatchedSchemaMeta): +class CommandNodeSchema(YamlFileSchema): # TODO: Not finalized now. Need to revisit. name = fields.Str(required=True) - type = StringTransformedEnum(allowed_values=ExperimentNodeType.CODE, required=True) - path = UnionField([LocalPathField(required=True), fields.Str(required=True)]) + display_name = fields.Str() + type = StringTransformedEnum(allowed_values=ExperimentNodeType.COMMAND, required=True) + code = LocalPathField(default=".") + command = fields.Str(required=True) inputs = fields.Dict(keys=fields.Str) + outputs = fields.Dict(keys=fields.Str, values=LocalPathField(allow_none=True)) + environment_variables = fields.Dict(keys=fields.Str, values=fields.Str) # runtime field, only available for cloud run runtime = fields.Str() # TODO: Revisit the required fields - display_name = fields.Str() - environment_variables = fields.Dict(keys=fields.Str, values=fields.Str) class FlowNodeSchema(RunSchema): @@ -58,14 +60,21 @@ class ExperimentTemplateSchema(YamlFileSchema): description = fields.Str() data = fields.List(NestedField(ExperimentDataSchema)) # Optional inputs = fields.List(NestedField(ExperimentInputSchema)) # Optional - nodes = fields.List(UnionField([NestedField(FlowNodeSchema), NestedField(ScriptNodeSchema)]), required=True) + nodes = fields.List( + UnionField( + [ + NestedField(CommandNodeSchema), + NestedField(FlowNodeSchema), + ] + ), + required=True, + ) @post_load def resolve_nodes(self, data, **kwargs): - from promptflow._sdk.entities._experiment import FlowNode, ScriptNode + from promptflow._sdk.entities._experiment import CommandNode, FlowNode nodes = data.get("nodes", []) - resolved_nodes = [] for node in nodes: if not isinstance(node, dict): @@ -73,9 +82,9 @@ def resolve_nodes(self, data, **kwargs): node_type = node.get("type", None) if node_type == ExperimentNodeType.FLOW: resolved_nodes.append(FlowNode._load_from_dict(data=node, context=self.context, additional_message="")) - elif node_type == ExperimentNodeType.CODE: + elif node_type == ExperimentNodeType.COMMAND: resolved_nodes.append( - ScriptNode._load_from_dict(data=node, context=self.context, additional_message="") + CommandNode._load_from_dict(data=node, context=self.context, additional_message="") ) else: raise ValueError(f"Unknown node type {node_type} for node {node}.") diff --git a/src/promptflow/promptflow/_sdk/schemas/_run.py b/src/promptflow/promptflow/_sdk/schemas/_run.py index 99674fd37d8..07406ad5d73 100644 --- a/src/promptflow/promptflow/_sdk/schemas/_run.py +++ b/src/promptflow/promptflow/_sdk/schemas/_run.py @@ -102,6 +102,11 @@ class RunSchema(YamlFileSchema): connections = fields.Dict(keys=fields.Str(), values=fields.Dict(keys=fields.Str())) # endregion: context + # region: command node + command = fields.Str(dump_only=True) + outputs = fields.Dict(key=fields.Str(), dump_only=True) + # endregion: command node + @post_load def resolve_dot_env_file(self, data, **kwargs): return _resolve_dot_env_file(data, **kwargs) diff --git a/src/promptflow/tests/sdk_cli_test/e2etests/test_cli.py b/src/promptflow/tests/sdk_cli_test/e2etests/test_cli.py index fbd823d9e39..4d638e10fdf 100644 --- a/src/promptflow/tests/sdk_cli_test/e2etests/test_cli.py +++ b/src/promptflow/tests/sdk_cli_test/e2etests/test_cli.py @@ -1939,7 +1939,7 @@ def test_experiment_start(self, monkeypatch, capfd, local_client): "experiment", "create", "--template", - f"{EXPERIMENT_DIR}/basic-no-script-template/basic.exp.yaml", + f"{EXPERIMENT_DIR}/basic-script-template/basic-script.exp.yaml", "--name", exp_name, ) @@ -1956,8 +1956,8 @@ def test_experiment_start(self, monkeypatch, capfd, local_client): out, _ = capfd.readouterr() assert ExperimentStatus.TERMINATED in out exp = local_client._experiments.get(name=exp_name) - assert len(exp.node_runs["main"]) > 0 - assert len(exp.node_runs["eval"]) > 0 + assert len(exp.node_runs) == 4 + assert all(len(exp.node_runs[node_name]) > 0 for node_name in exp.node_runs) metrics = local_client.runs.get_metrics(name=exp.node_runs["eval"][0]["name"]) assert "accuracy" in metrics diff --git a/src/promptflow/tests/sdk_cli_test/e2etests/test_experiment.py b/src/promptflow/tests/sdk_cli_test/e2etests/test_experiment.py index f302c820661..4f261902355 100644 --- a/src/promptflow/tests/sdk_cli_test/e2etests/test_experiment.py +++ b/src/promptflow/tests/sdk_cli_test/e2etests/test_experiment.py @@ -7,6 +7,7 @@ from promptflow._sdk._constants import ExperimentStatus, RunStatus from promptflow._sdk._load_functions import load_common from promptflow._sdk.entities._experiment import ( + CommandNode, Experiment, ExperimentData, ExperimentInput, @@ -51,6 +52,36 @@ def test_experiment_from_template(self): assert experiment_dict["nodes"][1].items() == expected["nodes"][1].items() assert experiment_dict.items() >= expected.items() + def test_experiment_from_template_with_script_node(self): + template_path = EXP_ROOT / "basic-script-template" / "basic-script.exp.yaml" + # Load template and create experiment + template = load_common(ExperimentTemplate, source=template_path) + experiment = Experiment.from_template(template) + # Assert command node load correctly + assert len(experiment.nodes) == 4 + expected = dict(yaml.load(open(template_path, "r", encoding="utf-8").read())) + experiment_dict = experiment._to_dict() + assert isinstance(experiment.nodes[0], CommandNode) + assert isinstance(experiment.nodes[1], FlowNode) + assert isinstance(experiment.nodes[2], FlowNode) + assert isinstance(experiment.nodes[3], CommandNode) + gen_data_snapshot_path = experiment._output_dir / "snapshots" / "gen_data" + echo_snapshot_path = experiment._output_dir / "snapshots" / "echo" + expected["nodes"][0]["code"] = gen_data_snapshot_path.absolute().as_posix() + expected["nodes"][3]["code"] = echo_snapshot_path.absolute().as_posix() + expected["nodes"][3]["environment_variables"] = {} + assert experiment_dict["nodes"][0].items() == expected["nodes"][0].items() + assert experiment_dict["nodes"][3].items() == expected["nodes"][3].items() + # Assert snapshots + assert gen_data_snapshot_path.exists() + file_count = len(list(gen_data_snapshot_path.rglob("*"))) + assert file_count == 1 + assert (gen_data_snapshot_path / "generate_data.py").exists() + # Assert no file exists in echo path + assert echo_snapshot_path.exists() + file_count = len(list(echo_snapshot_path.rglob("*"))) + assert file_count == 0 + def test_experiment_create_and_get(self): template_path = EXP_ROOT / "basic-no-script-template" / "basic.exp.yaml" # Load template and create experiment @@ -85,3 +116,17 @@ def test_experiment_start(self): assert eval_run.display_name == "eval" metrics = client.runs.get_metrics(name=eval_run.name) assert "accuracy" in metrics + + @pytest.mark.usefixtures("use_secrets_config_file", "recording_injection", "setup_local_connection") + def test_experiment_with_script_start(self): + template_path = EXP_ROOT / "basic-script-template" / "basic-script.exp.yaml" + # Load template and create experiment + template = load_common(ExperimentTemplate, source=template_path) + experiment = Experiment.from_template(template) + client = PFClient() + exp = client._experiments.create_or_update(experiment) + exp = client._experiments.start(exp.name) + assert exp.status == ExperimentStatus.TERMINATED + assert len(exp.node_runs) == 4 + for key, val in exp.node_runs.items(): + assert val[0]["status"] == RunStatus.COMPLETED, f"Node {key} run failed" diff --git a/src/promptflow/tests/test_configs/experiments/basic-script-template/basic-script.exp.yaml b/src/promptflow/tests/test_configs/experiments/basic-script-template/basic-script.exp.yaml new file mode 100644 index 00000000000..6a76782705d --- /dev/null +++ b/src/promptflow/tests/test_configs/experiments/basic-script-template/basic-script.exp.yaml @@ -0,0 +1,51 @@ +$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Experiment.schema.json + +description: Basic experiment without script node + +data: + - name: my_data + path: ../../flows/web_classification/data.jsonl + +inputs: + - name: count + type: int + default: 3 + +nodes: + - name: gen_data + type: command + command: python generate_data.py --input-path ${inputs.input_path} --count ${inputs.count} --output-path ${outputs.output_path} + code: ./generate_data + inputs: + input_path: ${data.my_data} + count: ${inputs.count} + outputs: + output_path: + environment_variables: + CONNECTION_KEY: ${azure_open_ai_connection.api_key} + + - name: main + type: flow + path: ../../flows/web_classification/flow.dag.yaml + inputs: + url: ${gen_data.outputs.output_path.url} + variant: ${summarize_text_content.variant_0} + environment_variables: {} + connections: {} + + - name: eval + type: flow + path: ../../flows/eval-classification-accuracy + inputs: + groundtruth: ${data.my_data.answer} # No node can be named with "data" + prediction: ${main.outputs.category} + environment_variables: {} + connections: {} + + - name: echo + type: command + command: echo ${inputs.input_path} > ${outputs.output_path}/output.txt + inputs: + input_path: ${main.outputs} + outputs: + output_path: diff --git a/src/promptflow/tests/test_configs/experiments/basic-script-template/generate_data/generate_data.py b/src/promptflow/tests/test_configs/experiments/basic-script-template/generate_data/generate_data.py new file mode 100644 index 00000000000..8021c648e29 --- /dev/null +++ b/src/promptflow/tests/test_configs/experiments/basic-script-template/generate_data/generate_data.py @@ -0,0 +1,24 @@ +import argparse +import os +from pathlib import Path + +parser = argparse.ArgumentParser() +parser.add_argument("--input-path", type=str, required=True) +parser.add_argument("--output-path", type=str, required=True) +parser.add_argument("--count", type=int, required=True) +args = parser.parse_args() + +env_var = os.environ.get("CONNECTION_KEY") +assert env_var is not None, "Environment variable CONNECTION_KEY not set!" +assert env_var != "${azure_open_ai_connection.api_key}", "Environment variable CONNECTION_KEY not resolved!" + +with open(args.input_path, "r", encoding="utf-8") as f: + input_lines = f.readlines() + +assert args.count == len(input_lines), \ + f"Data number {args.count} different from input lines {len(input_lines)} in file!" + +output_path = Path(args.output_path) +assert output_path.exists(), f"Output path {args.output_path!r} not exists!" +with open(output_path / "data.jsonl", "w", encoding="utf-8") as f: + f.writelines(input_lines)