feat(graph_engine): allow to scale down without lock

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN- 2025-09-04 19:32:07 +08:00
parent aff7ca12b8
commit 36048d1526
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
2 changed files with 29 additions and 52 deletions

View File

@ -74,6 +74,22 @@ class Worker(threading.Thread):
"""Signal the worker to stop processing."""
self._stop_event.set()
@property
def is_idle(self) -> bool:
"""Check if the worker is currently idle."""
# Worker is idle if it hasn't processed a task recently (within 0.2 seconds)
return (time.time() - self._last_task_time) > 0.2
@property
def idle_duration(self) -> float:
"""Get the duration in seconds since the worker last processed a task."""
return time.time() - self._last_task_time
@property
def worker_id(self) -> int:
"""Get the worker's ID."""
return self._worker_id
@override
def run(self) -> None:
"""

View File

@ -8,7 +8,6 @@ DynamicScaler, and WorkerFactory into a single class.
import logging
import queue
import threading
import time
from typing import TYPE_CHECKING, final
from configs import dify_config
@ -78,9 +77,7 @@ class WorkerPool:
self._lock = threading.RLock()
self._running = False
# Track worker idle times for scale-down
self._worker_idle_times: dict[int, float] = {}
self._worker_active_states: dict[int, bool] = {}
# No longer tracking worker states with callbacks to avoid lock contention
def start(self, initial_count: int | None = None) -> None:
"""
@ -136,8 +133,6 @@ class WorkerPool:
worker.join(timeout=10.0)
self._workers.clear()
self._worker_active_states.clear()
self._worker_idle_times.clear()
def _create_worker(self) -> None:
"""Create and start a new worker."""
@ -158,31 +153,6 @@ class WorkerPool:
worker.start()
self._workers.append(worker)
# Initialize tracking
self._worker_active_states[worker_id] = True
self._worker_idle_times[worker_id] = 0.0
def _on_worker_idle(self, worker_id: int) -> None:
"""Handle worker becoming idle."""
with self._lock:
if worker_id not in self._worker_active_states:
return
# Mark as idle and record time if transitioning from active
if self._worker_active_states.get(worker_id, False):
self._worker_active_states[worker_id] = False
self._worker_idle_times[worker_id] = time.time()
def _on_worker_active(self, worker_id: int) -> None:
"""Handle worker becoming active."""
with self._lock:
if worker_id not in self._worker_active_states:
return
# Mark as active and clear idle time
self._worker_active_states[worker_id] = True
self._worker_idle_times[worker_id] = 0.0
def _remove_worker(self, worker: Worker, worker_id: int) -> None:
"""Remove a specific worker from the pool."""
# Stop the worker
@ -192,14 +162,10 @@ class WorkerPool:
if worker.is_alive():
worker.join(timeout=2.0)
# Remove from list and tracking
# Remove from list
if worker in self._workers:
self._workers.remove(worker)
# Clean up tracking
self._worker_active_states.pop(worker_id, None)
self._worker_idle_times.pop(worker_id, None)
def _try_scale_up(self, queue_depth: int, current_count: int) -> bool:
"""
Try to scale up workers if needed.
@ -252,23 +218,18 @@ class WorkerPool:
if not has_excess_capacity:
return False
# Find and remove idle workers
current_time = time.time()
# Find and remove idle workers that have been idle long enough
workers_to_remove = []
for worker in self._workers:
worker_id = worker._worker_id
# Check if worker is idle and has exceeded idle time threshold
if not self._worker_active_states.get(worker_id, True) and self._worker_idle_times.get(worker_id, 0) > 0:
idle_duration = current_time - self._worker_idle_times[worker_id]
if idle_duration >= self._scale_down_idle_time:
# Don't remove if it would leave us unable to handle the queue
remaining_workers = current_count - len(workers_to_remove) - 1
if remaining_workers >= self._min_workers and remaining_workers >= max(1, queue_depth // 2):
workers_to_remove.append((worker, worker_id))
# Only remove one worker per check to avoid aggressive scaling
break
if worker.is_idle and worker.idle_duration >= self._scale_down_idle_time:
# Don't remove if it would leave us unable to handle the queue
remaining_workers = current_count - len(workers_to_remove) - 1
if remaining_workers >= self._min_workers and remaining_workers >= max(1, queue_depth // 2):
workers_to_remove.append((worker, worker.worker_id))
# Only remove one worker per check to avoid aggressive scaling
break
# Remove idle workers if any found
if workers_to_remove:
@ -300,9 +261,9 @@ class WorkerPool:
current_count = len(self._workers)
queue_depth = self._ready_queue.qsize()
# Count active vs idle workers
active_count = sum(1 for state in self._worker_active_states.values() if state)
idle_count = current_count - active_count
# Count active vs idle workers by querying their state directly
idle_count = sum(1 for worker in self._workers if worker.is_idle)
active_count = current_count - idle_count
# Try to scale up if queue is backing up
self._try_scale_up(queue_depth, current_count)