refactor(graph_engine): Merge event_collector and event_emitter into event_manager

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN- 2025-09-01 13:15:58 +08:00
parent bb5d52539c
commit 8433cf4437
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
7 changed files with 67 additions and 114 deletions

View File

@ -5,12 +5,10 @@ This package handles event routing, collection, and emission for
workflow graph execution events. workflow graph execution events.
""" """
from .event_collector import EventCollector from .event_handlers import EventHandler
from .event_emitter import EventEmitter from .event_manager import EventManager
from .event_handlers import EventHandlerRegistry
__all__ = [ __all__ = [
"EventCollector", "EventHandler",
"EventEmitter", "EventManager",
"EventHandlerRegistry",
] ]

View File

@ -1,58 +0,0 @@
"""
Event emitter for yielding events to external consumers.
"""
import threading
import time
from collections.abc import Generator
from typing import final
from core.workflow.graph_events import GraphEngineEvent
from .event_collector import EventCollector
@final
class EventEmitter:
"""
Emits collected events as a generator for external consumption.
This provides a generator interface for yielding events as they're
collected, with proper synchronization for multi-threaded access.
"""
def __init__(self, event_collector: EventCollector) -> None:
"""
Initialize the event emitter.
Args:
event_collector: The collector to emit events from
"""
self._event_collector = event_collector
self._execution_complete = threading.Event()
def mark_complete(self) -> None:
"""Mark execution as complete to stop the generator."""
self._execution_complete.set()
def emit_events(self) -> Generator[GraphEngineEvent, None, None]:
"""
Generator that yields events as they're collected.
Yields:
GraphEngineEvent instances as they're processed
"""
yielded_count = 0
while not self._execution_complete.is_set() or yielded_count < self._event_collector.event_count():
# Get new events since last yield
new_events = self._event_collector.get_new_events(yielded_count)
# Yield any new events
for event in new_events:
yield event
yielded_count += 1
# Small sleep to avoid busy waiting
if not self._execution_complete.is_set() and not new_events:
time.sleep(0.001)

View File

@ -33,13 +33,13 @@ if TYPE_CHECKING:
from ..error_handling import ErrorHandler from ..error_handling import ErrorHandler
from ..graph_traversal import EdgeProcessor from ..graph_traversal import EdgeProcessor
from ..state_management import UnifiedStateManager from ..state_management import UnifiedStateManager
from .event_collector import EventCollector from .event_manager import EventManager
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@final @final
class EventHandlerRegistry: class EventHandler:
""" """
Registry of event handlers for different event types. Registry of event handlers for different event types.
@ -53,7 +53,7 @@ class EventHandlerRegistry:
graph_runtime_state: GraphRuntimeState, graph_runtime_state: GraphRuntimeState,
graph_execution: GraphExecution, graph_execution: GraphExecution,
response_coordinator: ResponseStreamCoordinator, response_coordinator: ResponseStreamCoordinator,
event_collector: "EventCollector", event_collector: "EventManager",
edge_processor: "EdgeProcessor", edge_processor: "EdgeProcessor",
state_manager: "UnifiedStateManager", state_manager: "UnifiedStateManager",
error_handler: "ErrorHandler", error_handler: "ErrorHandler",
@ -66,7 +66,7 @@ class EventHandlerRegistry:
graph_runtime_state: Runtime state with variable pool graph_runtime_state: Runtime state with variable pool
graph_execution: Graph execution aggregate graph_execution: Graph execution aggregate
response_coordinator: Response stream coordinator response_coordinator: Response stream coordinator
event_collector: Event collector for collecting events event_collector: Event manager for collecting events
edge_processor: Edge processor for edge traversal edge_processor: Edge processor for edge traversal
state_manager: Unified state manager state_manager: Unified state manager
error_handler: Error handler error_handler: Error handler

View File

