feat(api): omit WorkflowPersistenceLayer while single stepping container nodes

This commit is contained in:
QuantumGhost 2025-12-24 10:04:47 +08:00
parent aff9853156
commit 3e8d49a393
6 changed files with 211 additions and 45 deletions

View File

@ -256,6 +256,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
single_iteration_run=AdvancedChatAppGenerateEntity.SingleIterationRunEntity(
node_id=node_id, inputs=args["inputs"]
),
workflow_run_id=str(uuid.uuid4()),
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())
@ -339,6 +340,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
single_loop_run=AdvancedChatAppGenerateEntity.SingleLoopRunEntity(node_id=node_id, inputs=args["inputs"]),
workflow_run_id=str(uuid.uuid4()),
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())

View File

@ -26,7 +26,10 @@ from core.variables.variables import VariableUnion
from core.workflow.enums import WorkflowType
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.graph_engine.layers.persistence import (
PersistenceWorkflowInfo,
WorkflowPersistenceLayer,
)
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.runtime import GraphRuntimeState, VariablePool
@ -187,20 +190,20 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
self._queue_manager.graph_runtime_state = graph_runtime_state
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=self._workflow.id,
workflow_type=WorkflowType(self._workflow.type),
version=self._workflow.version,
graph_data=self._workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
workflow_entry.graph_engine.layer(persistence_layer)
if not self.application_generate_entity.is_single_stepping_container_nodes():
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=self._workflow.id,
workflow_type=WorkflowType(self._workflow.type),
version=self._workflow.version,
graph_data=self._workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
workflow_entry.graph_engine.layer(persistence_layer)
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)

View File

@ -172,21 +172,21 @@ class PipelineRunner(WorkflowBasedAppRunner):
)
self._queue_manager.graph_runtime_state = graph_runtime_state
if not self.application_generate_entity.is_single_stepping_container_nodes():
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=workflow.id,
workflow_type=WorkflowType(workflow.type),
version=workflow.version,
graph_data=workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=workflow.id,
workflow_type=WorkflowType(workflow.type),
version=workflow.version,
graph_data=workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
workflow_entry.graph_engine.layer(persistence_layer)
workflow_entry.graph_engine.layer(persistence_layer)
generator = workflow_entry.run()

View File

@ -10,7 +10,10 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
from core.workflow.enums import WorkflowType
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.graph_engine.layers.persistence import (
PersistenceWorkflowInfo,
WorkflowPersistenceLayer,
)
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.runtime import GraphRuntimeState, VariablePool
@ -133,20 +136,21 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
command_channel=command_channel,
)
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=self._workflow.id,
workflow_type=WorkflowType(self._workflow.type),
version=self._workflow.version,
graph_data=self._workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
if not self.application_generate_entity.is_single_stepping_container_nodes():
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=self._workflow.id,
workflow_type=WorkflowType(self._workflow.type),
version=self._workflow.version,
graph_data=self._workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
workflow_entry.graph_engine.layer(persistence_layer)
workflow_entry.graph_engine.layer(persistence_layer)
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)

View File

@ -228,6 +228,9 @@ class AdvancedChatAppGenerateEntity(ConversationAppGenerateEntity):
single_loop_run: SingleLoopRunEntity | None = None
def is_single_stepping_container_nodes(self) -> bool:
return self.single_iteration_run is not None or self.single_loop_run is not None
class WorkflowAppGenerateEntity(AppGenerateEntity):
"""
@ -258,6 +261,9 @@ class WorkflowAppGenerateEntity(AppGenerateEntity):
single_loop_run: SingleLoopRunEntity | None = None
def is_single_stepping_container_nodes(self) -> bool:
return self.single_iteration_run is not None or self.single_loop_run is not None
class RagPipelineGenerateEntity(WorkflowAppGenerateEntity):
"""

View File

