refactor(graph_engine): remove unused parameters from Engine

This commit is contained in:
-LAN- 2025-09-16 14:11:42 +08:00
parent bd13cf05eb
commit b5684f1992
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
15 changed files with 9 additions and 180 deletions

View File

@ -97,8 +97,12 @@ class RedisChannel:
Returns:
Deserialized command or None if invalid
"""
command_type_value = data.get("command_type")
if not isinstance(command_type_value, str):
return None
try:
command_type = CommandType(data.get("command_type"))
command_type = CommandType(command_type_value)
if command_type == CommandType.ABORT:
return AbortCommand(**data)

View File

@ -5,12 +5,10 @@ This package contains the core domain entities, value objects, and aggregates
that represent the business concepts of workflow graph execution.
"""
from .execution_context import ExecutionContext
from .graph_execution import GraphExecution
from .node_execution import NodeExecution
__all__ = [
"ExecutionContext",
"GraphExecution",
"NodeExecution",
]

View File

@ -1,31 +0,0 @@
"""
ExecutionContext value object containing immutable execution parameters.
"""
from dataclasses import dataclass
from core.app.entities.app_invoke_entities import InvokeFrom
from models.enums import UserFrom
@dataclass(frozen=True)
class ExecutionContext:
"""
Immutable value object containing the context for a graph execution.
This encapsulates all the contextual information needed to execute a workflow,
keeping it separate from the mutable execution state.
"""
tenant_id: str
app_id: str
workflow_id: str
user_id: str
user_from: UserFrom
invoke_from: InvokeFrom
call_depth: int
def __post_init__(self) -> None:
"""Validate execution context parameters."""
if self.call_depth < 0:
raise ValueError("Call depth must be non-negative")

View File

