feat(event_collector): Update to use ReadWriteLock

This commit is contained in:
-LAN- 2025-08-28 03:26:42 +08:00
parent 635eff2e25
commit 65617f000d
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
1 changed files with 81 additions and 6 deletions

View File

@ -9,6 +9,81 @@ from core.workflow.graph_events import GraphEngineEvent
from ..layers.base import Layer
class ReadWriteLock:
"""
A read-write lock implementation that allows multiple concurrent readers
but only one writer at a time.
"""
def __init__(self) -> None:
self._read_ready = threading.Condition(threading.RLock())
self._readers = 0
def acquire_read(self) -> None:
"""Acquire a read lock."""
self._read_ready.acquire()
try:
self._readers += 1
finally:
self._read_ready.release()
def release_read(self) -> None:
"""Release a read lock."""
self._read_ready.acquire()
try:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()
finally:
self._read_ready.release()
def acquire_write(self) -> None:
"""Acquire a write lock."""
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def release_write(self) -> None:
"""Release a write lock."""
self._read_ready.release()
def read_lock(self) -> "ReadLockContext":
"""Return a context manager for read locking."""
return ReadLockContext(self)
def write_lock(self) -> "WriteLockContext":
"""Return a context manager for write locking."""
return WriteLockContext(self)
class ReadLockContext:
"""Context manager for read locks."""
def __init__(self, lock: ReadWriteLock) -> None:
self._lock = lock
def __enter__(self) -> "ReadLockContext":
self._lock.acquire_read()
return self
def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None:
self._lock.release_read()
class WriteLockContext:
"""Context manager for write locks."""
def __init__(self, lock: ReadWriteLock) -> None:
self._lock = lock
def __enter__(self) -> "WriteLockContext":
self._lock.acquire_write()
return self
def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None:
self._lock.release_write()
class EventCollector:
"""
Collects and buffers events for later retrieval.
@ -20,7 +95,7 @@ class EventCollector:
def __init__(self) -> None:
"""Initialize the event collector."""
self._events: list[GraphEngineEvent] = []
self._lock = threading.Lock()
self._lock = ReadWriteLock()
self._layers: list[Layer] = []
def set_layers(self, layers: list[Layer]) -> None:
@ -39,7 +114,7 @@ class EventCollector:
Args:
event: The event to collect
"""
with self._lock:
with self._lock.write_lock():
self._events.append(event)
self._notify_layers(event)
@ -50,7 +125,7 @@ class EventCollector:
Returns:
List of collected events
"""
with self._lock:
with self._lock.read_lock():
return list(self._events)
def get_new_events(self, start_index: int) -> list[GraphEngineEvent]:
@ -63,7 +138,7 @@ class EventCollector:
Returns:
List of new events
"""
with self._lock:
with self._lock.read_lock():
return list(self._events[start_index:])
def event_count(self) -> int:
@ -73,12 +148,12 @@ class EventCollector:
Returns:
Number of collected events
"""
with self._lock:
with self._lock.read_lock():
return len(self._events)
def clear(self) -> None:
"""Clear all collected events."""
with self._lock:
with self._lock.write_lock():
self._events.clear()
def _notify_layers(self, event: GraphEngineEvent) -> None: