From 591c463e4b1ef16b2ec230a02f35b6fc9c40826e Mon Sep 17 00:00:00 2001 From: Blackoutta <37723456+Blackoutta@users.noreply.github.com> Date: Tue, 30 Sep 2025 10:41:42 +0800 Subject: [PATCH] improve: Explicitly delete task Redis key on completion in AppQueueManager (#26406) Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/core/app/apps/base_app_queue_manager.py | 23 +++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/api/core/app/apps/base_app_queue_manager.py b/api/core/app/apps/base_app_queue_manager.py index fdba952eeb..4b246a53d3 100644 --- a/api/core/app/apps/base_app_queue_manager.py +++ b/api/core/app/apps/base_app_queue_manager.py @@ -1,9 +1,11 @@ +import logging import queue import time from abc import abstractmethod from enum import IntEnum, auto from typing import Any +from redis.exceptions import RedisError from sqlalchemy.orm import DeclarativeMeta from configs import dify_config @@ -18,6 +20,8 @@ from core.app.entities.queue_entities import ( ) from extensions.ext_redis import redis_client +logger = logging.getLogger(__name__) + class PublishFrom(IntEnum): APPLICATION_MANAGER = auto() @@ -35,9 +39,8 @@ class AppQueueManager: self.invoke_from = invoke_from # Public accessor for invoke_from user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user" - redis_client.setex( - AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}" - ) + self._task_belong_cache_key = AppQueueManager._generate_task_belong_cache_key(self._task_id) + redis_client.setex(self._task_belong_cache_key, 1800, f"{user_prefix}-{self._user_id}") q: queue.Queue[WorkflowQueueMessage | MessageQueueMessage | None] = queue.Queue() @@ -79,9 +82,21 @@ class AppQueueManager: Stop listen to queue :return: """ + self._clear_task_belong_cache() self._q.put(None) - def publish_error(self, e, pub_from: PublishFrom): + def _clear_task_belong_cache(self) -> None: + """ + Remove the task belong cache key once listening is finished. + """ + try: + redis_client.delete(self._task_belong_cache_key) + except RedisError: + logger.exception( + "Failed to clear task belong cache for task %s (key: %s)", self._task_id, self._task_belong_cache_key + ) + + def publish_error(self, e, pub_from: PublishFrom) -> None: """ Publish error :param e: error