fix: prevent legacy stop from interrupting GraphEngine runs (#37129)

Co-authored-by: Crazywoola <100913391+crazywoola@users.noreply.github.com>
This commit is contained in:
Blackoutta 2026-06-22 16:25:15 +08:00 committed by GitHub
parent 25b90229bc
commit 7c20ffe6c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 70 additions and 8 deletions

View File

@ -11,6 +11,7 @@ from core.app.entities.queue_entities import (
QueueMessageEndEvent,
QueueStopEvent,
)
from models.model import AppMode
class MessageBasedAppQueueManager(AppQueueManager):
@ -47,4 +48,6 @@ class MessageBasedAppQueueManager(AppQueueManager):
self.stop_listen()
if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():
if self._app_mode == AppMode.ADVANCED_CHAT.value:
return
raise GenerateTaskStoppedError()

View File

@ -1,7 +1,6 @@
from typing import override
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import (
AppQueueEvent,
@ -43,6 +42,3 @@ class WorkflowAppQueueManager(AppQueueManager):
| QueueWorkflowPartialSuccessEvent,
):
self.stop_listen()
if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():
raise GenerateTaskStoppedError()

View File

@ -0,0 +1,59 @@
from unittest.mock import patch
import pytest
from core.app.apps.base_app_queue_manager import PublishFrom
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueTextChunkEvent
from models.model import AppMode
def _message_queue_manager(app_mode: str) -> MessageBasedAppQueueManager:
with patch("core.app.apps.base_app_queue_manager.redis_client") as mock_redis:
mock_redis.setex.return_value = True
return MessageBasedAppQueueManager(
task_id="task-1",
user_id="user-1",
invoke_from=InvokeFrom.DEBUGGER,
conversation_id="conversation-1",
app_mode=app_mode,
message_id="message-1",
)
def _workflow_queue_manager(app_mode: str) -> WorkflowAppQueueManager:
with patch("core.app.apps.base_app_queue_manager.redis_client") as mock_redis:
mock_redis.setex.return_value = True
return WorkflowAppQueueManager(
task_id="task-1",
user_id="user-1",
invoke_from=InvokeFrom.DEBUGGER,
app_mode=app_mode,
)
def test_message_queue_does_not_raise_legacy_stop_for_advanced_chat() -> None:
manager = _message_queue_manager(AppMode.ADVANCED_CHAT.value)
with patch.object(manager, "_is_stopped", return_value=True):
manager.publish(QueueTextChunkEvent(text="chunk"), PublishFrom.APPLICATION_MANAGER)
def test_workflow_queue_does_not_read_legacy_stop_flag() -> None:
manager = _workflow_queue_manager(AppMode.WORKFLOW.value)
with patch.object(manager, "_is_stopped", return_value=True) as is_stopped:
manager.publish(QueueTextChunkEvent(text="chunk"), PublishFrom.APPLICATION_MANAGER)
is_stopped.assert_not_called()
def test_message_queue_keeps_legacy_stop_for_non_graphengine_chat() -> None:
manager = _message_queue_manager(AppMode.CHAT.value)
with patch.object(manager, "_is_stopped", return_value=True):
with pytest.raises(GenerateTaskStoppedError):
manager.publish(QueueTextChunkEvent(text="chunk"), PublishFrom.APPLICATION_MANAGER)

View File

@ -1,9 +1,8 @@
from __future__ import annotations
import pytest
from unittest.mock import patch
from core.app.apps.base_app_queue_manager import PublishFrom
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueMessageEndEvent, QueuePingEvent
@ -17,11 +16,16 @@ class TestWorkflowAppQueueManager:
invoke_from=InvokeFrom.DEBUGGER,
app_mode="workflow",
)
manager._is_stopped = lambda: True
with pytest.raises(GenerateTaskStoppedError):
with (
patch.object(manager, "_is_stopped", return_value=True) as is_stopped,
patch.object(manager, "stop_listen") as stop_listen,
):
manager._publish(QueueMessageEndEvent(llm_result=None), PublishFrom.APPLICATION_MANAGER)
stop_listen.assert_called_once()
is_stopped.assert_not_called()
def test_publish_non_stop_event_does_not_raise(self):
manager = WorkflowAppQueueManager(
task_id="task",