From d3977cea772d1de1b5034e43427d99e5e8c80480 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=90=E7=B2=92=20Yanli?= Date: Thu, 11 Jun 2026 14:18:43 +0900 Subject: [PATCH] fix(api): handle agent deferred tool events (#37319) --- api/clients/agent_backend/__init__.py | 4 +- api/clients/agent_backend/event_adapter.py | 57 ++++++++---- api/clients/agent_backend/fake_client.py | 23 +++-- .../workflow/nodes/agent_v2/agent_node.py | 15 ++- .../workflow/nodes/agent_v2/output_adapter.py | 14 +-- .../agent_backend/test_event_adapter.py | 91 +++++++++++++++++-- .../clients/agent_backend/test_fake_client.py | 12 +-- .../nodes/agent_v2/test_output_adapter.py | 19 ---- 8 files changed, 154 insertions(+), 81 deletions(-) diff --git a/api/clients/agent_backend/__init__.py b/api/clients/agent_backend/__init__.py index bbee164d5cb..f8d15f85676 100644 --- a/api/clients/agent_backend/__init__.py +++ b/api/clients/agent_backend/__init__.py @@ -16,12 +16,12 @@ from clients.agent_backend.errors import ( AgentBackendValidationError, ) from clients.agent_backend.event_adapter import ( + AgentBackendDeferredToolCallInternalEvent, AgentBackendInternalEvent, AgentBackendInternalEventType, AgentBackendRunCancelledInternalEvent, AgentBackendRunEventAdapter, AgentBackendRunFailedInternalEvent, - AgentBackendRunPausedInternalEvent, AgentBackendRunStartedInternalEvent, AgentBackendRunSucceededInternalEvent, AgentBackendStreamInternalEvent, @@ -51,6 +51,7 @@ __all__ = [ "WORKFLOW_NODE_JOB_PROMPT_LAYER_ID", "WORKFLOW_USER_PROMPT_LAYER_ID", "AgentBackendAgentAppRunInput", + "AgentBackendDeferredToolCallInternalEvent", "AgentBackendError", "AgentBackendHTTPError", "AgentBackendInternalEvent", @@ -63,7 +64,6 @@ __all__ = [ "AgentBackendRunEventAdapter", "AgentBackendRunFailedError", "AgentBackendRunFailedInternalEvent", - "AgentBackendRunPausedInternalEvent", "AgentBackendRunRequestBuilder", "AgentBackendRunStartedInternalEvent", "AgentBackendRunSucceededInternalEvent", diff --git a/api/clients/agent_backend/event_adapter.py b/api/clients/agent_backend/event_adapter.py index 02b30e6c6b3..e983aa336d7 100644 --- a/api/clients/agent_backend/event_adapter.py +++ b/api/clients/agent_backend/event_adapter.py @@ -2,7 +2,9 @@ The adapter does not define a new cross-service event contract. It consumes ``dify_agent.protocol.RunEvent`` and produces small API-internal models that the -future workflow Agent Node can map to Graphon/AppQueue events in phase 3. +workflow Agent Node maps to Graphon/AppQueue events. Deferred external tool calls +remain Dify Agent ``run_succeeded`` payloads on the wire; API code turns them +into an internal event so workflow pause/session handling stays local to API. """ from __future__ import annotations @@ -12,11 +14,11 @@ from typing import Annotated, Literal, cast from agenton.compositor import CompositorSessionSnapshot from dify_agent.protocol import ( + DeferredToolCallPayload, PydanticAIStreamRunEvent, RunCancelledEvent, RunEvent, RunFailedEvent, - RunPausedEvent, RunStartedEvent, RunSucceededEvent, ) @@ -30,7 +32,7 @@ class AgentBackendInternalEventType(StrEnum): RUN_STARTED = "run_started" STREAM_EVENT = "stream_event" - RUN_PAUSED = "run_paused" + DEFERRED_TOOL_CALL = "deferred_tool_call" RUN_SUCCEEDED = "run_succeeded" RUN_FAILED = "run_failed" RUN_CANCELLED = "run_cancelled" @@ -67,13 +69,13 @@ class AgentBackendRunSucceededInternalEvent(AgentBackendInternalEventBase): session_snapshot: CompositorSessionSnapshot -class AgentBackendRunPausedInternalEvent(AgentBackendInternalEventBase): - """API-internal resumable pause event for human handoff and Babysit flows.""" +class AgentBackendDeferredToolCallInternalEvent(AgentBackendInternalEventBase): + """API-internal representation of a Dify Agent deferred external tool call.""" - type: Literal[AgentBackendInternalEventType.RUN_PAUSED] = AgentBackendInternalEventType.RUN_PAUSED - reason: str + type: Literal[AgentBackendInternalEventType.DEFERRED_TOOL_CALL] = AgentBackendInternalEventType.DEFERRED_TOOL_CALL + deferred_tool_call: DeferredToolCallPayload message: str | None = None - session_snapshot: CompositorSessionSnapshot | None = None + session_snapshot: CompositorSessionSnapshot class AgentBackendRunFailedInternalEvent(AgentBackendInternalEventBase): @@ -95,7 +97,7 @@ class AgentBackendRunCancelledInternalEvent(AgentBackendInternalEventBase): type AgentBackendInternalEvent = Annotated[ AgentBackendRunStartedInternalEvent | AgentBackendStreamInternalEvent - | AgentBackendRunPausedInternalEvent + | AgentBackendDeferredToolCallInternalEvent | AgentBackendRunSucceededInternalEvent | AgentBackendRunFailedInternalEvent | AgentBackendRunCancelledInternalEvent, @@ -128,6 +130,18 @@ class AgentBackendRunEventAdapter: ) ] case RunSucceededEvent(): + if "deferred_tool_call" in event.data.model_fields_set: + if event.data.deferred_tool_call is None: + raise TypeError("run_succeeded deferred_tool_call branch is missing payload") + return [ + AgentBackendDeferredToolCallInternalEvent( + run_id=event.run_id, + source_event_id=event.id, + deferred_tool_call=event.data.deferred_tool_call, + message=_deferred_tool_call_message(event.data.deferred_tool_call), + session_snapshot=event.data.session_snapshot, + ) + ] return [ AgentBackendRunSucceededInternalEvent( run_id=event.run_id, @@ -136,16 +150,6 @@ class AgentBackendRunEventAdapter: session_snapshot=event.data.session_snapshot, ) ] - case RunPausedEvent(): - return [ - AgentBackendRunPausedInternalEvent( - run_id=event.run_id, - source_event_id=event.id, - reason=event.data.reason, - message=event.data.message, - session_snapshot=event.data.session_snapshot, - ) - ] case RunFailedEvent(): return [ AgentBackendRunFailedInternalEvent( @@ -165,3 +169,18 @@ class AgentBackendRunEventAdapter: ) ] raise TypeError(f"unsupported agent backend run event: {type(event).__name__}") + + +def _deferred_tool_call_message(payload: DeferredToolCallPayload) -> str: + """Return a concise workflow pause message from deferred-tool arguments.""" + args = payload.args + if isinstance(args, dict): + question = args.get("question") + if isinstance(question, str) and question.strip(): + return question + + title = args.get("title") + if isinstance(title, str) and title.strip(): + return title + + return f"Agent backend requested external input via deferred tool '{payload.tool_name}'." diff --git a/api/clients/agent_backend/fake_client.py b/api/clients/agent_backend/fake_client.py index a768777039d..11de90c94b7 100644 --- a/api/clients/agent_backend/fake_client.py +++ b/api/clients/agent_backend/fake_client.py @@ -17,11 +17,10 @@ from dify_agent.protocol import ( CancelRunResponse, CreateRunRequest, CreateRunResponse, + DeferredToolCallPayload, RunEvent, RunFailedEvent, RunFailedEventData, - RunPausedEvent, - RunPausedEventData, RunStartedEvent, RunStatusResponse, RunSucceededEvent, @@ -32,7 +31,11 @@ _FIXED_TIME = datetime(2026, 1, 1, tzinfo=UTC) class FakeAgentBackendScenario(StrEnum): - """Deterministic fake scenarios for API-side integration tests.""" + """Deterministic fake scenarios for API-side integration tests. + + ``PAUSED`` represents the API workflow effect. On the Dify Agent wire + protocol it is a succeeded run carrying a deferred external tool call. + """ SUCCESS = "success" FAILED = "failed" @@ -95,7 +98,7 @@ class FakeAgentBackendRunClient: case FakeAgentBackendScenario.PAUSED: return RunStatusResponse( run_id=run_id, - status="paused", + status="succeeded", created_at=_FIXED_TIME, updated_at=_FIXED_TIME, ) @@ -128,13 +131,17 @@ class FakeAgentBackendRunClient: case FakeAgentBackendScenario.PAUSED: return ( RunStartedEvent(id="1-0", run_id=run_id, created_at=_FIXED_TIME), - RunPausedEvent( + RunSucceededEvent( id="2-0", run_id=run_id, created_at=_FIXED_TIME, - data=RunPausedEventData( - reason="human_input_required", - message="Agent requested human input.", + data=RunSucceededEventData( + deferred_tool_call=DeferredToolCallPayload( + tool_call_id="fake-ask-human-1", + tool_name="ask_human", + args={"question": "Agent requested human input."}, + metadata={"layer_type": "dify.ask_human", "schema_version": 1}, + ), session_snapshot=CompositorSessionSnapshot(layers=[]), ), ), diff --git a/api/core/workflow/nodes/agent_v2/agent_node.py b/api/core/workflow/nodes/agent_v2/agent_node.py index bdcbb776267..f15f431dc80 100644 --- a/api/core/workflow/nodes/agent_v2/agent_node.py +++ b/api/core/workflow/nodes/agent_v2/agent_node.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, override from agenton.compositor import CompositorSessionSnapshot from clients.agent_backend import ( + AgentBackendDeferredToolCallInternalEvent, AgentBackendError, AgentBackendHTTPError, AgentBackendInternalEventType, @@ -14,7 +15,6 @@ from clients.agent_backend import ( AgentBackendRunClient, AgentBackendRunEventAdapter, AgentBackendRunFailedInternalEvent, - AgentBackendRunPausedInternalEvent, AgentBackendRunSucceededInternalEvent, AgentBackendStreamError, AgentBackendStreamInternalEvent, @@ -62,7 +62,7 @@ _TerminalAgentBackendEvent = ( AgentBackendRunSucceededInternalEvent | AgentBackendRunFailedInternalEvent | AgentBackendRunCancelledInternalEvent - | AgentBackendRunPausedInternalEvent + | AgentBackendDeferredToolCallInternalEvent ) @@ -250,7 +250,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]): ) return - if isinstance(terminal_event, AgentBackendRunPausedInternalEvent): + if isinstance(terminal_event, AgentBackendDeferredToolCallInternalEvent): self._save_session_snapshot( session_scope=session_scope, backend_run_id=terminal_event.run_id, @@ -394,16 +394,15 @@ class DifyAgentNode(Node[DifyAgentNodeData]): **dict(metadata.get("agent_backend") or {}), "stream_event_count": stream_event_count, } - # Narrow to the 4 known terminal event types so the caller - # can hand the result to ``build_failure_result`` (which is - # typed against the union). Anything else is a protocol- - # level surprise we surface as a stream error. + # Narrow to the known terminal event types before returning + # to the caller. Deferred-tool events are terminal on the + # Dify Agent wire, then converted into workflow pause locally. if isinstance( internal_event, AgentBackendRunSucceededInternalEvent | AgentBackendRunFailedInternalEvent | AgentBackendRunCancelledInternalEvent - | AgentBackendRunPausedInternalEvent, + | AgentBackendDeferredToolCallInternalEvent, ): return internal_event, None return None, self._failure_event( diff --git a/api/core/workflow/nodes/agent_v2/output_adapter.py b/api/core/workflow/nodes/agent_v2/output_adapter.py index b679c409bac..0fae0105348 100644 --- a/api/core/workflow/nodes/agent_v2/output_adapter.py +++ b/api/core/workflow/nodes/agent_v2/output_adapter.py @@ -4,11 +4,11 @@ from collections.abc import Mapping, Sequence from typing import Any, Protocol from clients.agent_backend import ( + AgentBackendDeferredToolCallInternalEvent, AgentBackendInternalEvent, AgentBackendInternalEventType, AgentBackendRunCancelledInternalEvent, AgentBackendRunFailedInternalEvent, - AgentBackendRunPausedInternalEvent, AgentBackendRunSucceededInternalEvent, ) from core.app.file_access import DatabaseFileAccessController @@ -85,11 +85,7 @@ class WorkflowAgentOutputAdapter: def build_failure_result( self, *, - event: ( - AgentBackendRunFailedInternalEvent - | AgentBackendRunCancelledInternalEvent - | AgentBackendRunPausedInternalEvent - ), + event: (AgentBackendRunFailedInternalEvent | AgentBackendRunCancelledInternalEvent), inputs: dict[str, Any], process_data: dict[str, Any], metadata: dict[str, Any], @@ -108,10 +104,6 @@ class WorkflowAgentOutputAdapter: error = event.message or "Agent backend run was cancelled." error_type = "agent_backend_run_cancelled" terminal_status = "cancelled" - case AgentBackendRunPausedInternalEvent(): - error = event.message or "Agent backend run paused, but workflow Agent Node pause is not supported yet." - error_type = "agent_backend_paused_unsupported" - terminal_status = "paused" metadata = self._with_terminal_metadata(metadata, event, terminal_status) usage = self._usage_from_metadata(metadata) @@ -339,7 +331,7 @@ class WorkflowAgentOutputAdapter: } ) session_snapshot = None - if isinstance(event, AgentBackendRunSucceededInternalEvent | AgentBackendRunPausedInternalEvent): + if isinstance(event, AgentBackendRunSucceededInternalEvent | AgentBackendDeferredToolCallInternalEvent): session_snapshot = event.session_snapshot if session_snapshot is not None: agent_backend["session_snapshot"] = { diff --git a/api/tests/unit_tests/clients/agent_backend/test_event_adapter.py b/api/tests/unit_tests/clients/agent_backend/test_event_adapter.py index 79f7d14d31e..ab88ba19640 100644 --- a/api/tests/unit_tests/clients/agent_backend/test_event_adapter.py +++ b/api/tests/unit_tests/clients/agent_backend/test_event_adapter.py @@ -1,12 +1,12 @@ +import pytest from agenton.compositor import CompositorSessionSnapshot from dify_agent.protocol import ( + DeferredToolCallPayload, PydanticAIStreamRunEvent, RunCancelledEvent, RunCancelledEventData, RunFailedEvent, RunFailedEventData, - RunPausedEvent, - RunPausedEventData, RunStartedEvent, RunSucceededEvent, RunSucceededEventData, @@ -14,11 +14,11 @@ from dify_agent.protocol import ( from pydantic_ai.messages import FinalResultEvent from clients.agent_backend import ( + AgentBackendDeferredToolCallInternalEvent, AgentBackendInternalEventType, AgentBackendRunCancelledInternalEvent, AgentBackendRunEventAdapter, AgentBackendRunFailedInternalEvent, - AgentBackendRunPausedInternalEvent, AgentBackendRunStartedInternalEvent, AgentBackendRunSucceededInternalEvent, AgentBackendStreamInternalEvent, @@ -92,27 +92,102 @@ def test_event_adapter_maps_run_failed_to_failed_result(): ] -def test_event_adapter_maps_run_paused_to_resumable_pause(): +def test_event_adapter_maps_deferred_tool_call_success_to_internal_event(): snapshot = CompositorSessionSnapshot(layers=[]) + deferred_tool_call = DeferredToolCallPayload( + tool_call_id="tool-call-1", + tool_name="ask_human", + args={"question": "Need review"}, + metadata={"layer_type": "dify.ask_human", "schema_version": 1}, + ) adapted = AgentBackendRunEventAdapter().adapt( - RunPausedEvent( + RunSucceededEvent( id="5-0", run_id="run-1", - data=RunPausedEventData(reason="human_handoff", message="Need review", session_snapshot=snapshot), + data=RunSucceededEventData(deferred_tool_call=deferred_tool_call, session_snapshot=snapshot), ) ) assert adapted == [ - AgentBackendRunPausedInternalEvent( + AgentBackendDeferredToolCallInternalEvent( run_id="run-1", source_event_id="5-0", - reason="human_handoff", + deferred_tool_call=deferred_tool_call, message="Need review", session_snapshot=snapshot, ) ] +def test_event_adapter_rejects_deferred_tool_call_success_without_payload(): + snapshot = CompositorSessionSnapshot(layers=[]) + + with pytest.raises(TypeError, match="deferred_tool_call branch is missing payload"): + _ = AgentBackendRunEventAdapter().adapt( + RunSucceededEvent( + id="5-1", + run_id="run-1", + data=RunSucceededEventData(deferred_tool_call=None, session_snapshot=snapshot), + ) + ) + + +def test_event_adapter_uses_deferred_tool_call_title_as_pause_message_fallback(): + snapshot = CompositorSessionSnapshot(layers=[]) + deferred_tool_call = DeferredToolCallPayload( + tool_call_id="tool-call-1", + tool_name="ask_human", + args={"title": "Review required"}, + metadata={}, + ) + + adapted = AgentBackendRunEventAdapter().adapt( + RunSucceededEvent( + id="5-2", + run_id="run-1", + data=RunSucceededEventData(deferred_tool_call=deferred_tool_call, session_snapshot=snapshot), + ) + ) + + assert adapted == [ + AgentBackendDeferredToolCallInternalEvent( + run_id="run-1", + source_event_id="5-2", + deferred_tool_call=deferred_tool_call, + message="Review required", + session_snapshot=snapshot, + ) + ] + + +def test_event_adapter_uses_generic_deferred_tool_call_pause_message_when_args_have_no_label(): + snapshot = CompositorSessionSnapshot(layers=[]) + deferred_tool_call = DeferredToolCallPayload( + tool_call_id="tool-call-1", + tool_name="ask_human", + args={"question": " ", "title": " "}, + metadata={}, + ) + + adapted = AgentBackendRunEventAdapter().adapt( + RunSucceededEvent( + id="5-3", + run_id="run-1", + data=RunSucceededEventData(deferred_tool_call=deferred_tool_call, session_snapshot=snapshot), + ) + ) + + assert adapted == [ + AgentBackendDeferredToolCallInternalEvent( + run_id="run-1", + source_event_id="5-3", + deferred_tool_call=deferred_tool_call, + message="Agent backend requested external input via deferred tool 'ask_human'.", + session_snapshot=snapshot, + ) + ] + + def test_event_adapter_maps_run_cancelled_to_terminal_cancelled(): adapted = AgentBackendRunEventAdapter().adapt( RunCancelledEvent( diff --git a/api/tests/unit_tests/clients/agent_backend/test_fake_client.py b/api/tests/unit_tests/clients/agent_backend/test_fake_client.py index 9b3e206031e..5862117f622 100644 --- a/api/tests/unit_tests/clients/agent_backend/test_fake_client.py +++ b/api/tests/unit_tests/clients/agent_backend/test_fake_client.py @@ -70,18 +70,18 @@ def test_fake_client_cancel_run_returns_cancelled_status(): assert cancelled.status == "cancelled" -def test_fake_client_paused_scenario_returns_paused_status_and_event(): - """The paused scenario exists for HITL-style flows; both ``wait_run`` and - the event stream must report the pause so consumers can branch on it.""" +def test_fake_client_paused_scenario_returns_deferred_tool_call_success_event(): + """The API pause scenario follows the Dify Agent deferred-tool wire shape.""" client = FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario.PAUSED) status = client.wait_run("fake-run-1") events = list(client.stream_events("fake-run-1")) - assert status.status == "paused" + assert status.status == "succeeded" assert status.error is None - assert events[-1].type == "run_paused" - assert events[-1].data.reason == "human_input_required" + assert events[-1].type == "run_succeeded" + assert events[-1].data.deferred_tool_call is not None + assert events[-1].data.deferred_tool_call.tool_name == "ask_human" def test_fake_client_success_wait_run_returns_succeeded_status(): diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_adapter.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_adapter.py index 49e24cc6770..8d50021f8fe 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_adapter.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_adapter.py @@ -6,7 +6,6 @@ from agenton.compositor import CompositorSessionSnapshot from clients.agent_backend import ( AgentBackendRunCancelledInternalEvent, AgentBackendRunFailedInternalEvent, - AgentBackendRunPausedInternalEvent, AgentBackendRunSucceededInternalEvent, ) from core.workflow.file_reference import build_file_reference @@ -144,24 +143,6 @@ def test_success_output_adapter_preserves_dict_output(): } -def test_failure_output_adapter_maps_paused_to_unsupported_failure(): - result = WorkflowAgentOutputAdapter().build_failure_result( - event=AgentBackendRunPausedInternalEvent( - run_id="run-1", - source_event_id="2-0", - reason="human", - message=None, - session_snapshot=None, - ), - inputs={}, - process_data={}, - metadata={}, - ) - - assert result.status == WorkflowNodeExecutionStatus.FAILED - assert result.error_type == "agent_backend_paused_unsupported" - - def test_failure_output_adapter_preserves_backend_failed_reason(): result = WorkflowAgentOutputAdapter().build_failure_result( event=AgentBackendRunFailedInternalEvent(