diff --git a/api/core/workflow/graph_engine/worker.py b/api/core/workflow/graph_engine/worker.py index df2fbf486e..8f2ae06795 100644 --- a/api/core/workflow/graph_engine/worker.py +++ b/api/core/workflow/graph_engine/worker.py @@ -74,6 +74,22 @@ class Worker(threading.Thread): """Signal the worker to stop processing.""" self._stop_event.set() + @property + def is_idle(self) -> bool: + """Check if the worker is currently idle.""" + # Worker is idle if it hasn't processed a task recently (within 0.2 seconds) + return (time.time() - self._last_task_time) > 0.2 + + @property + def idle_duration(self) -> float: + """Get the duration in seconds since the worker last processed a task.""" + return time.time() - self._last_task_time + + @property + def worker_id(self) -> int: + """Get the worker's ID.""" + return self._worker_id + @override def run(self) -> None: """ diff --git a/api/core/workflow/graph_engine/worker_management/worker_pool.py b/api/core/workflow/graph_engine/worker_management/worker_pool.py index cfc0f5ab65..d4b09219b6 100644 --- a/api/core/workflow/graph_engine/worker_management/worker_pool.py +++ b/api/core/workflow/graph_engine/worker_management/worker_pool.py @@ -8,7 +8,6 @@ DynamicScaler, and WorkerFactory into a single class. import logging import queue import threading -import time from typing import TYPE_CHECKING, final from configs import dify_config @@ -78,9 +77,7 @@ class WorkerPool: self._lock = threading.RLock() self._running = False - # Track worker idle times for scale-down - self._worker_idle_times: dict[int, float] = {} - self._worker_active_states: dict[int, bool] = {} + # No longer tracking worker states with callbacks to avoid lock contention def start(self, initial_count: int | None = None) -> None: """ @@ -136,8 +133,6 @@ class WorkerPool: worker.join(timeout=10.0) self._workers.clear() - self._worker_active_states.clear() - self._worker_idle_times.clear() def _create_worker(self) -> None: """Create and start a new worker.""" @@ -158,31 +153,6 @@ class WorkerPool: worker.start() self._workers.append(worker) - # Initialize tracking - self._worker_active_states[worker_id] = True - self._worker_idle_times[worker_id] = 0.0 - - def _on_worker_idle(self, worker_id: int) -> None: - """Handle worker becoming idle.""" - with self._lock: - if worker_id not in self._worker_active_states: - return - - # Mark as idle and record time if transitioning from active - if self._worker_active_states.get(worker_id, False): - self._worker_active_states[worker_id] = False - self._worker_idle_times[worker_id] = time.time() - - def _on_worker_active(self, worker_id: int) -> None: - """Handle worker becoming active.""" - with self._lock: - if worker_id not in self._worker_active_states: - return - - # Mark as active and clear idle time - self._worker_active_states[worker_id] = True - self._worker_idle_times[worker_id] = 0.0 - def _remove_worker(self, worker: Worker, worker_id: int) -> None: """Remove a specific worker from the pool.""" # Stop the worker @@ -192,14 +162,10 @@ class WorkerPool: if worker.is_alive(): worker.join(timeout=2.0) - # Remove from list and tracking + # Remove from list if worker in self._workers: self._workers.remove(worker) - # Clean up tracking - self._worker_active_states.pop(worker_id, None) - self._worker_idle_times.pop(worker_id, None) - def _try_scale_up(self, queue_depth: int, current_count: int) -> bool: """ Try to scale up workers if needed. @@ -252,23 +218,18 @@ class WorkerPool: if not has_excess_capacity: return False - # Find and remove idle workers - current_time = time.time() + # Find and remove idle workers that have been idle long enough workers_to_remove = [] for worker in self._workers: - worker_id = worker._worker_id - # Check if worker is idle and has exceeded idle time threshold - if not self._worker_active_states.get(worker_id, True) and self._worker_idle_times.get(worker_id, 0) > 0: - idle_duration = current_time - self._worker_idle_times[worker_id] - if idle_duration >= self._scale_down_idle_time: - # Don't remove if it would leave us unable to handle the queue - remaining_workers = current_count - len(workers_to_remove) - 1 - if remaining_workers >= self._min_workers and remaining_workers >= max(1, queue_depth // 2): - workers_to_remove.append((worker, worker_id)) - # Only remove one worker per check to avoid aggressive scaling - break + if worker.is_idle and worker.idle_duration >= self._scale_down_idle_time: + # Don't remove if it would leave us unable to handle the queue + remaining_workers = current_count - len(workers_to_remove) - 1 + if remaining_workers >= self._min_workers and remaining_workers >= max(1, queue_depth // 2): + workers_to_remove.append((worker, worker.worker_id)) + # Only remove one worker per check to avoid aggressive scaling + break # Remove idle workers if any found if workers_to_remove: @@ -300,9 +261,9 @@ class WorkerPool: current_count = len(self._workers) queue_depth = self._ready_queue.qsize() - # Count active vs idle workers - active_count = sum(1 for state in self._worker_active_states.values() if state) - idle_count = current_count - active_count + # Count active vs idle workers by querying their state directly + idle_count = sum(1 for worker in self._workers if worker.is_idle) + active_count = current_count - idle_count # Try to scale up if queue is backing up self._try_scale_up(queue_depth, current_count)