diff --git a/api/core/app/apps/message_based_app_queue_manager.py b/api/core/app/apps/message_based_app_queue_manager.py index 0b97809bf3a..3c7102971f1 100644 --- a/api/core/app/apps/message_based_app_queue_manager.py +++ b/api/core/app/apps/message_based_app_queue_manager.py @@ -11,6 +11,7 @@ from core.app.entities.queue_entities import ( QueueMessageEndEvent, QueueStopEvent, ) +from models.model import AppMode class MessageBasedAppQueueManager(AppQueueManager): @@ -47,4 +48,6 @@ class MessageBasedAppQueueManager(AppQueueManager): self.stop_listen() if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped(): + if self._app_mode == AppMode.ADVANCED_CHAT.value: + return raise GenerateTaskStoppedError() diff --git a/api/core/app/apps/workflow/app_queue_manager.py b/api/core/app/apps/workflow/app_queue_manager.py index fcdd1465d4f..7824d33b875 100644 --- a/api/core/app/apps/workflow/app_queue_manager.py +++ b/api/core/app/apps/workflow/app_queue_manager.py @@ -1,7 +1,6 @@ from typing import override from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom -from core.app.apps.exc import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.queue_entities import ( AppQueueEvent, @@ -43,6 +42,3 @@ class WorkflowAppQueueManager(AppQueueManager): | QueueWorkflowPartialSuccessEvent, ): self.stop_listen() - - if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped(): - raise GenerateTaskStoppedError() diff --git a/api/tests/unit_tests/core/app/apps/test_legacy_stop_graphengine_lifecycle.py b/api/tests/unit_tests/core/app/apps/test_legacy_stop_graphengine_lifecycle.py new file mode 100644 index 00000000000..a9aafd5949d --- /dev/null +++ b/api/tests/unit_tests/core/app/apps/test_legacy_stop_graphengine_lifecycle.py @@ -0,0 +1,59 @@ +from unittest.mock import patch + +import pytest + +from core.app.apps.base_app_queue_manager import PublishFrom +from core.app.apps.exc import GenerateTaskStoppedError +from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager +from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager +from core.app.entities.app_invoke_entities import InvokeFrom +from core.app.entities.queue_entities import QueueTextChunkEvent +from models.model import AppMode + + +def _message_queue_manager(app_mode: str) -> MessageBasedAppQueueManager: + with patch("core.app.apps.base_app_queue_manager.redis_client") as mock_redis: + mock_redis.setex.return_value = True + return MessageBasedAppQueueManager( + task_id="task-1", + user_id="user-1", + invoke_from=InvokeFrom.DEBUGGER, + conversation_id="conversation-1", + app_mode=app_mode, + message_id="message-1", + ) + + +def _workflow_queue_manager(app_mode: str) -> WorkflowAppQueueManager: + with patch("core.app.apps.base_app_queue_manager.redis_client") as mock_redis: + mock_redis.setex.return_value = True + return WorkflowAppQueueManager( + task_id="task-1", + user_id="user-1", + invoke_from=InvokeFrom.DEBUGGER, + app_mode=app_mode, + ) + + +def test_message_queue_does_not_raise_legacy_stop_for_advanced_chat() -> None: + manager = _message_queue_manager(AppMode.ADVANCED_CHAT.value) + + with patch.object(manager, "_is_stopped", return_value=True): + manager.publish(QueueTextChunkEvent(text="chunk"), PublishFrom.APPLICATION_MANAGER) + + +def test_workflow_queue_does_not_read_legacy_stop_flag() -> None: + manager = _workflow_queue_manager(AppMode.WORKFLOW.value) + + with patch.object(manager, "_is_stopped", return_value=True) as is_stopped: + manager.publish(QueueTextChunkEvent(text="chunk"), PublishFrom.APPLICATION_MANAGER) + + is_stopped.assert_not_called() + + +def test_message_queue_keeps_legacy_stop_for_non_graphengine_chat() -> None: + manager = _message_queue_manager(AppMode.CHAT.value) + + with patch.object(manager, "_is_stopped", return_value=True): + with pytest.raises(GenerateTaskStoppedError): + manager.publish(QueueTextChunkEvent(text="chunk"), PublishFrom.APPLICATION_MANAGER) diff --git a/api/tests/unit_tests/core/app/apps/workflow/test_app_queue_manager.py b/api/tests/unit_tests/core/app/apps/workflow/test_app_queue_manager.py index 6133be98676..e3b86530098 100644 --- a/api/tests/unit_tests/core/app/apps/workflow/test_app_queue_manager.py +++ b/api/tests/unit_tests/core/app/apps/workflow/test_app_queue_manager.py @@ -1,9 +1,8 @@ from __future__ import annotations -import pytest +from unittest.mock import patch from core.app.apps.base_app_queue_manager import PublishFrom -from core.app.apps.exc import GenerateTaskStoppedError from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.queue_entities import QueueMessageEndEvent, QueuePingEvent @@ -17,11 +16,16 @@ class TestWorkflowAppQueueManager: invoke_from=InvokeFrom.DEBUGGER, app_mode="workflow", ) - manager._is_stopped = lambda: True - with pytest.raises(GenerateTaskStoppedError): + with ( + patch.object(manager, "_is_stopped", return_value=True) as is_stopped, + patch.object(manager, "stop_listen") as stop_listen, + ): manager._publish(QueueMessageEndEvent(llm_result=None), PublishFrom.APPLICATION_MANAGER) + stop_listen.assert_called_once() + is_stopped.assert_not_called() + def test_publish_non_stop_event_does_not_raise(self): manager = WorkflowAppQueueManager( task_id="task",