@ -5,13 +5,13 @@ This module defines command types that can be sent to a running GraphEngine
instance to control its execution flow.
"""
from enum import Enum
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, Field
class CommandType(str, Enum):
class CommandType(StrEnum):
"""Types of commands that can be sent to GraphEngine."""
ABORT = "abort"

View File

@ -8,12 +8,11 @@ Domain-Driven Design principles for improved maintainability and testability.
import contextvars
import logging
import queue
from collections.abc import Generator, Mapping
from collections.abc import Generator
from typing import final
from flask import Flask, current_app
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities import GraphRuntimeState
from core.workflow.enums import NodeExecutionType
from core.workflow.graph import Graph
@ -27,10 +26,9 @@ from core.workflow.graph_events import (
GraphRunStartedEvent,
GraphRunSucceededEvent,
)
from models.enums import UserFrom
from .command_processing import AbortCommandHandler, CommandProcessor
from .domain import ExecutionContext, GraphExecution
from .domain import GraphExecution
from .entities.commands import AbortCommand
from .error_handler import ErrorHandler
from .event_management import EventHandler, EventManager
@ -57,15 +55,8 @@ class GraphEngine:
def __init__(
self,
tenant_id: str,
app_id: str,
workflow_id: str,
user_id: str,
user_from: UserFrom,
invoke_from: InvokeFrom,
call_depth: int,
graph: Graph,
graph_config: Mapping[str, object],
graph_runtime_state: GraphRuntimeState,
command_channel: CommandChannel,
min_workers: int | None = None,
@ -75,25 +66,12 @@ class GraphEngine:
) -> None:
"""Initialize the graph engine with all subsystems and dependencies."""
# === Domain Models ===
# Execution context encapsulates workflow execution metadata
self._execution_context = ExecutionContext(
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
user_id=user_id,
user_from=user_from,
invoke_from=invoke_from,
call_depth=call_depth,
)
# Graph execution tracks the overall execution state
self._graph_execution = GraphExecution(workflow_id=workflow_id)
# === Core Dependencies ===
# Graph structure and configuration
self._graph = graph
self._graph_config = graph_config
self._graph_runtime_state = graph_runtime_state
self._command_channel = command_channel

View File

@ -551,15 +551,8 @@ class IterationNode(Node):
# Create a new GraphEngine for this iteration
graph_engine = GraphEngine(
tenant_id=self.tenant_id,
app_id=self.app_id,
workflow_id=self.workflow_id,
user_id=self.user_id,
user_from=self.user_from,
invoke_from=self.invoke_from,
call_depth=self.workflow_call_depth,
graph=iteration_graph,
graph_config=self.graph_config,
graph_runtime_state=graph_runtime_state_copy,
command_channel=InMemoryChannel(), # Use InMemoryChannel for sub-graphs
)

View File

@ -443,15 +443,8 @@ class LoopNode(Node):
# Create a new GraphEngine for this iteration
graph_engine = GraphEngine(
tenant_id=self.tenant_id,
app_id=self.app_id,
workflow_id=self.workflow_id,
user_id=self.user_id,
user_from=self.user_from,
invoke_from=self.invoke_from,
call_depth=self.workflow_call_depth,
graph=loop_graph,
graph_config=self.graph_config,
graph_runtime_state=graph_runtime_state_copy,
command_channel=InMemoryChannel(), # Use InMemoryChannel for sub-graphs
)

View File

@ -73,15 +73,8 @@ class WorkflowEntry:
self.command_channel = command_channel
self.graph_engine = GraphEngine(
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
user_id=user_id,
user_from=user_from,
invoke_from=invoke_from,
call_depth=call_depth,
graph=graph,
graph_config=graph_config,
graph_runtime_state=graph_runtime_state,
command_channel=command_channel,
)

View File

@ -3,14 +3,12 @@
import time
from unittest.mock import MagicMock
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities import GraphRuntimeState, VariablePool
from core.workflow.graph import Graph
from core.workflow.graph_engine import GraphEngine
from core.workflow.graph_engine.command_channels import InMemoryChannel
from core.workflow.graph_engine.entities.commands import AbortCommand
from core.workflow.graph_events import GraphRunAbortedEvent, GraphRunStartedEvent
from models.enums import UserFrom
def test_abort_command():
@ -42,15 +40,8 @@ def test_abort_command():
# Create GraphEngine with same shared runtime state
engine = GraphEngine(
tenant_id="test",
app_id="test",
workflow_id="test_workflow",
user_id="test",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.WEB_APP,
call_depth=0,
graph=mock_graph,
graph_config={},
graph_runtime_state=shared_runtime_state, # Use shared instance
command_channel=command_channel,
)

View File

@ -6,7 +6,6 @@ This test validates that:
- When blocking != 1: NodeRunStreamChunkEvent present (direct LLM to End output)
"""
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.enums import NodeType
from core.workflow.graph_engine import GraphEngine
from core.workflow.graph_engine.command_channels import InMemoryChannel
@ -16,7 +15,6 @@ from core.workflow.graph_events import (
NodeRunStreamChunkEvent,
NodeRunSucceededEvent,
)
from models.enums import UserFrom
from .test_table_runner import TableTestRunner
@ -40,20 +38,10 @@ def test_streaming_output_with_blocking_equals_one():
use_mock_factory=True,
)
workflow_config = fixture_data.get("workflow", {})
graph_config = workflow_config.get("graph", {})
# Create and run the engine
engine = GraphEngine(
tenant_id="test_tenant",
app_id="test_app",
workflow_id="test_workflow",
user_id="test_user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
graph=graph,
graph_config=graph_config,
graph_runtime_state=graph_runtime_state,
command_channel=InMemoryChannel(),
)
@ -145,20 +133,10 @@ def test_streaming_output_with_blocking_not_equals_one():
use_mock_factory=True,
)
workflow_config = fixture_data.get("workflow", {})
graph_config = workflow_config.get("graph", {})
# Create and run the engine
engine = GraphEngine(
tenant_id="test_tenant",
app_id="test_app",
workflow_id="test_workflow",
user_id="test_user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
graph=graph,
graph_config=graph_config,
graph_runtime_state=graph_runtime_state,
command_channel=InMemoryChannel(),
)

View File

@ -10,11 +10,9 @@ import time
from hypothesis import HealthCheck, given, settings
from hypothesis import strategies as st
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.graph_engine import GraphEngine
from core.workflow.graph_engine.command_channels import InMemoryChannel
from core.workflow.graph_events import GraphRunStartedEvent, GraphRunSucceededEvent
from models.enums import UserFrom
# Import the test framework from the new module
from .test_table_runner import TableTestRunner, WorkflowRunner, WorkflowTestCase
@ -460,15 +458,8 @@ def test_layer_system_basic():
# Create engine with layer
engine = GraphEngine(
tenant_id="test_tenant",
app_id="test_app",
workflow_id="test_workflow",
user_id="test_user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.WEB_APP,
call_depth=0,
graph=graph,
graph_config=fixture_data.get("workflow", {}).get("graph", {}),
graph_runtime_state=graph_runtime_state,
command_channel=InMemoryChannel(),
)
@ -523,15 +514,8 @@ def test_layer_chaining():
# Create engine
engine = GraphEngine(
tenant_id="test_tenant",
app_id="test_app",
workflow_id="test_workflow",
user_id="test_user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.WEB_APP,
call_depth=0,
graph=graph,
graph_config=fixture_data.get("workflow", {}).get("graph", {}),
graph_runtime_state=graph_runtime_state,
command_channel=InMemoryChannel(),
)
@ -577,15 +561,8 @@ def test_layer_error_handling():
# Create engine with faulty layer
engine = GraphEngine(
tenant_id="test_tenant",
app_id="test_app",
workflow_id="test_workflow",
user_id="test_user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.WEB_APP,
call_depth=0,
graph=graph,
graph_config=fixture_data.get("workflow", {}).get("graph", {}),
graph_runtime_state=graph_runtime_state,
command_channel=InMemoryChannel(),
)

