fix(workflow): repair explicit runtime state CI regressions

This commit is contained in:
-LAN- 2026-04-08 18:02:07 +08:00
parent eed09cd698
commit 24441dff07
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
4 changed files with 44 additions and 22 deletions

View File

@ -9,6 +9,7 @@ does not depend on that implicit behavior.
from __future__ import annotations
from contextlib import AbstractContextManager
from typing import Any, cast
from graphon.graph import Graph
from graphon.graph_engine.domain.graph_execution import GraphExecution
@ -64,13 +65,14 @@ def ensure_graph_runtime_state_initialized(
) -> GraphRuntimeState:
"""Materialize non-graph collaborators when loading legacy or sparse state."""
workflow_id = _require_workflow_id(workflow_id)
state = cast(Any, graph_runtime_state)
if graph_runtime_state._ready_queue is None:
graph_runtime_state._ready_queue = InMemoryReadyQueue()
if state._ready_queue is None:
state._ready_queue = InMemoryReadyQueue()
graph_execution = graph_runtime_state._graph_execution
graph_execution = state._graph_execution
if graph_execution is None:
graph_runtime_state._graph_execution = GraphExecution(
state._graph_execution = GraphExecution(
workflow_id=workflow_id,
)
elif not graph_execution.workflow_id:
@ -93,19 +95,21 @@ def bind_graph_runtime_state_to_graph(
graph_runtime_state,
workflow_id=workflow_id,
)
state = cast(Any, graph_runtime_state)
graph_protocol = cast(Any, graph)
attached_graph = graph_runtime_state._graph
attached_graph = state._graph
if attached_graph is not None and attached_graph is not graph:
raise ValueError("GraphRuntimeState already attached to a different graph instance")
if graph_runtime_state._response_coordinator is None:
if state._response_coordinator is None:
response_coordinator = ResponseStreamCoordinator(
variable_pool=graph_runtime_state.variable_pool,
graph=graph,
graph=graph_protocol,
)
graph_runtime_state._response_coordinator = response_coordinator
state._response_coordinator = response_coordinator
graph_runtime_state.attach_graph(graph)
graph_runtime_state.attach_graph(graph_protocol)
return graph_runtime_state

View File

@ -107,11 +107,12 @@ class TestWorkflowChildEngineBuilder:
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(
workflow_entry,
"GraphRuntimeState",
"create_graph_runtime_state",
return_value=child_graph_runtime_state,
) as graph_runtime_state_cls,
) as create_graph_runtime_state,
patch.object(workflow_entry, "DifyNodeFactory", return_value=sentinel.factory) as dify_node_factory,
patch.object(workflow_entry.Graph, "init", return_value=child_graph) as graph_init,
patch.object(workflow_entry, "bind_graph_runtime_state_to_graph") as bind_runtime_state,
patch.object(workflow_entry, "GraphEngine", return_value=child_engine) as graph_engine_cls,
patch.object(workflow_entry, "GraphEngineConfig", return_value=sentinel.graph_engine_config),
patch.object(workflow_entry, "InMemoryChannel", return_value=sentinel.command_channel),
@ -126,9 +127,10 @@ class TestWorkflowChildEngineBuilder:
)
assert result is child_engine
graph_runtime_state_cls.assert_called_once_with(
create_graph_runtime_state.assert_called_once_with(
variable_pool=sentinel.child_variable_pool,
start_at=123.0,
workflow_id="workflow-id",
execution_context=sentinel.execution_context,
)
dify_node_factory.assert_called_once_with(
@ -140,6 +142,11 @@ class TestWorkflowChildEngineBuilder:
node_factory=sentinel.factory,
root_node_id="root",
)
bind_runtime_state.assert_called_once_with(
child_graph_runtime_state,
child_graph,
workflow_id="workflow-id",
)
graph_engine_cls.assert_called_once_with(
workflow_id="workflow-id",
graph=child_graph,
@ -247,6 +254,7 @@ class TestWorkflowEntryInit:
patch.object(workflow_entry.dify_config, "ENABLE_OTEL", False),
patch.object(workflow_entry, "is_instrument_flag_enabled", return_value=True),
patch.object(workflow_entry, "capture_current_context", return_value=sentinel.execution_context),
patch.object(workflow_entry, "bind_graph_runtime_state_to_graph"),
patch.object(workflow_entry, "GraphEngine", return_value=graph_engine) as graph_engine_cls,
patch.object(workflow_entry, "GraphEngineConfig", return_value=sentinel.graph_engine_config),
patch.object(workflow_entry, "InMemoryChannel", return_value=sentinel.command_channel),
@ -352,7 +360,7 @@ class TestWorkflowEntrySingleStepRun:
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(
workflow_entry,
"GraphRuntimeState",
"create_graph_runtime_state",
return_value=SimpleNamespace(variable_pool=variable_pool),
),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
@ -413,7 +421,7 @@ class TestWorkflowEntrySingleStepRun:
with (
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode),
@ -482,7 +490,7 @@ class TestWorkflowEntrySingleStepRun:
with (
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeDatasourceNode),
@ -542,7 +550,7 @@ class TestWorkflowEntrySingleStepRun:
with (
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode),
@ -653,7 +661,9 @@ class TestWorkflowEntryHelpers:
patch.object(
workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context
) as graph_init_context_cls,
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(
workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state
) as create_graph_runtime_state,
patch.object(
workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}
) as build_dify_run_context,
@ -693,13 +703,14 @@ class TestWorkflowEntryHelpers:
invoke_from=InvokeFrom.DEBUGGER,
)
graph_init_context_cls.assert_called_once_with(
workflow_id="",
workflow_id="free-node:node-id",
graph_config=workflow_entry.WorkflowEntry._create_single_node_graph(
"node-id", {"type": BuiltinNodeTypes.PARAMETER_EXTRACTOR, "title": "Node"}
),
run_context={"_dify": "context"},
call_depth=0,
)
create_graph_runtime_state.assert_called_once()
dify_node_factory_cls.assert_called_once_with(
graph_init_context=sentinel.graph_init_context,
graph_runtime_state=sentinel.graph_runtime_state,
@ -739,7 +750,7 @@ class TestWorkflowEntryHelpers:
patch.object(workflow_entry, "VariablePool", return_value=sentinel.variable_pool),
patch.object(workflow_entry, "add_variables_to_pool"),
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(

View File

@ -26,7 +26,10 @@ class TestWorkflowEntryRedisChannel:
redis_channel = RedisChannel(mock_redis_client, "test:channel:key")
# Patch GraphEngine to verify it receives the Redis channel
with patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine:
with (
patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"),
patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine,
):
mock_graph_engine = MockGraphEngine.return_value # Create WorkflowEntry with Redis channel
workflow_entry = WorkflowEntry(
tenant_id="test-tenant",
@ -60,6 +63,7 @@ class TestWorkflowEntryRedisChannel:
# Patch GraphEngine and InMemoryChannel
with (
patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"),
patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine,
patch("core.workflow.workflow_entry.InMemoryChannel", autospec=True) as MockInMemoryChannel,
):
@ -107,7 +111,10 @@ class TestWorkflowEntryRedisChannel:
mock_event2 = MagicMock()
# Patch GraphEngine
with patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine:
with (
patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"),
patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine,
):
mock_graph_engine = MagicMock()
mock_graph_engine.run.return_value = iter([mock_event1, mock_event2])
MockGraphEngine.return_value = mock_graph_engine

View File

@ -2752,8 +2752,8 @@ class TestWorkflowServiceFreeNodeExecution:
with (
patch("services.workflow_service.DifyGraphInitContext") as mock_graph_init_context_cls,
patch("services.workflow_service.GraphRuntimeState"),
patch("services.workflow_service.build_dify_run_context") as mock_build_dify_run_context,
patch("services.workflow_service.create_graph_runtime_state"),
patch("services.workflow_service.DifyHumanInputNodeRuntime") as mock_runtime_cls,
patch("services.workflow_service.HumanInputNode") as mock_node_cls,
):