mirror of https://github.com/langgenius/dify.git
refactor(graph_engine): Move GraphStateManager to single file package.
Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
parent
d52621fce3
commit
f17c71e08a
|
|
@ -32,8 +32,8 @@ from ..response_coordinator import ResponseStreamCoordinator
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from ..error_handling import ErrorHandler
|
||||
from ..graph_state_manager import GraphStateManager
|
||||
from ..graph_traversal import EdgeProcessor
|
||||
from ..state_management import UnifiedStateManager
|
||||
from .event_manager import EventManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -56,7 +56,7 @@ class EventHandler:
|
|||
response_coordinator: ResponseStreamCoordinator,
|
||||
event_collector: "EventManager",
|
||||
edge_processor: "EdgeProcessor",
|
||||
state_manager: "UnifiedStateManager",
|
||||
state_manager: "GraphStateManager",
|
||||
error_handler: "ErrorHandler",
|
||||
) -> None:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -33,12 +33,12 @@ from .domain import ExecutionContext, GraphExecution
|
|||
from .entities.commands import AbortCommand
|
||||
from .error_handling import ErrorHandler
|
||||
from .event_management import EventHandler, EventManager
|
||||
from .graph_state_manager import GraphStateManager
|
||||
from .graph_traversal import EdgeProcessor, SkipPropagator
|
||||
from .layers.base import GraphEngineLayer
|
||||
from .orchestration import Dispatcher, ExecutionCoordinator
|
||||
from .protocols.command_channel import CommandChannel
|
||||
from .response_coordinator import ResponseStreamCoordinator
|
||||
from .state_management import UnifiedStateManager
|
||||
from .worker_management import WorkerPool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -110,7 +110,7 @@ class GraphEngine:
|
|||
|
||||
# === State Management ===
|
||||
# Unified state manager handles all node state transitions and queue operations
|
||||
self._state_manager = UnifiedStateManager(self._graph, self._ready_queue)
|
||||
self._state_manager = GraphStateManager(self._graph, self._ready_queue)
|
||||
|
||||
# === Response Coordination ===
|
||||
# Coordinates response streaming from response nodes
|
||||
|
|
|
|||
|
|
@ -1,8 +1,5 @@
|
|||
"""
|
||||
Unified state manager that combines node, edge, and execution tracking.
|
||||
|
||||
This is a proposed simplification that merges NodeStateManager, EdgeStateManager,
|
||||
and ExecutionTracker into a single cohesive class.
|
||||
Graph state manager that combines node, edge, and execution tracking.
|
||||
"""
|
||||
|
||||
import queue
|
||||
|
|
@ -23,24 +20,10 @@ class EdgeStateAnalysis(TypedDict):
|
|||
|
||||
|
||||
@final
|
||||
class UnifiedStateManager:
|
||||
"""
|
||||
Unified manager for all graph state operations.
|
||||
|
||||
This class combines the responsibilities of:
|
||||
- NodeStateManager: Node state transitions and ready queue
|
||||
- EdgeStateManager: Edge state transitions and analysis
|
||||
- ExecutionTracker: Tracking executing nodes
|
||||
|
||||
Benefits:
|
||||
- Single lock for all state operations (reduced contention)
|
||||
- Cohesive state management interface
|
||||
- Simplified dependency injection
|
||||
"""
|
||||
|
||||
class GraphStateManager:
|
||||
def __init__(self, graph: Graph, ready_queue: queue.Queue[str]) -> None:
|
||||
"""
|
||||
Initialize the unified state manager.
|
||||
Initialize the state manager.
|
||||
|
||||
Args:
|
||||
graph: The workflow graph
|
||||
|
|
@ -9,8 +9,8 @@ from core.workflow.enums import NodeExecutionType
|
|||
from core.workflow.graph import Edge, Graph
|
||||
from core.workflow.graph_events import NodeRunStreamChunkEvent
|
||||
|
||||
from ..graph_state_manager import GraphStateManager
|
||||
from ..response_coordinator import ResponseStreamCoordinator
|
||||
from ..state_management import UnifiedStateManager
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .skip_propagator import SkipPropagator
|
||||
|
|
@ -29,7 +29,7 @@ class EdgeProcessor:
|
|||
def __init__(
|
||||
self,
|
||||
graph: Graph,
|
||||
state_manager: UnifiedStateManager,
|
||||
state_manager: GraphStateManager,
|
||||
response_coordinator: ResponseStreamCoordinator,
|
||||
skip_propagator: "SkipPropagator",
|
||||
) -> None:
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ from typing import final
|
|||
|
||||
from core.workflow.graph import Edge, Graph
|
||||
|
||||
from ..state_management import UnifiedStateManager
|
||||
from ..graph_state_manager import GraphStateManager
|
||||
|
||||
|
||||
@final
|
||||
|
|
@ -22,7 +22,7 @@ class SkipPropagator:
|
|||
def __init__(
|
||||
self,
|
||||
graph: Graph,
|
||||
state_manager: UnifiedStateManager,
|
||||
state_manager: GraphStateManager,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the skip propagator.
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, final
|
|||
from ..command_processing import CommandProcessor
|
||||
from ..domain import GraphExecution
|
||||
from ..event_management import EventManager
|
||||
from ..state_management import UnifiedStateManager
|
||||
from ..graph_state_manager import GraphStateManager
|
||||
from ..worker_management import WorkerPool
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -26,7 +26,7 @@ class ExecutionCoordinator:
|
|||
def __init__(
|
||||
self,
|
||||
graph_execution: GraphExecution,
|
||||
state_manager: UnifiedStateManager,
|
||||
state_manager: GraphStateManager,
|
||||
event_handler: "EventHandler",
|
||||
event_collector: EventManager,
|
||||
command_processor: CommandProcessor,
|
||||
|
|
|
|||
|
|
@ -1,12 +0,0 @@
|
|||
"""
|
||||
State management subsystem for graph engine.
|
||||
|
||||
This package manages node states, edge states, and execution tracking
|
||||
during workflow graph execution.
|
||||
"""
|
||||
|
||||
from .unified_state_manager import UnifiedStateManager
|
||||
|
||||
__all__ = [
|
||||
"UnifiedStateManager",
|
||||
]
|
||||
Loading…
Reference in New Issue