diff --git a/api/core/workflow/graph_engine/event_management/event_handlers.py b/api/core/workflow/graph_engine/event_management/event_handlers.py index 3ab69776a4..10e7d421af 100644 --- a/api/core/workflow/graph_engine/event_management/event_handlers.py +++ b/api/core/workflow/graph_engine/event_management/event_handlers.py @@ -32,8 +32,8 @@ from ..response_coordinator import ResponseStreamCoordinator if TYPE_CHECKING: from ..error_handling import ErrorHandler + from ..graph_state_manager import GraphStateManager from ..graph_traversal import EdgeProcessor - from ..state_management import UnifiedStateManager from .event_manager import EventManager logger = logging.getLogger(__name__) @@ -56,7 +56,7 @@ class EventHandler: response_coordinator: ResponseStreamCoordinator, event_collector: "EventManager", edge_processor: "EdgeProcessor", - state_manager: "UnifiedStateManager", + state_manager: "GraphStateManager", error_handler: "ErrorHandler", ) -> None: """ diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index b6563058bc..019a1aaade 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -33,12 +33,12 @@ from .domain import ExecutionContext, GraphExecution from .entities.commands import AbortCommand from .error_handling import ErrorHandler from .event_management import EventHandler, EventManager +from .graph_state_manager import GraphStateManager from .graph_traversal import EdgeProcessor, SkipPropagator from .layers.base import GraphEngineLayer from .orchestration import Dispatcher, ExecutionCoordinator from .protocols.command_channel import CommandChannel from .response_coordinator import ResponseStreamCoordinator -from .state_management import UnifiedStateManager from .worker_management import WorkerPool logger = logging.getLogger(__name__) @@ -110,7 +110,7 @@ class GraphEngine: # === State Management === # Unified state manager handles all node state transitions and queue operations - self._state_manager = UnifiedStateManager(self._graph, self._ready_queue) + self._state_manager = GraphStateManager(self._graph, self._ready_queue) # === Response Coordination === # Coordinates response streaming from response nodes diff --git a/api/core/workflow/graph_engine/state_management/unified_state_manager.py b/api/core/workflow/graph_engine/graph_state_manager.py similarity index 91% rename from api/core/workflow/graph_engine/state_management/unified_state_manager.py rename to api/core/workflow/graph_engine/graph_state_manager.py index 258b84c341..efc3992ac9 100644 --- a/api/core/workflow/graph_engine/state_management/unified_state_manager.py +++ b/api/core/workflow/graph_engine/graph_state_manager.py @@ -1,8 +1,5 @@ """ -Unified state manager that combines node, edge, and execution tracking. - -This is a proposed simplification that merges NodeStateManager, EdgeStateManager, -and ExecutionTracker into a single cohesive class. +Graph state manager that combines node, edge, and execution tracking. """ import queue @@ -23,24 +20,10 @@ class EdgeStateAnalysis(TypedDict): @final -class UnifiedStateManager: - """ - Unified manager for all graph state operations. - - This class combines the responsibilities of: - - NodeStateManager: Node state transitions and ready queue - - EdgeStateManager: Edge state transitions and analysis - - ExecutionTracker: Tracking executing nodes - - Benefits: - - Single lock for all state operations (reduced contention) - - Cohesive state management interface - - Simplified dependency injection - """ - +class GraphStateManager: def __init__(self, graph: Graph, ready_queue: queue.Queue[str]) -> None: """ - Initialize the unified state manager. + Initialize the state manager. Args: graph: The workflow graph diff --git a/api/core/workflow/graph_engine/graph_traversal/edge_processor.py b/api/core/workflow/graph_engine/graph_traversal/edge_processor.py index c5634ed984..9bd0f86fbf 100644 --- a/api/core/workflow/graph_engine/graph_traversal/edge_processor.py +++ b/api/core/workflow/graph_engine/graph_traversal/edge_processor.py @@ -9,8 +9,8 @@ from core.workflow.enums import NodeExecutionType from core.workflow.graph import Edge, Graph from core.workflow.graph_events import NodeRunStreamChunkEvent +from ..graph_state_manager import GraphStateManager from ..response_coordinator import ResponseStreamCoordinator -from ..state_management import UnifiedStateManager if TYPE_CHECKING: from .skip_propagator import SkipPropagator @@ -29,7 +29,7 @@ class EdgeProcessor: def __init__( self, graph: Graph, - state_manager: UnifiedStateManager, + state_manager: GraphStateManager, response_coordinator: ResponseStreamCoordinator, skip_propagator: "SkipPropagator", ) -> None: diff --git a/api/core/workflow/graph_engine/graph_traversal/skip_propagator.py b/api/core/workflow/graph_engine/graph_traversal/skip_propagator.py index 51ab3c6739..78f8ecdcdf 100644 --- a/api/core/workflow/graph_engine/graph_traversal/skip_propagator.py +++ b/api/core/workflow/graph_engine/graph_traversal/skip_propagator.py @@ -7,7 +7,7 @@ from typing import final from core.workflow.graph import Edge, Graph -from ..state_management import UnifiedStateManager +from ..graph_state_manager import GraphStateManager @final @@ -22,7 +22,7 @@ class SkipPropagator: def __init__( self, graph: Graph, - state_manager: UnifiedStateManager, + state_manager: GraphStateManager, ) -> None: """ Initialize the skip propagator. diff --git a/api/core/workflow/graph_engine/orchestration/execution_coordinator.py b/api/core/workflow/graph_engine/orchestration/execution_coordinator.py index 234a3607c3..b35e8bb6d8 100644 --- a/api/core/workflow/graph_engine/orchestration/execution_coordinator.py +++ b/api/core/workflow/graph_engine/orchestration/execution_coordinator.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, final from ..command_processing import CommandProcessor from ..domain import GraphExecution from ..event_management import EventManager -from ..state_management import UnifiedStateManager +from ..graph_state_manager import GraphStateManager from ..worker_management import WorkerPool if TYPE_CHECKING: @@ -26,7 +26,7 @@ class ExecutionCoordinator: def __init__( self, graph_execution: GraphExecution, - state_manager: UnifiedStateManager, + state_manager: GraphStateManager, event_handler: "EventHandler", event_collector: EventManager, command_processor: CommandProcessor, diff --git a/api/core/workflow/graph_engine/state_management/__init__.py b/api/core/workflow/graph_engine/state_management/__init__.py deleted file mode 100644 index 9a632a3b9f..0000000000 --- a/api/core/workflow/graph_engine/state_management/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -""" -State management subsystem for graph engine. - -This package manages node states, edge states, and execution tracking -during workflow graph execution. -""" - -from .unified_state_manager import UnifiedStateManager - -__all__ = [ - "UnifiedStateManager", -]