From 65617f000d292fbc959a272e4c50cbe0ed3e35c8 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Thu, 28 Aug 2025 03:26:42 +0800 Subject: [PATCH] feat(event_collector): Update to use ReadWriteLock --- .../event_management/event_collector.py | 87 +++++++++++++++++-- 1 file changed, 81 insertions(+), 6 deletions(-) diff --git a/api/core/workflow/graph_engine/event_management/event_collector.py b/api/core/workflow/graph_engine/event_management/event_collector.py index 3d266fc012..20c1419324 100644 --- a/api/core/workflow/graph_engine/event_management/event_collector.py +++ b/api/core/workflow/graph_engine/event_management/event_collector.py @@ -9,6 +9,81 @@ from core.workflow.graph_events import GraphEngineEvent from ..layers.base import Layer +class ReadWriteLock: + """ + A read-write lock implementation that allows multiple concurrent readers + but only one writer at a time. + """ + + def __init__(self) -> None: + self._read_ready = threading.Condition(threading.RLock()) + self._readers = 0 + + def acquire_read(self) -> None: + """Acquire a read lock.""" + self._read_ready.acquire() + try: + self._readers += 1 + finally: + self._read_ready.release() + + def release_read(self) -> None: + """Release a read lock.""" + self._read_ready.acquire() + try: + self._readers -= 1 + if self._readers == 0: + self._read_ready.notify_all() + finally: + self._read_ready.release() + + def acquire_write(self) -> None: + """Acquire a write lock.""" + self._read_ready.acquire() + while self._readers > 0: + self._read_ready.wait() + + def release_write(self) -> None: + """Release a write lock.""" + self._read_ready.release() + + def read_lock(self) -> "ReadLockContext": + """Return a context manager for read locking.""" + return ReadLockContext(self) + + def write_lock(self) -> "WriteLockContext": + """Return a context manager for write locking.""" + return WriteLockContext(self) + + +class ReadLockContext: + """Context manager for read locks.""" + + def __init__(self, lock: ReadWriteLock) -> None: + self._lock = lock + + def __enter__(self) -> "ReadLockContext": + self._lock.acquire_read() + return self + + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: + self._lock.release_read() + + +class WriteLockContext: + """Context manager for write locks.""" + + def __init__(self, lock: ReadWriteLock) -> None: + self._lock = lock + + def __enter__(self) -> "WriteLockContext": + self._lock.acquire_write() + return self + + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None: + self._lock.release_write() + + class EventCollector: """ Collects and buffers events for later retrieval. @@ -20,7 +95,7 @@ class EventCollector: def __init__(self) -> None: """Initialize the event collector.""" self._events: list[GraphEngineEvent] = [] - self._lock = threading.Lock() + self._lock = ReadWriteLock() self._layers: list[Layer] = [] def set_layers(self, layers: list[Layer]) -> None: @@ -39,7 +114,7 @@ class EventCollector: Args: event: The event to collect """ - with self._lock: + with self._lock.write_lock(): self._events.append(event) self._notify_layers(event) @@ -50,7 +125,7 @@ class EventCollector: Returns: List of collected events """ - with self._lock: + with self._lock.read_lock(): return list(self._events) def get_new_events(self, start_index: int) -> list[GraphEngineEvent]: @@ -63,7 +138,7 @@ class EventCollector: Returns: List of new events """ - with self._lock: + with self._lock.read_lock(): return list(self._events[start_index:]) def event_count(self) -> int: @@ -73,12 +148,12 @@ class EventCollector: Returns: Number of collected events """ - with self._lock: + with self._lock.read_lock(): return len(self._events) def clear(self) -> None: """Clear all collected events.""" - with self._lock: + with self._lock.write_lock(): self._events.clear() def _notify_layers(self, event: GraphEngineEvent) -> None: