feat(dify-agent): sync ask-human updates (#37286)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
盐粒 Yanli 2026-06-11 11:44:26 +09:00 committed by GitHub
parent 84490179b0
commit 117a25b32a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 2022 additions and 100 deletions

View File

@ -45,14 +45,15 @@ current `agent run` has ended; the outer `workflow run` is what should be paused
The caller should handle this flow as follows:
1. Read the current `agent run` result and detect the HITL (human-in-the-loop)
requirement.
1. Read the current `agent run` result and detect `deferred_tool_call` on the
terminal `run_succeeded` event.
2. Enter workflow HITL handling and pause graphon.
3. Wait for the human input to be completed.
4. When resuming the workflow, insert the human tool response into the same Agent
session's history layer.
5. Start a second `agent run` on the same Agent node and reuse the same history
session.
4. When resuming the workflow, start a second `agent run` on the same Agent node
with the previous `session_snapshot`, matching composition, and
`deferred_tool_results` keyed by the original tool call id.
5. Keep the history layer active so Dify Agent can match the result to the
pending tool call stored in the previous run's message history.
In other words, a human tool does not mean “pause this agent run until it is
resumed.” It means “this agent run ended with a result that requires human

View File

@ -136,8 +136,10 @@ Successful runs emit `run_started`, zero or more `pydantic_ai_event`, and
`run_succeeded`. Failed runs end with `run_failed`. Event envelopes retain `id`,
`run_id`, `type`, `data`, and `created_at`; `data` is typed per event type,
including Pydantic AI's `AgentStreamEvent` payload for `pydantic_ai_event` and a
terminal `run_succeeded.data` object containing JSON-safe `output` plus a
`CompositorSessionSnapshot` for resumption.
terminal `run_succeeded.data` object containing a `CompositorSessionSnapshot` for
resumption. A successful run has exactly one active result branch: JSON-safe
`output` for final answers, or `deferred_tool_call` when a layer such as
`dify.ask_human` ends the current agent run with an external deferred tool call.
## Examples

View File

@ -0,0 +1,271 @@
# Ask human layer
The ask human layer exposes one model-visible tool that lets an agent end the
current run with a structured request for human input. This page is for Dify
Agent clients that build `CreateRunRequest` payloads and then interpret terminal
run events.
The layer type id is `dify.ask_human`. It does not deliver forms, choose
recipients, enforce authorization, or wait inside the agent run. It only gives
the model a safe way to ask for human input and returns that request as a
deferred tool call.
## Layer contract
| Property | Value |
| --- | --- |
| Type id | `dify.ask_human` |
| Common layer name | `ask_human` |
| Config DTO | `DifyAskHumanLayerConfig` |
| Model-visible tool | `ask_human` by default, configurable with `tool_name` |
| Tool kind | pydantic-ai `external` deferred tool |
| Terminal event | `run_succeeded` |
| Terminal payload branch | `run_succeeded.data.deferred_tool_call` |
The agent run does not enter a paused status. When the model calls the ask-human
tool, the current run succeeds with a `deferred_tool_call` instead of normal
`output`. The client is responsible for turning that deferred call into its own
human-facing workflow, collecting a result, and starting another run with
`deferred_tool_results`.
## Basic usage
Add the ask human layer to the same composition as the prompt, history, LLM, and
optional structured-output layers:
```python {test="skip" lint="skip"}
from agenton_collections.layers.plain import PromptLayerConfig
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
from dify_agent.layers.ask_human import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig
from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID
from dify_agent.protocol.schemas import CreateRunRequest, RunComposition, RunLayerSpec
request = CreateRunRequest(
composition=RunComposition(
layers=[
RunLayerSpec(
name="prompt",
type="plain.prompt",
config=PromptLayerConfig(
prefix="You can ask a human only when the missing decision is required to continue.",
user="Review the deployment plan and proceed only after getting the required approval.",
),
),
RunLayerSpec(
name=DIFY_AGENT_HISTORY_LAYER_ID,
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
),
RunLayerSpec(
name="ask_human",
type=DIFY_ASK_HUMAN_LAYER_TYPE_ID,
config=DifyAskHumanLayerConfig(
max_fields=4,
max_actions=2,
allowed_field_types=["paragraph", "select"],
allow_file_fields=False,
),
),
RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type="dify.plugin.llm",
config=DifyPluginLLMLayerConfig(
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-5.2",
credentials={"openai_api_key": "<redacted>"},
),
),
]
)
)
```
Include a [history layer](../history-layer/index.md) whenever you expect to
resume after a human answer. The pending tool call is stored in pydantic-ai
message history, so the resumed run needs both the returned `session_snapshot`
and the same logical composition with the history layer still present.
## Config fields
`DifyAskHumanLayerConfig` controls the model-facing tool identity and guardrails.
It intentionally does not contain delivery settings.
| Field | Type | Default | Meaning |
| --- | --- | --- | --- |
| `enabled` | `bool` | `True` | When false, the layer exposes neither the tool nor the prompt guidance. |
| `tool_name` | `str` | `"ask_human"` | Model-visible tool name. Must be a valid identifier. |
| `tool_description` | `str \| None` | default description | Optional model-visible tool description. |
| `max_fields` | `int` | `8` | Maximum number of fields the model may request. Use `0` for action-only requests. |
| `max_actions` | `int` | `4` | Maximum number of human actions the model may request. |
| `allowed_field_types` | `list["paragraph" \| "select" \| "file" \| "file-list"]` | `["paragraph", "select"]` | Field types accepted by runtime validation. |
| `allow_file_fields` | `bool` | `False` | File field types are rejected unless this is true and the type is listed in `allowed_field_types`. |
| `max_markdown_chars` | `int` | `8000` | Maximum length for the optional `markdown` body. |
| `max_question_chars` | `int` | `1000` | Maximum length for the required `question`. |
| `max_field_label_chars` | `int` | `120` | Maximum label length for each field. |
| `max_action_label_chars` | `int` | `80` | Maximum label length for each action. |
Configured limits are also capped by server hard limits. If a config exceeds a
hard cap, request validation fails before the run can execute.
The layer converts these limits into a prompt hint automatically. Clients do not
need to write a separate system prompt listing the limits, although they may add
business-specific guidance such as when human input is appropriate.
## What the model can request
When enabled, the layer exposes an external deferred tool whose argument shape is
`AskHumanToolArgs`:
| Field | Type | Meaning |
| --- | --- | --- |
| `title` | `str \| None` | Optional short title for the human request. |
| `question` | `str` | Required question/instruction for the human. |
| `markdown` | `str \| None` | Optional longer Markdown body. Treat it as untrusted user-visible content. |
| `fields` | `list[AskHumanField]` | Optional structured fields for the human to fill. |
| `actions` | `list[AskHumanAction]` | Optional action buttons. If omitted, Dify Agent normalizes to a single primary `Submit` action. |
| `urgency` | `"normal" \| "high"` | Hint for downstream systems; it is not a delivery policy. |
Supported field variants:
- `paragraph`: free-text input.
- `select`: single-choice input with unique option values.
- `file`: single-file input, only when file fields are allowed.
- `file-list`: multi-file input, only when file fields are allowed.
Tool arguments are validated again after the model calls the tool. Invalid calls
produce a model retry before a terminal success is emitted.
## Handling a deferred human request
Stream or poll run events as usual. A successful final answer has
`event.data.output`. A successful human request has `event.data.deferred_tool_call`.
Exactly one branch is set.
```python {test="skip" lint="skip"}
deferred_call = None
snapshot = None
async for event in client.stream_events(run_id):
if event.type != "run_succeeded":
continue
snapshot = event.data.session_snapshot
if event.data.deferred_tool_call is not None:
deferred_call = event.data.deferred_tool_call
else:
final_output = event.data.output
break
if deferred_call is not None:
# Render your own human-facing form, enqueue notification, pause an outer
# workflow, or store the request for later. Dify Agent does not do that part.
print(deferred_call.tool_call_id, deferred_call.args)
```
A typical deferred payload looks like this:
```json
{
"tool_call_id": "call_01H...",
"tool_name": "ask_human",
"args": {
"title": "Deployment approval",
"question": "Can we deploy version 2026.06.10 to production now?",
"fields": [
{
"type": "paragraph",
"name": "comment",
"label": "Approval comment",
"required": false
}
],
"actions": [
{"id": "approve", "label": "Approve", "style": "primary"},
{"id": "reject", "label": "Reject", "style": "destructive"}
],
"urgency": "normal"
},
"metadata": {
"layer_type": "dify.ask_human",
"tool_name": "ask_human",
"schema_version": 1
}
}
```
The `args` object is model-generated content. Validate and sanitize it before
rendering it to end users.
## Resume with a human result
After your client collects a human answer, create a new run with:
- the previous `session_snapshot`;
- a matching composition that still includes the history and ask-human layers;
- `deferred_tool_results.calls[tool_call_id]` containing the human result.
```python {test="skip" lint="skip"}
from dify_agent.layers.ask_human import AskHumanToolResult
from dify_agent.protocol import DeferredToolResultsPayload
human_result = AskHumanToolResult(
status="submitted",
action={"id": "approve", "label": "Approve"},
values={"comment": "Approved for the planned window."},
message="The human approved the deployment.",
)
resume_request = CreateRunRequest(
composition=composition_with_same_layer_names_and_order,
session_snapshot=snapshot,
deferred_tool_results=DeferredToolResultsPayload(
calls={deferred_call.tool_call_id: human_result.model_dump(mode="json")},
),
)
```
Dify Agent passes the supplied result back to pydantic-ai as the return value of
the original external tool call, then the model continues. The resumed run may
produce a final `output`, or it may produce another `deferred_tool_call` if the
agent needs another human turn.
Timeouts and unavailable humans should also be sent as tool results instead of
being treated as agent-run failures:
```json
{
"status": "timeout",
"action": {"id": "__timeout", "label": "Timeout"},
"values": {},
"message": "The human did not respond before the workflow timeout."
}
```
## Client responsibilities
The ask human layer deliberately leaves product decisions to the caller. Clients
must decide how to:
- persist the deferred call and correlate it with a human-facing task;
- render and sanitize the requested fields/actions;
- choose recipients, channels, and timeout policy;
- authorize who may answer;
- transform the human submission into `AskHumanToolResult`;
- resume with the returned `session_snapshot` and matching composition.
Do not put recipient emails, workspace member ids, public URLs, auth tokens, or
timeout policy in the tool arguments. The model-facing request is untrusted and
should not control delivery or authorization.
## Troubleshooting
| Symptom | What to check |
| --- | --- |
| Run fails with `Deferred tool results require a 'history' layer` | Add the `history` layer and resume with the prior snapshot. |
| Run fails with `pending tool call can be resumed` | Keep the history layer active for the initial deferred run. |
| Run fails with `exactly one deferred call` | The MVP supports one ask-human call per run. Ask the model to ask one question at a time. |
| Run fails with `tool name must be ...` | Use the configured `tool_name`; do not rename it only in downstream form code. |
| File fields are rejected | Set `allow_file_fields=True` and include `file` or `file-list` in `allowed_field_types`. |
| `run_succeeded.data.output` is absent | Check `run_succeeded.data.deferred_tool_call`; this is a human-request success, not a failed run. |

View File

@ -21,6 +21,7 @@ nav:
- Prompt Layer: dify-agent/user-manual/prompt-layer/index.md
- Execution Context Layer: dify-agent/user-manual/execution-context-layer/index.md
- Shell Layer: dify-agent/user-manual/shell-layer/index.md
- Ask Human Layer: dify-agent/user-manual/ask-human-layer/index.md
- Plugin LLM Layer: dify-agent/user-manual/plugin-llm-layer/index.md
- Plugin Tool Layer: dify-agent/user-manual/plugin-tool-layer/index.md
- History Layer: dify-agent/user-manual/history-layer/index.md

View File

@ -0,0 +1,48 @@
"""Client-safe exports for Dify ask-human layer DTOs and schema types.
The runtime layer implementation lives in ``layer.py`` and imports server-side
execution helpers. Keep this package root import-safe for client code that only
needs to build run requests or understand deferred payload shapes.
"""
from dify_agent.layers.ask_human.configs import (
DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION,
DIFY_ASK_HUMAN_LAYER_TYPE_ID,
DifyAskHumanLayerConfig,
)
from dify_agent.layers.ask_human.schema import (
AskHumanAction,
AskHumanActionStyle,
AskHumanField,
AskHumanFieldType,
AskHumanFileField,
AskHumanFileListField,
AskHumanParagraphField,
AskHumanResultStatus,
AskHumanSelectField,
AskHumanSelectOption,
AskHumanSelectedAction,
AskHumanToolArgs,
AskHumanToolResult,
AskHumanUrgency,
)
__all__ = [
"AskHumanAction",
"AskHumanActionStyle",
"AskHumanField",
"AskHumanFieldType",
"AskHumanFileField",
"AskHumanFileListField",
"AskHumanParagraphField",
"AskHumanResultStatus",
"AskHumanSelectField",
"AskHumanSelectOption",
"AskHumanSelectedAction",
"AskHumanToolArgs",
"AskHumanToolResult",
"AskHumanUrgency",
"DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION",
"DIFY_ASK_HUMAN_LAYER_TYPE_ID",
"DifyAskHumanLayerConfig",
]

View File

@ -0,0 +1,136 @@
"""Client-safe DTOs for the Dify ask-human layer.
The public config controls only stable model-facing tool identity and guardrails.
Delivery, recipient selection, timeout policy, and other operational behavior are
intentionally out of scope for this layer and must stay outside the model-facing
tool contract. Setting ``enabled=False`` disables both ask-human tool exposure
and the prompt guidance that tells the model about these limits. Caller-provided
limits are additionally capped by small server hard limits so one composition
cannot widen the public deferred-tool surface arbitrarily. File field variants
are part of the schema vocabulary for forward compatibility, but they remain
invalid unless ``allow_file_fields=True`` and the allowed field-type list also
permits them.
"""
from __future__ import annotations
import re
from typing import ClassVar, Final
from pydantic import ConfigDict, Field, field_validator, model_validator
from agenton.layers import LayerConfig
from dify_agent.layers.ask_human.schema import AskHumanFieldType
DIFY_ASK_HUMAN_LAYER_TYPE_ID: Final[str] = "dify.ask_human"
DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION: Final[str] = (
"Ask a human for missing information or a decision that is required to continue. "
"Use this only when the answer cannot be inferred from the conversation, available tools, or current context. "
"Provide concise instructions, structured fields, and clear actions for the human."
)
_TOOL_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
_HARD_MAX_FIELDS = 16
_HARD_MAX_ACTIONS = 8
_HARD_MAX_MARKDOWN_CHARS = 20_000
_HARD_MAX_QUESTION_CHARS = 4_000
_HARD_MAX_FIELD_LABEL_CHARS = 200
_HARD_MAX_ACTION_LABEL_CHARS = 120
_FILE_FIELD_TYPES: Final[frozenset[AskHumanFieldType]] = frozenset({"file", "file-list"})
class DifyAskHumanLayerConfig(LayerConfig):
"""Public config for the optional ask-human deferred tool layer.
This DTO describes the exact model-facing guardrail surface that the runtime
will both validate and surface back to the model through prompt guidance.
``enabled=False`` means callers keep the layer in composition data without
exposing either the tool or its instructions for that run. Numeric limits are
caller-configurable only within the server's hard caps, and file field types
are rejected unless callers opt in with ``allow_file_fields=True``.
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
enabled: bool = True
tool_name: str = "ask_human"
tool_description: str | None = None
max_fields: int = Field(default=8, ge=0)
max_actions: int = Field(default=4, ge=1)
allowed_field_types: list[AskHumanFieldType] = Field(default_factory=lambda: ["paragraph", "select"])
allow_file_fields: bool = False
max_markdown_chars: int = Field(default=8_000, ge=0)
max_question_chars: int = Field(default=1_000, ge=1)
max_field_label_chars: int = Field(default=120, ge=1)
max_action_label_chars: int = Field(default=80, ge=1)
@property
def effective_tool_description(self) -> str:
"""Return the configured description or the proposal default text."""
return self.tool_description or DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION
@field_validator("tool_name")
@classmethod
def _validate_tool_name(cls, value: str) -> str:
if not _TOOL_NAME_PATTERN.fullmatch(value):
raise ValueError("tool_name must be a valid tool identifier")
return value
@field_validator("tool_description")
@classmethod
def _normalize_tool_description(cls, value: str | None) -> str | None:
if value is None:
return None
stripped = value.strip()
return stripped or None
@field_validator("allowed_field_types")
@classmethod
def _validate_allowed_field_types(cls, value: list[AskHumanFieldType]) -> list[AskHumanFieldType]:
if len(set(value)) != len(value):
raise ValueError("allowed_field_types must not contain duplicates")
return value
@field_validator(
"max_fields",
"max_actions",
"max_markdown_chars",
"max_question_chars",
"max_field_label_chars",
"max_action_label_chars",
mode="after",
)
@classmethod
def _validate_hard_limits(cls, value: int, info: object) -> int:
field_name = getattr(info, "field_name", "value")
hard_limits = {
"max_fields": _HARD_MAX_FIELDS,
"max_actions": _HARD_MAX_ACTIONS,
"max_markdown_chars": _HARD_MAX_MARKDOWN_CHARS,
"max_question_chars": _HARD_MAX_QUESTION_CHARS,
"max_field_label_chars": _HARD_MAX_FIELD_LABEL_CHARS,
"max_action_label_chars": _HARD_MAX_ACTION_LABEL_CHARS,
}
hard_limit = hard_limits[field_name]
if value > hard_limit:
raise ValueError(f"{field_name} must be <= {hard_limit}")
return value
@model_validator(mode="after")
def _validate_file_field_policy(self) -> DifyAskHumanLayerConfig:
if not self.allow_file_fields:
forbidden = [field_type for field_type in self.allowed_field_types if field_type in _FILE_FIELD_TYPES]
if forbidden:
joined = ", ".join(forbidden)
raise ValueError(
f"allowed_field_types cannot include file field types when allow_file_fields is false: {joined}"
)
return self
__all__ = [
"DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION",
"DIFY_ASK_HUMAN_LAYER_TYPE_ID",
"DifyAskHumanLayerConfig",
]

View File

@ -0,0 +1,275 @@
"""Runtime ask-human layer built on pydantic-ai external deferred tools.
The layer contributes one optional external tool plus one prompt hint. The tool
never executes Python during the initial run; instead the model emits an
external deferred tool call that Dify Agent returns through ``run_succeeded`` as
``deferred_tool_call``. Guardrails are enforced in two places:
* prompt/tool-definition guidance nudges the model toward valid requests, and
* runtime validation normalizes default actions and rejects out-of-policy calls.
The layer stays product-neutral: downstream systems decide delivery, recipients,
timeouts, and authorization for the human request.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, ClassVar, cast
from pydantic import JsonValue, ValidationError
from pydantic_ai import Tool
from pydantic_ai.exceptions import ModelRetry
from pydantic_ai.tools import DeferredToolRequests, RunContext, ToolDefinition
from typing_extensions import Self, override
from agenton.layers import EmptyRuntimeState, NoLayerDeps, PydanticAILayer, PydanticAIPrompt, PydanticAITool
from dify_agent.layers.ask_human.configs import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig
from dify_agent.layers.ask_human.schema import (
AskHumanAction,
AskHumanField,
AskHumanToolArgs,
)
from dify_agent.protocol.schemas import DeferredToolCallPayload, RunComposition
_ASK_HUMAN_DEFERRED_SCHEMA_VERSION = 1
_DEFAULT_SUBMIT_ACTION = AskHumanAction(id="submit", label="Submit", style="primary")
@dataclass(slots=True)
class DifyAskHumanLayer(PydanticAILayer[NoLayerDeps, object, DifyAskHumanLayerConfig, EmptyRuntimeState]):
"""State-free pydantic-ai layer that exposes the ask-human deferred tool."""
type_id: ClassVar[str | None] = DIFY_ASK_HUMAN_LAYER_TYPE_ID
config: DifyAskHumanLayerConfig
@classmethod
@override
def from_config(cls, config: DifyAskHumanLayerConfig) -> Self:
"""Create the layer from validated public config."""
return cls(config=DifyAskHumanLayerConfig.model_validate(config))
@property
@override
def prefix_prompts(self) -> list[PydanticAIPrompt[object]]:
if not self.config.enabled:
return []
return [self.build_prompt_hint]
@property
@override
def tools(self) -> list[PydanticAITool[object]]:
if not self.config.enabled:
return []
return [
Tool(
self._never_executed_tool,
takes_ctx=True,
name=self.config.tool_name,
description=self.config.effective_tool_description,
prepare=self._prepare_tool_definition,
args_validator=self._validate_tool_args,
sequential=True,
)
]
def build_prompt_hint(self) -> str:
"""Return the model-facing instruction text for ask-human guardrails."""
allowed_field_types = ", ".join(self.config.allowed_field_types) if self.config.allowed_field_types else "none"
file_field_status = "enabled" if self.config.allow_file_fields else "disabled"
if self.config.max_fields == 0:
field_count_hint = "Do not add any fields."
else:
field_count_hint = f"Use at most {self.config.max_fields} field(s)."
return (
f"You may call the external tool '{self.config.tool_name}' only when human input is required to continue. "
"Do not ask a human for information that can be inferred from the conversation, current context, or other tools.\n\n"
f"Ask-human guardrails:\n"
f"- Allowed field types: {allowed_field_types}.\n"
f"- File upload fields are {file_field_status}.\n"
f"- {field_count_hint}\n"
f"- Use at most {self.config.max_actions} action(s).\n"
f"- Keep 'question' under {self.config.max_question_chars} characters.\n"
f"- Keep 'markdown' under {self.config.max_markdown_chars} characters.\n"
f"- Keep each field label under {self.config.max_field_label_chars} characters.\n"
f"- Keep each action label under {self.config.max_action_label_chars} characters.\n"
"- If you omit actions, the system will add one primary action: Submit.\n"
"Prefer concise, structured requests that stay comfortably within these limits."
)
def build_deferred_tool_call_payload(self, requests: DeferredToolRequests) -> DeferredToolCallPayload:
"""Validate and normalize the single supported deferred ask-human call."""
if requests.approvals:
raise ValueError("ask_human does not support approval requests; use external deferred calls only")
call_count = len(requests.calls)
if call_count != 1:
raise ValueError(f"ask_human supports exactly one deferred call per run in this version; got {call_count}.")
call = requests.calls[0]
if call.tool_name != self.config.tool_name:
raise ValueError(f"ask_human deferred tool name must be '{self.config.tool_name}', got '{call.tool_name}'.")
args = self._validate_and_normalize_tool_args(
title=None,
question="",
markdown=None,
fields=[],
actions=[],
urgency="normal",
raw_args=call.args,
)
return DeferredToolCallPayload(
tool_call_id=call.tool_call_id,
tool_name=call.tool_name,
args=cast(JsonValue, args.model_dump(mode="json")),
metadata={
"layer_type": self.type_id,
"tool_name": self.config.tool_name,
"schema_version": _ASK_HUMAN_DEFERRED_SCHEMA_VERSION,
},
)
def _prepare_tool_definition(self, _ctx: RunContext[object], tool_def: ToolDefinition) -> ToolDefinition:
"""Convert the ask-human tool into a pydantic-ai external deferred tool."""
del tool_def
return ToolDefinition(
name=self.config.tool_name,
description=self.config.effective_tool_description,
parameters_json_schema=cast(dict[str, Any], AskHumanToolArgs.model_json_schema()),
strict=False,
sequential=True,
kind="external",
)
async def _never_executed_tool(
self,
_ctx: RunContext[object],
*,
title: str | None = None,
question: str,
markdown: str | None = None,
fields: list[AskHumanField] | None = None,
actions: list[AskHumanAction] | None = None,
urgency: str = "normal",
) -> str:
del title, question, markdown, fields, actions, urgency
raise RuntimeError("ask_human is an external deferred tool and should not execute during the initial run")
def _validate_tool_args(
self,
_ctx: RunContext[object],
*,
title: str | None = None,
question: str,
markdown: str | None = None,
fields: list[AskHumanField] | None = None,
actions: list[AskHumanAction] | None = None,
urgency: str = "normal",
) -> None:
try:
_ = self._validate_and_normalize_tool_args(
title=title,
question=question,
markdown=markdown,
fields=fields or [],
actions=actions or [],
urgency=urgency,
)
except (ValidationError, ValueError) as exc:
raise ModelRetry(str(exc)) from exc
def _validate_and_normalize_tool_args(
self,
*,
title: str | None,
question: str,
markdown: str | None,
fields: list[AskHumanField],
actions: list[AskHumanAction],
urgency: str,
raw_args: str | dict[str, Any] | None = None,
) -> AskHumanToolArgs:
if raw_args is not None:
args = _validate_tool_args_payload(raw_args)
else:
args = AskHumanToolArgs(
title=title,
question=question,
markdown=markdown,
fields=fields,
actions=actions,
urgency=cast(Any, urgency),
)
if len(args.fields) > self.config.max_fields:
raise ValueError(f"ask_human fields must contain at most {self.config.max_fields} item(s)")
normalized_actions = list(args.actions)
if not normalized_actions:
normalized_actions = [_DEFAULT_SUBMIT_ACTION.model_copy()]
if len(normalized_actions) > self.config.max_actions:
raise ValueError(f"ask_human actions must contain at most {self.config.max_actions} item(s)")
if len(args.question) > self.config.max_question_chars:
raise ValueError(f"ask_human question must be <= {self.config.max_question_chars} characters")
if args.markdown is not None and len(args.markdown) > self.config.max_markdown_chars:
raise ValueError(f"ask_human markdown must be <= {self.config.max_markdown_chars} characters")
allowed_field_types = set(self.config.allowed_field_types)
for field in args.fields:
if field.type not in allowed_field_types:
raise ValueError(f"ask_human field type '{field.type}' is not allowed by this layer config")
if len(field.label) > self.config.max_field_label_chars:
raise ValueError(
f"ask_human field label '{field.label}' must be <= {self.config.max_field_label_chars} characters"
)
if not self.config.allow_file_fields and field.type in {"file", "file-list"}:
raise ValueError("ask_human file fields are disabled by this layer config")
for action in normalized_actions:
if len(action.label) > self.config.max_action_label_chars:
raise ValueError(
f"ask_human action label '{action.label}' must be <= {self.config.max_action_label_chars} characters"
)
return args.model_copy(update={"actions": normalized_actions})
def validate_ask_human_layer_composition(composition: RunComposition) -> None:
"""Reject unsupported public ask-human layer graph shapes."""
ask_human_layers = [layer.name for layer in composition.layers if layer.type == DIFY_ASK_HUMAN_LAYER_TYPE_ID]
if len(ask_human_layers) > 1:
names = ", ".join(ask_human_layers)
raise ValueError(f"Only one '{DIFY_ASK_HUMAN_LAYER_TYPE_ID}' layer is supported. Found layers: {names}.")
def get_ask_human_layer(run: Any) -> DifyAskHumanLayer | None:
"""Return the active ask-human layer when one is present and enabled."""
matched: list[DifyAskHumanLayer] = []
for slot in run.slots.values():
layer = slot.layer
if isinstance(layer, DifyAskHumanLayer):
matched.append(layer)
if not matched:
return None
if len(matched) > 1:
raise ValueError(f"Only one '{DIFY_ASK_HUMAN_LAYER_TYPE_ID}' layer is supported per run.")
layer = matched[0]
return layer if layer.config.enabled else None
def _validate_tool_args_payload(raw_args: str | dict[str, Any]) -> AskHumanToolArgs:
if isinstance(raw_args, str):
return AskHumanToolArgs.model_validate_json(raw_args or "{}")
return AskHumanToolArgs.model_validate(raw_args or {})
__all__ = [
"DifyAskHumanLayer",
"get_ask_human_layer",
"validate_ask_human_layer_composition",
]

View File

@ -0,0 +1,234 @@
"""Product-neutral schemas for the Dify ask-human deferred tool contract.
These models describe the model-facing tool arguments and the later human result
payload expected by resumed runs. Config-specific guardrails such as maximum
counts, allowed field types, or per-install length limits are enforced by
``dify_agent.layers.ask_human.layer`` so this module stays import-safe for
client code that only needs the stable wire/schema shapes.
"""
from __future__ import annotations
import re
from typing import Annotated, ClassVar, Literal
from pydantic import BaseModel, ConfigDict, Field, JsonValue, ValidationInfo, field_validator, model_validator
_IDENTIFIER_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
type AskHumanFieldType = Literal["paragraph", "select", "file", "file-list"]
type AskHumanActionStyle = Literal["default", "primary", "destructive"]
type AskHumanUrgency = Literal["normal", "high"]
type AskHumanResultStatus = Literal["submitted", "timeout", "cancelled", "unavailable"]
def is_valid_identifier(value: str) -> bool:
"""Return whether ``value`` matches the stable ask-human identifier rules."""
return bool(_IDENTIFIER_PATTERN.fullmatch(value))
def _require_non_blank(value: str, *, label: str) -> str:
if not value.strip():
raise ValueError(f"{label} must not be blank")
return value
class AskHumanSelectOption(BaseModel):
"""One selectable option for an ask-human select field."""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
value: str = Field(min_length=1)
label: str = Field(min_length=1)
@field_validator("value", "label")
@classmethod
def _validate_non_blank(cls, value: str, info: ValidationInfo) -> str:
return _require_non_blank(value, label=f"select option {info.field_name}")
class AskHumanFieldBase(BaseModel):
"""Shared field properties for ask-human form fields."""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
name: str = Field(min_length=1)
label: str = Field(min_length=1)
required: bool = False
@field_validator("name")
@classmethod
def _validate_name(cls, value: str) -> str:
if not is_valid_identifier(value):
raise ValueError("field name must be a valid identifier")
return value
@field_validator("label")
@classmethod
def _validate_label(cls, value: str) -> str:
return _require_non_blank(value, label="field label")
class AskHumanParagraphField(AskHumanFieldBase):
"""Free-text paragraph field."""
type: Literal["paragraph"] = "paragraph"
placeholder: str | None = None
default: str | None = None
class AskHumanSelectField(AskHumanFieldBase):
"""Single-choice select field."""
type: Literal["select"] = "select"
options: list[AskHumanSelectOption] = Field(default_factory=list)
default: str | None = None
@model_validator(mode="after")
def _validate_options(self) -> AskHumanSelectField:
if not self.options:
raise ValueError("select fields must define at least one option")
seen_values: set[str] = set()
for option in self.options:
if option.value in seen_values:
raise ValueError(f"select field '{self.name}' contains duplicate option value '{option.value}'")
seen_values.add(option.value)
if self.default is not None and self.default not in seen_values:
raise ValueError(f"select field '{self.name}' default must match one of its option values")
return self
class AskHumanFileField(AskHumanFieldBase):
"""Single-file upload field."""
type: Literal["file"] = "file"
class AskHumanFileListField(AskHumanFieldBase):
"""Multi-file upload field."""
type: Literal["file-list"] = "file-list"
max_files: int | None = Field(default=None, ge=1)
type AskHumanField = Annotated[
AskHumanParagraphField | AskHumanSelectField | AskHumanFileField | AskHumanFileListField,
Field(discriminator="type"),
]
class AskHumanAction(BaseModel):
"""One human-visible action rendered with an ask-human request."""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
id: str = Field(min_length=1)
label: str = Field(min_length=1)
style: AskHumanActionStyle = "default"
@field_validator("id")
@classmethod
def _validate_id(cls, value: str) -> str:
if not is_valid_identifier(value):
raise ValueError("action id must be a valid identifier")
return value
@field_validator("label")
@classmethod
def _validate_label(cls, value: str) -> str:
return _require_non_blank(value, label="action label")
class AskHumanSelectedAction(BaseModel):
"""Action metadata returned with a human-submitted result."""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
id: str = Field(min_length=1)
label: str = Field(min_length=1)
@field_validator("id")
@classmethod
def _validate_id(cls, value: str) -> str:
if not is_valid_identifier(value):
raise ValueError("selected action id must be a valid identifier")
return value
@field_validator("label")
@classmethod
def _validate_label(cls, value: str) -> str:
return _require_non_blank(value, label="selected action label")
class AskHumanToolArgs(BaseModel):
"""Arguments accepted by the ask-human external deferred tool."""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
title: str | None = None
question: str = Field(min_length=1)
markdown: str | None = None
fields: list[AskHumanField] = Field(default_factory=list)
actions: list[AskHumanAction] = Field(default_factory=list)
urgency: AskHumanUrgency = "normal"
@field_validator("question")
@classmethod
def _validate_question(cls, value: str) -> str:
return _require_non_blank(value, label="question")
@field_validator("title")
@classmethod
def _validate_title(cls, value: str | None) -> str | None:
if value is None:
return None
return _require_non_blank(value, label="title")
@model_validator(mode="after")
def _validate_unique_ids(self) -> AskHumanToolArgs:
field_names: set[str] = set()
for field in self.fields:
if field.name in field_names:
raise ValueError(f"field name '{field.name}' must be unique")
field_names.add(field.name)
action_ids: set[str] = set()
for action in self.actions:
if action.id in action_ids:
raise ValueError(f"action id '{action.id}' must be unique")
action_ids.add(action.id)
return self
class AskHumanToolResult(BaseModel):
"""Expected value shape for a later deferred ask-human tool result."""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
status: AskHumanResultStatus
action: AskHumanSelectedAction | None = None
values: dict[str, JsonValue] = Field(default_factory=dict)
message: str | None = None
rendered_content: str | None = None
__all__ = [
"AskHumanAction",
"AskHumanActionStyle",
"AskHumanField",
"AskHumanFieldType",
"AskHumanFileField",
"AskHumanFileListField",
"AskHumanParagraphField",
"AskHumanResultStatus",
"AskHumanSelectField",
"AskHumanSelectOption",
"AskHumanSelectedAction",
"AskHumanToolArgs",
"AskHumanToolResult",
"AskHumanUrgency",
"is_valid_identifier",
]

View File

@ -14,6 +14,8 @@ from .schemas import (
CancelRunResponse,
CreateRunRequest,
CreateRunResponse,
DeferredToolCallPayload,
DeferredToolResultsPayload,
EmptyRunEventData,
LayerExitSignals,
PydanticAIStreamRunEvent,
@ -25,8 +27,6 @@ from .schemas import (
RunEventsResponse,
RunFailedEvent,
RunFailedEventData,
RunPausedEvent,
RunPausedEventData,
RunPurpose,
RunLayerSpec,
RunStartedEvent,
@ -44,6 +44,8 @@ __all__ = [
"CancelRunResponse",
"CreateRunRequest",
"CreateRunResponse",
"DeferredToolCallPayload",
"DeferredToolResultsPayload",
"DIFY_AGENT_HISTORY_LAYER_ID",
"DIFY_AGENT_MODEL_LAYER_ID",
"DIFY_AGENT_OUTPUT_LAYER_ID",
@ -59,8 +61,6 @@ __all__ = [
"RunEventsResponse",
"RunFailedEvent",
"RunFailedEventData",
"RunPausedEvent",
"RunPausedEventData",
"RunPurpose",
"RunLayerSpec",
"RunStartedEvent",

View File

@ -5,9 +5,9 @@ producers, storage adapters, and Python client. Create-run requests expose a
Dify-friendly ``composition.layers[].config`` shape so callers can describe one
layer in one place; the server normalizes that public DTO into Agenton's
state-only ``CompositorConfig`` plus node-name keyed per-run configs before
calling ``Compositor.enter(configs=...)``. Session snapshots and ``on_exit`` stay
top-level because they are per-run resume state and exit policy, not graph node
definition.
calling ``Compositor.enter(configs=...)``. Session snapshots, deferred tool
results, and ``on_exit`` stay top-level because they are per-run resume state or
exit policy, not graph node definition.
The server still constructs layers only from explicit provider type ids, keeping
HTTP input data-only and preventing unsafe import-path construction. Run events
@ -22,21 +22,25 @@ by ``DIFY_AGENT_MODEL_LAYER_ID``, the optional history layer named by
by ``DIFY_AGENT_OUTPUT_LAYER_ID``. Request-level ``on_exit`` signals decide
whether each active layer is suspended or deleted when the run exits, with
suspend as the default so successful terminal events can include resumable
snapshots. Successful runs publish the final JSON-safe agent output and the
resumable Agenton session snapshot together on the terminal ``run_succeeded``
event so consumers can treat terminal events as complete run summaries. Session
snapshots carry only layer lifecycle/runtime state in compositor order; they do
not persist output-layer config. Resumed structured-output runs therefore must
resubmit the same ``output`` layer in ``composition.layers[]`` so snapshot layer
name/order still matches the composition and the runtime can rebuild the same
structured output contract.
snapshots. Successful runs always publish the resumable Agenton session snapshot
on the terminal ``run_succeeded`` event together with exactly one of the final
JSON-safe ``output`` or a deferred external ``deferred_tool_call`` payload. That
lets consumers treat terminal success events as complete run summaries without a
separate pause protocol. Session snapshots carry only layer lifecycle/runtime
state in compositor order; they do not persist output-layer config. Resumed
structured-output runs therefore must resubmit the same ``output`` layer in
``composition.layers[]`` so snapshot layer name/order still matches the
composition and the runtime can rebuild the same structured output contract.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Annotated, ClassVar, Final, Literal, TypeAlias
from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter
from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter, model_serializer, model_validator
from pydantic_ai.messages import AgentStreamEvent
from pydantic_ai.tools import DeferredToolResults
from agenton.compositor import CompositorConfig, CompositorSessionSnapshot, LayerConfigInput, LayerNodeConfig
from agenton.layers import ExitIntent
@ -45,12 +49,11 @@ from agenton.layers import ExitIntent
DIFY_AGENT_MODEL_LAYER_ID: Final[str] = "llm"
DIFY_AGENT_HISTORY_LAYER_ID: Final[str] = "history"
DIFY_AGENT_OUTPUT_LAYER_ID: Final[str] = "output"
RunStatus = Literal["running", "paused", "succeeded", "failed", "cancelled"]
RunStatus = Literal["running", "succeeded", "failed", "cancelled"]
RunPurpose = Literal["workflow_node", "single_step", "agent_app", "babysit", "fasten_preview"]
RunEventType = Literal[
"run_started",
"pydantic_ai_event",
"run_paused",
"run_succeeded",
"run_failed",
"run_cancelled",
@ -121,7 +124,15 @@ class CreateRunRequest(BaseModel):
keep snapshot compatibility and rebuild the output schema. Dify tenant,
user, and run-correlation identifiers must be submitted through a
``dify.execution_context`` entry in ``composition.layers[]``; there is no
parallel top-level ``execution_context`` request field.
parallel top-level ``execution_context`` request field. External deferred
tool continuation input belongs in the top-level ``deferred_tool_results``
field rather than inside composition. Resume requests are therefore expected
to pair a prior ``session_snapshot`` with the same logical composition so
Agenton can rebuild the same layers and message history. For ask-human
continuation specifically, the matching pending tool call must still exist
in prior history state; callers should keep the history layer active across
runs so deferred tool results can be matched against the original model
response instead of starting a fresh user-prompt turn.
"""
composition: RunComposition
@ -129,6 +140,7 @@ class CreateRunRequest(BaseModel):
idempotency_key: str | None = None
metadata: dict[str, JsonValue] = Field(default_factory=dict)
session_snapshot: CompositorSessionSnapshot | None = None
deferred_tool_results: DeferredToolResultsPayload | None = None
on_exit: LayerExitSignals = Field(default_factory=LayerExitSignals)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@ -213,14 +225,59 @@ class EmptyRunEventData(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class RunSucceededEventData(BaseModel):
"""Terminal success payload for final output and resumable session state."""
class DeferredToolResultsPayload(BaseModel):
"""Public JSON-safe DTO for deferred external tool results supplied on resume."""
output: JsonValue
calls: dict[str, JsonValue] = Field(default_factory=dict)
metadata: dict[str, dict[str, JsonValue]] = Field(default_factory=dict)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
def to_pydantic_ai(self) -> DeferredToolResults:
"""Convert the public DTO into pydantic-ai's resume input dataclass."""
return DeferredToolResults(
calls=dict(self.calls),
metadata={key: dict(value) for key, value in self.metadata.items()},
)
class DeferredToolCallPayload(BaseModel):
"""Terminal success payload for one deferred external tool request."""
tool_call_id: str
tool_name: str
args: JsonValue
metadata: dict[str, JsonValue] = Field(default_factory=dict)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class RunSucceededEventData(BaseModel):
"""Terminal success payload for final output or deferred tool continuation."""
output: JsonValue | None = None
deferred_tool_call: DeferredToolCallPayload | None = None
session_snapshot: CompositorSessionSnapshot
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@model_validator(mode="after")
def _validate_result_shape(self) -> RunSucceededEventData:
has_output = "output" in self.model_fields_set
has_deferred_tool_call = "deferred_tool_call" in self.model_fields_set
if has_output == has_deferred_tool_call:
raise ValueError("Exactly one of output or deferred_tool_call must be set")
return self
@model_serializer(mode="plain")
def _serialize_active_result(self) -> dict[str, object]:
data: dict[str, object] = {"session_snapshot": self.session_snapshot}
if "output" in self.model_fields_set:
data["output"] = self.output
if "deferred_tool_call" in self.model_fields_set:
data["deferred_tool_call"] = self.deferred_tool_call
return data
class RunFailedEventData(BaseModel):
"""Terminal failure payload shown to polling and SSE consumers."""
@ -231,16 +288,6 @@ class RunFailedEventData(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class RunPausedEventData(BaseModel):
"""Pause payload used for human handoff or other resumable waits."""
reason: str
message: str | None = None
session_snapshot: CompositorSessionSnapshot | None = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class RunCancelledEventData(BaseModel):
"""Terminal cancellation payload for explicit user/operator cancellation."""
@ -288,13 +335,6 @@ class RunFailedEvent(BaseRunEvent):
data: RunFailedEventData
class RunPausedEvent(BaseRunEvent):
"""Resumable pause event emitted when a run waits for outside input."""
type: Literal["run_paused"] = "run_paused"
data: RunPausedEventData
class RunCancelledEvent(BaseRunEvent):
"""Terminal cancellation event emitted after an explicit cancel request."""
@ -303,12 +343,7 @@ class RunCancelledEvent(BaseRunEvent):
RunEvent: TypeAlias = Annotated[
RunStartedEvent
| PydanticAIStreamRunEvent
| RunPausedEvent
| RunSucceededEvent
| RunFailedEvent
| RunCancelledEvent,
RunStartedEvent | PydanticAIStreamRunEvent | RunSucceededEvent | RunFailedEvent | RunCancelledEvent,
Field(discriminator="type"),
]
RUN_EVENT_ADAPTER: TypeAdapter[RunEvent] = TypeAdapter(RunEvent)
@ -330,6 +365,8 @@ __all__ = [
"CancelRunResponse",
"CreateRunRequest",
"CreateRunResponse",
"DeferredToolCallPayload",
"DeferredToolResultsPayload",
"DIFY_AGENT_HISTORY_LAYER_ID",
"DIFY_AGENT_MODEL_LAYER_ID",
"DIFY_AGENT_OUTPUT_LAYER_ID",
@ -345,8 +382,6 @@ __all__ = [
"RunEventsResponse",
"RunFailedEvent",
"RunFailedEventData",
"RunPausedEvent",
"RunPausedEventData",
"RunPurpose",
"RunStartedEvent",
"RunStatus",

View File

@ -2,8 +2,9 @@
Only explicitly allowed provider type ids are constructible here. The default
provider set contains prompt layers, the optional pydantic-ai history layer, the
state-free Dify structured output layer, the Dify execution-context layer, the
stateful Dify shell layer, and the Dify plugin business-layer family:
state-free Dify structured output layer, the optional Dify ask-human layer, the
Dify execution-context layer, the stateful Dify shell layer, and the Dify
plugin business-layer family:
- ``dify.execution_context`` for shared tenant/user/run daemon context,
- ``dify.shell`` for shellctl-backed shell job control,
@ -33,6 +34,7 @@ from agenton_collections.layers.plain.basic import PromptLayer
from agenton_collections.transformers.pydantic_ai import PYDANTIC_AI_TRANSFORMERS
from dify_agent.agent_stub.server.shell_agent_stub_env import ShellAgentStubTokenFactory
from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubTokenCodec
from dify_agent.layers.ask_human.layer import DifyAskHumanLayer
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.dify_plugin.tools_layer import DifyPluginToolsLayer
from dify_agent.layers.execution_context.configs import DifyExecutionContextLayerConfig
@ -80,6 +82,7 @@ def create_default_layer_providers(
LayerProvider.from_layer_type(PromptLayer),
LayerProvider.from_layer_type(PydanticAIHistoryLayer),
LayerProvider.from_layer_type(DifyOutputLayer),
LayerProvider.from_layer_type(DifyAskHumanLayer),
LayerProvider.from_factory(
layer_type=DifyExecutionContextLayer,
create=lambda config: DifyExecutionContextLayer.from_config_with_settings(

View File

@ -3,19 +3,21 @@
The runner only needs append-only event writes and status transitions, so tests
can use ``InMemoryRunEventSink`` without Redis. Production storage implements the
same protocol with Redis streams in ``dify_agent.storage.redis_run_store``. The
terminal success helper writes the final JSON-safe output and session snapshot in
one event so event consumers can stop at ``run_succeeded`` without correlating
separate payload events.
terminal success helper writes either the final JSON-safe output or one deferred
tool request together with the resumable session snapshot in a single event so
consumers can stop at ``run_succeeded`` without correlating separate payload
events.
"""
from collections import defaultdict
from typing import Protocol
from typing import Protocol, cast
from pydantic import JsonValue
from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.protocol.schemas import (
DeferredToolCallPayload,
EmptyRunEventData,
PydanticAIStreamRunEvent,
RunEvent,
@ -29,6 +31,9 @@ from dify_agent.protocol.schemas import (
)
_UNSET = object()
class RunEventSink(Protocol):
"""Boundary used by runtime code to publish observable run progress."""
@ -95,15 +100,31 @@ async def emit_run_succeeded(
sink: RunEventSink,
*,
run_id: str,
output: JsonValue,
output: JsonValue | None | object = _UNSET,
deferred_tool_call: DeferredToolCallPayload | object = _UNSET,
session_snapshot: CompositorSessionSnapshot,
) -> str:
"""Emit the terminal success event with output and resumable state."""
"""Emit the terminal success event with output or deferred continuation.
Callers must activate exactly one result branch. ``_UNSET`` is used instead
of ``None`` to preserve the distinction between an omitted inactive branch
and an active ``output`` branch whose JSON value is explicitly ``null``.
Without that sentinel, ``output=None`` would be indistinguishable from
output field absent, which would break nullable-success payloads.
"""
data: dict[str, JsonValue | DeferredToolCallPayload | CompositorSessionSnapshot | None] = {
"session_snapshot": session_snapshot,
}
if output is not _UNSET:
data["output"] = cast(JsonValue | None, output)
if deferred_tool_call is not _UNSET:
data["deferred_tool_call"] = cast(DeferredToolCallPayload, deferred_tool_call)
return await emit_run_event(
sink,
event=RunSucceededEvent(
run_id=run_id,
data=RunSucceededEventData(output=output, session_snapshot=session_snapshot),
data=RunSucceededEventData.model_validate(data),
created_at=utc_now(),
),
)

View File

@ -3,36 +3,47 @@
The runner is storage-agnostic: it normalizes the public Dify composition into
Agenton's graph/config split, enters a fresh ``CompositorRun`` (or resumes one
from a snapshot), renders the current Dify system prompts into temporary
``message_history``, runs pydantic-ai with ``run.user_prompts`` as the current
user input, emits stream events, applies request-level ``on_exit`` signals, and
then publishes a terminal success or failure event. The Pydantic AI model is
resolved from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID``.
An optional history layer contributes stored message history only through
session state; successful runs append only ``result.new_messages()`` back into
that layer so current system prompts are not persisted. An optional structured
output layer named by ``DIFY_AGENT_OUTPUT_LAYER_ID`` is read after entry and
resolved into an output contract whose type both exposes the output schema to
the model and performs runtime JSON Schema validation through custom Pydantic
hooks. Invalid structured outputs therefore trigger Pydantic AI's normal
output-validation retry behavior before Dify Agent emits ``run_succeeded``.
Layers still never own the FastAPI lifespan-owned plugin daemon HTTP client.
Successful terminal events contain both the JSON-safe final output and session
snapshot; there are no separate output or snapshot events to correlate.
``message_history``, runs pydantic-ai with either the current ``run.user_prompts``
or deferred external tool results, emits stream events, applies request-level
``on_exit`` signals, and then publishes a terminal success or failure event. The
Pydantic AI model is resolved from the active Agenton layer named by
``DIFY_AGENT_MODEL_LAYER_ID``. An optional history layer contributes stored
message history only through session state; successful runs append only
``result.new_messages()`` back into that layer so current system prompts are not
persisted. An optional structured output layer named by
``DIFY_AGENT_OUTPUT_LAYER_ID`` is read after entry and resolved into an output
contract whose type both exposes the output schema to the model and performs
runtime JSON Schema validation through custom Pydantic hooks. When the ask-human
layer is active, the runtime also allows ``DeferredToolRequests`` output and
publishes that deferred request through the normal ``run_succeeded`` event as
``deferred_tool_call`` instead of a final ``output``. Invalid structured outputs
or invalid deferred-tool behavior still trigger normal retries/failures before
Dify Agent emits success. Layers still never own the FastAPI lifespan-owned
plugin daemon HTTP client.
"""
from collections.abc import AsyncIterable
from collections import Counter
from typing import Any, cast
from dataclasses import dataclass
from typing import Any, Literal, cast
import httpx
from pydantic import JsonValue, TypeAdapter
from pydantic_ai.messages import AgentStreamEvent
from pydantic_ai.output import OutputSpec
from pydantic_ai.tools import DeferredToolRequests, DeferredToolResults
from agenton.compositor import CompositorSessionSnapshot, LayerProviderInput
from agenton.layers.types import PydanticAITool
from dify_agent.layers.ask_human.layer import get_ask_human_layer, validate_ask_human_layer_composition
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.dify_plugin.tools_layer import DifyPluginToolsLayer
from dify_agent.protocol.schemas import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, normalize_composition
from dify_agent.protocol.schemas import (
CreateRunRequest,
DIFY_AGENT_MODEL_LAYER_ID,
DeferredToolCallPayload,
normalize_composition,
)
from dify_agent.runtime.agent_factory import create_agent, normalize_user_input
from dify_agent.runtime.agenton_validation import is_agenton_enter_validation_runtime_error
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_providers
@ -61,6 +72,16 @@ class AgentRunValidationError(ValueError):
"""Raised when a run request is valid JSON but cannot execute."""
@dataclass(slots=True)
class RunSuccessOutcome:
"""Normalized successful runner output before event emission."""
result_kind: Literal["output", "deferred_tool_call"]
output: JsonValue | None
deferred_tool_call: DeferredToolCallPayload | None
session_snapshot: CompositorSessionSnapshot
class AgentRunRunner:
"""Executes one run and writes only public run events to its sink."""
@ -92,7 +113,7 @@ class AgentRunRunner:
_ = await emit_run_started(self.sink, run_id=self.run_id)
try:
output, session_snapshot = await self._run_agent()
outcome = await self._run_agent()
except Exception as exc:
message = str(exc) or type(exc).__name__
_ = await emit_run_failed(self.sink, run_id=self.run_id, error=message)
@ -102,12 +123,16 @@ class AgentRunRunner:
_ = await emit_run_succeeded(
self.sink,
run_id=self.run_id,
output=output,
session_snapshot=session_snapshot,
**(
{"output": outcome.output}
if outcome.result_kind == "output"
else {"deferred_tool_call": outcome.deferred_tool_call}
),
session_snapshot=outcome.session_snapshot,
)
await self.sink.update_status(self.run_id, "succeeded")
async def _run_agent(self) -> tuple[JsonValue, CompositorSessionSnapshot]:
async def _run_agent(self) -> RunSuccessOutcome:
"""Run pydantic-ai inside an entered Agenton run.
Known request-shaped Agenton enter-time failures are normalized to
@ -128,6 +153,7 @@ class AgentRunRunner:
try:
validate_output_layer_composition(self.request.composition)
validate_history_layer_composition(self.request.composition)
validate_ask_human_layer_composition(self.request.composition)
graph_config, layer_configs = normalize_composition(self.request.composition)
compositor = build_pydantic_ai_compositor(graph_config, providers=self.layer_providers)
validate_layer_exit_signals(compositor, self.request.on_exit)
@ -135,12 +161,16 @@ class AgentRunRunner:
raise AgentRunValidationError(str(exc)) from exc
entered_run = False
output: JsonValue | None = None
deferred_tool_call: DeferredToolCallPayload | None = None
result_kind: Literal["output", "deferred_tool_call"] | None = None
try:
async with compositor.enter(configs=layer_configs, session_snapshot=self.request.session_snapshot) as run:
entered_run = True
apply_layer_exit_signals(run, self.request.on_exit)
user_prompts = run.user_prompts
if not has_non_blank_user_prompt(user_prompts):
deferred_tool_results = _resolve_deferred_tool_results(self.request)
if deferred_tool_results is None and not has_non_blank_user_prompt(user_prompts):
raise AgentRunValidationError(EMPTY_USER_PROMPTS_ERROR)
async def handle_events(_ctx: object, events: AsyncIterable[AgentStreamEvent]) -> None:
@ -154,24 +184,44 @@ class AgentRunRunner:
system_prompts=run.prompts,
stored_history=history_layer.message_history if history_layer is not None else (),
)
ask_human_layer = get_ask_human_layer(run)
llm_layer = run.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer)
model = llm_layer.get_model(http_client=self.plugin_daemon_http_client)
tools = await _resolve_run_tools(run, http_client=self.plugin_daemon_http_client)
except (KeyError, TypeError, RuntimeError, ValueError) as exc:
raise AgentRunValidationError(str(exc)) from exc
if deferred_tool_results is not None and history_layer is None:
raise AgentRunValidationError(
"Deferred tool results require a 'history' layer with prior message history."
)
agent = create_agent(
model,
tools=tools,
output_type=output_contract.output_type,
output_type=_resolve_agent_output_type(output_contract.output_type, ask_human_layer is not None),
)
result = await agent.run(
normalize_user_input(user_prompts),
None if deferred_tool_results is not None else normalize_user_input(user_prompts),
message_history=message_history,
deferred_tool_results=deferred_tool_results,
event_stream_handler=handle_events,
)
output = _serialize_agent_output(result.output)
append_successful_run_history(history_layer, result.new_messages())
if isinstance(result.output, DeferredToolRequests):
if ask_human_layer is None:
raise AgentRunValidationError(
"Deferred tool requests were returned, but no active ask_human layer is available for validation."
)
if history_layer is None:
raise AgentRunValidationError(
"ask_human deferred tool requests require a 'history' layer so the pending tool call can be resumed."
)
deferred_tool_call = ask_human_layer.build_deferred_tool_call_payload(result.output)
result_kind = "deferred_tool_call"
else:
output = _serialize_agent_output(result.output)
result_kind = "output"
except RuntimeError as exc:
if not entered_run and is_agenton_enter_validation_runtime_error(exc):
raise AgentRunValidationError(str(exc)) from exc
@ -183,8 +233,15 @@ class AgentRunRunner:
if run.session_snapshot is None:
raise RuntimeError("Agenton run did not produce a session snapshot after exit.")
if result_kind is None:
raise RuntimeError("Agent run did not resolve either a final output or a deferred tool call.")
return output, run.session_snapshot
return RunSuccessOutcome(
result_kind=result_kind,
output=output,
deferred_tool_call=deferred_tool_call,
session_snapshot=run.session_snapshot,
)
def _serialize_agent_output(output: object) -> JsonValue:
@ -192,6 +249,20 @@ def _serialize_agent_output(output: object) -> JsonValue:
return cast(JsonValue, _AGENT_OUTPUT_ADAPTER.dump_python(output, mode="json"))
def _resolve_agent_output_type(output_type: OutputSpec[object], allow_deferred_tools: bool) -> OutputSpec[object]:
"""Return the run output type, optionally augmented with deferred-tool support."""
if not allow_deferred_tools:
return output_type
return cast(OutputSpec[object], [output_type, DeferredToolRequests])
def _resolve_deferred_tool_results(request: CreateRunRequest) -> DeferredToolResults | None:
"""Convert public deferred tool results into the pydantic-ai resume input."""
if request.deferred_tool_results is None:
return None
return request.deferred_tool_results.to_pydantic_ai()
async def _resolve_run_tools(
run: Any,
*,

View File

@ -0,0 +1,73 @@
import pytest
from pydantic import ValidationError
import dify_agent.layers.ask_human as ask_human_exports
from dify_agent.layers.ask_human import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig
from dify_agent.layers.ask_human.schema import AskHumanToolArgs
def test_ask_human_package_exports_client_safe_symbols_only() -> None:
assert ask_human_exports.DIFY_ASK_HUMAN_LAYER_TYPE_ID == "dify.ask_human"
assert ask_human_exports.__all__ == [
"AskHumanAction",
"AskHumanActionStyle",
"AskHumanField",
"AskHumanFieldType",
"AskHumanFileField",
"AskHumanFileListField",
"AskHumanParagraphField",
"AskHumanResultStatus",
"AskHumanSelectField",
"AskHumanSelectOption",
"AskHumanSelectedAction",
"AskHumanToolArgs",
"AskHumanToolResult",
"AskHumanUrgency",
"DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION",
"DIFY_ASK_HUMAN_LAYER_TYPE_ID",
"DifyAskHumanLayerConfig",
]
assert not hasattr(ask_human_exports, "DifyAskHumanLayer")
def test_ask_human_layer_config_defaults_and_effective_description() -> None:
config = DifyAskHumanLayerConfig()
assert DIFY_ASK_HUMAN_LAYER_TYPE_ID == "dify.ask_human"
assert config.model_dump(mode="json") == {
"enabled": True,
"tool_name": "ask_human",
"tool_description": None,
"max_fields": 8,
"max_actions": 4,
"allowed_field_types": ["paragraph", "select"],
"allow_file_fields": False,
"max_markdown_chars": 8000,
"max_question_chars": 1000,
"max_field_label_chars": 120,
"max_action_label_chars": 80,
}
assert "Ask a human for missing information" in config.effective_tool_description
def test_ask_human_layer_config_rejects_invalid_tool_name() -> None:
with pytest.raises(ValidationError, match="tool_name must be a valid tool identifier"):
_ = DifyAskHumanLayerConfig(tool_name="ask-human")
def test_ask_human_layer_config_rejects_file_field_types_when_disabled() -> None:
with pytest.raises(ValidationError, match="cannot include file field types"):
_ = DifyAskHumanLayerConfig(allowed_field_types=["paragraph", "file"])
def test_ask_human_tool_args_reject_duplicate_field_names() -> None:
with pytest.raises(ValidationError, match="field name 'comment' must be unique"):
_ = AskHumanToolArgs.model_validate(
{
"question": "Need a reply",
"fields": [
{"type": "paragraph", "name": "comment", "label": "Comment"},
{"type": "paragraph", "name": "comment", "label": "Another comment"},
],
}
)

View File

@ -0,0 +1,152 @@
import asyncio
import inspect
from collections.abc import Awaitable
from typing import Any, cast
import pytest
from pydantic_ai.messages import ToolCallPart
from pydantic_ai.tools import DeferredToolRequests, ToolDefinition
from dify_agent.layers.ask_human import DifyAskHumanLayerConfig
from dify_agent.layers.ask_human.schema import AskHumanToolArgs
from dify_agent.layers.ask_human.layer import DifyAskHumanLayer
async def _await_tool_definition(value: Awaitable[ToolDefinition | None]) -> ToolDefinition | None:
return await value
def test_ask_human_layer_exposes_one_external_tool_and_prompt_hint() -> None:
config = DifyAskHumanLayerConfig(
tool_name="human_gate",
tool_description="Collect a human decision.",
max_fields=2,
max_actions=3,
allowed_field_types=["paragraph"],
allow_file_fields=False,
max_question_chars=240,
max_markdown_chars=512,
max_field_label_chars=32,
max_action_label_chars=16,
)
layer = DifyAskHumanLayer.from_config(config)
prompt_hint = layer.build_prompt_hint()
tool = layer.tools[0]
prepare = tool.prepare
assert prepare is not None
prepared_or_awaitable = prepare(
cast(Any, None),
ToolDefinition(
name=tool.name, description=tool.description, parameters_json_schema=tool.function_schema.json_schema
),
)
prepared = (
asyncio.run(_await_tool_definition(cast(Awaitable[ToolDefinition | None], prepared_or_awaitable)))
if inspect.isawaitable(prepared_or_awaitable)
else prepared_or_awaitable
)
assert len(layer.prefix_prompts) == 1
assert len(layer.tools) == 1
assert "Allowed field types: paragraph." in prompt_hint
assert "File upload fields are disabled." in prompt_hint
assert "Use at most 2 field(s)." in prompt_hint
assert "Use at most 3 action(s)." in prompt_hint
assert "Keep 'question' under 240 characters." in prompt_hint
assert "Keep 'markdown' under 512 characters." in prompt_hint
assert "Keep each field label under 32 characters." in prompt_hint
assert "Keep each action label under 16 characters." in prompt_hint
assert prepared is not None
assert prepared.name == "human_gate"
assert prepared.description == "Collect a human decision."
assert prepared.kind == "external"
assert prepared.parameters_json_schema == AskHumanToolArgs.model_json_schema()
def test_ask_human_layer_normalizes_default_action_in_deferred_payload() -> None:
layer = DifyAskHumanLayer.from_config(DifyAskHumanLayerConfig())
payload = layer.build_deferred_tool_call_payload(
DeferredToolRequests(
calls=[
ToolCallPart(
tool_name="ask_human",
args={
"question": "Need a human answer",
"fields": [{"type": "paragraph", "name": "comment", "label": "Comment"}],
},
tool_call_id="call-1",
)
]
)
)
assert payload.tool_call_id == "call-1"
assert payload.tool_name == "ask_human"
assert payload.args == {
"title": None,
"question": "Need a human answer",
"markdown": None,
"fields": [
{
"type": "paragraph",
"name": "comment",
"label": "Comment",
"required": False,
"placeholder": None,
"default": None,
}
],
"actions": [{"id": "submit", "label": "Submit", "style": "primary"}],
"urgency": "normal",
}
assert payload.metadata == {
"layer_type": "dify.ask_human",
"tool_name": "ask_human",
"schema_version": 1,
}
def test_ask_human_layer_rejects_disallowed_field_types_in_deferred_payload() -> None:
layer = DifyAskHumanLayer.from_config(DifyAskHumanLayerConfig(allowed_field_types=["paragraph"]))
with pytest.raises(ValueError, match="field type 'select' is not allowed"):
_ = layer.build_deferred_tool_call_payload(
DeferredToolRequests(
calls=[
ToolCallPart(
tool_name="ask_human",
args={
"question": "Need a choice",
"fields": [
{
"type": "select",
"name": "decision",
"label": "Decision",
"options": [{"value": "yes", "label": "Yes"}],
}
],
},
tool_call_id="call-2",
)
]
)
)
def test_ask_human_layer_rejects_tool_name_mismatch_in_deferred_payload() -> None:
layer = DifyAskHumanLayer.from_config(DifyAskHumanLayerConfig(tool_name="human_gate"))
with pytest.raises(ValueError, match="deferred tool name must be 'human_gate', got 'ask_human'"):
_ = layer.build_deferred_tool_call_payload(
DeferredToolRequests(
calls=[
ToolCallPart(
tool_name="ask_human",
args={"question": "Need a human answer"},
tool_call_id="call-3",
)
]
)
)

View File

@ -6,6 +6,7 @@ from agenton.compositor import CompositorSessionSnapshot
from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
import dify_agent.protocol as protocol_exports
from dify_agent.layers.ask_human import DifyAskHumanLayerConfig
from dify_agent.layers.execution_context import DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, DifyExecutionContextLayerConfig
from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LLM_LAYER_TYPE_ID, DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
@ -13,6 +14,7 @@ from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LA
from dify_agent.protocol.schemas import (
RUN_EVENT_ADAPTER,
CreateRunRequest,
DeferredToolCallPayload,
LayerExitSignals,
PydanticAIStreamRunEvent,
RunCancelledEvent,
@ -21,8 +23,6 @@ from dify_agent.protocol.schemas import (
RunFailedEvent,
RunFailedEventData,
RunLayerSpec,
RunPausedEvent,
RunPausedEventData,
RunStartedEvent,
RunSucceededEvent,
RunSucceededEventData,
@ -49,15 +49,19 @@ def test_run_event_adapter_round_trips_typed_variants() -> None:
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
),
RunFailedEvent(run_id="run-1", data=RunFailedEventData(error="boom", reason="shutdown")),
RunPausedEvent(
run_id="run-1",
data=RunPausedEventData(
reason="human_handoff",
message="Need review",
RunSucceededEvent(
run_id="run-2",
data=RunSucceededEventData(
deferred_tool_call=DeferredToolCallPayload(
tool_call_id="tool-call-1",
tool_name="ask_human",
args={"question": "Need approval"},
metadata={"layer_type": "dify.ask_human"},
),
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
),
RunFailedEvent(run_id="run-1", data=RunFailedEventData(error="boom", reason="shutdown")),
RunCancelledEvent(run_id="run-1", data=RunCancelledEventData(reason="user_cancelled")),
]
@ -204,6 +208,35 @@ def test_create_run_request_accepts_dto_first_public_composition_and_normalizes_
}
def test_create_run_request_accepts_deferred_tool_results_payload() -> None:
request = CreateRunRequest.model_validate(
{
"composition": {
"layers": [
{"name": "prompt", "type": PLAIN_PROMPT_LAYER_TYPE_ID, "config": {"user": "hello"}},
{"name": "ask_human", "type": "dify.ask_human", "config": DifyAskHumanLayerConfig().model_dump()},
]
},
"deferred_tool_results": {
"calls": {
"tool-call-1": {
"status": "submitted",
"action": {"id": "submit", "label": "Submit"},
"values": {"comment": "Looks good."},
}
}
},
}
)
assert request.deferred_tool_results is not None
assert request.deferred_tool_results.calls["tool-call-1"] == {
"status": "submitted",
"action": {"id": "submit", "label": "Submit"},
"values": {"comment": "Looks good."},
}
def test_create_run_request_accepts_plugin_tools_layer_with_prepared_parameters_and_schema() -> None:
request = CreateRunRequest.model_validate(
{
@ -327,6 +360,49 @@ def test_on_exit_default_to_suspend_and_are_public() -> None:
assert request.on_exit.layers == {}
def test_run_succeeded_event_data_requires_exactly_one_result_variant() -> None:
snapshot = CompositorSessionSnapshot(layers=[])
with pytest.raises(ValidationError, match="Exactly one of output or deferred_tool_call must be set"):
_ = RunSucceededEventData(session_snapshot=snapshot)
with pytest.raises(ValidationError, match="Exactly one of output or deferred_tool_call must be set"):
_ = RunSucceededEventData(
output="done",
deferred_tool_call=DeferredToolCallPayload(
tool_call_id="tool-call-1",
tool_name="ask_human",
args={"question": "Need approval"},
),
session_snapshot=snapshot,
)
def test_run_succeeded_event_data_allows_explicit_json_null_output() -> None:
snapshot = CompositorSessionSnapshot(layers=[])
data = RunSucceededEventData(output=None, session_snapshot=snapshot)
assert data.output is None
assert data.deferred_tool_call is None
def test_run_succeeded_event_round_trips_explicit_json_null_output() -> None:
event = RunSucceededEvent(
run_id="run-null-output",
data=RunSucceededEventData(output=None, session_snapshot=CompositorSessionSnapshot(layers=[])),
)
payload = RUN_EVENT_ADAPTER.dump_json(event)
decoded = RUN_EVENT_ADAPTER.validate_json(payload)
assert isinstance(decoded, RunSucceededEvent)
assert decoded.data.output is None
assert decoded.data.deferred_tool_call is None
assert b'"output":null' in payload
assert b'"deferred_tool_call"' not in payload
def test_on_exit_accept_layer_overrides() -> None:
request = CreateRunRequest.model_validate(
{

View File

@ -1,5 +1,5 @@
import asyncio
from collections.abc import Mapping
from collections.abc import Iterable, Mapping
from typing import Any, ClassVar, cast
import httpx
@ -8,6 +8,7 @@ from pydantic import JsonValue
from pydantic_ai import Tool
from pydantic_ai.exceptions import UnexpectedModelBehavior
from pydantic_ai.messages import (
ToolReturnPart,
ModelMessage,
ModelRequest,
ModelResponse,
@ -18,12 +19,14 @@ from pydantic_ai.messages import (
)
from pydantic_ai.models import ModelRequestParameters
from pydantic_ai.models.test import TestModel
from pydantic_ai.tools import DeferredToolRequests, DeferredToolResults
from pydantic_ai.settings import ModelSettings
from agenton.compositor import CompositorSessionSnapshot, LayerProvider, LayerSessionSnapshot
from agenton.layers import ExitIntent, LifecycleState
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, PydanticAIHistoryRuntimeState
from agenton_collections.layers.plain import PromptLayerConfig, ToolsLayer
from dify_agent.layers.ask_human import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig
from dify_agent.layers.execution_context import DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, DifyExecutionContextLayerConfig
from dify_agent.layers.shell import DIFY_SHELL_LAYER_TYPE_ID, DifyShellLayerConfig
from dify_agent.layers.shell.layer import DifyShellLayer
@ -42,6 +45,7 @@ from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerC
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
from dify_agent.protocol.schemas import (
CreateRunRequest,
DeferredToolResultsPayload,
LayerExitSignals,
RunComposition,
RunLayerSpec,
@ -54,7 +58,7 @@ from shell_session_manager.shellctl.shared import DeleteJobResponse, JobResult,
class StaticToolsTestLayer(ToolsLayer):
type_id: ClassVar[str] = "test.static.tools"
type_id: ClassVar[str | None] = "test.static.tools"
class FakeRunnerShellctlClient:
@ -115,6 +119,8 @@ def _request(
user: str | list[str] = "hello",
*,
include_history: bool = False,
include_ask_human: bool = False,
ask_human_config: DifyAskHumanLayerConfig | None = None,
llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID,
execution_context_layer_name: str = "execution_context",
on_exit: LayerExitSignals | None = None,
@ -131,6 +137,17 @@ def _request(
if include_history
else []
),
*(
[
RunLayerSpec(
name="ask_human",
type=DIFY_ASK_HUMAN_LAYER_TYPE_ID,
config=ask_human_config or DifyAskHumanLayerConfig(),
)
]
if include_ask_human
else []
),
RunLayerSpec(
name=execution_context_layer_name,
type=DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID,
@ -270,6 +287,7 @@ class RecordingTestModel(TestModel):
def _history_session_snapshot(
messages: list[ModelMessage],
*,
include_ask_human: bool = False,
include_output: bool = False,
) -> CompositorSessionSnapshot:
layers = [
@ -279,6 +297,11 @@ def _history_session_snapshot(
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state=PydanticAIHistoryRuntimeState(messages=messages).model_dump(mode="json"),
),
*(
[LayerSessionSnapshot(name="ask_human", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={})]
if include_ask_human
else []
),
LayerSessionSnapshot(name="execution_context", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}),
LayerSessionSnapshot(
name=DIFY_AGENT_MODEL_LAYER_ID, lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}
@ -302,6 +325,18 @@ def _flatten_message_parts(messages: list[ModelMessage]) -> list[object]:
return [part for message in messages for part in message.parts]
class FakeAgentRunResult:
output: object
_new_messages: list[ModelMessage]
def __init__(self, output: object, new_messages: list[ModelMessage]) -> None:
self.output = output
self._new_messages = new_messages
def new_messages(self) -> list[ModelMessage]:
return list(self._new_messages)
def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
seen_clients: list[httpx.AsyncClient] = []
@ -350,6 +385,489 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa
assert sink.statuses["run-1"] == "succeeded"
def test_runner_preserves_explicit_json_null_output(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType]
class FakeAgent:
async def run(self, *_args: object, **_kwargs: object) -> FakeAgentRunResult:
return FakeAgentRunResult(None, [])
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
monkeypatch.setattr("dify_agent.runtime.runner.create_agent", lambda *_args, **_kwargs: FakeAgent())
request = _request()
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-null-output",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
terminal = sink.events["run-null-output"][-1]
assert isinstance(terminal, RunSucceededEvent)
assert terminal.data.output is None
assert terminal.data.deferred_tool_call is None
assert sink.statuses["run-null-output"] == "succeeded"
def test_runner_emits_deferred_tool_call_and_persists_pending_history(monkeypatch: pytest.MonkeyPatch) -> None:
captured_output_types: list[object] = []
captured_user_prompts: list[object] = []
pending_tool_call = ToolCallPart(
tool_name="ask_human",
args={
"question": "Which deployment window should we use?",
"fields": [{"type": "paragraph", "name": "window", "label": "Deployment window"}],
},
tool_call_id="tool-call-1",
)
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType]
class FakeAgent:
async def run(self, user_prompt: object, **kwargs: object) -> FakeAgentRunResult:
captured_user_prompts.append(user_prompt)
assert kwargs["deferred_tool_results"] is None
return FakeAgentRunResult(
DeferredToolRequests(calls=[pending_tool_call]),
[
ModelRequest(parts=[UserPromptPart(content="current user")]),
ModelResponse(parts=[pending_tool_call]),
],
)
def fake_create_agent(model: object, *, tools: list[Tool[object]], output_type: object) -> FakeAgent:
del model, tools
captured_output_types.append(output_type)
return FakeAgent()
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
monkeypatch.setattr("dify_agent.runtime.runner.create_agent", fake_create_agent)
request = _request("current user", include_history=True, include_ask_human=True)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-ask-human",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
terminal = sink.events["run-ask-human"][-1]
assert isinstance(terminal, RunSucceededEvent)
assert captured_user_prompts == ["current user"]
assert any(item is DeferredToolRequests for item in cast(Iterable[object], captured_output_types[0]))
assert terminal.data.output is None
assert terminal.data.deferred_tool_call is not None
assert terminal.data.deferred_tool_call.tool_call_id == "tool-call-1"
assert terminal.data.deferred_tool_call.tool_name == "ask_human"
assert terminal.data.deferred_tool_call.args == {
"title": None,
"question": "Which deployment window should we use?",
"markdown": None,
"fields": [
{
"type": "paragraph",
"name": "window",
"label": "Deployment window",
"required": False,
"placeholder": None,
"default": None,
}
],
"actions": [{"id": "submit", "label": "Submit", "style": "primary"}],
"urgency": "normal",
}
saved_history = _history_messages_from_snapshot(terminal.data.session_snapshot)
assert isinstance(saved_history[-1], ModelResponse)
assert saved_history[-1].parts == [pending_tool_call]
def test_runner_resumes_with_deferred_tool_results_and_no_user_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
seen_user_prompts: list[object] = []
seen_deferred_results: list[object] = []
pending_tool_call = ToolCallPart(
tool_name="ask_human",
args={"question": "Need approval"},
tool_call_id="tool-call-1",
)
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType]
class FakeAgent:
async def run(self, user_prompt: object, **kwargs: object) -> FakeAgentRunResult:
seen_user_prompts.append(user_prompt)
seen_deferred_results.append(kwargs.get("deferred_tool_results"))
if kwargs.get("deferred_tool_results") is None:
return FakeAgentRunResult(
DeferredToolRequests(calls=[pending_tool_call]),
[
ModelRequest(parts=[UserPromptPart(content="current user")]),
ModelResponse(parts=[pending_tool_call]),
],
)
deferred_tool_results = cast(DeferredToolResults, kwargs["deferred_tool_results"])
assert deferred_tool_results is not None
submitted_result = cast(dict[str, object], deferred_tool_results.calls["tool-call-1"])
assert submitted_result["status"] == "submitted"
return FakeAgentRunResult(
"done after human",
[
ModelRequest(
parts=[
ToolReturnPart(
tool_name="ask_human",
content={"status": "submitted", "values": {"comment": "Ship it"}},
tool_call_id="tool-call-1",
)
]
),
ModelResponse(parts=[TextPart(content="done after human")]),
],
)
def fake_create_agent(model: object, *, tools: list[Tool[object]], output_type: object) -> FakeAgent:
del model, tools, output_type
return FakeAgent()
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
monkeypatch.setattr("dify_agent.runtime.runner.create_agent", fake_create_agent)
request = _request("current user", include_history=True, include_ask_human=True)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-ask-human-initial",
plugin_daemon_http_client=client,
).run()
initial_terminal = sink.events["run-ask-human-initial"][-1]
assert isinstance(initial_terminal, RunSucceededEvent)
resumed_request = request.model_copy(deep=True)
resumed_request.session_snapshot = initial_terminal.data.session_snapshot
resumed_request.deferred_tool_results = DeferredToolResultsPayload.model_validate(
{
"calls": {
"tool-call-1": {
"status": "submitted",
"action": {"id": "submit", "label": "Submit"},
"values": {"comment": "Ship it"},
}
}
}
)
await AgentRunRunner(
sink=sink,
request=resumed_request,
run_id="run-ask-human-resume",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
resumed_terminal = sink.events["run-ask-human-resume"][-1]
assert isinstance(resumed_terminal, RunSucceededEvent)
assert resumed_terminal.data.output == "done after human"
assert resumed_terminal.data.deferred_tool_call is None
assert seen_user_prompts == ["current user", None]
assert seen_deferred_results[0] is None
assert seen_deferred_results[1] is not None
def test_runner_can_emit_second_deferred_tool_call_after_resume(monkeypatch: pytest.MonkeyPatch) -> None:
seen_user_prompts: list[object] = []
first_pending_tool_call = ToolCallPart(
tool_name="ask_human",
args={"question": "Need deployment owner"},
tool_call_id="tool-call-1",
)
second_pending_tool_call = ToolCallPart(
tool_name="ask_human",
args={"question": "Need final go-live confirmation"},
tool_call_id="tool-call-2",
)
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType]
class FakeAgent:
async def run(self, user_prompt: object, **kwargs: object) -> FakeAgentRunResult:
seen_user_prompts.append(user_prompt)
deferred_tool_results = kwargs.get("deferred_tool_results")
if deferred_tool_results is None:
return FakeAgentRunResult(
DeferredToolRequests(calls=[first_pending_tool_call]),
[
ModelRequest(parts=[UserPromptPart(content="current user")]),
ModelResponse(parts=[first_pending_tool_call]),
],
)
return FakeAgentRunResult(
DeferredToolRequests(calls=[second_pending_tool_call]),
[
ModelRequest(
parts=[
ToolReturnPart(
tool_name="ask_human",
content={"status": "submitted", "values": {"owner": "ops"}},
tool_call_id="tool-call-1",
)
]
),
ModelResponse(parts=[second_pending_tool_call]),
],
)
def fake_create_agent(model: object, *, tools: list[Tool[object]], output_type: object) -> FakeAgent:
del model, tools, output_type
return FakeAgent()
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
monkeypatch.setattr("dify_agent.runtime.runner.create_agent", fake_create_agent)
request = _request("current user", include_history=True, include_ask_human=True)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-ask-human-turn-1",
plugin_daemon_http_client=client,
).run()
first_terminal = sink.events["run-ask-human-turn-1"][-1]
assert isinstance(first_terminal, RunSucceededEvent)
resumed_request = request.model_copy(deep=True)
resumed_request.session_snapshot = first_terminal.data.session_snapshot
resumed_request.deferred_tool_results = DeferredToolResultsPayload.model_validate(
{
"calls": {
"tool-call-1": {
"status": "submitted",
"action": {"id": "submit", "label": "Submit"},
"values": {"owner": "ops"},
}
}
}
)
await AgentRunRunner(
sink=sink,
request=resumed_request,
run_id="run-ask-human-turn-2",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
second_terminal = sink.events["run-ask-human-turn-2"][-1]
assert isinstance(second_terminal, RunSucceededEvent)
assert second_terminal.data.output is None
assert second_terminal.data.deferred_tool_call is not None
assert second_terminal.data.deferred_tool_call.tool_call_id == "tool-call-2"
assert seen_user_prompts == ["current user", None]
saved_history = _history_messages_from_snapshot(second_terminal.data.session_snapshot)
assert isinstance(saved_history[1], ModelResponse)
assert saved_history[1].parts == [first_pending_tool_call]
assert isinstance(saved_history[2], ModelRequest)
assert len(saved_history[2].parts) == 1
assert isinstance(saved_history[2].parts[0], ToolReturnPart)
assert saved_history[2].parts[0].tool_name == "ask_human"
assert saved_history[2].parts[0].tool_call_id == "tool-call-1"
assert saved_history[2].parts[0].content == {"status": "submitted", "values": {"owner": "ops"}}
assert isinstance(saved_history[3], ModelResponse)
assert saved_history[3].parts == [second_pending_tool_call]
def test_runner_rejects_deferred_tool_call_without_history_layer(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType]
class FakeAgent:
async def run(self, *_args: object, **_kwargs: object) -> FakeAgentRunResult:
return FakeAgentRunResult(
DeferredToolRequests(
calls=[
ToolCallPart(tool_name="ask_human", args={"question": "Need owner"}, tool_call_id="tool-call-1")
]
),
[],
)
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
monkeypatch.setattr("dify_agent.runtime.runner.create_agent", lambda *args, **kwargs: FakeAgent())
request = _request("current user", include_history=False, include_ask_human=True)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(
AgentRunValidationError,
match="ask_human deferred tool requests require a 'history' layer so the pending tool call can be resumed",
):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-ask-human-no-history",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
assert [event.type for event in sink.events["run-ask-human-no-history"]] == ["run_started", "run_failed"]
def test_runner_rejects_resume_with_deferred_tool_results_without_history_layer(
monkeypatch: pytest.MonkeyPatch,
) -> None:
agent_run_called = False
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType]
class FakeAgent:
async def run(self, *_args: object, **_kwargs: object) -> FakeAgentRunResult:
nonlocal agent_run_called
agent_run_called = True
return FakeAgentRunResult("unexpected", [])
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
monkeypatch.setattr("dify_agent.runtime.runner.create_agent", lambda *args, **kwargs: FakeAgent())
request = _request("current user", include_history=False, include_ask_human=True)
request.deferred_tool_results = DeferredToolResultsPayload.model_validate(
{
"calls": {
"tool-call-1": {
"status": "submitted",
"action": {"id": "submit", "label": "Submit"},
"values": {"owner": "ops"},
}
}
}
)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(
AgentRunValidationError,
match="Deferred tool results require a 'history' layer with prior message history",
):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-ask-human-resume-no-history",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
assert agent_run_called is False
assert [event.type for event in sink.events["run-ask-human-resume-no-history"]] == ["run_started", "run_failed"]
def test_runner_rejects_multiple_deferred_tool_calls(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType]
class FakeAgent:
async def run(self, *_args: object, **_kwargs: object) -> FakeAgentRunResult:
return FakeAgentRunResult(
DeferredToolRequests(
calls=[
ToolCallPart(tool_name="ask_human", args={"question": "One"}, tool_call_id="tool-call-1"),
ToolCallPart(tool_name="ask_human", args={"question": "Two"}, tool_call_id="tool-call-2"),
]
),
[],
)
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
monkeypatch.setattr("dify_agent.runtime.runner.create_agent", lambda *args, **kwargs: FakeAgent())
request = _request("current user", include_history=True, include_ask_human=True)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(ValueError, match="supports exactly one deferred call per run"):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-ask-human-multi",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
assert [event.type for event in sink.events["run-ask-human-multi"]] == ["run_started", "run_failed"]
def test_runner_rejects_deferred_approval_requests(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType]
class FakeAgent:
async def run(self, *_args: object, **_kwargs: object) -> FakeAgentRunResult:
return FakeAgentRunResult(
DeferredToolRequests(
approvals=[
ToolCallPart(
tool_name="ask_human", args={"question": "Need approval"}, tool_call_id="tool-call-1"
)
]
),
[],
)
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
monkeypatch.setattr("dify_agent.runtime.runner.create_agent", lambda *args, **kwargs: FakeAgent())
request = _request("current user", include_history=True, include_ask_human=True)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(ValueError, match="does not support approval requests"):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-ask-human-approval",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
assert [event.type for event in sink.events["run-ask-human-approval"]] == ["run_started", "run_failed"]
def test_runner_passes_dynamic_dify_plugin_tools_to_agent(monkeypatch: pytest.MonkeyPatch) -> None:
seen_tools: list[Tool[object]] = []

View File

@ -74,6 +74,7 @@ def test_client_public_exports_work_with_default_dependencies_only(tmp_path: Pat
shell_module = importlib.import_module("dify_agent.layers.shell")
execution_context_module = importlib.import_module("dify_agent.layers.execution_context")
plugin_module = importlib.import_module("dify_agent.layers.dify_plugin")
ask_human_module = importlib.import_module("dify_agent.layers.ask_human")
output_module = importlib.import_module("dify_agent.layers.output")
assert agenton_layers.ExitIntent is not None
@ -92,6 +93,7 @@ def test_client_public_exports_work_with_default_dependencies_only(tmp_path: Pat
assert shell_module.DifyShellLayerConfig is not None
assert execution_context_module.DifyExecutionContextLayerConfig is not None
assert plugin_module.DifyPluginLLMLayerConfig is not None
assert ask_human_module.DifyAskHumanLayerConfig is not None
assert output_module.DifyOutputLayerConfig is not None
grpc_error = importlib.import_module("dify_agent.agent_stub.client._errors").AgentStubMissingGRPCDependencyError

View File

@ -80,6 +80,7 @@ def test_protocol_and_dify_plugin_exports_do_not_import_server_only_modules() ->
"anthropic",
"dify_agent.adapters.llm",
"dify_agent.layers.execution_context.layer",
"dify_agent.layers.ask_human.layer",
"dify_agent.layers.dify_plugin.llm_layer",
"dify_agent.layers.dify_plugin.tools_layer",
"dify_agent.layers.output.output_layer",
@ -98,6 +99,7 @@ def test_protocol_and_dify_plugin_exports_do_not_import_server_only_modules() ->
imports=[
"dify_agent.protocol",
"dify_agent.layers.execution_context",
"dify_agent.layers.ask_human",
"dify_agent.layers.dify_plugin",
"dify_agent.layers.output",
"dify_agent.layers.shell",
@ -105,6 +107,7 @@ def test_protocol_and_dify_plugin_exports_do_not_import_server_only_modules() ->
assertions=[
"assert hasattr(dify_agent_protocol, 'PydanticAIStreamRunEvent')",
"assert dify_agent_layers_execution_context.__all__ == ['DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID', 'DifyExecutionContextAgentMode', 'DifyExecutionContextInvokeFrom', 'DifyExecutionContextLayerConfig', 'DifyExecutionContextUserFrom']",
"assert dify_agent_layers_ask_human.__all__ == ['AskHumanAction', 'AskHumanActionStyle', 'AskHumanField', 'AskHumanFieldType', 'AskHumanFileField', 'AskHumanFileListField', 'AskHumanParagraphField', 'AskHumanResultStatus', 'AskHumanSelectField', 'AskHumanSelectOption', 'AskHumanSelectedAction', 'AskHumanToolArgs', 'AskHumanToolResult', 'AskHumanUrgency', 'DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION', 'DIFY_ASK_HUMAN_LAYER_TYPE_ID', 'DifyAskHumanLayerConfig']",
"assert dify_agent_layers_dify_plugin.__all__ == ['DIFY_PLUGIN_LLM_LAYER_TYPE_ID', 'DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID', 'DifyPluginCredentialValue', 'DifyPluginLLMLayerConfig', 'DifyPluginToolCredentialType', 'DifyPluginToolConfig', 'DifyPluginToolOption', 'DifyPluginToolParameter', 'DifyPluginToolParameterForm', 'DifyPluginToolParameterType', 'DifyPluginToolsLayerConfig', 'DifyPluginToolValue']",
"assert dify_agent_layers_output.__all__ == ['DIFY_OUTPUT_LAYER_TYPE_ID', 'DifyOutputLayerConfig']",
"assert dify_agent_layers_shell.__all__ == ['DIFY_SHELL_LAYER_TYPE_ID', 'DifyShellCliToolConfig', 'DifyShellEnvVarConfig', 'DifyShellLayerConfig', 'DifyShellSandboxConfig', 'DifyShellSecretRefConfig']",