View File

@ -615,15 +615,8 @@ class MockIterationNode(MockNodeMixin, IterationNode):
# Create a new GraphEngine for this iteration
graph_engine = GraphEngine(
tenant_id=self.tenant_id,
app_id=self.app_id,
workflow_id=self.workflow_id,
user_id=self.user_id,
user_from=self.user_from,
invoke_from=self.invoke_from,
call_depth=self.workflow_call_depth,
graph=iteration_graph,
graph_config=self.graph_config,
graph_runtime_state=graph_runtime_state_copy,
command_channel=InMemoryChannel(), # Use InMemoryChannel for sub-graphs
)
@ -683,15 +676,8 @@ class MockLoopNode(MockNodeMixin, LoopNode):
# Create a new GraphEngine for this iteration
graph_engine = GraphEngine(
tenant_id=self.tenant_id,
app_id=self.app_id,
workflow_id=self.workflow_id,
user_id=self.user_id,
user_from=self.user_from,
invoke_from=self.invoke_from,
call_depth=self.workflow_call_depth,
graph=loop_graph,
graph_config=self.graph_config,
graph_runtime_state=graph_runtime_state_copy,
command_channel=InMemoryChannel(), # Use InMemoryChannel for sub-graphs
)

View File

@ -118,15 +118,8 @@ def test_parallel_streaming_workflow():
# Create the graph engine
engine = GraphEngine(
tenant_id="test_tenant",
app_id="test_app",
workflow_id="test_workflow",
user_id="test_user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.WEB_APP,
call_depth=0,
graph=graph,
graph_config=graph_config,
graph_runtime_state=graph_runtime_state,
command_channel=InMemoryChannel(),
)

View File

@ -19,7 +19,6 @@ from functools import lru_cache
from pathlib import Path
from typing import Any
from core.app.entities.app_invoke_entities import InvokeFrom
from core.tools.utils.yaml_utils import _load_yaml_file
from core.variables import (
ArrayNumberVariable,
@ -42,7 +41,6 @@ from core.workflow.graph_events import (
)
from core.workflow.nodes.node_factory import DifyNodeFactory
from core.workflow.system_variable import SystemVariable
from models.enums import UserFrom
from .test_mock_config import MockConfig
from .test_mock_factory import MockNodeFactory
@ -373,20 +371,10 @@ class TableTestRunner:
mock_config=test_case.mock_config,
)
workflow_config = fixture_data.get("workflow", {})
graph_config = workflow_config.get("graph", {})
# Create and run the engine with configured worker settings
engine = GraphEngine(
tenant_id="test_tenant",
app_id="test_app",
workflow_id="test_workflow",
user_id="test_user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER, # Use DEBUGGER to avoid conversation_id requirement
call_depth=0,
graph=graph,
graph_config=graph_config,
graph_runtime_state=graph_runtime_state,
command_channel=InMemoryChannel(),
min_workers=self.graph_engine_min_workers,

View File

@ -1,11 +1,9 @@
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.graph_engine import GraphEngine
from core.workflow.graph_engine.command_channels import InMemoryChannel
from core.workflow.graph_events import (
GraphRunSucceededEvent,
NodeRunStreamChunkEvent,
)
from models.enums import UserFrom
from .test_table_runner import TableTestRunner
@ -23,20 +21,10 @@ def test_tool_in_chatflow():
use_mock_factory=True,
)
workflow_config = fixture_data.get("workflow", {})
graph_config = workflow_config.get("graph", {})
# Create and run the engine
engine = GraphEngine(
tenant_id="test_tenant",
app_id="test_app",
workflow_id="test_workflow",
user_id="test_user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
graph=graph,
graph_config=graph_config,
graph_runtime_state=graph_runtime_state,
command_channel=InMemoryChannel(),
)