Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/promptflow/1.2.0 #1460

Merged
merged 12 commits into from
Dec 14, 2023
3 changes: 2 additions & 1 deletion src/promptflow/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Release History

## 1.2.0 (upcoming)
## 1.2.0 (2023.12.14)

### Features Added
- [SDK/CLI] Support `pfazure run download` to download run data from Azure AI.
- [SDK/CLI] Support `pf run create` to create a local run record from downloaded run data.

### Bugs Fixed

Expand Down
7 changes: 5 additions & 2 deletions src/promptflow/promptflow/_cli/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import argparse

from promptflow._sdk._constants import CLIListOutputFormat, FlowType
from promptflow._sdk._constants import PROMPT_FLOW_DIR_NAME, PROMPT_FLOW_RUNS_DIR_NAME, CLIListOutputFormat, FlowType

# TODO: avoid azure dependency here
MAX_LIST_CLI_RESULTS = 50
Expand Down Expand Up @@ -160,7 +160,10 @@ def add_param_output(parser):
"-o",
"--output",
type=str,
help="The output directory to store the results. Default to be current working directory if not specified.",
help=(
f"The output directory to store the results. "
f"Default to be ~/{PROMPT_FLOW_DIR_NAME}/{PROMPT_FLOW_RUNS_DIR_NAME} if not specified."
),
)


Expand Down
26 changes: 20 additions & 6 deletions src/promptflow/promptflow/_cli/_pf/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from promptflow._sdk._pf_client import PFClient
from promptflow._sdk._run_functions import _create_run
from promptflow._sdk.entities import Run
from promptflow.exceptions import UserErrorException


