Skip to content

Commit

Permalink
[Internal][Executor] Create the executor proxy in batch_engine.run an…
Browse files Browse the repository at this point in the history
…d ensure that the life cycle of it is in the run function (#1450)

# Description

Create the executor proxy in batch_engine.run and ensure that the life
cycle of it is in the run function

Main interface changes:

* <a
href="diffhunk://#diff-f3f345ba5efdfd8e3d9ad093ae9c2a0639925c41f9b90d5376db7dadf1943291R25-L30">`src/promptflow/tests/executor/e2etests/test_batch_timeout.py`</a>:
Use `MockPythonExecutorProxy` to run python flow to test timeout. <a
href="diffhunk://#diff-f3f345ba5efdfd8e3d9ad093ae9c2a0639925c41f9b90d5376db7dadf1943291R25-L30">[1]</a>
<a
href="diffhunk://#diff-f3f345ba5efdfd8e3d9ad093ae9c2a0639925c41f9b90d5376db7dadf1943291R73-L64">[2]</a>
<a
href="diffhunk://#diff-f3f345ba5efdfd8e3d9ad093ae9c2a0639925c41f9b90d5376db7dadf1943291R1-R14">[3]</a>
<a
href="diffhunk://#diff-f3f345ba5efdfd8e3d9ad093ae9c2a0639925c41f9b90d5376db7dadf1943291R103-R121">[4]</a>
* <a
href="diffhunk://#diff-ecf0905f2116abe08b5cf2931b856bf39aa8b38a30fadd9042538d076dbfde80R86-R93">`src/promptflow/promptflow/batch/_batch_engine.py`</a>:
Create the executor proxy in batch_engine.run and ensure that the life
cycle of it is in the run function. <a
href="diffhunk://#diff-ecf0905f2116abe08b5cf2931b856bf39aa8b38a30fadd9042538d076dbfde80R86-R93">[1]</a>
<a
href="diffhunk://#diff-ecf0905f2116abe08b5cf2931b856bf39aa8b38a30fadd9042538d076dbfde80R126-R135">[2]</a>
<a
href="diffhunk://#diff-ecf0905f2116abe08b5cf2931b856bf39aa8b38a30fadd9042538d076dbfde80R204">[3]</a>
<a
href="diffhunk://#diff-ecf0905f2116abe08b5cf2931b856bf39aa8b38a30fadd9042538d076dbfde80L136">[4]</a>
<a
href="diffhunk://#diff-ecf0905f2116abe08b5cf2931b856bf39aa8b38a30fadd9042538d076dbfde80R162">[5]</a>
* <a
href="diffhunk://#diff-44ac8eeb30a630bd24987e92f8bd5a180d57a3ad067db5d937561f21771e14c5L112-R112">`src/promptflow/promptflow/batch/_base_executor_proxy.py`</a>:
Changed the API path to lowercase for making HTTP POST requests in the
`BaseExecutorProxy` class. <a
href="diffhunk://#diff-44ac8eeb30a630bd24987e92f8bd5a180d57a3ad067db5d937561f21771e14c5L112-R112">[1]</a>
<a
href="diffhunk://#diff-44ac8eeb30a630bd24987e92f8bd5a180d57a3ad067db5d937561f21771e14c5L93-R93">[2]</a>

# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [x] Pull request includes test coverage for the included changes.
  • Loading branch information
PeiwenGaoMS authored Dec 18, 2023
1 parent a5b2ced commit 9af98e8
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 21 deletions.
4 changes: 2 additions & 2 deletions src/promptflow/promptflow/batch/_base_executor_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async def exec_line_async(
) -> LineResult:
start_time = datetime.utcnow()
# call execution api to get line results
url = self.api_endpoint + "/Execution"
url = self.api_endpoint + "/execution"
payload = {"run_id": run_id, "line_number": index, "inputs": inputs}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=LINE_TIMEOUT_SEC)
Expand All @@ -109,7 +109,7 @@ async def exec_aggregation_async(
) -> AggregationResult:
# call aggregation api to get aggregation result
async with httpx.AsyncClient() as client:
url = self.api_endpoint + "/Aggregation"
url = self.api_endpoint + "/aggregation"
payload = {"run_id": run_id, "batch_inputs": batch_inputs, "aggregation_inputs": aggregation_inputs}
response = await client.post(url, json=payload, timeout=LINE_TIMEOUT_SEC)
result = self._process_http_response(response)
Expand Down
35 changes: 22 additions & 13 deletions src/promptflow/promptflow/batch/_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,14 @@ def __init__(
:param kwargs: The keyword arguments related to creating the executor proxy class
:type kwargs: Any
"""
self._flow_file = flow_file
self._working_dir = Flow._resolve_working_dir(flow_file, working_dir)
self._flow = Flow.from_yaml(flow_file, working_dir=self._working_dir)
FlowValidator.ensure_flow_valid_in_batch_mode(self._flow)

executor_proxy_cls = self.executor_proxy_classes[self._flow.program_language]
with _change_working_dir(self._working_dir):
self._executor_proxy: AbstractExecutorProxy = executor_proxy_cls.create(
flow_file, self._working_dir, connections=connections, storage=storage, **kwargs
)
self._connections = connections
self._storage = storage
self._kwargs = kwargs
# set it to True when the batch run is canceled
self._is_canceled = False

Expand Down Expand Up @@ -125,15 +123,24 @@ def run(

try:
self._start_time = datetime.utcnow()
# set batch input source from input mapping
OperationContext.get_instance().set_batch_input_source_from_inputs_mapping(inputs_mapping)
# resolve input data from input dirs and apply inputs mapping
batch_input_processor = BatchInputsProcessor(self._working_dir, self._flow.inputs, max_lines_count)
batch_inputs = batch_input_processor.process_batch_inputs(input_dirs, inputs_mapping)
# resolve output dir
output_dir = resolve_dir_to_absolute(self._working_dir, output_dir)
# run flow in batch mode
with _change_working_dir(self._working_dir):
# create executor proxy instance according to the flow program language
executor_proxy_cls = self.executor_proxy_classes[self._flow.program_language]
self._executor_proxy: AbstractExecutorProxy = executor_proxy_cls.create(
self._flow_file,
self._working_dir,
connections=self._connections,
storage=self._storage,
**self._kwargs,
)
# set batch input source from input mapping
OperationContext.get_instance().set_batch_input_source_from_inputs_mapping(inputs_mapping)
# resolve input data from input dirs and apply inputs mapping
batch_input_processor = BatchInputsProcessor(self._working_dir, self._flow.inputs, max_lines_count)
batch_inputs = batch_input_processor.process_batch_inputs(input_dirs, inputs_mapping)
# resolve output dir
output_dir = resolve_dir_to_absolute(self._working_dir, output_dir)
# run flow in batch mode
return async_run_allowing_running_loop(
self._exec_in_task, batch_inputs, run_id, output_dir, raise_on_line_failure
)
Expand All @@ -152,6 +159,7 @@ def run(
)
raise unexpected_error from e
finally:
# destroy the executor proxy and end the life cycle of the executor proxy
self._executor_proxy.destroy()

def cancel(self):
Expand Down Expand Up @@ -193,6 +201,7 @@ async def _exec(
output_dir: Path = None,
raise_on_line_failure: bool = False,
) -> BatchResult:
# ensure executor health before execution
await self._executor_proxy.ensure_executor_health()
# apply default value in early stage, so we can use it both in line and aggregation nodes execution.
batch_inputs = [
Expand Down
40 changes: 35 additions & 5 deletions src/promptflow/tests/executor/e2etests/test_batch_timeout.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import signal
from pathlib import Path
from tempfile import mkdtemp
from typing import Optional

import pytest

from promptflow.batch import BatchEngine
from promptflow._constants import LINE_TIMEOUT_SEC, FlowLanguage
from promptflow.batch import BatchEngine, PythonExecutorProxy
from promptflow.batch._result import BatchResult, LineError
from promptflow.contracts.run_info import Status
from promptflow.executor import FlowExecutor
from promptflow.executor._line_execution_process_pool import signal_handler
from promptflow.storage._run_storage import AbstractRunStorage

from ..utils import MemoryRunStorage, get_flow_folder, get_flow_inputs_file, get_yaml_file

Expand All @@ -16,18 +22,23 @@
@pytest.mark.usefixtures("use_secrets_config_file", "dev_connections")
@pytest.mark.e2etest
class TestBatchTimeout:
def setup_method(self):
BatchEngine.register_executor(FlowLanguage.Python, MockPythonExecutorProxy)

@pytest.mark.parametrize(
"flow_folder",
[
SAMPLE_FLOW,
],
)
def test_batch_with_timeout(self, flow_folder, dev_connections):
# set line timeout to 1 second for testing
batch_engine = BatchEngine(
get_yaml_file(flow_folder), get_flow_folder(flow_folder), connections=dev_connections
get_yaml_file(flow_folder),
get_flow_folder(flow_folder),
connections=dev_connections,
line_timeout_sec=1,
)
# set line timeout to 1 second for testing
batch_engine._executor_proxy._flow_executor._line_timeout_sec = 1
# prepare input file and output dir
input_dirs = {"data": get_flow_inputs_file(flow_folder, file_name="samples.json")}
output_dir = Path(mkdtemp())
Expand Down Expand Up @@ -59,9 +70,9 @@ def test_batch_with_one_line_timeout(self, flow_folder, dev_connections):
get_flow_folder(flow_folder),
connections=dev_connections,
storage=mem_run_storage,
line_timeout_sec=60,
)
# set line timeout to 1 second for testing
batch_engine._executor_proxy._flow_executor._line_timeout_sec = 60
# prepare input file and output dir
input_dirs = {"data": get_flow_inputs_file(flow_folder, file_name="samples.json")}
output_dir = Path(mkdtemp())
Expand Down Expand Up @@ -89,3 +100,22 @@ def test_batch_with_one_line_timeout(self, flow_folder, dev_connections):
# assert mem_run_storage persists run infos correctly
assert len(mem_run_storage._flow_runs) == 3, "Flow run is not persisted in memory storage."
assert len(mem_run_storage._node_runs) == 5, "Node run is not persisted in memory storage."


class MockPythonExecutorProxy(PythonExecutorProxy):
@classmethod
def create(
cls,
flow_file: Path,
working_dir: Optional[Path] = None,
*,
connections: Optional[dict] = None,
storage: Optional[AbstractRunStorage] = None,
**kwargs,
) -> "MockPythonExecutorProxy":
line_timeout_sec = kwargs.get("line_timeout_sec", LINE_TIMEOUT_SEC)
flow_executor = FlowExecutor.create(
flow_file, connections, working_dir, storage=storage, raise_ex=False, line_timeout_sec=line_timeout_sec
)
signal.signal(signal.SIGINT, signal_handler)
return cls(flow_executor)
2 changes: 1 addition & 1 deletion src/promptflow/tests/executor/mock_execution_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def run_executor_server(port, has_error=False):
app.router.add_get("/health", _handle_health)

handle_execution_with_customization = partial(_handle_execution, has_error=has_error)
app.router.add_post("/Execution", handle_execution_with_customization)
app.router.add_post("/execution", handle_execution_with_customization)

print(f"Starting server on port {port}")
web.run_app(app, host="localhost", port=port)
Expand Down

0 comments on commit 9af98e8

Please sign in to comment.