From 934a20e745f5e2c0fe81b9109adaa205f87daa7e Mon Sep 17 00:00:00 2001 From: Blackoutta <37723456+Blackoutta@users.noreply.github.com> Date: Wed, 13 May 2026 11:23:32 +0800 Subject: [PATCH] fix: restore tracing after HITL workflow resume (#36064) Co-authored-by: -LAN- --- .../app/apps/advanced_chat/app_generator.py | 13 ++ api/core/app/apps/workflow/app_generator.py | 13 ++ .../apps/advanced_chat/test_app_generator.py | 138 +++++++++++++++++- .../core/app/apps/test_pause_resume.py | 12 +- .../app/apps/test_workflow_app_generator.py | 2 +- .../apps/workflow/test_app_generator_extra.py | 111 ++++++++++++++ 6 files changed, 278 insertions(+), 11 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index b79d5514b4..cc7bc64439 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -253,7 +253,20 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): ): """ Resume a paused advanced chat execution. + + ``trace_manager`` is transient and excluded from generate-entity serialization, + so resumed executions rebuild it here before persistence layers receive the entity. """ + if application_generate_entity.trace_manager is None: + application_generate_entity = application_generate_entity.model_copy( + update={ + "trace_manager": TraceQueueManager( + app_id=app_model.id, + user_id=user.id if isinstance(user, Account) else user.session_id, + ) + } + ) + return self._generate( workflow=workflow, user=user, diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 43546d57f5..78255c0512 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -253,7 +253,20 @@ class WorkflowAppGenerator(BaseAppGenerator): ) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]: """ Resume a paused workflow execution using the persisted runtime state. + + ``trace_manager`` is transient and excluded from generate-entity serialization, + so resumed executions rebuild it here before persistence layers receive the entity. """ + if application_generate_entity.trace_manager is None: + application_generate_entity = application_generate_entity.model_copy( + update={ + "trace_manager": TraceQueueManager( + app_id=app_model.id, + user_id=user.id if isinstance(user, Account) else user.session_id, + ) + } + ) + return self._generate( app_model=app_model, workflow=workflow, diff --git a/api/tests/unit_tests/core/app/apps/advanced_chat/test_app_generator.py b/api/tests/unit_tests/core/app/apps/advanced_chat/test_app_generator.py index bc3b06cd1b..5df064030b 100644 --- a/api/tests/unit_tests/core/app/apps/advanced_chat/test_app_generator.py +++ b/api/tests/unit_tests/core/app/apps/advanced_chat/test_app_generator.py @@ -197,6 +197,7 @@ class TestAdvancedChatAppGeneratorInternals: def test_resume_delegates_to_generate(self, monkeypatch: pytest.MonkeyPatch): generator = AdvancedChatAppGenerator() + existing_trace_manager = SimpleNamespace(app_id="existing-app", user_id="existing-user") application_generate_entity = AdvancedChatAppGenerateEntity.model_construct( task_id="task", app_config=self._build_app_config(), @@ -207,22 +208,25 @@ class TestAdvancedChatAppGeneratorInternals: stream=True, invoke_from=InvokeFrom.WEB_APP, extras={}, - trace_manager=None, + trace_manager=existing_trace_manager, workflow_run_id="run-id", ) - captured: dict[str, object] = {} + captured_entity: AdvancedChatAppGenerateEntity | None = None + captured_graph_runtime_state: object | None = None def _fake_generate(**kwargs): - captured.update(kwargs) - return {"resumed": True} + nonlocal captured_entity, captured_graph_runtime_state + captured_entity = kwargs["application_generate_entity"] + captured_graph_runtime_state = kwargs["graph_runtime_state"] + return SimpleNamespace(resumed=True) monkeypatch.setattr(generator, "_generate", _fake_generate) result = generator.resume( - app_model=SimpleNamespace(), + app_model=SimpleNamespace(id="app-id"), workflow=SimpleNamespace(), - user=SimpleNamespace(), + user=SimpleNamespace(id="end-user-id", session_id="session-id"), conversation=SimpleNamespace(id="conversation-id"), message=SimpleNamespace(id="message-id"), application_generate_entity=application_generate_entity, @@ -232,8 +236,10 @@ class TestAdvancedChatAppGeneratorInternals: pause_state_config=None, ) - assert result == {"resumed": True} - assert captured["graph_runtime_state"] is not None + assert result.resumed is True + assert captured_entity is not None + assert captured_entity.trace_manager is existing_trace_manager + assert captured_graph_runtime_state is not None def test_single_iteration_generate_builds_debug_task(self, monkeypatch: pytest.MonkeyPatch): generator = AdvancedChatAppGenerator() @@ -1243,3 +1249,119 @@ class TestAdvancedChatAppGeneratorInternals: ) assert captured["application_generate_entity"].parent_message_id == UUID_NIL + + +class TestAdvancedChatAppGeneratorResume: + @staticmethod + def _build_app_config() -> WorkflowUIBasedAppConfig: + return WorkflowUIBasedAppConfig( + tenant_id="tenant", + app_id="app", + app_mode=AppMode.ADVANCED_CHAT, + additional_features=AppAdditionalFeatures(), + variables=[], + workflow_id="workflow-id", + ) + + def test_resume_restores_trace_manager_when_missing(self, monkeypatch: pytest.MonkeyPatch): + generator = AdvancedChatAppGenerator() + application_generate_entity = AdvancedChatAppGenerateEntity.model_construct( + task_id="task", + app_config=self._build_app_config(), + file_upload_config=None, + conversation_id="conversation-id", + inputs={}, + query="hello", + files=[], + parent_message_id="parent-message-id", + user_id="user", + stream=False, + invoke_from=InvokeFrom.WEB_APP, + extras={}, + trace_manager=None, + workflow_run_id="run-id", + ) + DummyTraceQueueManager = type( + "_DummyTraceQueueManager", + (TraceQueueManager,), + { + "__init__": lambda self, app_id=None, user_id=None: ( + setattr(self, "app_id", app_id) or setattr(self, "user_id", user_id) + ) + }, + ) + monkeypatch.setattr( + "core.app.apps.advanced_chat.app_generator.TraceQueueManager", + DummyTraceQueueManager, + ) + captured_entity: AdvancedChatAppGenerateEntity | None = None + + def _fake_generate(**kwargs): + nonlocal captured_entity + captured_entity = kwargs["application_generate_entity"] + return SimpleNamespace(ok=True) + + monkeypatch.setattr(generator, "_generate", _fake_generate) + + result = generator.resume( + app_model=SimpleNamespace(id="app-id"), + workflow=SimpleNamespace(), + user=SimpleNamespace(id="end-user-id", session_id="session-id"), + conversation=SimpleNamespace(id="conversation-id"), + message=SimpleNamespace(id="message-id"), + application_generate_entity=application_generate_entity, + workflow_execution_repository=SimpleNamespace(), + workflow_node_execution_repository=SimpleNamespace(), + graph_runtime_state=SimpleNamespace(), + ) + + assert result.ok is True + assert captured_entity is not None + trace_manager = captured_entity.trace_manager + assert isinstance(trace_manager, DummyTraceQueueManager) + assert trace_manager.app_id == "app-id" + assert trace_manager.user_id == "session-id" + + def test_resume_preserves_existing_trace_manager(self, monkeypatch: pytest.MonkeyPatch): + generator = AdvancedChatAppGenerator() + existing_trace_manager = SimpleNamespace(app_id="existing-app", user_id="existing-user") + application_generate_entity = AdvancedChatAppGenerateEntity.model_construct( + task_id="task", + app_config=self._build_app_config(), + file_upload_config=None, + conversation_id="conversation-id", + inputs={}, + query="hello", + files=[], + parent_message_id="parent-message-id", + user_id="user", + stream=False, + invoke_from=InvokeFrom.WEB_APP, + extras={}, + trace_manager=existing_trace_manager, + workflow_run_id="run-id", + ) + captured_entity: AdvancedChatAppGenerateEntity | None = None + + def _fake_generate(**kwargs): + nonlocal captured_entity + captured_entity = kwargs["application_generate_entity"] + return SimpleNamespace(ok=True) + + monkeypatch.setattr(generator, "_generate", _fake_generate) + + result = generator.resume( + app_model=SimpleNamespace(id="app-id"), + workflow=SimpleNamespace(), + user=SimpleNamespace(id="end-user-id", session_id="session-id"), + conversation=SimpleNamespace(id="conversation-id"), + message=SimpleNamespace(id="message-id"), + application_generate_entity=application_generate_entity, + workflow_execution_repository=SimpleNamespace(), + workflow_node_execution_repository=SimpleNamespace(), + graph_runtime_state=SimpleNamespace(), + ) + + assert result.ok is True + assert captured_entity is not None + assert captured_entity.trace_manager is existing_trace_manager diff --git a/api/tests/unit_tests/core/app/apps/test_pause_resume.py b/api/tests/unit_tests/core/app/apps/test_pause_resume.py index 1acebfee17..5f13c6aff6 100644 --- a/api/tests/unit_tests/core/app/apps/test_pause_resume.py +++ b/api/tests/unit_tests/core/app/apps/test_pause_resume.py @@ -228,7 +228,11 @@ def test_workflow_app_pause_resume_matches_baseline(mocker: MockerFixture): app_model=SimpleNamespace(mode="workflow"), workflow=SimpleNamespace(), user=SimpleNamespace(), - application_generate_entity=SimpleNamespace(stream=False, invoke_from=InvokeFrom.SERVICE_API), + application_generate_entity=SimpleNamespace( + stream=False, + invoke_from=InvokeFrom.SERVICE_API, + trace_manager=SimpleNamespace(), + ), graph_runtime_state=resumed_state, workflow_execution_repository=SimpleNamespace(), workflow_node_execution_repository=SimpleNamespace(), @@ -270,7 +274,11 @@ def test_advanced_chat_pause_resume_matches_baseline(mocker: MockerFixture): user=SimpleNamespace(), conversation=SimpleNamespace(id="conv"), message=SimpleNamespace(id="msg"), - application_generate_entity=SimpleNamespace(stream=False, invoke_from=InvokeFrom.SERVICE_API), + application_generate_entity=SimpleNamespace( + stream=False, + invoke_from=InvokeFrom.SERVICE_API, + trace_manager=SimpleNamespace(), + ), workflow_execution_repository=SimpleNamespace(), workflow_node_execution_repository=SimpleNamespace(), graph_runtime_state=resumed_state, diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_app_generator.py b/api/tests/unit_tests/core/app/apps/test_workflow_app_generator.py index 2e4e469eb5..44c34a0142 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_app_generator.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_app_generator.py @@ -99,7 +99,7 @@ def test_resume_delegates_to_generate(mocker: MockerFixture): generator = WorkflowAppGenerator() mock_generate = mocker.patch.object(generator, "_generate", return_value="ok") - application_generate_entity = SimpleNamespace(stream=False, invoke_from="debugger") + application_generate_entity = SimpleNamespace(stream=False, invoke_from="debugger", trace_manager=MagicMock()) runtime_state = MagicMock(name="runtime-state") pause_config = MagicMock(name="pause-config") diff --git a/api/tests/unit_tests/core/app/apps/workflow/test_app_generator_extra.py b/api/tests/unit_tests/core/app/apps/workflow/test_app_generator_extra.py index 320189143e..941a47b572 100644 --- a/api/tests/unit_tests/core/app/apps/workflow/test_app_generator_extra.py +++ b/api/tests/unit_tests/core/app/apps/workflow/test_app_generator_extra.py @@ -186,3 +186,114 @@ class TestWorkflowAppGeneratorGenerate: ) assert result == {"ok": True} + + +class TestWorkflowAppGeneratorResume: + def test_resume_restores_trace_manager_when_missing(self, monkeypatch: pytest.MonkeyPatch): + generator = WorkflowAppGenerator() + app_config = WorkflowUIBasedAppConfig( + tenant_id="tenant", + app_id="app", + app_mode=AppMode.WORKFLOW, + additional_features=AppAdditionalFeatures(), + variables=[], + workflow_id="workflow-id", + ) + application_generate_entity = WorkflowAppGenerateEntity.model_construct( + task_id="task", + app_config=app_config, + inputs={}, + files=[], + user_id="user", + stream=False, + invoke_from=InvokeFrom.WEB_APP, + extras={}, + trace_manager=None, + workflow_execution_id="run-id", + call_depth=0, + ) + DummyTraceQueueManager = type( + "_DummyTraceQueueManager", + (TraceQueueManager,), + { + "__init__": lambda self, app_id=None, user_id=None: ( + setattr(self, "app_id", app_id) or setattr(self, "user_id", user_id) + ) + }, + ) + monkeypatch.setattr( + "core.app.apps.workflow.app_generator.TraceQueueManager", + DummyTraceQueueManager, + ) + captured_entity: WorkflowAppGenerateEntity | None = None + + def _fake_generate(**kwargs): + nonlocal captured_entity + captured_entity = kwargs["application_generate_entity"] + return SimpleNamespace(ok=True) + + monkeypatch.setattr(generator, "_generate", _fake_generate) + + result = generator.resume( + app_model=SimpleNamespace(id="app-id"), + workflow=SimpleNamespace(), + user=SimpleNamespace(id="end-user-id", session_id="session-id"), + application_generate_entity=application_generate_entity, + graph_runtime_state=SimpleNamespace(), + workflow_execution_repository=SimpleNamespace(), + workflow_node_execution_repository=SimpleNamespace(), + ) + + assert result.ok is True + assert captured_entity is not None + trace_manager = captured_entity.trace_manager + assert isinstance(trace_manager, DummyTraceQueueManager) + assert trace_manager.app_id == "app-id" + assert trace_manager.user_id == "session-id" + + def test_resume_preserves_existing_trace_manager(self, monkeypatch: pytest.MonkeyPatch): + generator = WorkflowAppGenerator() + app_config = WorkflowUIBasedAppConfig( + tenant_id="tenant", + app_id="app", + app_mode=AppMode.WORKFLOW, + additional_features=AppAdditionalFeatures(), + variables=[], + workflow_id="workflow-id", + ) + existing_trace_manager = SimpleNamespace(app_id="existing-app", user_id="existing-user") + application_generate_entity = WorkflowAppGenerateEntity.model_construct( + task_id="task", + app_config=app_config, + inputs={}, + files=[], + user_id="user", + stream=False, + invoke_from=InvokeFrom.WEB_APP, + extras={}, + trace_manager=existing_trace_manager, + workflow_execution_id="run-id", + call_depth=0, + ) + captured_entity: WorkflowAppGenerateEntity | None = None + + def _fake_generate(**kwargs): + nonlocal captured_entity + captured_entity = kwargs["application_generate_entity"] + return SimpleNamespace(ok=True) + + monkeypatch.setattr(generator, "_generate", _fake_generate) + + result = generator.resume( + app_model=SimpleNamespace(id="app-id"), + workflow=SimpleNamespace(), + user=SimpleNamespace(id="end-user-id", session_id="session-id"), + application_generate_entity=application_generate_entity, + graph_runtime_state=SimpleNamespace(), + workflow_execution_repository=SimpleNamespace(), + workflow_node_execution_repository=SimpleNamespace(), + ) + + assert result.ok is True + assert captured_entity is not None + assert captured_entity.trace_manager is existing_trace_manager