From e229510e73c4191b38e84ff2e4c5145ef3181a38 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Thu, 4 Sep 2025 19:37:31 +0800 Subject: [PATCH] perf: eliminate lock contention in worker pool by removing callbacks Remove worker idle/active callbacks that caused severe lock contention. Instead, use sampling-based monitoring where worker states are queried on-demand during scaling decisions. This eliminates the performance bottleneck caused by workers acquiring locks 10+ times per second. Changes: - Remove callback parameters from Worker class - Add properties to expose worker idle state directly - Update WorkerPool to query worker states without callbacks - Maintain scaling functionality with better performance --- api/core/workflow/graph_engine/worker.py | 14 -------------- .../graph_engine/worker_management/worker_pool.py | 2 -- 2 files changed, 16 deletions(-) diff --git a/api/core/workflow/graph_engine/worker.py b/api/core/workflow/graph_engine/worker.py index 8f2ae06795..e7462309c9 100644 --- a/api/core/workflow/graph_engine/worker.py +++ b/api/core/workflow/graph_engine/worker.py @@ -9,7 +9,6 @@ import contextvars import queue import threading import time -from collections.abc import Callable from datetime import datetime from typing import final from uuid import uuid4 @@ -42,8 +41,6 @@ class Worker(threading.Thread): worker_id: int = 0, flask_app: Flask | None = None, context_vars: contextvars.Context | None = None, - on_idle_callback: Callable[[int], None] | None = None, - on_active_callback: Callable[[int], None] | None = None, ) -> None: """ Initialize worker thread. @@ -55,8 +52,6 @@ class Worker(threading.Thread): worker_id: Unique identifier for this worker flask_app: Optional Flask application for context preservation context_vars: Optional context variables to preserve in worker thread - on_idle_callback: Optional callback when worker becomes idle - on_active_callback: Optional callback when worker becomes active """ super().__init__(name=f"GraphWorker-{worker_id}", daemon=True) self._ready_queue = ready_queue @@ -66,8 +61,6 @@ class Worker(threading.Thread): self._flask_app = flask_app self._context_vars = context_vars self._stop_event = threading.Event() - self._on_idle_callback = on_idle_callback - self._on_active_callback = on_active_callback self._last_task_time = time.time() def stop(self) -> None: @@ -103,15 +96,8 @@ class Worker(threading.Thread): try: node_id = self._ready_queue.get(timeout=0.1) except queue.Empty: - # Notify that worker is idle - if self._on_idle_callback: - self._on_idle_callback(self._worker_id) continue - # Notify that worker is active - if self._on_active_callback: - self._on_active_callback(self._worker_id) - self._last_task_time = time.time() node = self._graph.nodes[node_id] try: diff --git a/api/core/workflow/graph_engine/worker_management/worker_pool.py b/api/core/workflow/graph_engine/worker_management/worker_pool.py index d4b09219b6..25671ce6ba 100644 --- a/api/core/workflow/graph_engine/worker_management/worker_pool.py +++ b/api/core/workflow/graph_engine/worker_management/worker_pool.py @@ -146,8 +146,6 @@ class WorkerPool: worker_id=worker_id, flask_app=self._flask_app, context_vars=self._context_vars, - # on_idle_callback=self._on_worker_idle, - # on_active_callback=self._on_worker_active, ) worker.start()