From bd1f9bb7353ba68c1f9b5019488ca79094b7107b Mon Sep 17 00:00:00 2001 From: -LAN- Date: Tue, 16 Dec 2025 22:56:32 +0800 Subject: [PATCH 1/3] fix: cannot run single step on iteration node --- api/core/app/apps/advanced_chat/app_runner.py | 1 + api/core/app/apps/pipeline/pipeline_runner.py | 33 +++---- api/core/app/apps/workflow/app_runner.py | 1 + api/core/app/apps/workflow_app_runner.py | 71 ++++++++++++-- .../nodes/iteration/iteration_node.py | 31 ++++++ ...rkflow_app_runner_single_node_execution.py | 94 +++++++++++++++++++ .../iteration_node_empty_iteration_spec.py | 51 ++++++++++ .../nodes/_base/hooks/use-one-step-run.ts | 16 +++- 8 files changed, 269 insertions(+), 29 deletions(-) create mode 100644 api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/iteration/iteration_node_empty_iteration_spec.py diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index ee092e55c5..aee14e22b3 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -109,6 +109,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): workflow=self._workflow, single_iteration_run=self.application_generate_entity.single_iteration_run, single_loop_run=self.application_generate_entity.single_loop_run, + system_variables=system_inputs, ) else: inputs = self.application_generate_entity.inputs diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index 4be9e01fbf..c3f0931c99 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -92,6 +92,22 @@ class PipelineRunner(WorkflowBasedAppRunner): db.session.close() + files = self.application_generate_entity.files + system_inputs = SystemVariable( + files=files, + user_id=user_id, + app_id=app_config.app_id, + workflow_id=app_config.workflow_id, + workflow_execution_id=self.application_generate_entity.workflow_execution_id, + document_id=self.application_generate_entity.document_id, + original_document_id=self.application_generate_entity.original_document_id, + batch=self.application_generate_entity.batch, + dataset_id=self.application_generate_entity.dataset_id, + datasource_type=self.application_generate_entity.datasource_type, + datasource_info=self.application_generate_entity.datasource_info, + invoke_from=self.application_generate_entity.invoke_from.value, + ) + # if only single iteration run is requested if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run: # Handle single iteration or single loop run @@ -99,27 +115,12 @@ class PipelineRunner(WorkflowBasedAppRunner): workflow=workflow, single_iteration_run=self.application_generate_entity.single_iteration_run, single_loop_run=self.application_generate_entity.single_loop_run, + system_variables=system_inputs, ) else: inputs = self.application_generate_entity.inputs - files = self.application_generate_entity.files # Create a variable pool. - system_inputs = SystemVariable( - files=files, - user_id=user_id, - app_id=app_config.app_id, - workflow_id=app_config.workflow_id, - workflow_execution_id=self.application_generate_entity.workflow_execution_id, - document_id=self.application_generate_entity.document_id, - original_document_id=self.application_generate_entity.original_document_id, - batch=self.application_generate_entity.batch, - dataset_id=self.application_generate_entity.dataset_id, - datasource_type=self.application_generate_entity.datasource_type, - datasource_info=self.application_generate_entity.datasource_info, - invoke_from=self.application_generate_entity.invoke_from.value, - ) - rag_pipeline_variables = [] if workflow.rag_pipeline_variables: for v in workflow.rag_pipeline_variables: diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 894e6f397a..4217056078 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -80,6 +80,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): workflow=self._workflow, single_iteration_run=self.application_generate_entity.single_iteration_run, single_loop_run=self.application_generate_entity.single_loop_run, + system_variables=system_inputs, ) else: inputs = self.application_generate_entity.inputs diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 0e125b3538..5ec6896460 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -130,6 +130,8 @@ class WorkflowBasedAppRunner: workflow: Workflow, single_iteration_run: Any | None = None, single_loop_run: Any | None = None, + *, + system_variables: SystemVariable | None = None, ) -> tuple[Graph, VariablePool, GraphRuntimeState]: """ Prepare graph, variable pool, and runtime state for single node execution @@ -147,9 +149,10 @@ class WorkflowBasedAppRunner: ValueError: If neither single_iteration_run nor single_loop_run is specified """ # Create initial runtime state with variable pool containing environment variables + system_variables = system_variables or SystemVariable.empty() graph_runtime_state = GraphRuntimeState( variable_pool=VariablePool( - system_variables=SystemVariable.empty(), + system_variables=system_variables, user_inputs={}, environment_variables=workflow.environment_variables, ), @@ -220,7 +223,7 @@ class WorkflowBasedAppRunner: # filter nodes only in the specified node type (iteration or loop) main_node_config = next((n for n in graph_config.get("nodes", []) if n.get("id") == node_id), None) start_node_id = main_node_config.get("data", {}).get("start_node_id") if main_node_config else None - node_configs = [ + base_node_configs = [ node for node in graph_config.get("nodes", []) if node.get("id") == node_id @@ -228,26 +231,74 @@ class WorkflowBasedAppRunner: or (start_node_id and node.get("id") == start_node_id) ] - graph_config["nodes"] = node_configs + # Build a base graph config (without synthetic entry) to keep node-level context minimal. + base_graph_config = graph_config.copy() + base_graph_config["nodes"] = base_node_configs - node_ids = [node.get("id") for node in node_configs] + node_ids = [node.get("id") for node in base_node_configs if isinstance(node.get("id"), str)] # filter edges only in the specified node type - edge_configs = [ + base_edge_configs = [ edge for edge in graph_config.get("edges", []) if (edge.get("source") is None or edge.get("source") in node_ids) and (edge.get("target") is None or edge.get("target") in node_ids) ] - graph_config["edges"] = edge_configs + base_graph_config["edges"] = base_edge_configs + + # Inject a synthetic start node so Graph validation accepts the single-node graph + # (loop/iteration nodes are containers and cannot serve as graph roots). + synthetic_start_node_id = f"{node_id}_single_step_start" + synthetic_start_node = { + "id": synthetic_start_node_id, + "type": "custom", + "data": { + "type": NodeType.START, + "title": "Start", + "desc": "Synthetic start for single-step run", + "version": "1", + "variables": [], + }, + } + + synthetic_end_node_id = f"{node_id}_single_step_end" + synthetic_end_node = { + "id": synthetic_end_node_id, + "type": "custom", + "data": { + "type": NodeType.END, + "title": "End", + "desc": "Synthetic end for single-step run", + "version": "1", + "outputs": [], + }, + } + + graph_config_with_entry = base_graph_config.copy() + graph_config_with_entry["nodes"] = [*base_node_configs, synthetic_start_node, synthetic_end_node] + graph_config_with_entry["edges"] = [ + *base_edge_configs, + { + "source": synthetic_start_node_id, + "target": node_id, + "sourceHandle": "source", + "targetHandle": "target", + }, + { + "source": node_id, + "target": synthetic_end_node_id, + "sourceHandle": "source", + "targetHandle": "target", + }, + ] # Create required parameters for Graph.init graph_init_params = GraphInitParams( tenant_id=workflow.tenant_id, app_id=self._app_id, workflow_id=workflow.id, - graph_config=graph_config, + graph_config=base_graph_config, user_id="", user_from=UserFrom.ACCOUNT, invoke_from=InvokeFrom.SERVICE_API, @@ -260,14 +311,16 @@ class WorkflowBasedAppRunner: ) # init graph - graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=node_id) + graph = Graph.init( + graph_config=graph_config_with_entry, node_factory=node_factory, root_node_id=synthetic_start_node_id + ) if not graph: raise ValueError("graph not found in workflow") # fetch node config from node id target_node_config = None - for node in node_configs: + for node in base_node_configs: if node.get("id") == node_id: target_node_config = node break diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index e5d86414c1..6f537026f0 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -149,18 +149,49 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): return isinstance(variable, NoneSegment) or len(variable.value) == 0 def _handle_empty_iteration(self, variable: ArraySegment | NoneSegment) -> Generator[NodeEventBase, None, None]: + started_at = naive_utc_now() + inputs = {"iterator_selector": []} + usage = LLMUsage.empty_usage() + + yield IterationStartedEvent( + start_at=started_at, + inputs=inputs, + metadata={"iteration_length": 0}, + ) + # Try our best to preserve the type information. if isinstance(variable, ArraySegment): output = variable.model_copy(update={"value": []}) else: output = ArrayAnySegment(value=[]) + yield IterationSucceededEvent( + start_at=started_at, + inputs=inputs, + outputs={"output": []}, + steps=0, + metadata={ + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price, + WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency, + WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: {}, + }, + ) + yield StreamCompletedEvent( node_run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, # TODO(QuantumGhost): is it possible to compute the type of `output` # from graph definition? outputs={"output": output}, + inputs=inputs, + metadata={ + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price, + WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency, + WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: {}, + }, + llm_usage=usage, ) ) diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py new file mode 100644 index 0000000000..de08a6fae2 --- /dev/null +++ b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py @@ -0,0 +1,94 @@ +from types import SimpleNamespace +from unittest.mock import MagicMock + +from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner +from core.workflow.enums import NodeType, SystemVariableKey +from core.workflow.system_variable import SystemVariable + + +def test_prepare_single_iteration_injects_system_variables_and_fake_workflow(): + node_id = "iteration_node" + execution_id = "workflow-exec-123" + + workflow = SimpleNamespace( + id="workflow-id", + tenant_id="tenant-id", + app_id="app-id", + environment_variables=[], + graph_dict={ + "nodes": [ + { + "id": node_id, + "type": "custom", + "data": { + "type": NodeType.ITERATION, + "title": "Iteration", + "version": "1", + "iterator_selector": ["start", "items"], + "output_selector": [node_id, "output"], + }, + } + ], + "edges": [], + }, + ) + + runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id") + + system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id) + + graph, _, runtime_state = runner._prepare_single_node_execution( + workflow=workflow, + single_iteration_run=SimpleNamespace(node_id=node_id, inputs={"input_selector": [1, 2, 3]}), + system_variables=system_inputs, + ) + + assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id + assert runtime_state.variable_pool.get_by_prefix("sys")[SystemVariableKey.WORKFLOW_EXECUTION_ID] == execution_id + assert graph.root_node.id == f"{node_id}_single_step_start" + assert f"{node_id}_single_step_end" in graph.nodes + + +def test_prepare_single_loop_injects_system_variables_and_fake_workflow(): + node_id = "loop_node" + execution_id = "workflow-exec-456" + + workflow = SimpleNamespace( + id="workflow-id", + tenant_id="tenant-id", + app_id="app-id", + environment_variables=[], + graph_dict={ + "nodes": [ + { + "id": node_id, + "type": "custom", + "data": { + "type": NodeType.LOOP, + "title": "Loop", + "version": "1", + "loop_count": 1, + "break_conditions": [], + "logical_operator": "and", + "loop_variables": [], + "outputs": {}, + }, + } + ], + "edges": [], + }, + ) + + runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id") + + system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id) + + graph, _, runtime_state = runner._prepare_single_node_execution( + workflow=workflow, + single_loop_run=SimpleNamespace(node_id=node_id, inputs={}), + system_variables=system_inputs, + ) + + assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id + assert graph.root_node.id == f"{node_id}_single_step_start" + assert f"{node_id}_single_step_end" in graph.nodes diff --git a/api/tests/unit_tests/core/workflow/nodes/iteration/iteration_node_empty_iteration_spec.py b/api/tests/unit_tests/core/workflow/nodes/iteration/iteration_node_empty_iteration_spec.py new file mode 100644 index 0000000000..af3a4b6511 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/iteration/iteration_node_empty_iteration_spec.py @@ -0,0 +1,51 @@ +from core.workflow.entities import GraphInitParams +from core.workflow.graph_events import ( + NodeRunIterationStartedEvent, + NodeRunIterationSucceededEvent, + NodeRunSucceededEvent, +) +from core.workflow.nodes.iteration.iteration_node import IterationNode +from core.workflow.runtime import GraphRuntimeState, VariablePool +from core.workflow.system_variable import SystemVariable + + +def test_iteration_node_emits_iteration_events_when_iterator_empty(): + init_params = GraphInitParams( + tenant_id="tenant", + app_id="app", + workflow_id="workflow", + graph_config={}, + user_id="user", + user_from="account", + invoke_from="debugger", + call_depth=0, + ) + runtime_state = GraphRuntimeState( + variable_pool=VariablePool(system_variables=SystemVariable.empty(), user_inputs={}), + start_at=0.0, + ) + runtime_state.variable_pool.add(("start", "items"), []) + + node = IterationNode( + id="iteration-node", + config={ + "id": "iteration-node", + "data": { + "title": "Iteration", + "iterator_selector": ["start", "items"], + "output_selector": ["iteration-node", "output"], + }, + }, + graph_init_params=init_params, + graph_runtime_state=runtime_state, + ) + + events = list(node.run()) + + assert any(isinstance(event, NodeRunIterationStartedEvent) for event in events) + + iteration_succeeded_event = next(event for event in events if isinstance(event, NodeRunIterationSucceededEvent)) + assert iteration_succeeded_event.steps == 0 + assert iteration_succeeded_event.outputs == {"output": []} + + assert any(isinstance(event, NodeRunSucceededEvent) for event in events) diff --git a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts index dad62ae2a4..608d866204 100644 --- a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts +++ b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts @@ -792,8 +792,12 @@ const useOneStepRun = ({ }, }) const { data: iterationData } = params - _runResult.created_by = iterationData.created_by.name - setRunResult(_runResult) + const nextRunResult = { + ..._runResult, + created_by: (iterationData.created_by as any)?.name || '', + } + _runResult = nextRunResult + setRunResult(nextRunResult) }, onIterationStart: (params) => { const newIterationRunResult = produce(_iterationResult, (draft) => { @@ -895,8 +899,12 @@ const useOneStepRun = ({ }, }) const { data: loopData } = params - _runResult.created_by = loopData.created_by.name - setRunResult(_runResult) + const nextRunResult = { + ..._runResult, + created_by: (loopData.created_by as any)?.name || '', + } + _runResult = nextRunResult + setRunResult(nextRunResult) }, onLoopStart: (params) => { const newLoopRunResult = produce(_loopResult, (draft) => { From aff9853156e3863fe39a38de70512ca10b010d10 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 17 Dec 2025 12:58:12 +0800 Subject: [PATCH 2/3] Add type hint for empty iteration inputs --- api/core/workflow/nodes/iteration/iteration_node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 6f537026f0..1d77819286 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -150,7 +150,7 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): def _handle_empty_iteration(self, variable: ArraySegment | NoneSegment) -> Generator[NodeEventBase, None, None]: started_at = naive_utc_now() - inputs = {"iterator_selector": []} + inputs: dict[str, object] = {"iterator_selector": []} usage = LLMUsage.empty_usage() yield IterationStartedEvent( From 3e8d49a3933f715103b6f34779f233bb4cdba4fc Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Wed, 24 Dec 2025 10:04:47 +0800 Subject: [PATCH 3/3] feat(api): omit WorkflowPersistenceLayer while single stepping container nodes --- .../app/apps/advanced_chat/app_generator.py | 2 + api/core/app/apps/advanced_chat/app_runner.py | 33 ++-- api/core/app/apps/pipeline/pipeline_runner.py | 28 ++-- api/core/app/apps/workflow/app_runner.py | 32 ++-- api/core/app/entities/app_invoke_entities.py | 6 + ...rkflow_app_runner_single_node_execution.py | 155 +++++++++++++++++- 6 files changed, 211 insertions(+), 45 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index feb0d3358c..3305d10eca 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -256,6 +256,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): single_iteration_run=AdvancedChatAppGenerateEntity.SingleIterationRunEntity( node_id=node_id, inputs=args["inputs"] ), + workflow_run_id=str(uuid.uuid4()), ) contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) @@ -339,6 +340,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): invoke_from=InvokeFrom.DEBUGGER, extras={"auto_generate_conversation_name": False}, single_loop_run=AdvancedChatAppGenerateEntity.SingleLoopRunEntity(node_id=node_id, inputs=args["inputs"]), + workflow_run_id=str(uuid.uuid4()), ) contexts.plugin_tool_providers.set({}) contexts.plugin_tool_providers_lock.set(threading.Lock()) diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index aee14e22b3..4a06375f9c 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -26,7 +26,10 @@ from core.variables.variables import VariableUnion from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.layers.base import GraphEngineLayer -from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer +from core.workflow.graph_engine.layers.persistence import ( + PersistenceWorkflowInfo, + WorkflowPersistenceLayer, +) from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.runtime import GraphRuntimeState, VariablePool @@ -187,20 +190,20 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): self._queue_manager.graph_runtime_state = graph_runtime_state - persistence_layer = WorkflowPersistenceLayer( - application_generate_entity=self.application_generate_entity, - workflow_info=PersistenceWorkflowInfo( - workflow_id=self._workflow.id, - workflow_type=WorkflowType(self._workflow.type), - version=self._workflow.version, - graph_data=self._workflow.graph_dict, - ), - workflow_execution_repository=self._workflow_execution_repository, - workflow_node_execution_repository=self._workflow_node_execution_repository, - trace_manager=self.application_generate_entity.trace_manager, - ) - - workflow_entry.graph_engine.layer(persistence_layer) + if not self.application_generate_entity.is_single_stepping_container_nodes(): + persistence_layer = WorkflowPersistenceLayer( + application_generate_entity=self.application_generate_entity, + workflow_info=PersistenceWorkflowInfo( + workflow_id=self._workflow.id, + workflow_type=WorkflowType(self._workflow.type), + version=self._workflow.version, + graph_data=self._workflow.graph_dict, + ), + workflow_execution_repository=self._workflow_execution_repository, + workflow_node_execution_repository=self._workflow_node_execution_repository, + trace_manager=self.application_generate_entity.trace_manager, + ) + workflow_entry.graph_engine.layer(persistence_layer) for layer in self._graph_engine_layers: workflow_entry.graph_engine.layer(layer) diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index c3f0931c99..7fefe879ae 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -172,21 +172,21 @@ class PipelineRunner(WorkflowBasedAppRunner): ) self._queue_manager.graph_runtime_state = graph_runtime_state + if not self.application_generate_entity.is_single_stepping_container_nodes(): + persistence_layer = WorkflowPersistenceLayer( + application_generate_entity=self.application_generate_entity, + workflow_info=PersistenceWorkflowInfo( + workflow_id=workflow.id, + workflow_type=WorkflowType(workflow.type), + version=workflow.version, + graph_data=workflow.graph_dict, + ), + workflow_execution_repository=self._workflow_execution_repository, + workflow_node_execution_repository=self._workflow_node_execution_repository, + trace_manager=self.application_generate_entity.trace_manager, + ) - persistence_layer = WorkflowPersistenceLayer( - application_generate_entity=self.application_generate_entity, - workflow_info=PersistenceWorkflowInfo( - workflow_id=workflow.id, - workflow_type=WorkflowType(workflow.type), - version=workflow.version, - graph_data=workflow.graph_dict, - ), - workflow_execution_repository=self._workflow_execution_repository, - workflow_node_execution_repository=self._workflow_node_execution_repository, - trace_manager=self.application_generate_entity.trace_manager, - ) - - workflow_entry.graph_engine.layer(persistence_layer) + workflow_entry.graph_engine.layer(persistence_layer) generator = workflow_entry.run() diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 4217056078..8ff7381f90 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -10,7 +10,10 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.layers.base import GraphEngineLayer -from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer +from core.workflow.graph_engine.layers.persistence import ( + PersistenceWorkflowInfo, + WorkflowPersistenceLayer, +) from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.runtime import GraphRuntimeState, VariablePool @@ -133,20 +136,21 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): command_channel=command_channel, ) - persistence_layer = WorkflowPersistenceLayer( - application_generate_entity=self.application_generate_entity, - workflow_info=PersistenceWorkflowInfo( - workflow_id=self._workflow.id, - workflow_type=WorkflowType(self._workflow.type), - version=self._workflow.version, - graph_data=self._workflow.graph_dict, - ), - workflow_execution_repository=self._workflow_execution_repository, - workflow_node_execution_repository=self._workflow_node_execution_repository, - trace_manager=self.application_generate_entity.trace_manager, - ) + if not self.application_generate_entity.is_single_stepping_container_nodes(): + persistence_layer = WorkflowPersistenceLayer( + application_generate_entity=self.application_generate_entity, + workflow_info=PersistenceWorkflowInfo( + workflow_id=self._workflow.id, + workflow_type=WorkflowType(self._workflow.type), + version=self._workflow.version, + graph_data=self._workflow.graph_dict, + ), + workflow_execution_repository=self._workflow_execution_repository, + workflow_node_execution_repository=self._workflow_node_execution_repository, + trace_manager=self.application_generate_entity.trace_manager, + ) - workflow_entry.graph_engine.layer(persistence_layer) + workflow_entry.graph_engine.layer(persistence_layer) for layer in self._graph_engine_layers: workflow_entry.graph_engine.layer(layer) diff --git a/api/core/app/entities/app_invoke_entities.py b/api/core/app/entities/app_invoke_entities.py index 0cb573cb86..6693bd995b 100644 --- a/api/core/app/entities/app_invoke_entities.py +++ b/api/core/app/entities/app_invoke_entities.py @@ -228,6 +228,9 @@ class AdvancedChatAppGenerateEntity(ConversationAppGenerateEntity): single_loop_run: SingleLoopRunEntity | None = None + def is_single_stepping_container_nodes(self) -> bool: + return self.single_iteration_run is not None or self.single_loop_run is not None + class WorkflowAppGenerateEntity(AppGenerateEntity): """ @@ -258,6 +261,9 @@ class WorkflowAppGenerateEntity(AppGenerateEntity): single_loop_run: SingleLoopRunEntity | None = None + def is_single_stepping_container_nodes(self) -> bool: + return self.single_iteration_run is not None or self.single_loop_run is not None + class RagPipelineGenerateEntity(WorkflowAppGenerateEntity): """ diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py index de08a6fae2..f00f65e0cf 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node_execution.py @@ -1,8 +1,10 @@ from types import SimpleNamespace from unittest.mock import MagicMock -from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner -from core.workflow.enums import NodeType, SystemVariableKey +import core.app.apps.workflow.app_runner as workflow_app_runner +from core.app.apps.workflow.app_runner import WorkflowAppRunner, WorkflowBasedAppRunner +from core.app.entities.app_invoke_entities import InvokeFrom +from core.workflow.enums import NodeType, SystemVariableKey, WorkflowType from core.workflow.system_variable import SystemVariable @@ -92,3 +94,152 @@ def test_prepare_single_loop_injects_system_variables_and_fake_workflow(): assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id assert graph.root_node.id == f"{node_id}_single_step_start" assert f"{node_id}_single_step_end" in graph.nodes + + +class DummyCommandChannel: + def fetch_commands(self): + return [] + + def send_command(self, command): + return None + + +def _empty_graph_engine_run(self): + if False: # pragma: no cover + yield None + + +def _build_generate_entity(*, single_iteration_run=None, single_loop_run=None): + if isinstance(single_iteration_run, dict): + single_iteration_run = SimpleNamespace(**single_iteration_run) + if isinstance(single_loop_run, dict): + single_loop_run = SimpleNamespace(**single_loop_run) + + base = SimpleNamespace( + app_config=SimpleNamespace(app_id="app-id", workflow_id="workflow-id"), + workflow_execution_id="workflow-exec-id", + files=[], + user_id="user-id", + inputs={}, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + task_id="task-id", + trace_manager=None, + single_iteration_run=single_iteration_run, + single_loop_run=single_loop_run, + ) + + def is_single_stepping_container_nodes(): + return base.single_iteration_run is not None or base.single_loop_run is not None + + base.is_single_stepping_container_nodes = is_single_stepping_container_nodes # type: ignore[attr-defined] + return base + + +def test_workflow_runner_attaches_persistence_for_full_run(monkeypatch): + from core.workflow.graph_engine.graph_engine import GraphEngine + + monkeypatch.setattr(GraphEngine, "run", _empty_graph_engine_run) + persistence_ctor = MagicMock(name="persistence_layer_ctor") + monkeypatch.setattr(workflow_app_runner, "WorkflowPersistenceLayer", persistence_ctor) + monkeypatch.setattr(workflow_app_runner, "RedisChannel", lambda *args, **kwargs: DummyCommandChannel()) + + queue_manager = MagicMock() + workflow = SimpleNamespace( + id="workflow-id", + tenant_id="tenant-id", + app_id="app-id", + type=WorkflowType.WORKFLOW, + version="1", + graph_dict={ + "nodes": [ + { + "id": "start", + "type": "custom", + "data": {"type": NodeType.START, "title": "Start", "version": "1", "variables": []}, + }, + { + "id": "end", + "type": "custom", + "data": {"type": NodeType.END, "title": "End", "version": "1", "outputs": []}, + }, + ], + "edges": [ + {"source": "start", "target": "end", "sourceHandle": "source", "targetHandle": "target"}, + ], + }, + environment_variables=[], + ) + generate_entity = _build_generate_entity() + generate_entity.inputs = {"input": "value"} + + runner = WorkflowAppRunner( + application_generate_entity=generate_entity, + queue_manager=queue_manager, + variable_loader=MagicMock(), + workflow=workflow, + system_user_id="system-user-id", + root_node_id=None, + workflow_execution_repository=MagicMock(), + workflow_node_execution_repository=MagicMock(), + graph_engine_layers=(), + ) + + runner.run() + + assert persistence_ctor.call_count == 1 + + +def test_workflow_runner_skips_persistence_for_single_step(monkeypatch): + from core.workflow.graph_engine.graph_engine import GraphEngine + + monkeypatch.setattr(GraphEngine, "run", _empty_graph_engine_run) + persistence_ctor = MagicMock(name="persistence_layer_ctor") + monkeypatch.setattr(workflow_app_runner, "WorkflowPersistenceLayer", persistence_ctor) + monkeypatch.setattr(workflow_app_runner, "RedisChannel", lambda *args, **kwargs: DummyCommandChannel()) + + queue_manager = MagicMock() + workflow = SimpleNamespace( + id="workflow-id", + tenant_id="tenant-id", + app_id="app-id", + type=WorkflowType.WORKFLOW, + version="1", + graph_dict={ + "nodes": [ + { + "id": "loop", + "type": "custom", + "data": { + "type": NodeType.LOOP, + "title": "Loop", + "version": "1", + "loop_count": 1, + "break_conditions": [], + "logical_operator": "and", + "loop_variables": [], + "outputs": {}, + }, + } + ], + "edges": [], + }, + environment_variables=[], + ) + generate_entity = _build_generate_entity(single_loop_run={"node_id": "loop", "inputs": {}}) + + runner = WorkflowAppRunner( + application_generate_entity=generate_entity, + queue_manager=queue_manager, + variable_loader=MagicMock(), + workflow=workflow, + system_user_id="system-user-id", + root_node_id=None, + workflow_execution_repository=MagicMock(), + workflow_node_execution_repository=MagicMock(), + graph_engine_layers=(), + ) + + runner.run() + + assert persistence_ctor.call_count == 0