diff --git a/api/core/app/apps/base_app_queue_manager.py b/api/core/app/apps/base_app_queue_manager.py index fdba952eeb..4b246a53d3 100644 --- a/api/core/app/apps/base_app_queue_manager.py +++ b/api/core/app/apps/base_app_queue_manager.py @@ -1,9 +1,11 @@ +import logging import queue import time from abc import abstractmethod from enum import IntEnum, auto from typing import Any +from redis.exceptions import RedisError from sqlalchemy.orm import DeclarativeMeta from configs import dify_config @@ -18,6 +20,8 @@ from core.app.entities.queue_entities import ( ) from extensions.ext_redis import redis_client +logger = logging.getLogger(__name__) + class PublishFrom(IntEnum): APPLICATION_MANAGER = auto() @@ -35,9 +39,8 @@ class AppQueueManager: self.invoke_from = invoke_from # Public accessor for invoke_from user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user" - redis_client.setex( - AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}" - ) + self._task_belong_cache_key = AppQueueManager._generate_task_belong_cache_key(self._task_id) + redis_client.setex(self._task_belong_cache_key, 1800, f"{user_prefix}-{self._user_id}") q: queue.Queue[WorkflowQueueMessage | MessageQueueMessage | None] = queue.Queue() @@ -79,9 +82,21 @@ class AppQueueManager: Stop listen to queue :return: """ + self._clear_task_belong_cache() self._q.put(None) - def publish_error(self, e, pub_from: PublishFrom): + def _clear_task_belong_cache(self) -> None: + """ + Remove the task belong cache key once listening is finished. + """ + try: + redis_client.delete(self._task_belong_cache_key) + except RedisError: + logger.exception( + "Failed to clear task belong cache for task %s (key: %s)", self._task_id, self._task_belong_cache_key + ) + + def publish_error(self, e, pub_from: PublishFrom) -> None: """ Publish error :param e: error