@ -1,8 +1,10 @@
from types import SimpleNamespace
from unittest.mock import MagicMock
from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner
from core.workflow.enums import NodeType, SystemVariableKey
import core.app.apps.workflow.app_runner as workflow_app_runner
from core.app.apps.workflow.app_runner import WorkflowAppRunner, WorkflowBasedAppRunner
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.enums import NodeType, SystemVariableKey, WorkflowType
from core.workflow.system_variable import SystemVariable
@ -92,3 +94,152 @@ def test_prepare_single_loop_injects_system_variables_and_fake_workflow():
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
assert graph.root_node.id == f"{node_id}_single_step_start"
assert f"{node_id}_single_step_end" in graph.nodes
class DummyCommandChannel:
def fetch_commands(self):
return []
def send_command(self, command):
return None
def _empty_graph_engine_run(self):
if False: # pragma: no cover
yield None
def _build_generate_entity(*, single_iteration_run=None, single_loop_run=None):
if isinstance(single_iteration_run, dict):
single_iteration_run = SimpleNamespace(**single_iteration_run)
if isinstance(single_loop_run, dict):
single_loop_run = SimpleNamespace(**single_loop_run)
base = SimpleNamespace(
app_config=SimpleNamespace(app_id="app-id", workflow_id="workflow-id"),
workflow_execution_id="workflow-exec-id",
files=[],
user_id="user-id",
inputs={},
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
task_id="task-id",
trace_manager=None,
single_iteration_run=single_iteration_run,
single_loop_run=single_loop_run,
)
def is_single_stepping_container_nodes():
return base.single_iteration_run is not None or base.single_loop_run is not None
base.is_single_stepping_container_nodes = is_single_stepping_container_nodes # type: ignore[attr-defined]
return base
def test_workflow_runner_attaches_persistence_for_full_run(monkeypatch):
from core.workflow.graph_engine.graph_engine import GraphEngine
monkeypatch.setattr(GraphEngine, "run", _empty_graph_engine_run)
persistence_ctor = MagicMock(name="persistence_layer_ctor")
monkeypatch.setattr(workflow_app_runner, "WorkflowPersistenceLayer", persistence_ctor)
monkeypatch.setattr(workflow_app_runner, "RedisChannel", lambda *args, **kwargs: DummyCommandChannel())
queue_manager = MagicMock()
workflow = SimpleNamespace(
id="workflow-id",
tenant_id="tenant-id",
app_id="app-id",
type=WorkflowType.WORKFLOW,
version="1",
graph_dict={
"nodes": [
{
"id": "start",
"type": "custom",
"data": {"type": NodeType.START, "title": "Start", "version": "1", "variables": []},
},
{
"id": "end",
"type": "custom",
"data": {"type": NodeType.END, "title": "End", "version": "1", "outputs": []},
},
],
"edges": [
{"source": "start", "target": "end", "sourceHandle": "source", "targetHandle": "target"},
],
},
environment_variables=[],
)
generate_entity = _build_generate_entity()
generate_entity.inputs = {"input": "value"}
runner = WorkflowAppRunner(
application_generate_entity=generate_entity,
queue_manager=queue_manager,
variable_loader=MagicMock(),
workflow=workflow,
system_user_id="system-user-id",
root_node_id=None,
workflow_execution_repository=MagicMock(),
workflow_node_execution_repository=MagicMock(),
graph_engine_layers=(),
)
runner.run()
assert persistence_ctor.call_count == 1
def test_workflow_runner_skips_persistence_for_single_step(monkeypatch):
from core.workflow.graph_engine.graph_engine import GraphEngine
monkeypatch.setattr(GraphEngine, "run", _empty_graph_engine_run)
persistence_ctor = MagicMock(name="persistence_layer_ctor")
monkeypatch.setattr(workflow_app_runner, "WorkflowPersistenceLayer", persistence_ctor)
monkeypatch.setattr(workflow_app_runner, "RedisChannel", lambda *args, **kwargs: DummyCommandChannel())
queue_manager = MagicMock()
workflow = SimpleNamespace(
id="workflow-id",
tenant_id="tenant-id",
app_id="app-id",
type=WorkflowType.WORKFLOW,
version="1",
graph_dict={
"nodes": [
{
"id": "loop",
"type": "custom",
"data": {
"type": NodeType.LOOP,
"title": "Loop",
"version": "1",
"loop_count": 1,
"break_conditions": [],
"logical_operator": "and",
"loop_variables": [],
"outputs": {},
},
}
],
"edges": [],
},
environment_variables=[],
)
generate_entity = _build_generate_entity(single_loop_run={"node_id": "loop", "inputs": {}})
runner = WorkflowAppRunner(
application_generate_entity=generate_entity,
queue_manager=queue_manager,
variable_loader=MagicMock(),
workflow=workflow,
system_user_id="system-user-id",
root_node_id=None,
workflow_execution_repository=MagicMock(),
workflow_node_execution_repository=MagicMock(),
graph_engine_layers=(),
)
runner.run()
assert persistence_ctor.call_count == 0