fix(api): handle agent deferred tool events (#37319)

This commit is contained in:
盐粒 Yanli 2026-06-11 14:18:43 +09:00 committed by GitHub
parent 49c97a3f61
commit d3977cea77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 154 additions and 81 deletions

View File

@ -16,12 +16,12 @@ from clients.agent_backend.errors import (
AgentBackendValidationError,
)
from clients.agent_backend.event_adapter import (
AgentBackendDeferredToolCallInternalEvent,
AgentBackendInternalEvent,
AgentBackendInternalEventType,
AgentBackendRunCancelledInternalEvent,
AgentBackendRunEventAdapter,
AgentBackendRunFailedInternalEvent,
AgentBackendRunPausedInternalEvent,
AgentBackendRunStartedInternalEvent,
AgentBackendRunSucceededInternalEvent,
AgentBackendStreamInternalEvent,
@ -51,6 +51,7 @@ __all__ = [
"WORKFLOW_NODE_JOB_PROMPT_LAYER_ID",
"WORKFLOW_USER_PROMPT_LAYER_ID",
"AgentBackendAgentAppRunInput",
"AgentBackendDeferredToolCallInternalEvent",
"AgentBackendError",
"AgentBackendHTTPError",
"AgentBackendInternalEvent",
@ -63,7 +64,6 @@ __all__ = [
"AgentBackendRunEventAdapter",
"AgentBackendRunFailedError",
"AgentBackendRunFailedInternalEvent",
"AgentBackendRunPausedInternalEvent",
"AgentBackendRunRequestBuilder",
"AgentBackendRunStartedInternalEvent",
"AgentBackendRunSucceededInternalEvent",

View File

@ -2,7 +2,9 @@
The adapter does not define a new cross-service event contract. It consumes
``dify_agent.protocol.RunEvent`` and produces small API-internal models that the
future workflow Agent Node can map to Graphon/AppQueue events in phase 3.
workflow Agent Node maps to Graphon/AppQueue events. Deferred external tool calls
remain Dify Agent ``run_succeeded`` payloads on the wire; API code turns them
into an internal event so workflow pause/session handling stays local to API.
"""
from __future__ import annotations
@ -12,11 +14,11 @@ from typing import Annotated, Literal, cast
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.protocol import (
DeferredToolCallPayload,
PydanticAIStreamRunEvent,
RunCancelledEvent,
RunEvent,
RunFailedEvent,
RunPausedEvent,
RunStartedEvent,
RunSucceededEvent,
)
@ -30,7 +32,7 @@ class AgentBackendInternalEventType(StrEnum):
RUN_STARTED = "run_started"
STREAM_EVENT = "stream_event"
RUN_PAUSED = "run_paused"
DEFERRED_TOOL_CALL = "deferred_tool_call"
RUN_SUCCEEDED = "run_succeeded"
RUN_FAILED = "run_failed"
RUN_CANCELLED = "run_cancelled"
@ -67,13 +69,13 @@ class AgentBackendRunSucceededInternalEvent(AgentBackendInternalEventBase):
session_snapshot: CompositorSessionSnapshot
class AgentBackendRunPausedInternalEvent(AgentBackendInternalEventBase):
"""API-internal resumable pause event for human handoff and Babysit flows."""
class AgentBackendDeferredToolCallInternalEvent(AgentBackendInternalEventBase):
"""API-internal representation of a Dify Agent deferred external tool call."""
type: Literal[AgentBackendInternalEventType.RUN_PAUSED] = AgentBackendInternalEventType.RUN_PAUSED
reason: str
type: Literal[AgentBackendInternalEventType.DEFERRED_TOOL_CALL] = AgentBackendInternalEventType.DEFERRED_TOOL_CALL
deferred_tool_call: DeferredToolCallPayload
message: str | None = None
session_snapshot: CompositorSessionSnapshot | None = None
session_snapshot: CompositorSessionSnapshot
class AgentBackendRunFailedInternalEvent(AgentBackendInternalEventBase):
@ -95,7 +97,7 @@ class AgentBackendRunCancelledInternalEvent(AgentBackendInternalEventBase):
type AgentBackendInternalEvent = Annotated[
AgentBackendRunStartedInternalEvent
| AgentBackendStreamInternalEvent
| AgentBackendRunPausedInternalEvent
| AgentBackendDeferredToolCallInternalEvent
| AgentBackendRunSucceededInternalEvent
| AgentBackendRunFailedInternalEvent
| AgentBackendRunCancelledInternalEvent,
@ -128,6 +130,18 @@ class AgentBackendRunEventAdapter:
)
]
case RunSucceededEvent():
if "deferred_tool_call" in event.data.model_fields_set:
if event.data.deferred_tool_call is None:
raise TypeError("run_succeeded deferred_tool_call branch is missing payload")
return [
AgentBackendDeferredToolCallInternalEvent(
run_id=event.run_id,
source_event_id=event.id,
deferred_tool_call=event.data.deferred_tool_call,
message=_deferred_tool_call_message(event.data.deferred_tool_call),
session_snapshot=event.data.session_snapshot,
)
]
return [
AgentBackendRunSucceededInternalEvent(
run_id=event.run_id,
@ -136,16 +150,6 @@ class AgentBackendRunEventAdapter:
session_snapshot=event.data.session_snapshot,
)
]
case RunPausedEvent():
return [
AgentBackendRunPausedInternalEvent(
run_id=event.run_id,
source_event_id=event.id,
reason=event.data.reason,
message=event.data.message,
session_snapshot=event.data.session_snapshot,
)
]
case RunFailedEvent():
return [
AgentBackendRunFailedInternalEvent(
@ -165,3 +169,18 @@ class AgentBackendRunEventAdapter:
)
]
raise TypeError(f"unsupported agent backend run event: {type(event).__name__}")
def _deferred_tool_call_message(payload: DeferredToolCallPayload) -> str:
"""Return a concise workflow pause message from deferred-tool arguments."""
args = payload.args
if isinstance(args, dict):
question = args.get("question")
if isinstance(question, str) and question.strip():
return question
title = args.get("title")
if isinstance(title, str) and title.strip():
return title
return f"Agent backend requested external input via deferred tool '{payload.tool_name}'."

View File

@ -17,11 +17,10 @@ from dify_agent.protocol import (
CancelRunResponse,
CreateRunRequest,
CreateRunResponse,
DeferredToolCallPayload,
RunEvent,
RunFailedEvent,
RunFailedEventData,
RunPausedEvent,
RunPausedEventData,
RunStartedEvent,
RunStatusResponse,
RunSucceededEvent,
@ -32,7 +31,11 @@ _FIXED_TIME = datetime(2026, 1, 1, tzinfo=UTC)
class FakeAgentBackendScenario(StrEnum):
"""Deterministic fake scenarios for API-side integration tests."""
"""Deterministic fake scenarios for API-side integration tests.
``PAUSED`` represents the API workflow effect. On the Dify Agent wire
protocol it is a succeeded run carrying a deferred external tool call.
"""
SUCCESS = "success"
FAILED = "failed"
@ -95,7 +98,7 @@ class FakeAgentBackendRunClient:
case FakeAgentBackendScenario.PAUSED:
return RunStatusResponse(
run_id=run_id,
status="paused",
status="succeeded",
created_at=_FIXED_TIME,
updated_at=_FIXED_TIME,
)
@ -128,13 +131,17 @@ class FakeAgentBackendRunClient:
case FakeAgentBackendScenario.PAUSED:
return (
RunStartedEvent(id="1-0", run_id=run_id, created_at=_FIXED_TIME),
RunPausedEvent(
RunSucceededEvent(
id="2-0",
run_id=run_id,
created_at=_FIXED_TIME,
data=RunPausedEventData(
reason="human_input_required",
message="Agent requested human input.",
data=RunSucceededEventData(
deferred_tool_call=DeferredToolCallPayload(
tool_call_id="fake-ask-human-1",
tool_name="ask_human",
args={"question": "Agent requested human input."},
metadata={"layer_type": "dify.ask_human", "schema_version": 1},
),
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
),

View File

@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, override
from agenton.compositor import CompositorSessionSnapshot
from clients.agent_backend import (
AgentBackendDeferredToolCallInternalEvent,
AgentBackendError,
AgentBackendHTTPError,
AgentBackendInternalEventType,
@ -14,7 +15,6 @@ from clients.agent_backend import (
AgentBackendRunClient,
AgentBackendRunEventAdapter,
AgentBackendRunFailedInternalEvent,
AgentBackendRunPausedInternalEvent,
AgentBackendRunSucceededInternalEvent,
AgentBackendStreamError,
AgentBackendStreamInternalEvent,
@ -62,7 +62,7 @@ _TerminalAgentBackendEvent = (
AgentBackendRunSucceededInternalEvent
| AgentBackendRunFailedInternalEvent
| AgentBackendRunCancelledInternalEvent
| AgentBackendRunPausedInternalEvent
| AgentBackendDeferredToolCallInternalEvent
)
@ -250,7 +250,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
)
return
if isinstance(terminal_event, AgentBackendRunPausedInternalEvent):
if isinstance(terminal_event, AgentBackendDeferredToolCallInternalEvent):
self._save_session_snapshot(
session_scope=session_scope,
backend_run_id=terminal_event.run_id,
@ -394,16 +394,15 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
**dict(metadata.get("agent_backend") or {}),
"stream_event_count": stream_event_count,
}
# Narrow to the 4 known terminal event types so the caller
# can hand the result to ``build_failure_result`` (which is
# typed against the union). Anything else is a protocol-
# level surprise we surface as a stream error.
# Narrow to the known terminal event types before returning
# to the caller. Deferred-tool events are terminal on the
# Dify Agent wire, then converted into workflow pause locally.
if isinstance(
internal_event,
AgentBackendRunSucceededInternalEvent
| AgentBackendRunFailedInternalEvent
| AgentBackendRunCancelledInternalEvent
| AgentBackendRunPausedInternalEvent,
| AgentBackendDeferredToolCallInternalEvent,
):
return internal_event, None
return None, self._failure_event(

View File

@ -4,11 +4,11 @@ from collections.abc import Mapping, Sequence
from typing import Any, Protocol
from clients.agent_backend import (
AgentBackendDeferredToolCallInternalEvent,
AgentBackendInternalEvent,
AgentBackendInternalEventType,
AgentBackendRunCancelledInternalEvent,
AgentBackendRunFailedInternalEvent,
AgentBackendRunPausedInternalEvent,
AgentBackendRunSucceededInternalEvent,
)
from core.app.file_access import DatabaseFileAccessController
@ -85,11 +85,7 @@ class WorkflowAgentOutputAdapter:
def build_failure_result(
self,
*,
event: (
AgentBackendRunFailedInternalEvent
| AgentBackendRunCancelledInternalEvent
| AgentBackendRunPausedInternalEvent
),
event: (AgentBackendRunFailedInternalEvent | AgentBackendRunCancelledInternalEvent),
inputs: dict[str, Any],
process_data: dict[str, Any],
metadata: dict[str, Any],
@ -108,10 +104,6 @@ class WorkflowAgentOutputAdapter:
error = event.message or "Agent backend run was cancelled."
error_type = "agent_backend_run_cancelled"
terminal_status = "cancelled"
case AgentBackendRunPausedInternalEvent():
error = event.message or "Agent backend run paused, but workflow Agent Node pause is not supported yet."
error_type = "agent_backend_paused_unsupported"
terminal_status = "paused"
metadata = self._with_terminal_metadata(metadata, event, terminal_status)
usage = self._usage_from_metadata(metadata)
@ -339,7 +331,7 @@ class WorkflowAgentOutputAdapter:
}
)
session_snapshot = None
if isinstance(event, AgentBackendRunSucceededInternalEvent | AgentBackendRunPausedInternalEvent):
if isinstance(event, AgentBackendRunSucceededInternalEvent | AgentBackendDeferredToolCallInternalEvent):
session_snapshot = event.session_snapshot
if session_snapshot is not None:
agent_backend["session_snapshot"] = {

View File

@ -1,12 +1,12 @@
import pytest
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.protocol import (
DeferredToolCallPayload,
PydanticAIStreamRunEvent,
RunCancelledEvent,
RunCancelledEventData,
RunFailedEvent,
RunFailedEventData,
RunPausedEvent,
RunPausedEventData,
RunStartedEvent,
RunSucceededEvent,
RunSucceededEventData,
@ -14,11 +14,11 @@ from dify_agent.protocol import (
from pydantic_ai.messages import FinalResultEvent
from clients.agent_backend import (
AgentBackendDeferredToolCallInternalEvent,
AgentBackendInternalEventType,
AgentBackendRunCancelledInternalEvent,
AgentBackendRunEventAdapter,
AgentBackendRunFailedInternalEvent,
AgentBackendRunPausedInternalEvent,
AgentBackendRunStartedInternalEvent,
AgentBackendRunSucceededInternalEvent,
AgentBackendStreamInternalEvent,
@ -92,27 +92,102 @@ def test_event_adapter_maps_run_failed_to_failed_result():
]
def test_event_adapter_maps_run_paused_to_resumable_pause():
def test_event_adapter_maps_deferred_tool_call_success_to_internal_event():
snapshot = CompositorSessionSnapshot(layers=[])
deferred_tool_call = DeferredToolCallPayload(
tool_call_id="tool-call-1",
tool_name="ask_human",
args={"question": "Need review"},
metadata={"layer_type": "dify.ask_human", "schema_version": 1},
)
adapted = AgentBackendRunEventAdapter().adapt(
RunPausedEvent(
RunSucceededEvent(
id="5-0",
run_id="run-1",
data=RunPausedEventData(reason="human_handoff", message="Need review", session_snapshot=snapshot),
data=RunSucceededEventData(deferred_tool_call=deferred_tool_call, session_snapshot=snapshot),
)
)
assert adapted == [
AgentBackendRunPausedInternalEvent(
AgentBackendDeferredToolCallInternalEvent(
run_id="run-1",
source_event_id="5-0",
reason="human_handoff",
deferred_tool_call=deferred_tool_call,
message="Need review",
session_snapshot=snapshot,
)
]
def test_event_adapter_rejects_deferred_tool_call_success_without_payload():
snapshot = CompositorSessionSnapshot(layers=[])
with pytest.raises(TypeError, match="deferred_tool_call branch is missing payload"):
_ = AgentBackendRunEventAdapter().adapt(
RunSucceededEvent(
id="5-1",
run_id="run-1",
data=RunSucceededEventData(deferred_tool_call=None, session_snapshot=snapshot),
)
)
def test_event_adapter_uses_deferred_tool_call_title_as_pause_message_fallback():
snapshot = CompositorSessionSnapshot(layers=[])
deferred_tool_call = DeferredToolCallPayload(
tool_call_id="tool-call-1",
tool_name="ask_human",
args={"title": "Review required"},
metadata={},
)
adapted = AgentBackendRunEventAdapter().adapt(
RunSucceededEvent(
id="5-2",
run_id="run-1",
data=RunSucceededEventData(deferred_tool_call=deferred_tool_call, session_snapshot=snapshot),
)
)
assert adapted == [
AgentBackendDeferredToolCallInternalEvent(
run_id="run-1",
source_event_id="5-2",
deferred_tool_call=deferred_tool_call,
message="Review required",
session_snapshot=snapshot,
)
]
def test_event_adapter_uses_generic_deferred_tool_call_pause_message_when_args_have_no_label():
snapshot = CompositorSessionSnapshot(layers=[])
deferred_tool_call = DeferredToolCallPayload(
tool_call_id="tool-call-1",
tool_name="ask_human",
args={"question": " ", "title": " "},
metadata={},
)
adapted = AgentBackendRunEventAdapter().adapt(
RunSucceededEvent(
id="5-3",
run_id="run-1",
data=RunSucceededEventData(deferred_tool_call=deferred_tool_call, session_snapshot=snapshot),
)
)
assert adapted == [
AgentBackendDeferredToolCallInternalEvent(
run_id="run-1",
source_event_id="5-3",
deferred_tool_call=deferred_tool_call,
message="Agent backend requested external input via deferred tool 'ask_human'.",
session_snapshot=snapshot,
)
]
def test_event_adapter_maps_run_cancelled_to_terminal_cancelled():
adapted = AgentBackendRunEventAdapter().adapt(
RunCancelledEvent(

View File

@ -70,18 +70,18 @@ def test_fake_client_cancel_run_returns_cancelled_status():
assert cancelled.status == "cancelled"
def test_fake_client_paused_scenario_returns_paused_status_and_event():
"""The paused scenario exists for HITL-style flows; both ``wait_run`` and
the event stream must report the pause so consumers can branch on it."""
def test_fake_client_paused_scenario_returns_deferred_tool_call_success_event():
"""The API pause scenario follows the Dify Agent deferred-tool wire shape."""
client = FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario.PAUSED)
status = client.wait_run("fake-run-1")
events = list(client.stream_events("fake-run-1"))
assert status.status == "paused"
assert status.status == "succeeded"
assert status.error is None
assert events[-1].type == "run_paused"
assert events[-1].data.reason == "human_input_required"
assert events[-1].type == "run_succeeded"
assert events[-1].data.deferred_tool_call is not None
assert events[-1].data.deferred_tool_call.tool_name == "ask_human"
def test_fake_client_success_wait_run_returns_succeeded_status():

View File

@ -6,7 +6,6 @@ from agenton.compositor import CompositorSessionSnapshot
from clients.agent_backend import (
AgentBackendRunCancelledInternalEvent,
AgentBackendRunFailedInternalEvent,
AgentBackendRunPausedInternalEvent,
AgentBackendRunSucceededInternalEvent,
)
from core.workflow.file_reference import build_file_reference
@ -144,24 +143,6 @@ def test_success_output_adapter_preserves_dict_output():
}
def test_failure_output_adapter_maps_paused_to_unsupported_failure():
result = WorkflowAgentOutputAdapter().build_failure_result(
event=AgentBackendRunPausedInternalEvent(
run_id="run-1",
source_event_id="2-0",
reason="human",
message=None,
session_snapshot=None,
),
inputs={},
process_data={},
metadata={},
)
assert result.status == WorkflowNodeExecutionStatus.FAILED
assert result.error_type == "agent_backend_paused_unsupported"
def test_failure_output_adapter_preserves_backend_failed_reason():
result = WorkflowAgentOutputAdapter().build_failure_result(
event=AgentBackendRunFailedInternalEvent(