Skip to content

Commit

Permalink
[Executor] Use os._exit instead of sys.exit so that the non-daemon th…
Browse files Browse the repository at this point in the history
…read can be… (#1643)

Use os._exit to terminate async executor process to avoid non-daemon
thread created by runing sync tool in async mode stops process from
exiting.

`sys.exit`: https://docs.python.org/3/library/sys.html#sys.exit
Raise a SystemExit exception, signaling an intention to exit the
interpreter.
Specifically, it does not exit non-daemon thread

`os._exit` https://docs.python.org/3/library/os.html#os._exit
Exit the process with status n, without calling cleanup handlers,
flushing stdio buffers, etc.
Specifically, it stops process without waiting for non-daemon thread.

# Description

Please add an informative description that covers that changes made by
the pull request and link all relevant issues.

# All Promptflow Contribution checklist:
- [X] **The pull request does not introduce [breaking changes].**
- [] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [X] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [X] Title of the pull request is clear and informative.
- [X] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
guming-learning authored Jan 4, 2024
1 parent 2b56887 commit b6cdb70
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions src/promptflow/promptflow/executor/_async_nodes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
# ---------------------------------------------------------

import asyncio
import inspect
import contextvars
import inspect
import os
import signal
import sys
import threading
import time
from asyncio import Task
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Tuple

from promptflow._core.flow_execution_context import FlowExecutionContext
Expand All @@ -20,7 +20,6 @@
from promptflow.contracts.flow import Node
from promptflow.executor._dag_manager import DAGManager
from promptflow.executor._errors import NoNodeExecutedError
from concurrent.futures import ThreadPoolExecutor


class AsyncNodesScheduler:
Expand Down Expand Up @@ -96,8 +95,7 @@ def _execute_nodes(
nodes_to_bypass = dag_manager.pop_bypassable_nodes()
# Create tasks for ready nodes
return {
self._create_node_task(node, dag_manager, context, executor): node
for node in dag_manager.pop_ready_nodes()
self._create_node_task(node, dag_manager, context, executor): node for node in dag_manager.pop_ready_nodes()
}

def _create_node_task(
Expand All @@ -120,21 +118,19 @@ def _create_node_task(
@staticmethod
async def _sync_function_to_async_task(
executor: ThreadPoolExecutor,
context: FlowExecutionContext, node,
context: FlowExecutionContext,
node,
f,
kwargs,
):
return await asyncio.get_running_loop().run_in_executor(
executor, context.invoke_tool, node, f, kwargs
)
return await asyncio.get_running_loop().run_in_executor(executor, context.invoke_tool, node, f, kwargs)


def signal_handler(sig, frame):
"""
Start a thread to monitor coroutines after receiving signal.
"""
flow_logger.info(f"Received signal {sig}({signal.Signals(sig).name}),"
" start coroutine monitor thread.")
flow_logger.info(f"Received signal {sig}({signal.Signals(sig).name}), start coroutine monitor thread.")
loop = asyncio.get_running_loop()
monitor = threading.Thread(target=monitor_coroutine_after_cancellation, args=(loop,))
monitor.start()
Expand Down Expand Up @@ -167,18 +163,34 @@ def monitor_coroutine_after_cancellation(loop: asyncio.AbstractEventLoop):
all_tasks_are_done = all(task.done() for task in asyncio.all_tasks(loop))
if all_tasks_are_done:
flow_logger.info("All coroutines are done. Exiting.")
sys.exit(0)
# We cannot ensure persist_flow_run is called before the process exits in the case that there is
# non-daemon thread running, sleep for 3 seconds as a best effort.
# If the caller wants to ensure flow status is cancelled in storage, it should check the flow status
# after timeout and set the flow status to Cancelled.
time.sleep(3)
# Use os._exit instead of sys.exit, so that the process can stop without
# waiting for the thread created by run_in_executor to finish.
# sys.exit: https://docs.python.org/3/library/sys.html#sys.exit
# Raise a SystemExit exception, signaling an intention to exit the interpreter.
# Specifically, it does not exit non-daemon thread
# os._exit https://docs.python.org/3/library/os.html#os._exit
# Exit the process with status n, without calling cleanup handlers, flushing stdio buffers, etc.
# Specifically, it stops process without waiting for non-daemon thread.
os._exit(0)

exceeded_wait_seconds = time.time() - thread_start_time > max_wait_seconds
time.sleep(1)

if exceeded_wait_seconds:
if not all_tasks_are_done:
flow_logger.info(f"Not all coroutines are done within {max_wait_seconds}s"
" after cancellation. Exiting the process despite of them."
" Please config the environment variable"
" PF_WAIT_SECONDS_AFTER_CANCELLATION if your tool needs"
" more time to clean up after cancellation.")
flow_logger.info(
f"Not all coroutines are done within {max_wait_seconds}s"
" after cancellation. Exiting the process despite of them."
" Please config the environment variable"
" PF_WAIT_SECONDS_AFTER_CANCELLATION if your tool needs"
" more time to clean up after cancellation."
)
remaining_tasks = [task for task in asyncio.all_tasks(loop) if not task.done()]
flow_logger.info(f"Remaining tasks: {[task.get_name() for task in remaining_tasks]}")
sys.exit(0)
time.sleep(3)
os._exit(0)

0 comments on commit b6cdb70

Please sign in to comment.