Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: apply gevent threading patch early and ensure unique workflow node execution IDs #12196

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions api/app.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
from libs import version_utils

# preparation before creating app
version_utils.check_supported_python_version()
import os
import sys


def is_db_command():
import sys

if len(sys.argv) > 1 and sys.argv[0].endswith("flask") and sys.argv[1] == "db":
return True
return False
Expand All @@ -18,10 +14,18 @@ def is_db_command():

app = create_migrations_app()
else:
from app_factory import create_app
from libs import threadings_utils
if os.environ.get("FLASK_DEBUG", "False") != "True":
from gevent import monkey # type: ignore

# gevent
monkey.patch_all()

threadings_utils.apply_gevent_threading_patch()
from grpc.experimental import gevent as grpc_gevent # type: ignore

# grpc gevent
grpc_gevent.init_gevent()

from app_factory import create_app

app = create_app()
celery = app.extensions["celery"]
Expand Down
6 changes: 3 additions & 3 deletions api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def _handle_node_execution_start(
self, *, session: Session, workflow_run: WorkflowRun, event: QueueNodeStartedEvent
) -> WorkflowNodeExecution:
workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.id = event.node_execution_id
workflow_node_execution.id = str(uuid4())
workflow_node_execution.tenant_id = workflow_run.tenant_id
workflow_node_execution.app_id = workflow_run.app_id
workflow_node_execution.workflow_id = workflow_run.workflow_id
Expand Down Expand Up @@ -391,7 +391,7 @@ def _handle_workflow_node_execution_retried(
execution_metadata = json.dumps(merged_metadata)

workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.id = event.node_execution_id
workflow_node_execution.id = str(uuid4())
workflow_node_execution.tenant_id = workflow_run.tenant_id
workflow_node_execution.app_id = workflow_run.app_id
workflow_node_execution.workflow_id = workflow_run.workflow_id
Expand Down Expand Up @@ -824,7 +824,7 @@ def _get_workflow_run(self, *, session: Session, workflow_run_id: str) -> Workfl
return workflow_run

def _get_workflow_node_execution(self, session: Session, node_execution_id: str) -> WorkflowNodeExecution:
stmt = select(WorkflowNodeExecution).where(WorkflowNodeExecution.id == node_execution_id)
stmt = select(WorkflowNodeExecution).where(WorkflowNodeExecution.node_execution_id == node_execution_id)
workflow_node_execution = session.scalar(stmt)
if not workflow_node_execution:
raise WorkflowNodeExecutionNotFoundError(node_execution_id)
Expand Down
19 changes: 0 additions & 19 deletions api/libs/threadings_utils.py

This file was deleted.

12 changes: 0 additions & 12 deletions api/libs/version_utils.py

This file was deleted.

Loading