@ -1,8 +1,10 @@
""" """
Event collector for buffering and managing events. Unified event manager for collecting and emitting events.
""" """
import threading import threading
import time
from collections.abc import Generator
from typing import final from typing import final
from core.workflow.graph_events import GraphEngineEvent from core.workflow.graph_events import GraphEngineEvent
@ -89,19 +91,21 @@ class WriteLockContext:
@final @final
class EventCollector: class EventManager:
""" """
Collects and buffers events for later retrieval. Unified event manager that collects, buffers, and emits events.
This provides thread-safe event collection with support for This class combines event collection with event emission, providing
notifying layers about events as they're collected. thread-safe event management with support for notifying layers and
streaming events to external consumers.
""" """
def __init__(self) -> None: def __init__(self) -> None:
"""Initialize the event collector.""" """Initialize the event manager."""
self._events: list[GraphEngineEvent] = [] self._events: list[GraphEngineEvent] = []
self._lock = ReadWriteLock() self._lock = ReadWriteLock()
self._layers: list[Layer] = [] self._layers: list[Layer] = []
self._execution_complete = threading.Event()
def set_layers(self, layers: list[Layer]) -> None: def set_layers(self, layers: list[Layer]) -> None:
""" """
@ -123,17 +127,7 @@ class EventCollector:
self._events.append(event) self._events.append(event)
self._notify_layers(event) self._notify_layers(event)
def get_events(self) -> list[GraphEngineEvent]: def _get_new_events(self, start_index: int) -> list[GraphEngineEvent]:
"""
Get all collected events.
Returns:
List of collected events
"""
with self._lock.read_lock():
return list(self._events)
def get_new_events(self, start_index: int) -> list[GraphEngineEvent]:
""" """
Get new events starting from a specific index. Get new events starting from a specific index.
@ -146,7 +140,7 @@ class EventCollector:
with self._lock.read_lock(): with self._lock.read_lock():
return list(self._events[start_index:]) return list(self._events[start_index:])
def event_count(self) -> int: def _event_count(self) -> int:
""" """
Get the current count of collected events. Get the current count of collected events.
@ -156,10 +150,31 @@ class EventCollector:
with self._lock.read_lock(): with self._lock.read_lock():
return len(self._events) return len(self._events)
def clear(self) -> None: def mark_complete(self) -> None:
"""Clear all collected events.""" """Mark execution as complete to stop the event emission generator."""
with self._lock.write_lock(): self._execution_complete.set()
self._events.clear()
def emit_events(self) -> Generator[GraphEngineEvent, None, None]:
"""
Generator that yields events as they're collected.
Yields:
GraphEngineEvent instances as they're processed
"""
yielded_count = 0
while not self._execution_complete.is_set() or yielded_count < self._event_count():
# Get new events since last yield
new_events = self._get_new_events(yielded_count)
# Yield any new events
for event in new_events:
yield event
yielded_count += 1
# Small sleep to avoid busy waiting
if not self._execution_complete.is_set() and not new_events:
time.sleep(0.001)
def _notify_layers(self, event: GraphEngineEvent) -> None: def _notify_layers(self, event: GraphEngineEvent) -> None:
""" """

View File

@ -31,7 +31,7 @@ from .command_processing import AbortCommandHandler, CommandProcessor
from .domain import ExecutionContext, GraphExecution from .domain import ExecutionContext, GraphExecution
from .entities.commands import AbortCommand from .entities.commands import AbortCommand
from .error_handling import ErrorHandler from .error_handling import ErrorHandler
from .event_management import EventCollector, EventEmitter, EventHandlerRegistry from .event_management import EventHandler, EventManager
from .graph_traversal import EdgeProcessor, SkipPropagator from .graph_traversal import EdgeProcessor, SkipPropagator
from .layers.base import Layer from .layers.base import Layer
from .orchestration import Dispatcher, ExecutionCoordinator from .orchestration import Dispatcher, ExecutionCoordinator
@ -122,10 +122,8 @@ class GraphEngine:
) )
# === Event Management === # === Event Management ===
# Event collector aggregates events from all subsystems # Event manager handles both collection and emission of events
self._event_collector = EventCollector() self._event_manager = EventManager()
# Event emitter streams collected events to consumers
self._event_emitter = EventEmitter(self._event_collector)
# === Error Handling === # === Error Handling ===
# Centralized error handler for graph execution errors # Centralized error handler for graph execution errors
@ -149,12 +147,12 @@ class GraphEngine:
# === Event Handler Registry === # === Event Handler Registry ===
# Central registry for handling all node execution events # Central registry for handling all node execution events
self._event_handler_registry = EventHandlerRegistry( self._event_handler_registry = EventHandler(
graph=self._graph, graph=self._graph,
graph_runtime_state=self._graph_runtime_state, graph_runtime_state=self._graph_runtime_state,
graph_execution=self._graph_execution, graph_execution=self._graph_execution,
response_coordinator=self._response_coordinator, response_coordinator=self._response_coordinator,
event_collector=self._event_collector, event_collector=self._event_manager,
edge_processor=self._edge_processor, edge_processor=self._edge_processor,
state_manager=self._state_manager, state_manager=self._state_manager,
error_handler=self._error_handler, error_handler=self._error_handler,
@ -206,7 +204,7 @@ class GraphEngine:
graph_execution=self._graph_execution, graph_execution=self._graph_execution,
state_manager=self._state_manager, state_manager=self._state_manager,
event_handler=self._event_handler_registry, event_handler=self._event_handler_registry,
event_collector=self._event_collector, event_collector=self._event_manager,
command_processor=self._command_processor, command_processor=self._command_processor,
worker_pool=self._worker_pool, worker_pool=self._worker_pool,
) )
@ -215,10 +213,10 @@ class GraphEngine:
self._dispatcher = Dispatcher( self._dispatcher = Dispatcher(
event_queue=self._event_queue, event_queue=self._event_queue,
event_handler=self._event_handler_registry, event_handler=self._event_handler_registry,
event_collector=self._event_collector, event_collector=self._event_manager,
execution_coordinator=self._execution_coordinator, execution_coordinator=self._execution_coordinator,
max_execution_time=self._execution_context.max_execution_time, max_execution_time=self._execution_context.max_execution_time,
event_emitter=self._event_emitter, event_emitter=self._event_manager,
) )
# === Extensibility === # === Extensibility ===
@ -261,7 +259,7 @@ class GraphEngine:
self._start_execution() self._start_execution()
# Yield events as they occur # Yield events as they occur
yield from self._event_emitter.emit_events() yield from self._event_manager.emit_events()
# Handle completion # Handle completion
if self._graph_execution.aborted: if self._graph_execution.aborted:
@ -289,7 +287,7 @@ class GraphEngine:
def _initialize_layers(self) -> None: def _initialize_layers(self) -> None:
"""Initialize layers with context.""" """Initialize layers with context."""
self._event_collector.set_layers(self._layers) self._event_manager.set_layers(self._layers)
for layer in self._layers: for layer in self._layers:
try: try:
layer.initialize(self._graph_runtime_state, self._command_channel) layer.initialize(self._graph_runtime_state, self._command_channel)

