diff --git a/api/core/workflow/graph_engine/command_channels/redis_channel.py b/api/core/workflow/graph_engine/command_channels/redis_channel.py index ad0aa9402c..056e17bf5d 100644 --- a/api/core/workflow/graph_engine/command_channels/redis_channel.py +++ b/api/core/workflow/graph_engine/command_channels/redis_channel.py @@ -97,8 +97,12 @@ class RedisChannel: Returns: Deserialized command or None if invalid """ + command_type_value = data.get("command_type") + if not isinstance(command_type_value, str): + return None + try: - command_type = CommandType(data.get("command_type")) + command_type = CommandType(command_type_value) if command_type == CommandType.ABORT: return AbortCommand(**data) diff --git a/api/core/workflow/graph_engine/domain/__init__.py b/api/core/workflow/graph_engine/domain/__init__.py index cf6d3e6aa3..9e9afe4c21 100644 --- a/api/core/workflow/graph_engine/domain/__init__.py +++ b/api/core/workflow/graph_engine/domain/__init__.py @@ -5,12 +5,10 @@ This package contains the core domain entities, value objects, and aggregates that represent the business concepts of workflow graph execution. """ -from .execution_context import ExecutionContext from .graph_execution import GraphExecution from .node_execution import NodeExecution __all__ = [ - "ExecutionContext", "GraphExecution", "NodeExecution", ] diff --git a/api/core/workflow/graph_engine/domain/execution_context.py b/api/core/workflow/graph_engine/domain/execution_context.py deleted file mode 100644 index 9bcff0fea7..0000000000 --- a/api/core/workflow/graph_engine/domain/execution_context.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -ExecutionContext value object containing immutable execution parameters. -""" - -from dataclasses import dataclass - -from core.app.entities.app_invoke_entities import InvokeFrom -from models.enums import UserFrom - - -@dataclass(frozen=True) -class ExecutionContext: - """ - Immutable value object containing the context for a graph execution. - - This encapsulates all the contextual information needed to execute a workflow, - keeping it separate from the mutable execution state. - """ - - tenant_id: str - app_id: str - workflow_id: str - user_id: str - user_from: UserFrom - invoke_from: InvokeFrom - call_depth: int - - def __post_init__(self) -> None: - """Validate execution context parameters.""" - if self.call_depth < 0: - raise ValueError("Call depth must be non-negative") diff --git a/api/core/workflow/graph_engine/entities/commands.py b/api/core/workflow/graph_engine/entities/commands.py index 7e25fc0866..123ef3d449 100644 --- a/api/core/workflow/graph_engine/entities/commands.py +++ b/api/core/workflow/graph_engine/entities/commands.py @@ -5,13 +5,13 @@ This module defines command types that can be sent to a running GraphEngine instance to control its execution flow. """ -from enum import Enum +from enum import StrEnum from typing import Any from pydantic import BaseModel, Field -class CommandType(str, Enum): +class CommandType(StrEnum): """Types of commands that can be sent to GraphEngine.""" ABORT = "abort" diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 434ad4fc5e..b0daf694ce 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -8,12 +8,11 @@ Domain-Driven Design principles for improved maintainability and testability. import contextvars import logging import queue -from collections.abc import Generator, Mapping +from collections.abc import Generator from typing import final from flask import Flask, current_app -from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities import GraphRuntimeState from core.workflow.enums import NodeExecutionType from core.workflow.graph import Graph @@ -27,10 +26,9 @@ from core.workflow.graph_events import ( GraphRunStartedEvent, GraphRunSucceededEvent, ) -from models.enums import UserFrom from .command_processing import AbortCommandHandler, CommandProcessor -from .domain import ExecutionContext, GraphExecution +from .domain import GraphExecution from .entities.commands import AbortCommand from .error_handler import ErrorHandler from .event_management import EventHandler, EventManager @@ -57,15 +55,8 @@ class GraphEngine: def __init__( self, - tenant_id: str, - app_id: str, workflow_id: str, - user_id: str, - user_from: UserFrom, - invoke_from: InvokeFrom, - call_depth: int, graph: Graph, - graph_config: Mapping[str, object], graph_runtime_state: GraphRuntimeState, command_channel: CommandChannel, min_workers: int | None = None, @@ -75,25 +66,12 @@ class GraphEngine: ) -> None: """Initialize the graph engine with all subsystems and dependencies.""" - # === Domain Models === - # Execution context encapsulates workflow execution metadata - self._execution_context = ExecutionContext( - tenant_id=tenant_id, - app_id=app_id, - workflow_id=workflow_id, - user_id=user_id, - user_from=user_from, - invoke_from=invoke_from, - call_depth=call_depth, - ) - # Graph execution tracks the overall execution state self._graph_execution = GraphExecution(workflow_id=workflow_id) # === Core Dependencies === # Graph structure and configuration self._graph = graph - self._graph_config = graph_config self._graph_runtime_state = graph_runtime_state self._command_channel = command_channel diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 524cd2c40b..5340a5b6ce 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -551,15 +551,8 @@ class IterationNode(Node): # Create a new GraphEngine for this iteration graph_engine = GraphEngine( - tenant_id=self.tenant_id, - app_id=self.app_id, workflow_id=self.workflow_id, - user_id=self.user_id, - user_from=self.user_from, - invoke_from=self.invoke_from, - call_depth=self.workflow_call_depth, graph=iteration_graph, - graph_config=self.graph_config, graph_runtime_state=graph_runtime_state_copy, command_channel=InMemoryChannel(), # Use InMemoryChannel for sub-graphs ) diff --git a/api/core/workflow/nodes/loop/loop_node.py b/api/core/workflow/nodes/loop/loop_node.py index 2217bc205e..25b0c4f4fe 100644 --- a/api/core/workflow/nodes/loop/loop_node.py +++ b/api/core/workflow/nodes/loop/loop_node.py @@ -443,15 +443,8 @@ class LoopNode(Node): # Create a new GraphEngine for this iteration graph_engine = GraphEngine( - tenant_id=self.tenant_id, - app_id=self.app_id, workflow_id=self.workflow_id, - user_id=self.user_id, - user_from=self.user_from, - invoke_from=self.invoke_from, - call_depth=self.workflow_call_depth, graph=loop_graph, - graph_config=self.graph_config, graph_runtime_state=graph_runtime_state_copy, command_channel=InMemoryChannel(), # Use InMemoryChannel for sub-graphs ) diff --git a/api/core/workflow/workflow_entry.py b/api/core/workflow/workflow_entry.py index 901c830b17..f26f3a8008 100644 --- a/api/core/workflow/workflow_entry.py +++ b/api/core/workflow/workflow_entry.py @@ -73,15 +73,8 @@ class WorkflowEntry: self.command_channel = command_channel self.graph_engine = GraphEngine( - tenant_id=tenant_id, - app_id=app_id, workflow_id=workflow_id, - user_id=user_id, - user_from=user_from, - invoke_from=invoke_from, - call_depth=call_depth, graph=graph, - graph_config=graph_config, graph_runtime_state=graph_runtime_state, command_channel=command_channel, ) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_command_system.py b/api/tests/unit_tests/core/workflow/graph_engine/test_command_system.py index 58073ba5c3..9fec855a93 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_command_system.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_command_system.py @@ -3,14 +3,12 @@ import time from unittest.mock import MagicMock -from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities import GraphRuntimeState, VariablePool from core.workflow.graph import Graph from core.workflow.graph_engine import GraphEngine from core.workflow.graph_engine.command_channels import InMemoryChannel from core.workflow.graph_engine.entities.commands import AbortCommand from core.workflow.graph_events import GraphRunAbortedEvent, GraphRunStartedEvent -from models.enums import UserFrom def test_abort_command(): @@ -42,15 +40,8 @@ def test_abort_command(): # Create GraphEngine with same shared runtime state engine = GraphEngine( - tenant_id="test", - app_id="test", workflow_id="test_workflow", - user_id="test", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.WEB_APP, - call_depth=0, graph=mock_graph, - graph_config={}, graph_runtime_state=shared_runtime_state, # Use shared instance command_channel=command_channel, ) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_conditional_streaming_vs_template_workflow.py b/api/tests/unit_tests/core/workflow/graph_engine/test_conditional_streaming_vs_template_workflow.py index 2b2e4fe022..70a772fc4c 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_conditional_streaming_vs_template_workflow.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_conditional_streaming_vs_template_workflow.py @@ -6,7 +6,6 @@ This test validates that: - When blocking != 1: NodeRunStreamChunkEvent present (direct LLM to End output) """ -from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.enums import NodeType from core.workflow.graph_engine import GraphEngine from core.workflow.graph_engine.command_channels import InMemoryChannel @@ -16,7 +15,6 @@ from core.workflow.graph_events import ( NodeRunStreamChunkEvent, NodeRunSucceededEvent, ) -from models.enums import UserFrom from .test_table_runner import TableTestRunner @@ -40,20 +38,10 @@ def test_streaming_output_with_blocking_equals_one(): use_mock_factory=True, ) - workflow_config = fixture_data.get("workflow", {}) - graph_config = workflow_config.get("graph", {}) - # Create and run the engine engine = GraphEngine( - tenant_id="test_tenant", - app_id="test_app", workflow_id="test_workflow", - user_id="test_user", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.DEBUGGER, - call_depth=0, graph=graph, - graph_config=graph_config, graph_runtime_state=graph_runtime_state, command_channel=InMemoryChannel(), ) @@ -145,20 +133,10 @@ def test_streaming_output_with_blocking_not_equals_one(): use_mock_factory=True, ) - workflow_config = fixture_data.get("workflow", {}) - graph_config = workflow_config.get("graph", {}) - # Create and run the engine engine = GraphEngine( - tenant_id="test_tenant", - app_id="test_app", workflow_id="test_workflow", - user_id="test_user", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.DEBUGGER, - call_depth=0, graph=graph, - graph_config=graph_config, graph_runtime_state=graph_runtime_state, command_channel=InMemoryChannel(), ) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py index 4aa33bde26..6a723999de 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py @@ -10,11 +10,9 @@ import time from hypothesis import HealthCheck, given, settings from hypothesis import strategies as st -from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.graph_engine import GraphEngine from core.workflow.graph_engine.command_channels import InMemoryChannel from core.workflow.graph_events import GraphRunStartedEvent, GraphRunSucceededEvent -from models.enums import UserFrom # Import the test framework from the new module from .test_table_runner import TableTestRunner, WorkflowRunner, WorkflowTestCase @@ -460,15 +458,8 @@ def test_layer_system_basic(): # Create engine with layer engine = GraphEngine( - tenant_id="test_tenant", - app_id="test_app", workflow_id="test_workflow", - user_id="test_user", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.WEB_APP, - call_depth=0, graph=graph, - graph_config=fixture_data.get("workflow", {}).get("graph", {}), graph_runtime_state=graph_runtime_state, command_channel=InMemoryChannel(), ) @@ -523,15 +514,8 @@ def test_layer_chaining(): # Create engine engine = GraphEngine( - tenant_id="test_tenant", - app_id="test_app", workflow_id="test_workflow", - user_id="test_user", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.WEB_APP, - call_depth=0, graph=graph, - graph_config=fixture_data.get("workflow", {}).get("graph", {}), graph_runtime_state=graph_runtime_state, command_channel=InMemoryChannel(), ) @@ -577,15 +561,8 @@ def test_layer_error_handling(): # Create engine with faulty layer engine = GraphEngine( - tenant_id="test_tenant", - app_id="test_app", workflow_id="test_workflow", - user_id="test_user", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.WEB_APP, - call_depth=0, graph=graph, - graph_config=fixture_data.get("workflow", {}).get("graph", {}), graph_runtime_state=graph_runtime_state, command_channel=InMemoryChannel(), ) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py b/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py index e944c6f83e..e5ae32bbff 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py @@ -615,15 +615,8 @@ class MockIterationNode(MockNodeMixin, IterationNode): # Create a new GraphEngine for this iteration graph_engine = GraphEngine( - tenant_id=self.tenant_id, - app_id=self.app_id, workflow_id=self.workflow_id, - user_id=self.user_id, - user_from=self.user_from, - invoke_from=self.invoke_from, - call_depth=self.workflow_call_depth, graph=iteration_graph, - graph_config=self.graph_config, graph_runtime_state=graph_runtime_state_copy, command_channel=InMemoryChannel(), # Use InMemoryChannel for sub-graphs ) @@ -683,15 +676,8 @@ class MockLoopNode(MockNodeMixin, LoopNode): # Create a new GraphEngine for this iteration graph_engine = GraphEngine( - tenant_id=self.tenant_id, - app_id=self.app_id, workflow_id=self.workflow_id, - user_id=self.user_id, - user_from=self.user_from, - invoke_from=self.invoke_from, - call_depth=self.workflow_call_depth, graph=loop_graph, - graph_config=self.graph_config, graph_runtime_state=graph_runtime_state_copy, command_channel=InMemoryChannel(), # Use InMemoryChannel for sub-graphs ) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_streaming_workflow.py b/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_streaming_workflow.py index 04f0aa7f2e..d1f1f53b78 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_streaming_workflow.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_streaming_workflow.py @@ -118,15 +118,8 @@ def test_parallel_streaming_workflow(): # Create the graph engine engine = GraphEngine( - tenant_id="test_tenant", - app_id="test_app", workflow_id="test_workflow", - user_id="test_user", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.WEB_APP, - call_depth=0, graph=graph, - graph_config=graph_config, graph_runtime_state=graph_runtime_state, command_channel=InMemoryChannel(), ) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py b/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py index 01a8521550..0f3a142b1a 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py @@ -19,7 +19,6 @@ from functools import lru_cache from pathlib import Path from typing import Any -from core.app.entities.app_invoke_entities import InvokeFrom from core.tools.utils.yaml_utils import _load_yaml_file from core.variables import ( ArrayNumberVariable, @@ -42,7 +41,6 @@ from core.workflow.graph_events import ( ) from core.workflow.nodes.node_factory import DifyNodeFactory from core.workflow.system_variable import SystemVariable -from models.enums import UserFrom from .test_mock_config import MockConfig from .test_mock_factory import MockNodeFactory @@ -373,20 +371,10 @@ class TableTestRunner: mock_config=test_case.mock_config, ) - workflow_config = fixture_data.get("workflow", {}) - graph_config = workflow_config.get("graph", {}) - # Create and run the engine with configured worker settings engine = GraphEngine( - tenant_id="test_tenant", - app_id="test_app", workflow_id="test_workflow", - user_id="test_user", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.DEBUGGER, # Use DEBUGGER to avoid conversation_id requirement - call_depth=0, graph=graph, - graph_config=graph_config, graph_runtime_state=graph_runtime_state, command_channel=InMemoryChannel(), min_workers=self.graph_engine_min_workers, diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py b/api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py index e227518a8e..34682ff8f9 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py @@ -1,11 +1,9 @@ -from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.graph_engine import GraphEngine from core.workflow.graph_engine.command_channels import InMemoryChannel from core.workflow.graph_events import ( GraphRunSucceededEvent, NodeRunStreamChunkEvent, ) -from models.enums import UserFrom from .test_table_runner import TableTestRunner @@ -23,20 +21,10 @@ def test_tool_in_chatflow(): use_mock_factory=True, ) - workflow_config = fixture_data.get("workflow", {}) - graph_config = workflow_config.get("graph", {}) - # Create and run the engine engine = GraphEngine( - tenant_id="test_tenant", - app_id="test_app", workflow_id="test_workflow", - user_id="test_user", - user_from=UserFrom.ACCOUNT, - invoke_from=InvokeFrom.DEBUGGER, - call_depth=0, graph=graph, - graph_config=graph_config, graph_runtime_state=graph_runtime_state, command_channel=InMemoryChannel(), )