diff --git a/api/core/workflow/graph_engine/event_management/__init__.py b/api/core/workflow/graph_engine/event_management/__init__.py index 90c37aa195..f6c3c0f753 100644 --- a/api/core/workflow/graph_engine/event_management/__init__.py +++ b/api/core/workflow/graph_engine/event_management/__init__.py @@ -5,12 +5,10 @@ This package handles event routing, collection, and emission for workflow graph execution events. """ -from .event_collector import EventCollector -from .event_emitter import EventEmitter -from .event_handlers import EventHandlerRegistry +from .event_handlers import EventHandler +from .event_manager import EventManager __all__ = [ - "EventCollector", - "EventEmitter", - "EventHandlerRegistry", + "EventHandler", + "EventManager", ] diff --git a/api/core/workflow/graph_engine/event_management/event_emitter.py b/api/core/workflow/graph_engine/event_management/event_emitter.py deleted file mode 100644 index 660ab2d1ce..0000000000 --- a/api/core/workflow/graph_engine/event_management/event_emitter.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -Event emitter for yielding events to external consumers. -""" - -import threading -import time -from collections.abc import Generator -from typing import final - -from core.workflow.graph_events import GraphEngineEvent - -from .event_collector import EventCollector - - -@final -class EventEmitter: - """ - Emits collected events as a generator for external consumption. - - This provides a generator interface for yielding events as they're - collected, with proper synchronization for multi-threaded access. - """ - - def __init__(self, event_collector: EventCollector) -> None: - """ - Initialize the event emitter. - - Args: - event_collector: The collector to emit events from - """ - self._event_collector = event_collector - self._execution_complete = threading.Event() - - def mark_complete(self) -> None: - """Mark execution as complete to stop the generator.""" - self._execution_complete.set() - - def emit_events(self) -> Generator[GraphEngineEvent, None, None]: - """ - Generator that yields events as they're collected. - - Yields: - GraphEngineEvent instances as they're processed - """ - yielded_count = 0 - - while not self._execution_complete.is_set() or yielded_count < self._event_collector.event_count(): - # Get new events since last yield - new_events = self._event_collector.get_new_events(yielded_count) - - # Yield any new events - for event in new_events: - yield event - yielded_count += 1 - - # Small sleep to avoid busy waiting - if not self._execution_complete.is_set() and not new_events: - time.sleep(0.001) diff --git a/api/core/workflow/graph_engine/event_management/event_handlers.py b/api/core/workflow/graph_engine/event_management/event_handlers.py index c3c38ee3eb..50b415d026 100644 --- a/api/core/workflow/graph_engine/event_management/event_handlers.py +++ b/api/core/workflow/graph_engine/event_management/event_handlers.py @@ -33,13 +33,13 @@ if TYPE_CHECKING: from ..error_handling import ErrorHandler from ..graph_traversal import EdgeProcessor from ..state_management import UnifiedStateManager - from .event_collector import EventCollector + from .event_manager import EventManager logger = logging.getLogger(__name__) @final -class EventHandlerRegistry: +class EventHandler: """ Registry of event handlers for different event types. @@ -53,7 +53,7 @@ class EventHandlerRegistry: graph_runtime_state: GraphRuntimeState, graph_execution: GraphExecution, response_coordinator: ResponseStreamCoordinator, - event_collector: "EventCollector", + event_collector: "EventManager", edge_processor: "EdgeProcessor", state_manager: "UnifiedStateManager", error_handler: "ErrorHandler", @@ -66,7 +66,7 @@ class EventHandlerRegistry: graph_runtime_state: Runtime state with variable pool graph_execution: Graph execution aggregate response_coordinator: Response stream coordinator - event_collector: Event collector for collecting events + event_collector: Event manager for collecting events edge_processor: Edge processor for edge traversal state_manager: Unified state manager error_handler: Error handler diff --git a/api/core/workflow/graph_engine/event_management/event_collector.py b/api/core/workflow/graph_engine/event_management/event_manager.py similarity index 73% rename from api/core/workflow/graph_engine/event_management/event_collector.py rename to api/core/workflow/graph_engine/event_management/event_manager.py index 683a23c928..d34f4e032b 100644 --- a/api/core/workflow/graph_engine/event_management/event_collector.py +++ b/api/core/workflow/graph_engine/event_management/event_manager.py @@ -1,8 +1,10 @@ """ -Event collector for buffering and managing events. +Unified event manager for collecting and emitting events. """ import threading +import time +from collections.abc import Generator from typing import final from core.workflow.graph_events import GraphEngineEvent @@ -89,19 +91,21 @@ class WriteLockContext: @final -class EventCollector: +class EventManager: """ - Collects and buffers events for later retrieval. + Unified event manager that collects, buffers, and emits events. - This provides thread-safe event collection with support for - notifying layers about events as they're collected. + This class combines event collection with event emission, providing + thread-safe event management with support for notifying layers and + streaming events to external consumers. """ def __init__(self) -> None: - """Initialize the event collector.""" + """Initialize the event manager.""" self._events: list[GraphEngineEvent] = [] self._lock = ReadWriteLock() self._layers: list[Layer] = [] + self._execution_complete = threading.Event() def set_layers(self, layers: list[Layer]) -> None: """ @@ -123,17 +127,7 @@ class EventCollector: self._events.append(event) self._notify_layers(event) - def get_events(self) -> list[GraphEngineEvent]: - """ - Get all collected events. - - Returns: - List of collected events - """ - with self._lock.read_lock(): - return list(self._events) - - def get_new_events(self, start_index: int) -> list[GraphEngineEvent]: + def _get_new_events(self, start_index: int) -> list[GraphEngineEvent]: """ Get new events starting from a specific index. @@ -146,7 +140,7 @@ class EventCollector: with self._lock.read_lock(): return list(self._events[start_index:]) - def event_count(self) -> int: + def _event_count(self) -> int: """ Get the current count of collected events. @@ -156,10 +150,31 @@ class EventCollector: with self._lock.read_lock(): return len(self._events) - def clear(self) -> None: - """Clear all collected events.""" - with self._lock.write_lock(): - self._events.clear() + def mark_complete(self) -> None: + """Mark execution as complete to stop the event emission generator.""" + self._execution_complete.set() + + def emit_events(self) -> Generator[GraphEngineEvent, None, None]: + """ + Generator that yields events as they're collected. + + Yields: + GraphEngineEvent instances as they're processed + """ + yielded_count = 0 + + while not self._execution_complete.is_set() or yielded_count < self._event_count(): + # Get new events since last yield + new_events = self._get_new_events(yielded_count) + + # Yield any new events + for event in new_events: + yield event + yielded_count += 1 + + # Small sleep to avoid busy waiting + if not self._execution_complete.is_set() and not new_events: + time.sleep(0.001) def _notify_layers(self, event: GraphEngineEvent) -> None: """ diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index be6fd4f63f..833cee0ffe 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -31,7 +31,7 @@ from .command_processing import AbortCommandHandler, CommandProcessor from .domain import ExecutionContext, GraphExecution from .entities.commands import AbortCommand from .error_handling import ErrorHandler -from .event_management import EventCollector, EventEmitter, EventHandlerRegistry +from .event_management import EventHandler, EventManager from .graph_traversal import EdgeProcessor, SkipPropagator from .layers.base import Layer from .orchestration import Dispatcher, ExecutionCoordinator @@ -122,10 +122,8 @@ class GraphEngine: ) # === Event Management === - # Event collector aggregates events from all subsystems - self._event_collector = EventCollector() - # Event emitter streams collected events to consumers - self._event_emitter = EventEmitter(self._event_collector) + # Event manager handles both collection and emission of events + self._event_manager = EventManager() # === Error Handling === # Centralized error handler for graph execution errors @@ -149,12 +147,12 @@ class GraphEngine: # === Event Handler Registry === # Central registry for handling all node execution events - self._event_handler_registry = EventHandlerRegistry( + self._event_handler_registry = EventHandler( graph=self._graph, graph_runtime_state=self._graph_runtime_state, graph_execution=self._graph_execution, response_coordinator=self._response_coordinator, - event_collector=self._event_collector, + event_collector=self._event_manager, edge_processor=self._edge_processor, state_manager=self._state_manager, error_handler=self._error_handler, @@ -206,7 +204,7 @@ class GraphEngine: graph_execution=self._graph_execution, state_manager=self._state_manager, event_handler=self._event_handler_registry, - event_collector=self._event_collector, + event_collector=self._event_manager, command_processor=self._command_processor, worker_pool=self._worker_pool, ) @@ -215,10 +213,10 @@ class GraphEngine: self._dispatcher = Dispatcher( event_queue=self._event_queue, event_handler=self._event_handler_registry, - event_collector=self._event_collector, + event_collector=self._event_manager, execution_coordinator=self._execution_coordinator, max_execution_time=self._execution_context.max_execution_time, - event_emitter=self._event_emitter, + event_emitter=self._event_manager, ) # === Extensibility === @@ -261,7 +259,7 @@ class GraphEngine: self._start_execution() # Yield events as they occur - yield from self._event_emitter.emit_events() + yield from self._event_manager.emit_events() # Handle completion if self._graph_execution.aborted: @@ -289,7 +287,7 @@ class GraphEngine: def _initialize_layers(self) -> None: """Initialize layers with context.""" - self._event_collector.set_layers(self._layers) + self._event_manager.set_layers(self._layers) for layer in self._layers: try: layer.initialize(self._graph_runtime_state, self._command_channel) diff --git a/api/core/workflow/graph_engine/orchestration/dispatcher.py b/api/core/workflow/graph_engine/orchestration/dispatcher.py index 5ae1c3bbbe..80f744c941 100644 --- a/api/core/workflow/graph_engine/orchestration/dispatcher.py +++ b/api/core/workflow/graph_engine/orchestration/dispatcher.py @@ -10,11 +10,11 @@ from typing import TYPE_CHECKING, final from core.workflow.graph_events.base import GraphNodeEventBase -from ..event_management import EventCollector, EventEmitter +from ..event_management import EventManager from .execution_coordinator import ExecutionCoordinator if TYPE_CHECKING: - from ..event_management import EventHandlerRegistry + from ..event_management import EventHandler logger = logging.getLogger(__name__) @@ -31,11 +31,11 @@ class Dispatcher: def __init__( self, event_queue: queue.Queue[GraphNodeEventBase], - event_handler: "EventHandlerRegistry", - event_collector: EventCollector, + event_handler: "EventHandler", + event_collector: EventManager, execution_coordinator: ExecutionCoordinator, max_execution_time: int, - event_emitter: EventEmitter | None = None, + event_emitter: EventManager | None = None, ) -> None: """ Initialize the dispatcher. @@ -43,10 +43,10 @@ class Dispatcher: Args: event_queue: Queue of events from workers event_handler: Event handler registry for processing events - event_collector: Event collector for collecting unhandled events + event_collector: Event manager for collecting unhandled events execution_coordinator: Coordinator for execution flow max_execution_time: Maximum execution time in seconds - event_emitter: Optional event emitter to signal completion + event_emitter: Optional event manager to signal completion """ self._event_queue = event_queue self._event_handler = event_handler diff --git a/api/core/workflow/graph_engine/orchestration/execution_coordinator.py b/api/core/workflow/graph_engine/orchestration/execution_coordinator.py index 63e512f7b3..3dd443ddb3 100644 --- a/api/core/workflow/graph_engine/orchestration/execution_coordinator.py +++ b/api/core/workflow/graph_engine/orchestration/execution_coordinator.py @@ -6,12 +6,12 @@ from typing import TYPE_CHECKING, final from ..command_processing import CommandProcessor from ..domain import GraphExecution -from ..event_management import EventCollector +from ..event_management import EventManager from ..state_management import UnifiedStateManager from ..worker_management import SimpleWorkerPool if TYPE_CHECKING: - from ..event_management import EventHandlerRegistry + from ..event_management import EventHandler @final @@ -27,8 +27,8 @@ class ExecutionCoordinator: self, graph_execution: GraphExecution, state_manager: UnifiedStateManager, - event_handler: "EventHandlerRegistry", - event_collector: EventCollector, + event_handler: "EventHandler", + event_collector: EventManager, command_processor: CommandProcessor, worker_pool: SimpleWorkerPool, ) -> None: @@ -39,7 +39,7 @@ class ExecutionCoordinator: graph_execution: Graph execution aggregate state_manager: Unified state manager event_handler: Event handler registry for processing events - event_collector: Event collector for collecting events + event_collector: Event manager for collecting events command_processor: Processor for commands worker_pool: Pool of workers """