From ec6cafd7aacc494d12eace411c201ea06c0d9e2f Mon Sep 17 00:00:00 2001 From: Ponder Date: Mon, 13 Oct 2025 17:41:16 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20Cache=20AppQueueManager.is=5Fstopped()?= =?UTF-8?q?=20to=20reduce=20unnecessary=20Redis=20=E2=80=A6=20(#26778)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/core/app/apps/base_app_queue_manager.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/api/core/app/apps/base_app_queue_manager.py b/api/core/app/apps/base_app_queue_manager.py index 4b246a53d3..074555e31b 100644 --- a/api/core/app/apps/base_app_queue_manager.py +++ b/api/core/app/apps/base_app_queue_manager.py @@ -1,10 +1,12 @@ import logging import queue +import threading import time from abc import abstractmethod from enum import IntEnum, auto from typing import Any +from cachetools import TTLCache, cachedmethod from redis.exceptions import RedisError from sqlalchemy.orm import DeclarativeMeta @@ -45,6 +47,8 @@ class AppQueueManager: q: queue.Queue[WorkflowQueueMessage | MessageQueueMessage | None] = queue.Queue() self._q = q + self._stopped_cache: TTLCache[tuple, bool] = TTLCache(maxsize=1, ttl=1) + self._cache_lock = threading.Lock() def listen(self): """ @@ -157,6 +161,7 @@ class AppQueueManager: stopped_cache_key = cls._generate_stopped_cache_key(task_id) redis_client.setex(stopped_cache_key, 600, 1) + @cachedmethod(lambda self: self._stopped_cache, lock=lambda self: self._cache_lock) def _is_stopped(self) -> bool: """ Check if task is stopped