From 3e8d49a3933f715103b6f34779f233bb4cdba4fc Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Wed, 24 Dec 2025 10:04:47 +0800 Subject: [PATCH] feat(api): omit WorkflowPersistenceLayer while single stepping container nodes --- .../app/apps/advanced_chat/app_generator.py | 2 + api/core/app/apps/advanced_chat/app_runner.py | 33 ++-- api/core/app/apps/pipeline/pipeline_runner.py | 28 ++-- api/core/app/apps/workflow/app_runner.py | 32 ++-- api/core/app/entities/app_invoke_entities.py | 6 + ...rkflow_app_runner_single_node_execution.py | 155 +++++++++++++++++- 6 files changed, 211 insertions(+), 45 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index feb0d3358c..3305d10eca 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -256,6 +256,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): single_iteration_run=AdvancedChatAppGenerateEntity.SingleIterationRunEntity( node_id=node_id, inputs=args["inputs"] ), + workflow_run_id=str(uuid.uuid4()), ) contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) @@ -339,6 +340,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): invoke_from=InvokeFrom.DEBUGGER, extras={"auto_generate_conversation_name": False}, single_loop_run=AdvancedChatAppGenerateEntity.SingleLoopRunEntity(node_id=node_id, inputs=args["inputs"]), + workflow_run_id=str(uuid.uuid4()), ) contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index aee14e22b3..4a06375f9c 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -26,7 +26,10 @@ from core.variables.variables import VariableUnion from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.layers.base import GraphEngineLayer -from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer +from core.workflow.graph_engine.layers.persistence import ( + PersistenceWorkflowInfo, + WorkflowPersistenceLayer, +) from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.runtime import GraphRuntimeState, VariablePool @@ -187,20 +190,20 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): self._queue_manager.graph_runtime_state = graph_runtime_state - persistence_layer = WorkflowPersistenceLayer( - application_generate_entity=self.application_generate_entity, - workflow_info=PersistenceWorkflowInfo( - workflow_id=self._workflow.id, - workflow_type=WorkflowType(self._workflow.type), - version=self._workflow.version, - graph_data=self._workflow.graph_dict, - ), - workflow_execution_repository=self._workflow_execution_repository, - workflow_node_execution_repository=self._workflow_node_execution_repository, - trace_manager=self.application_generate_entity.trace_manager, - ) - - workflow_entry.graph_engine.layer(persistence_layer) + if not self.application_generate_entity.is_single_stepping_container_nodes(): + persistence_layer = WorkflowPersistenceLayer( + application_generate_entity=self.application_generate_entity, + workflow_info=PersistenceWorkflowInfo( + workflow_id=self._workflow.id, + workflow_type=WorkflowType(self._workflow.type), + version=self._workflow.version, + graph_data=self._workflow.graph_dict, + ), + workflow_execution_repository=self._workflow_execution_repository, + workflow_node_execution_repository=self._workflow_node_execution_repository, + trace_manager=self.application_generate_entity.trace_manager, + ) + workflow_entry.graph_engine.layer(persistence_layer) for layer in self._graph_engine_layers: workflow_entry.graph_engine.layer(layer) diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index c3f0931c99..7fefe879ae 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -172,21 +172,21 @@ class PipelineRunner(WorkflowBasedAppRunner): ) self._queue_manager.graph_runtime_state = graph_runtime_state + if not self.application_generate_entity.is_single_stepping_container_nodes(): + persistence_layer = WorkflowPersistenceLayer( + application_generate_entity=self.application_generate_entity, + workflow_info=PersistenceWorkflowInfo( + workflow_id=workflow.id, + workflow_type=WorkflowType(workflow.type), + version=workflow.version, + graph_data=workflow.graph_dict, + ), + workflow_execution_repository=self._workflow_execution_repository, + workflow_node_execution_repository=self._workflow_node_execution_repository, + trace_manager=self.application_generate_entity.trace_manager, + ) - persistence_layer = WorkflowPersistenceLayer( - application_generate_entity=self.application_generate_entity, - workflow_info=PersistenceWorkflowInfo( - workflow_id=workflow.id, - workflow_type=WorkflowType(workflow.type), - version=workflow.version, - graph_data=workflow.graph_dict, - ), - workflow_execution_repository=self._workflow_execution_repository, - workflow_node_execution_repository=self._workflow_node_execution_repository, - trace_manager=self.application_generate_entity.trace_manager, - ) - - workflow_entry.graph_engine.layer(persistence_layer) + workflow_entry.graph_engine.layer(persistence_layer) generator = workflow_entry.run() diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 4217056078..8ff7381f90 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -10,7 +10,10 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.layers.base import GraphEngineLayer -from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer +from core.workflow.graph_engine.layers.persistence import ( + PersistenceWorkflowInfo, + WorkflowPersistenceLayer, +) from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.runtime import GraphRuntimeState, VariablePool @@ -133,20 +136,21 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): command_channel=command_channel, ) - persistence_layer = WorkflowPersistenceLayer( - application_generate_entity=self.application_generate_entity, - workflow_info=PersistenceWorkflowInfo( - workflow_id=self._workflow.id, - workflow_type=WorkflowType(self._workflow.type), - version=self._workflow.version, - graph_data=self._workflow.graph_dict, - ), - workflow_execution_repository=self._workflow_execution_repository, - workflow_node_execution_repository=self._workflow_node_execution_repository, - trace_manager=self.application_generate_entity.trace_manager, - ) + if not self.application_generate_entity.is_single_stepping_container_nodes(): + persistence_layer = WorkflowPersistenceLayer( + application_generate_entity=self.application_generate_entity, + workflow_info=PersistenceWorkflowInfo( + workflow_id=self._workflow.id, + workflow_type=WorkflowType(self._workflow.type), + version=self._workflow.version, + graph_data=self._workflow.graph_dict, + ), + workflow_execution_repository=self._workflow_execution_repository, + workflow_node_execution_repository=self._workflow_node_execution_repository, + trace_manager=self.application_generate_entity.trace_manager, + ) - workflow_entry.graph_engine.layer(persistence_layer) + workflow_entry.graph_engine.layer(persistence_layer) for layer in self._graph_engine_layers: workflow_entry.graph_engine.layer(layer) diff --git a/api/core/app/entities/app_invoke_entities.py b/api/core/app/entities/app_invoke_entities.py index 0cb573cb86..6693bd995b 100644 --- a/api/core/app/entities/app_invoke_entities.py +++ b/api/core/app/entities/app_invoke_entities.py @@ -228,6 +228,9 @@ class AdvancedChatAppGenerateEntity(ConversationAppGenerateEntity): single_loop_run: SingleLoopRunEntity | None = None + def is_single_stepping_container_nodes(self) -> bool: + return self.single_iteration_run is not None or self.single_loop_run is not None + class WorkflowAppGenerateEntity(AppGenerateEntity): """ @@ -258,6 +261,9 @@ class WorkflowAppGenerateEntity(AppGenerateEntity): single_loop_run: SingleLoopRunEntity | None = None + def is_single_stepping_container_nodes(self) -> bool: + return self.single_iteration_run is not None or self.single_loop_run is not None + class RagPipelineGenerateEntity(WorkflowAppGenerateEntity): """ diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py index de08a6fae2..f00f65e0cf 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py @@ -1,8 +1,10 @@ from types import SimpleNamespace from unittest.mock import MagicMock -from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner -from core.workflow.enums import NodeType, SystemVariableKey +import core.app.apps.workflow.app_runner as workflow_app_runner +from core.app.apps.workflow.app_runner import WorkflowAppRunner, WorkflowBasedAppRunner +from core.app.entities.app_invoke_entities import InvokeFrom +from core.workflow.enums import NodeType, SystemVariableKey, WorkflowType from core.workflow.system_variable import SystemVariable @@ -92,3 +94,152 @@ def test_prepare_single_loop_injects_system_variables_and_fake_workflow(): assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id assert graph.root_node.id == f"{node_id}_single_step_start" assert f"{node_id}_single_step_end" in graph.nodes + + +class DummyCommandChannel: + def fetch_commands(self): + return [] + + def send_command(self, command): + return None + + +def _empty_graph_engine_run(self): + if False: # pragma: no cover + yield None + + +def _build_generate_entity(*, single_iteration_run=None, single_loop_run=None): + if isinstance(single_iteration_run, dict): + single_iteration_run = SimpleNamespace(**single_iteration_run) + if isinstance(single_loop_run, dict): + single_loop_run = SimpleNamespace(**single_loop_run) + + base = SimpleNamespace( + app_config=SimpleNamespace(app_id="app-id", workflow_id="workflow-id"), + workflow_execution_id="workflow-exec-id", + files=[], + user_id="user-id", + inputs={}, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + task_id="task-id", + trace_manager=None, + single_iteration_run=single_iteration_run, + single_loop_run=single_loop_run, + ) + + def is_single_stepping_container_nodes(): + return base.single_iteration_run is not None or base.single_loop_run is not None + + base.is_single_stepping_container_nodes = is_single_stepping_container_nodes # type: ignore[attr-defined] + return base + + +def test_workflow_runner_attaches_persistence_for_full_run(monkeypatch): + from core.workflow.graph_engine.graph_engine import GraphEngine + + monkeypatch.setattr(GraphEngine, "run", _empty_graph_engine_run) + persistence_ctor = MagicMock(name="persistence_layer_ctor") + monkeypatch.setattr(workflow_app_runner, "WorkflowPersistenceLayer", persistence_ctor) + monkeypatch.setattr(workflow_app_runner, "RedisChannel", lambda *args, **kwargs: DummyCommandChannel()) + + queue_manager = MagicMock() + workflow = SimpleNamespace( + id="workflow-id", + tenant_id="tenant-id", + app_id="app-id", + type=WorkflowType.WORKFLOW, + version="1", + graph_dict={ + "nodes": [ + { + "id": "start", + "type": "custom", + "data": {"type": NodeType.START, "title": "Start", "version": "1", "variables": []}, + }, + { + "id": "end", + "type": "custom", + "data": {"type": NodeType.END, "title": "End", "version": "1", "outputs": []}, + }, + ], + "edges": [ + {"source": "start", "target": "end", "sourceHandle": "source", "targetHandle": "target"}, + ], + }, + environment_variables=[], + ) + generate_entity = _build_generate_entity() + generate_entity.inputs = {"input": "value"} + + runner = WorkflowAppRunner( + application_generate_entity=generate_entity, + queue_manager=queue_manager, + variable_loader=MagicMock(), + workflow=workflow, + system_user_id="system-user-id", + root_node_id=None, + workflow_execution_repository=MagicMock(), + workflow_node_execution_repository=MagicMock(), + graph_engine_layers=(), + ) + + runner.run() + + assert persistence_ctor.call_count == 1 + + +def test_workflow_runner_skips_persistence_for_single_step(monkeypatch): + from core.workflow.graph_engine.graph_engine import GraphEngine + + monkeypatch.setattr(GraphEngine, "run", _empty_graph_engine_run) + persistence_ctor = MagicMock(name="persistence_layer_ctor") + monkeypatch.setattr(workflow_app_runner, "WorkflowPersistenceLayer", persistence_ctor) + monkeypatch.setattr(workflow_app_runner, "RedisChannel", lambda *args, **kwargs: DummyCommandChannel()) + + queue_manager = MagicMock() + workflow = SimpleNamespace( + id="workflow-id", + tenant_id="tenant-id", + app_id="app-id", + type=WorkflowType.WORKFLOW, + version="1", + graph_dict={ + "nodes": [ + { + "id": "loop", + "type": "custom", + "data": { + "type": NodeType.LOOP, + "title": "Loop", + "version": "1", + "loop_count": 1, + "break_conditions": [], + "logical_operator": "and", + "loop_variables": [], + "outputs": {}, + }, + } + ], + "edges": [], + }, + environment_variables=[], + ) + generate_entity = _build_generate_entity(single_loop_run={"node_id": "loop", "inputs": {}}) + + runner = WorkflowAppRunner( + application_generate_entity=generate_entity, + queue_manager=queue_manager, + variable_loader=MagicMock(), + workflow=workflow, + system_user_id="system-user-id", + root_node_id=None, + workflow_execution_repository=MagicMock(), + workflow_node_execution_repository=MagicMock(), + graph_engine_layers=(), + ) + + runner.run() + + assert persistence_ctor.call_count == 0