fix: restore tracing after HITL workflow resume (#36064)

Co-authored-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
Blackoutta 2026-05-13 11:23:32 +08:00 committed by GitHub
parent 7e56a244a8
commit 934a20e745
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 278 additions and 11 deletions

View File

@ -253,7 +253,20 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
):
"""
Resume a paused advanced chat execution.
``trace_manager`` is transient and excluded from generate-entity serialization,
so resumed executions rebuild it here before persistence layers receive the entity.
"""
if application_generate_entity.trace_manager is None:
application_generate_entity = application_generate_entity.model_copy(
update={
"trace_manager": TraceQueueManager(
app_id=app_model.id,
user_id=user.id if isinstance(user, Account) else user.session_id,
)
}
)
return self._generate(
workflow=workflow,
user=user,

View File

@ -253,7 +253,20 @@ class WorkflowAppGenerator(BaseAppGenerator):
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
"""
Resume a paused workflow execution using the persisted runtime state.
``trace_manager`` is transient and excluded from generate-entity serialization,
so resumed executions rebuild it here before persistence layers receive the entity.
"""
if application_generate_entity.trace_manager is None:
application_generate_entity = application_generate_entity.model_copy(
update={
"trace_manager": TraceQueueManager(
app_id=app_model.id,
user_id=user.id if isinstance(user, Account) else user.session_id,
)
}
)
return self._generate(
app_model=app_model,
workflow=workflow,

View File

@ -197,6 +197,7 @@ class TestAdvancedChatAppGeneratorInternals:
def test_resume_delegates_to_generate(self, monkeypatch: pytest.MonkeyPatch):
generator = AdvancedChatAppGenerator()
existing_trace_manager = SimpleNamespace(app_id="existing-app", user_id="existing-user")
application_generate_entity = AdvancedChatAppGenerateEntity.model_construct(
task_id="task",
app_config=self._build_app_config(),
@ -207,22 +208,25 @@ class TestAdvancedChatAppGeneratorInternals:
stream=True,
invoke_from=InvokeFrom.WEB_APP,
extras={},
trace_manager=None,
trace_manager=existing_trace_manager,
workflow_run_id="run-id",
)
captured: dict[str, object] = {}
captured_entity: AdvancedChatAppGenerateEntity | None = None
captured_graph_runtime_state: object | None = None
def _fake_generate(**kwargs):
captured.update(kwargs)
return {"resumed": True}
nonlocal captured_entity, captured_graph_runtime_state
captured_entity = kwargs["application_generate_entity"]
captured_graph_runtime_state = kwargs["graph_runtime_state"]
return SimpleNamespace(resumed=True)
monkeypatch.setattr(generator, "_generate", _fake_generate)
result = generator.resume(
app_model=SimpleNamespace(),
app_model=SimpleNamespace(id="app-id"),
workflow=SimpleNamespace(),
user=SimpleNamespace(),
user=SimpleNamespace(id="end-user-id", session_id="session-id"),
conversation=SimpleNamespace(id="conversation-id"),
message=SimpleNamespace(id="message-id"),
application_generate_entity=application_generate_entity,
@ -232,8 +236,10 @@ class TestAdvancedChatAppGeneratorInternals:
pause_state_config=None,
)
assert result == {"resumed": True}
assert captured["graph_runtime_state"] is not None
assert result.resumed is True
assert captured_entity is not None
assert captured_entity.trace_manager is existing_trace_manager
assert captured_graph_runtime_state is not None
def test_single_iteration_generate_builds_debug_task(self, monkeypatch: pytest.MonkeyPatch):
generator = AdvancedChatAppGenerator()
@ -1243,3 +1249,119 @@ class TestAdvancedChatAppGeneratorInternals:
)
assert captured["application_generate_entity"].parent_message_id == UUID_NIL
class TestAdvancedChatAppGeneratorResume:
@staticmethod
def _build_app_config() -> WorkflowUIBasedAppConfig:
return WorkflowUIBasedAppConfig(
tenant_id="tenant",
app_id="app",
app_mode=AppMode.ADVANCED_CHAT,
additional_features=AppAdditionalFeatures(),
variables=[],
workflow_id="workflow-id",
)
def test_resume_restores_trace_manager_when_missing(self, monkeypatch: pytest.MonkeyPatch):
generator = AdvancedChatAppGenerator()
application_generate_entity = AdvancedChatAppGenerateEntity.model_construct(
task_id="task",
app_config=self._build_app_config(),
file_upload_config=None,
conversation_id="conversation-id",
inputs={},
query="hello",
files=[],
parent_message_id="parent-message-id",
user_id="user",
stream=False,
invoke_from=InvokeFrom.WEB_APP,
extras={},
trace_manager=None,
workflow_run_id="run-id",
)
DummyTraceQueueManager = type(
"_DummyTraceQueueManager",
(TraceQueueManager,),
{
"__init__": lambda self, app_id=None, user_id=None: (
setattr(self, "app_id", app_id) or setattr(self, "user_id", user_id)
)
},
)
monkeypatch.setattr(
"core.app.apps.advanced_chat.app_generator.TraceQueueManager",
DummyTraceQueueManager,
)
captured_entity: AdvancedChatAppGenerateEntity | None = None
def _fake_generate(**kwargs):
nonlocal captured_entity
captured_entity = kwargs["application_generate_entity"]
return SimpleNamespace(ok=True)
monkeypatch.setattr(generator, "_generate", _fake_generate)
result = generator.resume(
app_model=SimpleNamespace(id="app-id"),
workflow=SimpleNamespace(),
user=SimpleNamespace(id="end-user-id", session_id="session-id"),
conversation=SimpleNamespace(id="conversation-id"),
message=SimpleNamespace(id="message-id"),
application_generate_entity=application_generate_entity,
workflow_execution_repository=SimpleNamespace(),
workflow_node_execution_repository=SimpleNamespace(),
graph_runtime_state=SimpleNamespace(),
)
assert result.ok is True
assert captured_entity is not None
trace_manager = captured_entity.trace_manager
assert isinstance(trace_manager, DummyTraceQueueManager)
assert trace_manager.app_id == "app-id"
assert trace_manager.user_id == "session-id"
def test_resume_preserves_existing_trace_manager(self, monkeypatch: pytest.MonkeyPatch):
generator = AdvancedChatAppGenerator()
existing_trace_manager = SimpleNamespace(app_id="existing-app", user_id="existing-user")
application_generate_entity = AdvancedChatAppGenerateEntity.model_construct(
task_id="task",
app_config=self._build_app_config(),
file_upload_config=None,
conversation_id="conversation-id",
inputs={},
query="hello",
files=[],
parent_message_id="parent-message-id",
user_id="user",
stream=False,
invoke_from=InvokeFrom.WEB_APP,
extras={},
trace_manager=existing_trace_manager,
workflow_run_id="run-id",
)
captured_entity: AdvancedChatAppGenerateEntity | None = None
def _fake_generate(**kwargs):
nonlocal captured_entity
captured_entity = kwargs["application_generate_entity"]
return SimpleNamespace(ok=True)
monkeypatch.setattr(generator, "_generate", _fake_generate)
result = generator.resume(
app_model=SimpleNamespace(id="app-id"),
workflow=SimpleNamespace(),
user=SimpleNamespace(id="end-user-id", session_id="session-id"),
conversation=SimpleNamespace(id="conversation-id"),
message=SimpleNamespace(id="message-id"),
application_generate_entity=application_generate_entity,
workflow_execution_repository=SimpleNamespace(),
workflow_node_execution_repository=SimpleNamespace(),
graph_runtime_state=SimpleNamespace(),
)
assert result.ok is True
assert captured_entity is not None
assert captured_entity.trace_manager is existing_trace_manager

View File

@ -228,7 +228,11 @@ def test_workflow_app_pause_resume_matches_baseline(mocker: MockerFixture):
app_model=SimpleNamespace(mode="workflow"),
workflow=SimpleNamespace(),
user=SimpleNamespace(),
application_generate_entity=SimpleNamespace(stream=False, invoke_from=InvokeFrom.SERVICE_API),
application_generate_entity=SimpleNamespace(
stream=False,
invoke_from=InvokeFrom.SERVICE_API,
trace_manager=SimpleNamespace(),
),
graph_runtime_state=resumed_state,
workflow_execution_repository=SimpleNamespace(),
workflow_node_execution_repository=SimpleNamespace(),
@ -270,7 +274,11 @@ def test_advanced_chat_pause_resume_matches_baseline(mocker: MockerFixture):
user=SimpleNamespace(),
conversation=SimpleNamespace(id="conv"),
message=SimpleNamespace(id="msg"),
application_generate_entity=SimpleNamespace(stream=False, invoke_from=InvokeFrom.SERVICE_API),
application_generate_entity=SimpleNamespace(
stream=False,
invoke_from=InvokeFrom.SERVICE_API,
trace_manager=SimpleNamespace(),
),
workflow_execution_repository=SimpleNamespace(),
workflow_node_execution_repository=SimpleNamespace(),
graph_runtime_state=resumed_state,

View File

@ -99,7 +99,7 @@ def test_resume_delegates_to_generate(mocker: MockerFixture):
generator = WorkflowAppGenerator()
mock_generate = mocker.patch.object(generator, "_generate", return_value="ok")
application_generate_entity = SimpleNamespace(stream=False, invoke_from="debugger")
application_generate_entity = SimpleNamespace(stream=False, invoke_from="debugger", trace_manager=MagicMock())
runtime_state = MagicMock(name="runtime-state")
pause_config = MagicMock(name="pause-config")

View File

@ -186,3 +186,114 @@ class TestWorkflowAppGeneratorGenerate:
)
assert result == {"ok": True}
class TestWorkflowAppGeneratorResume:
def test_resume_restores_trace_manager_when_missing(self, monkeypatch: pytest.MonkeyPatch):
generator = WorkflowAppGenerator()
app_config = WorkflowUIBasedAppConfig(
tenant_id="tenant",
app_id="app",
app_mode=AppMode.WORKFLOW,
additional_features=AppAdditionalFeatures(),
variables=[],
workflow_id="workflow-id",
)
application_generate_entity = WorkflowAppGenerateEntity.model_construct(
task_id="task",
app_config=app_config,
inputs={},
files=[],
user_id="user",
stream=False,
invoke_from=InvokeFrom.WEB_APP,
extras={},
trace_manager=None,
workflow_execution_id="run-id",
call_depth=0,
)
DummyTraceQueueManager = type(
"_DummyTraceQueueManager",
(TraceQueueManager,),
{
"__init__": lambda self, app_id=None, user_id=None: (
setattr(self, "app_id", app_id) or setattr(self, "user_id", user_id)
)
},
)
monkeypatch.setattr(
"core.app.apps.workflow.app_generator.TraceQueueManager",
DummyTraceQueueManager,
)
captured_entity: WorkflowAppGenerateEntity | None = None
def _fake_generate(**kwargs):
nonlocal captured_entity
captured_entity = kwargs["application_generate_entity"]
return SimpleNamespace(ok=True)
monkeypatch.setattr(generator, "_generate", _fake_generate)
result = generator.resume(
app_model=SimpleNamespace(id="app-id"),
workflow=SimpleNamespace(),
user=SimpleNamespace(id="end-user-id", session_id="session-id"),
application_generate_entity=application_generate_entity,
graph_runtime_state=SimpleNamespace(),
workflow_execution_repository=SimpleNamespace(),
workflow_node_execution_repository=SimpleNamespace(),
)
assert result.ok is True
assert captured_entity is not None
trace_manager = captured_entity.trace_manager
assert isinstance(trace_manager, DummyTraceQueueManager)
assert trace_manager.app_id == "app-id"
assert trace_manager.user_id == "session-id"
def test_resume_preserves_existing_trace_manager(self, monkeypatch: pytest.MonkeyPatch):
generator = WorkflowAppGenerator()
app_config = WorkflowUIBasedAppConfig(
tenant_id="tenant",
app_id="app",
app_mode=AppMode.WORKFLOW,
additional_features=AppAdditionalFeatures(),
variables=[],
workflow_id="workflow-id",
)
existing_trace_manager = SimpleNamespace(app_id="existing-app", user_id="existing-user")
application_generate_entity = WorkflowAppGenerateEntity.model_construct(
task_id="task",
app_config=app_config,
inputs={},
files=[],
user_id="user",
stream=False,
invoke_from=InvokeFrom.WEB_APP,
extras={},
trace_manager=existing_trace_manager,
workflow_execution_id="run-id",
call_depth=0,
)
captured_entity: WorkflowAppGenerateEntity | None = None
def _fake_generate(**kwargs):
nonlocal captured_entity
captured_entity = kwargs["application_generate_entity"]
return SimpleNamespace(ok=True)
monkeypatch.setattr(generator, "_generate", _fake_generate)
result = generator.resume(
app_model=SimpleNamespace(id="app-id"),
workflow=SimpleNamespace(),
user=SimpleNamespace(id="end-user-id", session_id="session-id"),
application_generate_entity=application_generate_entity,
graph_runtime_state=SimpleNamespace(),
workflow_execution_repository=SimpleNamespace(),
workflow_node_execution_repository=SimpleNamespace(),
)
assert result.ok is True
assert captured_entity is not None
assert captured_entity.trace_manager is existing_trace_manager