def add_run_parser(subparsers):
Expand Down Expand Up @@ -119,13 +120,18 @@ def add_run_create(subparsers):
pf run create -f <yaml-filename>
# Create a run from flow directory and reference a run:
pf run create --flow <path-to-flow-directory> --data <path-to-data-file> --column-mapping groundtruth='${data.answer}' prediction='${run.outputs.category}' --run <run-name> --variant "${summarize_text_content.variant_0}" --stream # noqa: E501
# Create a run from an existing run record folder
pf run create --source <path-to-run-folder>
"""

# data for pf has different help doc than pfazure
def add_param_data(parser):
parser.add_argument("--data", type=str, help="Local path to the data file.")

add_run_create_common(subparsers, [add_param_data], epilog=epilog)
def add_param_source(parser):
parser.add_argument("--source", type=str, help="Local path to the existing run record folder.")

add_run_create_common(subparsers, [add_param_data, add_param_source], epilog=epilog)


def add_run_cancel(subparsers):
Expand Down Expand Up @@ -521,6 +527,7 @@ def _parse_kv_pair(kv_pairs: str) -> Dict[str, str]:
def create_run(create_func: Callable, args):
file = args.file
flow = args.flow
run_source = getattr(args, "source", None) # source is only available for pf args, not pfazure.
data = args.data
column_mapping = args.column_mapping
variant = args.variant
Expand All @@ -529,7 +536,7 @@ def create_run(create_func: Callable, args):
stream = args.stream
environment_variables = args.environment_variables
connections = args.connections
params_override = args.params_override
params_override = args.params_override or []

if environment_variables:
environment_variables = list_of_dict_to_dict(environment_variables)
Expand All @@ -538,7 +545,6 @@ def create_run(create_func: Callable, args):
if column_mapping:
column_mapping = list_of_dict_to_dict(column_mapping)

params_override = params_override or []
if file:
for param_key, param in {
"name": name,
Expand All @@ -555,9 +561,7 @@ def create_run(create_func: Callable, args):
params_override.append({param_key: param})

run = load_run(source=file, params_override=params_override)
elif flow is None:
raise ValueError("--flow is required when not using --file.")
else:
elif flow:
run_data = {
"name": name,
"flow": flow,
Expand All @@ -572,6 +576,16 @@ def create_run(create_func: Callable, args):
run_data = {k: v for k, v in run_data.items() if v is not None}

run = Run._load(data=run_data, params_override=params_override)
elif run_source:
display_name, description, tags = _parse_metadata_args(params_override)
processed_params = {
"display_name": display_name,
"description": description,
"tags": tags,
}
run = Run._load_from_source(source=run_source, params_override=processed_params)
else:
raise UserErrorException("To create a run, one of [file, flow, source] must be specified.")
run = create_func(run=run, stream=stream)
if stream:
print("\n") # change new line to show run info
Expand Down
18 changes: 14 additions & 4 deletions src/promptflow/promptflow/_cli/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ def get_credentials_for_cli():
# check OBO via environment variable, the referenced code can be found from below search:
# https://msdata.visualstudio.com/Vienna/_search?text=AZUREML_OBO_ENABLED&type=code&pageSize=25&filters=ProjectFilters%7BVienna%7D&action=contents
if os.getenv(IdentityEnvironmentVariable.OBO_ENABLED_FLAG):
logger.info("User identity is configured, use OBO credential.")
logger.debug("User identity is configured, use OBO credential.")
credential = AzureMLOnBehalfOfCredential()
else:
client_id_from_env = os.getenv(IdentityEnvironmentVariable.DEFAULT_IDENTITY_CLIENT_ID)
if client_id_from_env:
# use managed identity when client id is available from environment variable.
# reference code:
# https://learn.microsoft.com/en-us/azure/machine-learning/how-to-identity-based-service-authentication?tabs=cli#compute-cluster
logger.info("Use managed identity credential.")
logger.debug("Use managed identity credential.")
credential = ManagedIdentityCredential(client_id=client_id_from_env)
elif is_in_ci_pipeline():
# use managed identity when executing in CI pipeline.
logger.info("Use azure cli credential.")
logger.debug("Use azure cli credential.")
credential = AzureCliCredential()
else:
# use default Azure credential to handle other cases.
logger.info("Use default credential.")
logger.debug("Use default credential.")
credential = DefaultAzureCredential()

return credential
Expand Down Expand Up @@ -473,3 +473,13 @@ def _get_cli_activity_name(cli, args):
activity_name += f".{args.sub_action}"

return activity_name


def _try_delete_existing_run_record(run_name: str):
from promptflow._sdk._errors import RunNotFoundError
from promptflow._sdk._orm import RunInfo as ORMRun

try:
ORMRun.delete(run_name)
except RunNotFoundError:
pass
13 changes: 13 additions & 0 deletions src/promptflow/promptflow/_sdk/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ class RunInfoSources(str, Enum):
INDEX_SERVICE = "index_service"
RUN_HISTORY = "run_history"
MT_SERVICE = "mt_service"
EXISTING_RUN = "existing_run"


class ConfigValueType(str, Enum):
Expand All @@ -283,6 +284,11 @@ class ConnectionType(str, Enum):
CUSTOM = "Custom"


ALL_CONNECTION_TYPES = set(
map(lambda x: f"{x.value}Connection", filter(lambda x: x != ConnectionType._NOT_SET, ConnectionType))
)


class ConnectionFields(str, Enum):
CONNECTION = "connection"
DEPLOYMENT_NAME = "deployment_name"
Expand Down Expand Up @@ -340,3 +346,10 @@ class AzureFlowSource:
LOCAL = "local"
PF_SERVICE = "pf_service"
INDEX = "index"


class DownloadedRun:
SNAPSHOT_FOLDER = LocalStorageFilenames.SNAPSHOT_FOLDER
METRICS_FILE_NAME = LocalStorageFilenames.METRICS
LOGS_FILE_NAME = LocalStorageFilenames.LOG
RUN_METADATA_FILE_NAME = "run_metadata.json"
14 changes: 13 additions & 1 deletion src/promptflow/promptflow/_sdk/_orm/run_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ class RunInfo(Base):
start_time = Column(TEXT) # ISO8601("YYYY-MM-DD HH:MM:SS.SSS"), string
end_time = Column(TEXT) # ISO8601("YYYY-MM-DD HH:MM:SS.SSS"), string
data = Column(TEXT) # local path of original run data, string
run_source = Column(TEXT) # run source, string

__table_args__ = (Index(RUN_INFO_CREATED_ON_INDEX_NAME, "created_on"),)
# schema version, increase the version number when you change the schema
__pf_schema_version__ = "2"
__pf_schema_version__ = "3"

@sqlite_retry
def dump(self) -> None:
Expand Down Expand Up @@ -158,3 +159,14 @@ def list(max_results: Optional[int], list_view_type: ListViewType) -> List["RunI
return [run_info for run_info in basic_statement.limit(max_results)]
else:
return [run_info for run_info in basic_statement.all()]

@staticmethod
@sqlite_retry
def delete(name: str) -> None:
with mgmt_db_session() as session:
run_info = session.query(RunInfo).filter(RunInfo.name == name).first()
if run_info is not None:
session.delete(run_info)
session.commit()
else:
raise RunNotFoundError(f"Run name {name!r} cannot be found.")
41 changes: 23 additions & 18 deletions src/promptflow/promptflow/_sdk/_submitter/test_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from promptflow._internal import ConnectionManager
from promptflow._sdk._constants import LOGGER_NAME, PROMPT_FLOW_DIR_NAME
from promptflow._sdk._utils import dump_flow_result, parse_variant
from promptflow._sdk.entities._flow import Flow, FlowContext
from promptflow._sdk.entities._flow import FlowContext, ProtectedFlow
from promptflow._sdk.operations._local_storage_operations import LoggerOperations
from promptflow._utils.context_utils import _change_working_dir
from promptflow._utils.exception_utils import ErrorResponse
Expand All @@ -32,7 +32,7 @@


class TestSubmitter:
def __init__(self, flow: Flow, flow_context: FlowContext, client=None):
def __init__(self, flow: ProtectedFlow, flow_context: FlowContext, client=None):
self.flow = flow
self._origin_flow = flow
self._dataplane_flow = None
Expand Down Expand Up @@ -397,7 +397,7 @@ def _get_generator_outputs(outputs):


class TestSubmitterViaProxy(TestSubmitter):
def __init__(self, flow: Flow, flow_context: FlowContext, client=None):
def __init__(self, flow: ProtectedFlow, flow_context: FlowContext, client=None):
super().__init__(flow, flow_context, client)

def flow_test(
Expand All @@ -413,11 +413,13 @@ def flow_test(
from promptflow._constants import LINE_NUMBER_KEY

if not connections:
connections = SubmitterHelper.resolve_connection_names_from_tool_meta(
tools_meta=CSharpExecutorProxy.generate_tool_metadata(
connections = SubmitterHelper.resolve_used_connections(
flow=self.flow,
tools_meta=CSharpExecutorProxy.get_tool_metadata(
flow_file=self.flow.flow_dag_path,
working_dir=self.flow.code,
),
flow_dag=self.flow.dag,
client=self._client,
)
credential_list = ConnectionManager(connections).get_secret_list()

Expand Down Expand Up @@ -468,20 +470,23 @@ def flow_test(
def exec_with_inputs(self, inputs):
from promptflow._constants import LINE_NUMBER_KEY

try:
connections = SubmitterHelper.resolve_connection_names_from_tool_meta(
tools_meta=CSharpExecutorProxy.generate_tool_metadata(
working_dir=self.flow.code,
),
flow_dag=self.flow.dag,
)
storage = DefaultRunStorage(base_dir=self.flow.code, sub_dir=Path(".promptflow/intermediate"))
flow_executor = CSharpExecutorProxy.create(
connections = SubmitterHelper.resolve_used_connections(
flow=self.flow,
tools_meta=CSharpExecutorProxy.get_tool_metadata(
flow_file=self.flow.path,
working_dir=self.flow.code,
connections=connections,
storage=storage,
)
),
client=self._client,
)
storage = DefaultRunStorage(base_dir=self.flow.code, sub_dir=Path(".promptflow/intermediate"))
flow_executor = CSharpExecutorProxy.create(
flow_file=self.flow.path,
working_dir=self.flow.code,
connections=connections,
storage=storage,
)

try:
# validate inputs
flow_inputs, _ = self.resolve_data(inputs=inputs, dataplane_flow=self.dataplane_flow)
line_result = async_run_allowing_running_loop(flow_executor.exec_line_async, inputs, index=0)
Expand Down
59 changes: 42 additions & 17 deletions src/promptflow/promptflow/_sdk/_submitter/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

import contextlib
import os
import re
import tempfile
from collections import defaultdict
from os import PathLike
from pathlib import Path

import pydash
from dotenv import load_dotenv
from pydash import objects

from promptflow._sdk._constants import (
ALL_CONNECTION_TYPES,
DEFAULT_VAR_ID,
INPUTS,
NODE,
Expand All @@ -32,7 +36,7 @@
get_used_connection_names_from_dict,
update_dict_value_with_connections,
)
from promptflow._sdk.entities._flow import Flow
from promptflow._sdk.entities._flow import Flow, ProtectedFlow
from promptflow._utils.context_utils import _change_working_dir
from promptflow._utils.flow_utils import dump_flow_dag, load_flow_dag
from promptflow.contracts.flow import Flow as ExecutableFlow
Expand Down Expand Up @@ -171,7 +175,7 @@ def variant_overwrite_context(
overwrite_connections(flow_dag, connections, working_dir=flow_dir_path)
overwrite_flow(flow_dag, overrides)
flow_path = dump_flow_dag(flow_dag, Path(temp_dir))
flow = Flow(code=flow_dir_path, path=flow_path, dag=flow_dag)
flow = ProtectedFlow(code=flow_dir_path, path=flow_path, dag=flow_dag)
yield flow


Expand All @@ -186,6 +190,7 @@ def init_env(cls, environment_variables):

@staticmethod
def resolve_connections(flow: Flow, client=None, connections_to_ignore=None) -> dict:
# TODO 2856400: use resolve_used_connections instead of this function to avoid using executable in control-plane
from .._pf_client import PFClient

client = client or PFClient()
Expand All @@ -198,21 +203,41 @@ def resolve_connections(flow: Flow, client=None, connections_to_ignore=None) ->
)

@staticmethod
def resolve_connection_names_from_tool_meta(tools_meta: dict, flow_dag: dict,):
connection_names = set({})
tool_names = set({})
if tools_meta:
packages = tools_meta.get("package", {})
for key, pkg in packages.items():
inputs = pkg.get("inputs", {})
if "connection" in inputs and inputs["connection"].get("type", []) == ["object"]:
tool_names.add(key)
nodes = flow_dag.get("nodes", [])
for node in nodes:
if node.get("source", {}).get("tool", "") in tool_names:
connection_names.add(node["inputs"]["connection"])
return list(connection_names)
return []
def resolve_used_connections(flow: ProtectedFlow, tools_meta: dict, client, connections_to_ignore=None) -> dict:
from .._pf_client import PFClient

client = client or PFClient()
connection_names = SubmitterHelper.get_used_connection_names(tools_meta=tools_meta, flow_dag=flow.dag)
connections_to_ignore = connections_to_ignore or []
result = {}
for n in connection_names:
if n not in connections_to_ignore:
conn = client.connections.get(name=n, with_secrets=True)
result[n] = conn._to_execution_connection_dict()
return result

@staticmethod
def get_used_connection_names(tools_meta: dict, flow_dag: dict):
# TODO: handle code tool meta for python
connection_inputs = defaultdict(set)
for package_id, package_meta in tools_meta.get("package", {}).items():
for tool_input_key, tool_input_meta in package_meta.get("inputs", {}).items():
if ALL_CONNECTION_TYPES.intersection(set(tool_input_meta.get("type"))):
connection_inputs[package_id].add(tool_input_key)

connection_names = set()
# TODO: we assume that all variants are resolved here
# TODO: only literal connection inputs are supported
# TODO: check whether we should put this logic in executor as seems it's not possible to avoid touching
# information for executable
for node in flow_dag.get("nodes", []):
package_id = pydash.get(node, "source.tool")
if package_id in connection_inputs:
for connection_input in connection_inputs[package_id]:
connection_name = pydash.get(node, f"inputs.{connection_input}")
if connection_name and not re.match(r"\${.*}", connection_name):
connection_names.add(connection_name)
return list(connection_names)

@classmethod
def resolve_environment_variables(cls, environment_variables: dict, client=None):
Expand Down
Loading
Loading