Skip to content

Commit

Permalink
[devkit] Fix potential empty portal url & wrong flow type of run (#3237)
Browse files Browse the repository at this point in the history
# Description

**Portal url related**

If `portal_url` is `None`, no need to update run; otherwise, a SQL error
will be raised, error message looks like: `"SQL: UPDATE run_info SET
WHERE run_info.name = ?"`

**Flow type**

`Run._flow_type` returns wrong flow type for flex flow run without YAML
in flow directory, as a workaround, create a new exclusive function to
get that for `pf run visualize`.

# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [x] Pull request includes test coverage for the included changes.
  • Loading branch information
zhengfeiwang authored May 13, 2024
1 parent 2b0fcbd commit b7eae79
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ def _submit_bulk_run(
# upload run to cloud if the trace destination is set to cloud
if self._config._is_cloud_trace_destination(path=run._get_flow_dir().resolve()):
portal_url = self._upload_run_to_cloud(run=run, config=self._config)
self.run_operations.update(name=run.name, portal_url=portal_url)
if portal_url is not None:
self.run_operations.update(name=run.name, portal_url=portal_url)

def _resolve_input_dirs(self, run: Run):
result = {"data": run.data if run.data else None}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
MAX_SHOW_DETAILS_RESULTS,
FlowRunProperties,
ListViewType,
LocalStorageFilenames,
RunInfoSources,
RunMode,
RunStatus,
Expand All @@ -35,7 +36,7 @@
from promptflow._sdk.entities import Run
from promptflow._sdk.operations._local_storage_operations import LocalStorageOperations
from promptflow._utils.logger_utils import get_cli_sdk_logger
from promptflow._utils.yaml_utils import load_yaml_string
from promptflow._utils.yaml_utils import load_yaml, load_yaml_string
from promptflow.contracts._run_management import RunDetail, RunMetadata, RunVisualization, VisualizationConfig
from promptflow.exceptions import UserErrorException

Expand Down Expand Up @@ -424,6 +425,25 @@ def _visualize_with_trace_ui(self, runs: List[Run], html_path: Optional[str] = N
html_string = generate_trace_ui_html_string(trace_ui_url)
dump_html(html_string, html_path=html_path, open_html=html_path is None)

def _get_run_flow_type(self, run: Run) -> str:
# BUG 3195705: observed `Run._flow_type` returns wrong flow type
# this function is a workaround to get the correct flow type for visualize run scenario
# so please use this function carefully
from promptflow._utils.flow_utils import is_prompty_flow, resolve_flow_path

# prompty: according to the file extension
if is_prompty_flow(run.flow):
return FlowType.PROMPTY
# DAG vs. flex: "entry" in flow.yaml
# resolve run snapshot, where must exist flow.dag.yaml or flow.flex.yaml
snapshot_path = run._output_path / LocalStorageFilenames.SNAPSHOT_FOLDER
flow_directory, yaml_file = resolve_flow_path(snapshot_path)
yaml_dict = load_yaml(flow_directory / yaml_file)
if isinstance(yaml_dict, dict) and "entry" in yaml_dict:
return FlowType.FLEX_FLOW
else:
return FlowType.DAG_FLOW

@monitor_operation(activity_name="pf.runs.visualize", activity_type=ActivityType.PUBLICAPI)
def visualize(self, runs: Union[str, Run, List[str], List[Run]], **kwargs) -> None:
"""Visualize run(s).
Expand All @@ -448,7 +468,8 @@ def visualize(self, runs: Union[str, Run, List[str], List[Run]], **kwargs) -> No
# for existing run source run, will raise type error when call `_flow_type`, so skip it
if run._run_source == RunInfoSources.EXISTING_RUN:
continue
if run._flow_type == FlowType.FLEX_FLOW or run._flow_type == FlowType.PROMPTY:
flow_type = self._get_run_flow_type(run)
if flow_type == FlowType.FLEX_FLOW or flow_type == FlowType.PROMPTY:
has_flex_or_prompty = True
break
if has_flex_or_prompty is True:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from marshmallow import ValidationError
from pytest_mock import MockerFixture

from promptflow._constants import PROMPTFLOW_CONNECTIONS
from promptflow._constants import PROMPTFLOW_CONNECTIONS, FlowType
from promptflow._sdk._constants import (
FLOW_DIRECTORY_MACRO_IN_CONFIG,
PROMPT_FLOW_DIR_NAME,
Expand Down Expand Up @@ -1996,6 +1996,16 @@ def test_visualize_different_runs(self, pf: PFClient) -> None:
pf.visualize(runs=[dag_flow_run, flex_flow_run, prompty_run])
assert static_vis_func.call_count == 1 and trace_ui_vis_func.call_count == 3

def test_flex_flow_run_flow_type(self, pf: PFClient) -> None:
run = pf.run(
flow="entry:my_flow",
code=f"{EAGER_FLOWS_DIR}/simple_without_yaml",
data=f"{DATAS_DIR}/simple_eager_flow_data.jsonl",
)
# BUG 3195705: uncomment below line after bug fixed, current value is "dag"
# assert run._flow_type == FlowType.FLEX_FLOW
assert pf.runs._get_run_flow_type(run) == FlowType.FLEX_FLOW


def assert_batch_run_result(run: Run, pf: PFClient, assert_func):
assert run.status == "Completed"
Expand Down

0 comments on commit b7eae79

Please sign in to comment.