From 24441dff07450e588ebb5e35a0a6af27008f9317 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 8 Apr 2026 18:02:07 +0800 Subject: [PATCH] fix(workflow): repair explicit runtime state CI regressions --- api/core/workflow/runtime_state.py | 22 +++++++------ .../workflow/test_workflow_entry_helpers.py | 31 +++++++++++++------ .../test_workflow_entry_redis_channel.py | 11 +++++-- .../services/test_workflow_service.py | 2 +- 4 files changed, 44 insertions(+), 22 deletions(-) diff --git a/api/core/workflow/runtime_state.py b/api/core/workflow/runtime_state.py index fa984d672a..3d14f9744a 100644 --- a/api/core/workflow/runtime_state.py +++ b/api/core/workflow/runtime_state.py @@ -9,6 +9,7 @@ does not depend on that implicit behavior. from __future__ import annotations from contextlib import AbstractContextManager +from typing import Any, cast from graphon.graph import Graph from graphon.graph_engine.domain.graph_execution import GraphExecution @@ -64,13 +65,14 @@ def ensure_graph_runtime_state_initialized( ) -> GraphRuntimeState: """Materialize non-graph collaborators when loading legacy or sparse state.""" workflow_id = _require_workflow_id(workflow_id) + state = cast(Any, graph_runtime_state) - if graph_runtime_state._ready_queue is None: - graph_runtime_state._ready_queue = InMemoryReadyQueue() + if state._ready_queue is None: + state._ready_queue = InMemoryReadyQueue() - graph_execution = graph_runtime_state._graph_execution + graph_execution = state._graph_execution if graph_execution is None: - graph_runtime_state._graph_execution = GraphExecution( + state._graph_execution = GraphExecution( workflow_id=workflow_id, ) elif not graph_execution.workflow_id: @@ -93,19 +95,21 @@ def bind_graph_runtime_state_to_graph( graph_runtime_state, workflow_id=workflow_id, ) + state = cast(Any, graph_runtime_state) + graph_protocol = cast(Any, graph) - attached_graph = graph_runtime_state._graph + attached_graph = state._graph if attached_graph is not None and attached_graph is not graph: raise ValueError("GraphRuntimeState already attached to a different graph instance") - if graph_runtime_state._response_coordinator is None: + if state._response_coordinator is None: response_coordinator = ResponseStreamCoordinator( variable_pool=graph_runtime_state.variable_pool, - graph=graph, + graph=graph_protocol, ) - graph_runtime_state._response_coordinator = response_coordinator + state._response_coordinator = response_coordinator - graph_runtime_state.attach_graph(graph) + graph_runtime_state.attach_graph(graph_protocol) return graph_runtime_state diff --git a/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py b/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py index 6dcaed1143..4d7644530a 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py @@ -107,11 +107,12 @@ class TestWorkflowChildEngineBuilder: patch.object(workflow_entry.time, "perf_counter", return_value=123.0), patch.object( workflow_entry, - "GraphRuntimeState", + "create_graph_runtime_state", return_value=child_graph_runtime_state, - ) as graph_runtime_state_cls, + ) as create_graph_runtime_state, patch.object(workflow_entry, "DifyNodeFactory", return_value=sentinel.factory) as dify_node_factory, patch.object(workflow_entry.Graph, "init", return_value=child_graph) as graph_init, + patch.object(workflow_entry, "bind_graph_runtime_state_to_graph") as bind_runtime_state, patch.object(workflow_entry, "GraphEngine", return_value=child_engine) as graph_engine_cls, patch.object(workflow_entry, "GraphEngineConfig", return_value=sentinel.graph_engine_config), patch.object(workflow_entry, "InMemoryChannel", return_value=sentinel.command_channel), @@ -126,9 +127,10 @@ class TestWorkflowChildEngineBuilder: ) assert result is child_engine - graph_runtime_state_cls.assert_called_once_with( + create_graph_runtime_state.assert_called_once_with( variable_pool=sentinel.child_variable_pool, start_at=123.0, + workflow_id="workflow-id", execution_context=sentinel.execution_context, ) dify_node_factory.assert_called_once_with( @@ -140,6 +142,11 @@ class TestWorkflowChildEngineBuilder: node_factory=sentinel.factory, root_node_id="root", ) + bind_runtime_state.assert_called_once_with( + child_graph_runtime_state, + child_graph, + workflow_id="workflow-id", + ) graph_engine_cls.assert_called_once_with( workflow_id="workflow-id", graph=child_graph, @@ -247,6 +254,7 @@ class TestWorkflowEntryInit: patch.object(workflow_entry.dify_config, "ENABLE_OTEL", False), patch.object(workflow_entry, "is_instrument_flag_enabled", return_value=True), patch.object(workflow_entry, "capture_current_context", return_value=sentinel.execution_context), + patch.object(workflow_entry, "bind_graph_runtime_state_to_graph"), patch.object(workflow_entry, "GraphEngine", return_value=graph_engine) as graph_engine_cls, patch.object(workflow_entry, "GraphEngineConfig", return_value=sentinel.graph_engine_config), patch.object(workflow_entry, "InMemoryChannel", return_value=sentinel.command_channel), @@ -352,7 +360,7 @@ class TestWorkflowEntrySingleStepRun: patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context), patch.object( workflow_entry, - "GraphRuntimeState", + "create_graph_runtime_state", return_value=SimpleNamespace(variable_pool=variable_pool), ), patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}), @@ -413,7 +421,7 @@ class TestWorkflowEntrySingleStepRun: with ( patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context), - patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state), + patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state), patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}), patch.object(workflow_entry.time, "perf_counter", return_value=123.0), patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode), @@ -482,7 +490,7 @@ class TestWorkflowEntrySingleStepRun: with ( patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context), - patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state), + patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state), patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}), patch.object(workflow_entry.time, "perf_counter", return_value=123.0), patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeDatasourceNode), @@ -542,7 +550,7 @@ class TestWorkflowEntrySingleStepRun: with ( patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context), - patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state), + patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state), patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}), patch.object(workflow_entry.time, "perf_counter", return_value=123.0), patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode), @@ -653,7 +661,9 @@ class TestWorkflowEntryHelpers: patch.object( workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context ) as graph_init_context_cls, - patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state), + patch.object( + workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state + ) as create_graph_runtime_state, patch.object( workflow_entry, "build_dify_run_context", return_value={"_dify": "context"} ) as build_dify_run_context, @@ -693,13 +703,14 @@ class TestWorkflowEntryHelpers: invoke_from=InvokeFrom.DEBUGGER, ) graph_init_context_cls.assert_called_once_with( - workflow_id="", + workflow_id="free-node:node-id", graph_config=workflow_entry.WorkflowEntry._create_single_node_graph( "node-id", {"type": BuiltinNodeTypes.PARAMETER_EXTRACTOR, "title": "Node"} ), run_context={"_dify": "context"}, call_depth=0, ) + create_graph_runtime_state.assert_called_once() dify_node_factory_cls.assert_called_once_with( graph_init_context=sentinel.graph_init_context, graph_runtime_state=sentinel.graph_runtime_state, @@ -739,7 +750,7 @@ class TestWorkflowEntryHelpers: patch.object(workflow_entry, "VariablePool", return_value=sentinel.variable_pool), patch.object(workflow_entry, "add_variables_to_pool"), patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context), - patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state), + patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state), patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}), patch.object(workflow_entry.time, "perf_counter", return_value=123.0), patch.object( diff --git a/api/tests/unit_tests/core/workflow/test_workflow_entry_redis_channel.py b/api/tests/unit_tests/core/workflow/test_workflow_entry_redis_channel.py index 4b2f98aeff..47140f9ccf 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_entry_redis_channel.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_entry_redis_channel.py @@ -26,7 +26,10 @@ class TestWorkflowEntryRedisChannel: redis_channel = RedisChannel(mock_redis_client, "test:channel:key") # Patch GraphEngine to verify it receives the Redis channel - with patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine: + with ( + patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"), + patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine, + ): mock_graph_engine = MockGraphEngine.return_value # Create WorkflowEntry with Redis channel workflow_entry = WorkflowEntry( tenant_id="test-tenant", @@ -60,6 +63,7 @@ class TestWorkflowEntryRedisChannel: # Patch GraphEngine and InMemoryChannel with ( + patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"), patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine, patch("core.workflow.workflow_entry.InMemoryChannel", autospec=True) as MockInMemoryChannel, ): @@ -107,7 +111,10 @@ class TestWorkflowEntryRedisChannel: mock_event2 = MagicMock() # Patch GraphEngine - with patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine: + with ( + patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"), + patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine, + ): mock_graph_engine = MagicMock() mock_graph_engine.run.return_value = iter([mock_event1, mock_event2]) MockGraphEngine.return_value = mock_graph_engine diff --git a/api/tests/unit_tests/services/test_workflow_service.py b/api/tests/unit_tests/services/test_workflow_service.py index 406b4fb9d0..2ec92be8fa 100644 --- a/api/tests/unit_tests/services/test_workflow_service.py +++ b/api/tests/unit_tests/services/test_workflow_service.py @@ -2752,8 +2752,8 @@ class TestWorkflowServiceFreeNodeExecution: with ( patch("services.workflow_service.DifyGraphInitContext") as mock_graph_init_context_cls, - patch("services.workflow_service.GraphRuntimeState"), patch("services.workflow_service.build_dify_run_context") as mock_build_dify_run_context, + patch("services.workflow_service.create_graph_runtime_state"), patch("services.workflow_service.DifyHumanInputNodeRuntime") as mock_runtime_cls, patch("services.workflow_service.HumanInputNode") as mock_node_cls, ):