From d4b848272e5f60d041432334e01e487d7939de8b Mon Sep 17 00:00:00 2001 From: -LAN- Date: Tue, 31 Dec 2024 11:42:51 +0800 Subject: [PATCH] fix: apply gevent threading patch early and ensure unique workflow node execution IDs (#12196) Signed-off-by: -LAN- --- api/app.py | 22 +++++++++++-------- .../task_pipeline/workflow_cycle_manage.py | 6 ++--- api/libs/threadings_utils.py | 19 ---------------- api/libs/version_utils.py | 12 ---------- 4 files changed, 16 insertions(+), 43 deletions(-) delete mode 100644 api/libs/threadings_utils.py delete mode 100644 api/libs/version_utils.py diff --git a/api/app.py b/api/app.py index c6a08290804a65..e6649e59df70b5 100644 --- a/api/app.py +++ b/api/app.py @@ -1,12 +1,8 @@ -from libs import version_utils - -# preparation before creating app -version_utils.check_supported_python_version() +import os +import sys def is_db_command(): - import sys - if len(sys.argv) > 1 and sys.argv[0].endswith("flask") and sys.argv[1] == "db": return True return False @@ -18,10 +14,18 @@ def is_db_command(): app = create_migrations_app() else: - from app_factory import create_app - from libs import threadings_utils + if os.environ.get("FLASK_DEBUG", "False") != "True": + from gevent import monkey # type: ignore + + # gevent + monkey.patch_all() - threadings_utils.apply_gevent_threading_patch() + from grpc.experimental import gevent as grpc_gevent # type: ignore + + # grpc gevent + grpc_gevent.init_gevent() + + from app_factory import create_app app = create_app() celery = app.extensions["celery"] diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index 7215105a5652da..e21475271a03d7 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -274,7 +274,7 @@ def _handle_node_execution_start( self, *, session: Session, workflow_run: WorkflowRun, event: QueueNodeStartedEvent ) -> WorkflowNodeExecution: workflow_node_execution = WorkflowNodeExecution() - workflow_node_execution.id = event.node_execution_id + workflow_node_execution.id = str(uuid4()) workflow_node_execution.tenant_id = workflow_run.tenant_id workflow_node_execution.app_id = workflow_run.app_id workflow_node_execution.workflow_id = workflow_run.workflow_id @@ -391,7 +391,7 @@ def _handle_workflow_node_execution_retried( execution_metadata = json.dumps(merged_metadata) workflow_node_execution = WorkflowNodeExecution() - workflow_node_execution.id = event.node_execution_id + workflow_node_execution.id = str(uuid4()) workflow_node_execution.tenant_id = workflow_run.tenant_id workflow_node_execution.app_id = workflow_run.app_id workflow_node_execution.workflow_id = workflow_run.workflow_id @@ -824,7 +824,7 @@ def _get_workflow_run(self, *, session: Session, workflow_run_id: str) -> Workfl return workflow_run def _get_workflow_node_execution(self, session: Session, node_execution_id: str) -> WorkflowNodeExecution: - stmt = select(WorkflowNodeExecution).where(WorkflowNodeExecution.id == node_execution_id) + stmt = select(WorkflowNodeExecution).where(WorkflowNodeExecution.node_execution_id == node_execution_id) workflow_node_execution = session.scalar(stmt) if not workflow_node_execution: raise WorkflowNodeExecutionNotFoundError(node_execution_id) diff --git a/api/libs/threadings_utils.py b/api/libs/threadings_utils.py deleted file mode 100644 index e4d63fd3142ce2..00000000000000 --- a/api/libs/threadings_utils.py +++ /dev/null @@ -1,19 +0,0 @@ -from configs import dify_config - - -def apply_gevent_threading_patch(): - """ - Run threading patch by gevent - to make standard library threading compatible. - Patching should be done as early as possible in the lifecycle of the program. - :return: - """ - if not dify_config.DEBUG: - from gevent import monkey # type: ignore - from grpc.experimental import gevent as grpc_gevent # type: ignore - - # gevent - monkey.patch_all() - - # grpc gevent - grpc_gevent.init_gevent() diff --git a/api/libs/version_utils.py b/api/libs/version_utils.py deleted file mode 100644 index 10edf8a058abf7..00000000000000 --- a/api/libs/version_utils.py +++ /dev/null @@ -1,12 +0,0 @@ -import sys - - -def check_supported_python_version(): - python_version = sys.version_info - if not ((3, 11) <= python_version < (3, 13)): - print( - "Aborted to launch the service " - f" with unsupported Python version {python_version.major}.{python_version.minor}." - " Please ensure Python 3.11 or 3.12." - ) - raise SystemExit(1)