mirror of https://github.com/langgenius/dify.git
perf: eliminate lock contention in worker pool by removing callbacks
Remove worker idle/active callbacks that caused severe lock contention. Instead, use sampling-based monitoring where worker states are queried on-demand during scaling decisions. This eliminates the performance bottleneck caused by workers acquiring locks 10+ times per second. Changes: - Remove callback parameters from Worker class - Add properties to expose worker idle state directly - Update WorkerPool to query worker states without callbacks - Maintain scaling functionality with better performance
This commit is contained in:
parent
36048d1526
commit
e229510e73
|
|
@ -9,7 +9,6 @@ import contextvars
|
|||
import queue
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
from typing import final
|
||||
from uuid import uuid4
|
||||
|
|
@ -42,8 +41,6 @@ class Worker(threading.Thread):
|
|||
worker_id: int = 0,
|
||||
flask_app: Flask | None = None,
|
||||
context_vars: contextvars.Context | None = None,
|
||||
on_idle_callback: Callable[[int], None] | None = None,
|
||||
on_active_callback: Callable[[int], None] | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize worker thread.
|
||||
|
|
@ -55,8 +52,6 @@ class Worker(threading.Thread):
|
|||
worker_id: Unique identifier for this worker
|
||||
flask_app: Optional Flask application for context preservation
|
||||
context_vars: Optional context variables to preserve in worker thread
|
||||
on_idle_callback: Optional callback when worker becomes idle
|
||||
on_active_callback: Optional callback when worker becomes active
|
||||
"""
|
||||
super().__init__(name=f"GraphWorker-{worker_id}", daemon=True)
|
||||
self._ready_queue = ready_queue
|
||||
|
|
@ -66,8 +61,6 @@ class Worker(threading.Thread):
|
|||
self._flask_app = flask_app
|
||||
self._context_vars = context_vars
|
||||
self._stop_event = threading.Event()
|
||||
self._on_idle_callback = on_idle_callback
|
||||
self._on_active_callback = on_active_callback
|
||||
self._last_task_time = time.time()
|
||||
|
||||
def stop(self) -> None:
|
||||
|
|
@ -103,15 +96,8 @@ class Worker(threading.Thread):
|
|||
try:
|
||||
node_id = self._ready_queue.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
# Notify that worker is idle
|
||||
if self._on_idle_callback:
|
||||
self._on_idle_callback(self._worker_id)
|
||||
continue
|
||||
|
||||
# Notify that worker is active
|
||||
if self._on_active_callback:
|
||||
self._on_active_callback(self._worker_id)
|
||||
|
||||
self._last_task_time = time.time()
|
||||
node = self._graph.nodes[node_id]
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -146,8 +146,6 @@ class WorkerPool:
|
|||
worker_id=worker_id,
|
||||
flask_app=self._flask_app,
|
||||
context_vars=self._context_vars,
|
||||
# on_idle_callback=self._on_worker_idle,
|
||||
# on_active_callback=self._on_worker_active,
|
||||
)
|
||||
|
||||
worker.start()
|
||||
|
|
|
|||
Loading…
Reference in New Issue