View File

@ -10,11 +10,11 @@ from typing import TYPE_CHECKING, final
from core.workflow.graph_events.base import GraphNodeEventBase from core.workflow.graph_events.base import GraphNodeEventBase
from ..event_management import EventCollector, EventEmitter from ..event_management import EventManager
from .execution_coordinator import ExecutionCoordinator from .execution_coordinator import ExecutionCoordinator
if TYPE_CHECKING: if TYPE_CHECKING:
from ..event_management import EventHandlerRegistry from ..event_management import EventHandler
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -31,11 +31,11 @@ class Dispatcher:
def __init__( def __init__(
self, self,
event_queue: queue.Queue[GraphNodeEventBase], event_queue: queue.Queue[GraphNodeEventBase],
event_handler: "EventHandlerRegistry", event_handler: "EventHandler",
event_collector: EventCollector, event_collector: EventManager,
execution_coordinator: ExecutionCoordinator, execution_coordinator: ExecutionCoordinator,
max_execution_time: int, max_execution_time: int,
event_emitter: EventEmitter | None = None, event_emitter: EventManager | None = None,
) -> None: ) -> None:
""" """
Initialize the dispatcher. Initialize the dispatcher.
@ -43,10 +43,10 @@ class Dispatcher:
Args: Args:
event_queue: Queue of events from workers event_queue: Queue of events from workers
event_handler: Event handler registry for processing events event_handler: Event handler registry for processing events
event_collector: Event collector for collecting unhandled events event_collector: Event manager for collecting unhandled events
execution_coordinator: Coordinator for execution flow execution_coordinator: Coordinator for execution flow
max_execution_time: Maximum execution time in seconds max_execution_time: Maximum execution time in seconds
event_emitter: Optional event emitter to signal completion event_emitter: Optional event manager to signal completion
""" """
self._event_queue = event_queue self._event_queue = event_queue
self._event_handler = event_handler self._event_handler = event_handler

View File

@ -6,12 +6,12 @@ from typing import TYPE_CHECKING, final
from ..command_processing import CommandProcessor from ..command_processing import CommandProcessor
from ..domain import GraphExecution from ..domain import GraphExecution
from ..event_management import EventCollector from ..event_management import EventManager
from ..state_management import UnifiedStateManager from ..state_management import UnifiedStateManager
from ..worker_management import SimpleWorkerPool from ..worker_management import SimpleWorkerPool
if TYPE_CHECKING: if TYPE_CHECKING:
from ..event_management import EventHandlerRegistry from ..event_management import EventHandler
@final @final
@ -27,8 +27,8 @@ class ExecutionCoordinator:
self, self,
graph_execution: GraphExecution, graph_execution: GraphExecution,
state_manager: UnifiedStateManager, state_manager: UnifiedStateManager,
event_handler: "EventHandlerRegistry", event_handler: "EventHandler",
event_collector: EventCollector, event_collector: EventManager,
command_processor: CommandProcessor, command_processor: CommandProcessor,
worker_pool: SimpleWorkerPool, worker_pool: SimpleWorkerPool,
) -> None: ) -> None:
@ -39,7 +39,7 @@ class ExecutionCoordinator:
graph_execution: Graph execution aggregate graph_execution: Graph execution aggregate
state_manager: Unified state manager state_manager: Unified state manager
event_handler: Event handler registry for processing events event_handler: Event handler registry for processing events
event_collector: Event collector for collecting events event_collector: Event manager for collecting events
command_processor: Processor for commands command_processor: Processor for commands
worker_pool: Pool of workers worker_pool: Pool of workers
""" """