From ffc3c61d004ae8570cd77e2cbfb16c3d22ee24c7 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Thu, 30 Oct 2025 14:54:14 +0800 Subject: [PATCH] merge workflow pasuing --- api/core/app/apps/workflow/app_generator.py | 18 +++++++++--------- api/core/app/apps/workflow/app_runner.py | 4 ++-- .../{engine_layers => layers}/suspend_layer.py | 0 .../timeslice_layer.py | 0 .../trigger_post_layer.py | 12 ++++++++---- api/tasks/async_workflow_tasks.py | 8 ++++---- 6 files changed, 23 insertions(+), 19 deletions(-) rename api/core/app/{engine_layers => layers}/suspend_layer.py (100%) rename api/core/app/{engine_layers => layers}/timeslice_layer.py (100%) rename api/core/app/{engine_layers => layers}/trigger_post_layer.py (87%) diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 9f0f788a59..46c03c061d 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -56,7 +56,7 @@ class WorkflowAppGenerator(BaseAppGenerator): call_depth: int, triggered_from: Optional[WorkflowRunTriggeredFrom] = None, root_node_id: Optional[str] = None, - layers: Optional[Sequence[GraphEngineLayer]] = None, + graph_engine_layers: Sequence[GraphEngineLayer] = (), ) -> Generator[Mapping[str, Any] | str, None, None]: ... @overload @@ -72,7 +72,7 @@ class WorkflowAppGenerator(BaseAppGenerator): call_depth: int, triggered_from: Optional[WorkflowRunTriggeredFrom] = None, root_node_id: Optional[str] = None, - layers: Optional[Sequence[GraphEngineLayer]] = None, + graph_engine_layers: Sequence[GraphEngineLayer] = (), ) -> Mapping[str, Any]: ... @overload @@ -88,7 +88,7 @@ class WorkflowAppGenerator(BaseAppGenerator): call_depth: int, triggered_from: Optional[WorkflowRunTriggeredFrom] = None, root_node_id: Optional[str] = None, - layers: Optional[Sequence[GraphEngineLayer]] = None, + graph_engine_layers: Sequence[GraphEngineLayer] = (), ) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]: ... def generate( @@ -103,7 +103,7 @@ class WorkflowAppGenerator(BaseAppGenerator): call_depth: int = 0, triggered_from: Optional[WorkflowRunTriggeredFrom] = None, root_node_id: Optional[str] = None, - layers: Optional[Sequence[GraphEngineLayer]] = None, + graph_engine_layers: Sequence[GraphEngineLayer] = (), ) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]: files: Sequence[Mapping[str, Any]] = args.get("files") or [] @@ -202,7 +202,7 @@ class WorkflowAppGenerator(BaseAppGenerator): workflow_node_execution_repository=workflow_node_execution_repository, streaming=streaming, root_node_id=root_node_id, - layers=layers, + graph_engine_layers=graph_engine_layers, ) def resume(self, *, workflow_run_id: str) -> None: @@ -224,7 +224,7 @@ class WorkflowAppGenerator(BaseAppGenerator): streaming: bool = True, variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER, root_node_id: Optional[str] = None, - layers: Optional[Sequence[GraphEngineLayer]] = None, + graph_engine_layers: Sequence[GraphEngineLayer] = (), ) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]: """ Generate App response. @@ -263,7 +263,7 @@ class WorkflowAppGenerator(BaseAppGenerator): "root_node_id": root_node_id, "workflow_execution_repository": workflow_execution_repository, "workflow_node_execution_repository": workflow_node_execution_repository, - "layers": layers, + "graph_engine_layers": graph_engine_layers, }, ) @@ -458,7 +458,7 @@ class WorkflowAppGenerator(BaseAppGenerator): workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, root_node_id: Optional[str] = None, - layers: Optional[Sequence[GraphEngineLayer]] = None, + graph_engine_layers: Sequence[GraphEngineLayer] = (), ) -> None: """ Generate worker in a new thread. @@ -503,7 +503,7 @@ class WorkflowAppGenerator(BaseAppGenerator): workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, root_node_id=root_node_id, - layers=layers, + graph_engine_layers=graph_engine_layers, ) try: diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 47dc0cf662..707d88cb60 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -41,12 +41,13 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): root_node_id: Optional[str] = None, workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, - layers: Optional[Sequence[GraphEngineLayer]] = None, + graph_engine_layers: Sequence[GraphEngineLayer] = (), ): super().__init__( queue_manager=queue_manager, variable_loader=variable_loader, app_id=application_generate_entity.app_config.app_id, + graph_engine_layers=graph_engine_layers, ) self.application_generate_entity = application_generate_entity self._workflow = workflow @@ -54,7 +55,6 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): self._root_node_id = root_node_id self._workflow_execution_repository = workflow_execution_repository self._workflow_node_execution_repository = workflow_node_execution_repository - self._layers = layers or [] def run(self): """ diff --git a/api/core/app/engine_layers/suspend_layer.py b/api/core/app/layers/suspend_layer.py similarity index 100% rename from api/core/app/engine_layers/suspend_layer.py rename to api/core/app/layers/suspend_layer.py diff --git a/api/core/app/engine_layers/timeslice_layer.py b/api/core/app/layers/timeslice_layer.py similarity index 100% rename from api/core/app/engine_layers/timeslice_layer.py rename to api/core/app/layers/timeslice_layer.py diff --git a/api/core/app/engine_layers/trigger_post_layer.py b/api/core/app/layers/trigger_post_layer.py similarity index 87% rename from api/core/app/engine_layers/trigger_post_layer.py rename to api/core/app/layers/trigger_post_layer.py index 1309295b1a..fe1a46a945 100644 --- a/api/core/app/engine_layers/trigger_post_layer.py +++ b/api/core/app/layers/trigger_post_layer.py @@ -3,12 +3,11 @@ from datetime import UTC, datetime from typing import Any, ClassVar from pydantic import TypeAdapter -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session, sessionmaker from core.workflow.graph_engine.layers.base import GraphEngineLayer from core.workflow.graph_events.base import GraphEngineEvent from core.workflow.graph_events.graph import GraphRunFailedEvent, GraphRunPausedEvent, GraphRunSucceededEvent -from models.engine import db from models.enums import WorkflowTriggerStatus from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository from tasks.workflow_cfs_scheduler.cfs_scheduler import AsyncWorkflowCFSPlanEntity @@ -32,10 +31,12 @@ class TriggerPostLayer(GraphEngineLayer): cfs_plan_scheduler_entity: AsyncWorkflowCFSPlanEntity, start_time: datetime, trigger_log_id: str, + session_maker: sessionmaker[Session], ): self.trigger_log_id = trigger_log_id self.start_time = start_time self.cfs_plan_scheduler_entity = cfs_plan_scheduler_entity + self.session_maker = session_maker def on_graph_start(self): pass @@ -45,7 +46,7 @@ class TriggerPostLayer(GraphEngineLayer): Update trigger log with success or failure. """ if isinstance(event, tuple(self._STATUS_MAP.keys())): - with Session(db.engine) as session: + with self.session_maker() as session: repo = SQLAlchemyWorkflowTriggerLogRepository(session) trigger_log = repo.get_by_id(self.trigger_log_id) if not trigger_log: @@ -62,7 +63,10 @@ class TriggerPostLayer(GraphEngineLayer): outputs = self.graph_runtime_state.outputs - workflow_run_id = outputs.get("workflow_run_id") + # BASICLY, workflow_execution_id is the same as workflow_run_id + workflow_run_id = self.graph_runtime_state.system_variable.workflow_execution_id + assert workflow_run_id, "Workflow run id is not set" + total_tokens = self.graph_runtime_state.total_tokens # Update trigger log with success diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index 07edd96bc0..a9907ac981 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -14,9 +14,9 @@ from sqlalchemy.orm import Session, sessionmaker from configs import dify_config from core.app.apps.workflow.app_generator import WorkflowAppGenerator -from core.app.engine_layers.timeslice_layer import TimeSliceLayer -from core.app.engine_layers.trigger_post_layer import TriggerPostLayer from core.app.entities.app_invoke_entities import InvokeFrom +from core.app.layers.timeslice_layer import TimeSliceLayer +from core.app.layers.trigger_post_layer import TriggerPostLayer from extensions.ext_database import db from models.account import Account from models.enums import CreatorUserRole, WorkflowTriggerStatus @@ -145,9 +145,9 @@ def _execute_workflow_common( call_depth=0, triggered_from=trigger_data.trigger_from, root_node_id=trigger_data.root_node_id, - layers=[ + graph_engine_layers=[ TimeSliceLayer(cfs_plan_scheduler), - TriggerPostLayer(cfs_plan_scheduler_entity, start_time, trigger_log.id), + TriggerPostLayer(cfs_plan_scheduler_entity, start_time, trigger_log.id, session_factory), ], )