Skip to content

Commit

Permalink
Revert "[Internal][Executor] Create the executor proxy in batch_engin…
Browse files Browse the repository at this point in the history
…e.run and ensure that the life cycle of it is in the run function (#1450)"

This reverts commit 9af98e8.
  • Loading branch information
PeiwenGaoMS authored Dec 18, 2023
1 parent 9af98e8 commit d4717b5
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 60 deletions.
4 changes: 2 additions & 2 deletions src/promptflow/promptflow/batch/_base_executor_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async def exec_line_async(
) -> LineResult:
start_time = datetime.utcnow()
# call execution api to get line results
url = self.api_endpoint + "/execution"
url = self.api_endpoint + "/Execution"
payload = {"run_id": run_id, "line_number": index, "inputs": inputs}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=LINE_TIMEOUT_SEC)
Expand All @@ -109,7 +109,7 @@ async def exec_aggregation_async(
) -> AggregationResult:
# call aggregation api to get aggregation result
async with httpx.AsyncClient() as client:
url = self.api_endpoint + "/aggregation"
url = self.api_endpoint + "/Aggregation"
payload = {"run_id": run_id, "batch_inputs": batch_inputs, "aggregation_inputs": aggregation_inputs}
response = await client.post(url, json=payload, timeout=LINE_TIMEOUT_SEC)
result = self._process_http_response(response)
Expand Down
35 changes: 13 additions & 22 deletions src/promptflow/promptflow/batch/_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ def __init__(
:param kwargs: The keyword arguments related to creating the executor proxy class
:type kwargs: Any
"""
self._flow_file = flow_file
self._working_dir = Flow._resolve_working_dir(flow_file, working_dir)
self._flow = Flow.from_yaml(flow_file, working_dir=self._working_dir)
FlowValidator.ensure_flow_valid_in_batch_mode(self._flow)

self._connections = connections
executor_proxy_cls = self.executor_proxy_classes[self._flow.program_language]
with _change_working_dir(self._working_dir):
self._executor_proxy: AbstractExecutorProxy = executor_proxy_cls.create(
flow_file, self._working_dir, connections=connections, storage=storage, **kwargs
)
self._storage = storage
self._kwargs = kwargs
# set it to True when the batch run is canceled
self._is_canceled = False

Expand Down Expand Up @@ -123,24 +125,15 @@ def run(

try:
self._start_time = datetime.utcnow()
# set batch input source from input mapping
OperationContext.get_instance().set_batch_input_source_from_inputs_mapping(inputs_mapping)
# resolve input data from input dirs and apply inputs mapping
batch_input_processor = BatchInputsProcessor(self._working_dir, self._flow.inputs, max_lines_count)
batch_inputs = batch_input_processor.process_batch_inputs(input_dirs, inputs_mapping)
# resolve output dir
output_dir = resolve_dir_to_absolute(self._working_dir, output_dir)
# run flow in batch mode
with _change_working_dir(self._working_dir):
# create executor proxy instance according to the flow program language
executor_proxy_cls = self.executor_proxy_classes[self._flow.program_language]
self._executor_proxy: AbstractExecutorProxy = executor_proxy_cls.create(
self._flow_file,
self._working_dir,
connections=self._connections,
storage=self._storage,
**self._kwargs,
)
# set batch input source from input mapping
OperationContext.get_instance().set_batch_input_source_from_inputs_mapping(inputs_mapping)
# resolve input data from input dirs and apply inputs mapping
batch_input_processor = BatchInputsProcessor(self._working_dir, self._flow.inputs, max_lines_count)
batch_inputs = batch_input_processor.process_batch_inputs(input_dirs, inputs_mapping)
# resolve output dir
output_dir = resolve_dir_to_absolute(self._working_dir, output_dir)
# run flow in batch mode
return async_run_allowing_running_loop(
self._exec_in_task, batch_inputs, run_id, output_dir, raise_on_line_failure
)
Expand All @@ -159,7 +152,6 @@ def run(
)
raise unexpected_error from e
finally:
# destroy the executor proxy and end the life cycle of the executor proxy
self._executor_proxy.destroy()

def cancel(self):
Expand Down Expand Up @@ -201,7 +193,6 @@ async def _exec(
output_dir: Path = None,
raise_on_line_failure: bool = False,
) -> BatchResult:
# ensure executor health before execution
await self._executor_proxy.ensure_executor_health()
# apply default value in early stage, so we can use it both in line and aggregation nodes execution.
batch_inputs = [
Expand Down
40 changes: 5 additions & 35 deletions src/promptflow/tests/executor/e2etests/test_batch_timeout.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import signal
from pathlib import Path
from tempfile import mkdtemp
from typing import Optional

import pytest

from promptflow._constants import LINE_TIMEOUT_SEC, FlowLanguage
from promptflow.batch import BatchEngine, PythonExecutorProxy
from promptflow.batch import BatchEngine
from promptflow.batch._result import BatchResult, LineError
from promptflow.contracts.run_info import Status
from promptflow.executor import FlowExecutor
from promptflow.executor._line_execution_process_pool import signal_handler
from promptflow.storage._run_storage import AbstractRunStorage

from ..utils import MemoryRunStorage, get_flow_folder, get_flow_inputs_file, get_yaml_file

Expand All @@ -22,23 +16,18 @@
@pytest.mark.usefixtures("use_secrets_config_file", "dev_connections")
@pytest.mark.e2etest
class TestBatchTimeout:
def setup_method(self):
BatchEngine.register_executor(FlowLanguage.Python, MockPythonExecutorProxy)

@pytest.mark.parametrize(
"flow_folder",
[
SAMPLE_FLOW,
],
)
def test_batch_with_timeout(self, flow_folder, dev_connections):
# set line timeout to 1 second for testing
batch_engine = BatchEngine(
get_yaml_file(flow_folder),
get_flow_folder(flow_folder),
connections=dev_connections,
line_timeout_sec=1,
get_yaml_file(flow_folder), get_flow_folder(flow_folder), connections=dev_connections
)
# set line timeout to 1 second for testing
batch_engine._executor_proxy._flow_executor._line_timeout_sec = 1
# prepare input file and output dir
input_dirs = {"data": get_flow_inputs_file(flow_folder, file_name="samples.json")}
output_dir = Path(mkdtemp())
Expand Down Expand Up @@ -70,9 +59,9 @@ def test_batch_with_one_line_timeout(self, flow_folder, dev_connections):
get_flow_folder(flow_folder),
connections=dev_connections,
storage=mem_run_storage,
line_timeout_sec=60,
)
# set line timeout to 1 second for testing
batch_engine._executor_proxy._flow_executor._line_timeout_sec = 60
# prepare input file and output dir
input_dirs = {"data": get_flow_inputs_file(flow_folder, file_name="samples.json")}
output_dir = Path(mkdtemp())
Expand Down Expand Up @@ -100,22 +89,3 @@ def test_batch_with_one_line_timeout(self, flow_folder, dev_connections):
# assert mem_run_storage persists run infos correctly
assert len(mem_run_storage._flow_runs) == 3, "Flow run is not persisted in memory storage."
assert len(mem_run_storage._node_runs) == 5, "Node run is not persisted in memory storage."


class MockPythonExecutorProxy(PythonExecutorProxy):
@classmethod
def create(
cls,
flow_file: Path,
working_dir: Optional[Path] = None,
*,
connections: Optional[dict] = None,
storage: Optional[AbstractRunStorage] = None,
**kwargs,
) -> "MockPythonExecutorProxy":
line_timeout_sec = kwargs.get("line_timeout_sec", LINE_TIMEOUT_SEC)
flow_executor = FlowExecutor.create(
flow_file, connections, working_dir, storage=storage, raise_ex=False, line_timeout_sec=line_timeout_sec
)
signal.signal(signal.SIGINT, signal_handler)
return cls(flow_executor)
2 changes: 1 addition & 1 deletion src/promptflow/tests/executor/mock_execution_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def run_executor_server(port, has_error=False):
app.router.add_get("/health", _handle_health)

handle_execution_with_customization = partial(_handle_execution, has_error=has_error)
app.router.add_post("/execution", handle_execution_with_customization)
app.router.add_post("/Execution", handle_execution_with_customization)

print(f"Starting server on port {port}")
web.run_app(app, host="localhost", port=port)
Expand Down

0 comments on commit d4717b5

Please sign in to comment.