diff --git a/dify-agent/docs/dify-agent/concepts/run-lifecycle/index.md b/dify-agent/docs/dify-agent/concepts/run-lifecycle/index.md index dce524e5b37..bbc9bd2fc31 100644 --- a/dify-agent/docs/dify-agent/concepts/run-lifecycle/index.md +++ b/dify-agent/docs/dify-agent/concepts/run-lifecycle/index.md @@ -45,14 +45,15 @@ current `agent run` has ended; the outer `workflow run` is what should be paused The caller should handle this flow as follows: -1. Read the current `agent run` result and detect the HITL (human-in-the-loop) - requirement. +1. Read the current `agent run` result and detect `deferred_tool_call` on the + terminal `run_succeeded` event. 2. Enter workflow HITL handling and pause graphon. 3. Wait for the human input to be completed. -4. When resuming the workflow, insert the human tool response into the same Agent - session's history layer. -5. Start a second `agent run` on the same Agent node and reuse the same history - session. +4. When resuming the workflow, start a second `agent run` on the same Agent node + with the previous `session_snapshot`, matching composition, and + `deferred_tool_results` keyed by the original tool call id. +5. Keep the history layer active so Dify Agent can match the result to the + pending tool call stored in the previous run's message history. In other words, a human tool does not mean “pause this agent run until it is resumed.” It means “this agent run ended with a result that requires human diff --git a/dify-agent/docs/dify-agent/guide/index.md b/dify-agent/docs/dify-agent/guide/index.md index c3662478dbc..27ec96ab349 100644 --- a/dify-agent/docs/dify-agent/guide/index.md +++ b/dify-agent/docs/dify-agent/guide/index.md @@ -136,8 +136,10 @@ Successful runs emit `run_started`, zero or more `pydantic_ai_event`, and `run_succeeded`. Failed runs end with `run_failed`. Event envelopes retain `id`, `run_id`, `type`, `data`, and `created_at`; `data` is typed per event type, including Pydantic AI's `AgentStreamEvent` payload for `pydantic_ai_event` and a -terminal `run_succeeded.data` object containing JSON-safe `output` plus a -`CompositorSessionSnapshot` for resumption. +terminal `run_succeeded.data` object containing a `CompositorSessionSnapshot` for +resumption. A successful run has exactly one active result branch: JSON-safe +`output` for final answers, or `deferred_tool_call` when a layer such as +`dify.ask_human` ends the current agent run with an external deferred tool call. ## Examples diff --git a/dify-agent/docs/dify-agent/user-manual/ask-human-layer/index.md b/dify-agent/docs/dify-agent/user-manual/ask-human-layer/index.md new file mode 100644 index 00000000000..404819398bc --- /dev/null +++ b/dify-agent/docs/dify-agent/user-manual/ask-human-layer/index.md @@ -0,0 +1,271 @@ +# Ask human layer + +The ask human layer exposes one model-visible tool that lets an agent end the +current run with a structured request for human input. This page is for Dify +Agent clients that build `CreateRunRequest` payloads and then interpret terminal +run events. + +The layer type id is `dify.ask_human`. It does not deliver forms, choose +recipients, enforce authorization, or wait inside the agent run. It only gives +the model a safe way to ask for human input and returns that request as a +deferred tool call. + +## Layer contract + +| Property | Value | +| --- | --- | +| Type id | `dify.ask_human` | +| Common layer name | `ask_human` | +| Config DTO | `DifyAskHumanLayerConfig` | +| Model-visible tool | `ask_human` by default, configurable with `tool_name` | +| Tool kind | pydantic-ai `external` deferred tool | +| Terminal event | `run_succeeded` | +| Terminal payload branch | `run_succeeded.data.deferred_tool_call` | + +The agent run does not enter a paused status. When the model calls the ask-human +tool, the current run succeeds with a `deferred_tool_call` instead of normal +`output`. The client is responsible for turning that deferred call into its own +human-facing workflow, collecting a result, and starting another run with +`deferred_tool_results`. + +## Basic usage + +Add the ask human layer to the same composition as the prompt, history, LLM, and +optional structured-output layers: + +```python {test="skip" lint="skip"} +from agenton_collections.layers.plain import PromptLayerConfig +from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID +from dify_agent.layers.ask_human import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig +from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig +from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID +from dify_agent.protocol.schemas import CreateRunRequest, RunComposition, RunLayerSpec + + +request = CreateRunRequest( + composition=RunComposition( + layers=[ + RunLayerSpec( + name="prompt", + type="plain.prompt", + config=PromptLayerConfig( + prefix="You can ask a human only when the missing decision is required to continue.", + user="Review the deployment plan and proceed only after getting the required approval.", + ), + ), + RunLayerSpec( + name=DIFY_AGENT_HISTORY_LAYER_ID, + type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, + ), + RunLayerSpec( + name="ask_human", + type=DIFY_ASK_HUMAN_LAYER_TYPE_ID, + config=DifyAskHumanLayerConfig( + max_fields=4, + max_actions=2, + allowed_field_types=["paragraph", "select"], + allow_file_fields=False, + ), + ), + RunLayerSpec( + name=DIFY_AGENT_MODEL_LAYER_ID, + type="dify.plugin.llm", + config=DifyPluginLLMLayerConfig( + plugin_id="langgenius/openai", + model_provider="openai", + model="gpt-5.2", + credentials={"openai_api_key": ""}, + ), + ), + ] + ) +) +``` + +Include a [history layer](../history-layer/index.md) whenever you expect to +resume after a human answer. The pending tool call is stored in pydantic-ai +message history, so the resumed run needs both the returned `session_snapshot` +and the same logical composition with the history layer still present. + +## Config fields + +`DifyAskHumanLayerConfig` controls the model-facing tool identity and guardrails. +It intentionally does not contain delivery settings. + +| Field | Type | Default | Meaning | +| --- | --- | --- | --- | +| `enabled` | `bool` | `True` | When false, the layer exposes neither the tool nor the prompt guidance. | +| `tool_name` | `str` | `"ask_human"` | Model-visible tool name. Must be a valid identifier. | +| `tool_description` | `str \| None` | default description | Optional model-visible tool description. | +| `max_fields` | `int` | `8` | Maximum number of fields the model may request. Use `0` for action-only requests. | +| `max_actions` | `int` | `4` | Maximum number of human actions the model may request. | +| `allowed_field_types` | `list["paragraph" \| "select" \| "file" \| "file-list"]` | `["paragraph", "select"]` | Field types accepted by runtime validation. | +| `allow_file_fields` | `bool` | `False` | File field types are rejected unless this is true and the type is listed in `allowed_field_types`. | +| `max_markdown_chars` | `int` | `8000` | Maximum length for the optional `markdown` body. | +| `max_question_chars` | `int` | `1000` | Maximum length for the required `question`. | +| `max_field_label_chars` | `int` | `120` | Maximum label length for each field. | +| `max_action_label_chars` | `int` | `80` | Maximum label length for each action. | + +Configured limits are also capped by server hard limits. If a config exceeds a +hard cap, request validation fails before the run can execute. + +The layer converts these limits into a prompt hint automatically. Clients do not +need to write a separate system prompt listing the limits, although they may add +business-specific guidance such as when human input is appropriate. + +## What the model can request + +When enabled, the layer exposes an external deferred tool whose argument shape is +`AskHumanToolArgs`: + +| Field | Type | Meaning | +| --- | --- | --- | +| `title` | `str \| None` | Optional short title for the human request. | +| `question` | `str` | Required question/instruction for the human. | +| `markdown` | `str \| None` | Optional longer Markdown body. Treat it as untrusted user-visible content. | +| `fields` | `list[AskHumanField]` | Optional structured fields for the human to fill. | +| `actions` | `list[AskHumanAction]` | Optional action buttons. If omitted, Dify Agent normalizes to a single primary `Submit` action. | +| `urgency` | `"normal" \| "high"` | Hint for downstream systems; it is not a delivery policy. | + +Supported field variants: + +- `paragraph`: free-text input. +- `select`: single-choice input with unique option values. +- `file`: single-file input, only when file fields are allowed. +- `file-list`: multi-file input, only when file fields are allowed. + +Tool arguments are validated again after the model calls the tool. Invalid calls +produce a model retry before a terminal success is emitted. + +## Handling a deferred human request + +Stream or poll run events as usual. A successful final answer has +`event.data.output`. A successful human request has `event.data.deferred_tool_call`. +Exactly one branch is set. + +```python {test="skip" lint="skip"} +deferred_call = None +snapshot = None + +async for event in client.stream_events(run_id): + if event.type != "run_succeeded": + continue + snapshot = event.data.session_snapshot + if event.data.deferred_tool_call is not None: + deferred_call = event.data.deferred_tool_call + else: + final_output = event.data.output + break + +if deferred_call is not None: + # Render your own human-facing form, enqueue notification, pause an outer + # workflow, or store the request for later. Dify Agent does not do that part. + print(deferred_call.tool_call_id, deferred_call.args) +``` + +A typical deferred payload looks like this: + +```json +{ + "tool_call_id": "call_01H...", + "tool_name": "ask_human", + "args": { + "title": "Deployment approval", + "question": "Can we deploy version 2026.06.10 to production now?", + "fields": [ + { + "type": "paragraph", + "name": "comment", + "label": "Approval comment", + "required": false + } + ], + "actions": [ + {"id": "approve", "label": "Approve", "style": "primary"}, + {"id": "reject", "label": "Reject", "style": "destructive"} + ], + "urgency": "normal" + }, + "metadata": { + "layer_type": "dify.ask_human", + "tool_name": "ask_human", + "schema_version": 1 + } +} +``` + +The `args` object is model-generated content. Validate and sanitize it before +rendering it to end users. + +## Resume with a human result + +After your client collects a human answer, create a new run with: + +- the previous `session_snapshot`; +- a matching composition that still includes the history and ask-human layers; +- `deferred_tool_results.calls[tool_call_id]` containing the human result. + +```python {test="skip" lint="skip"} +from dify_agent.layers.ask_human import AskHumanToolResult +from dify_agent.protocol import DeferredToolResultsPayload + + +human_result = AskHumanToolResult( + status="submitted", + action={"id": "approve", "label": "Approve"}, + values={"comment": "Approved for the planned window."}, + message="The human approved the deployment.", +) + +resume_request = CreateRunRequest( + composition=composition_with_same_layer_names_and_order, + session_snapshot=snapshot, + deferred_tool_results=DeferredToolResultsPayload( + calls={deferred_call.tool_call_id: human_result.model_dump(mode="json")}, + ), +) +``` + +Dify Agent passes the supplied result back to pydantic-ai as the return value of +the original external tool call, then the model continues. The resumed run may +produce a final `output`, or it may produce another `deferred_tool_call` if the +agent needs another human turn. + +Timeouts and unavailable humans should also be sent as tool results instead of +being treated as agent-run failures: + +```json +{ + "status": "timeout", + "action": {"id": "__timeout", "label": "Timeout"}, + "values": {}, + "message": "The human did not respond before the workflow timeout." +} +``` + +## Client responsibilities + +The ask human layer deliberately leaves product decisions to the caller. Clients +must decide how to: + +- persist the deferred call and correlate it with a human-facing task; +- render and sanitize the requested fields/actions; +- choose recipients, channels, and timeout policy; +- authorize who may answer; +- transform the human submission into `AskHumanToolResult`; +- resume with the returned `session_snapshot` and matching composition. + +Do not put recipient emails, workspace member ids, public URLs, auth tokens, or +timeout policy in the tool arguments. The model-facing request is untrusted and +should not control delivery or authorization. + +## Troubleshooting + +| Symptom | What to check | +| --- | --- | +| Run fails with `Deferred tool results require a 'history' layer` | Add the `history` layer and resume with the prior snapshot. | +| Run fails with `pending tool call can be resumed` | Keep the history layer active for the initial deferred run. | +| Run fails with `exactly one deferred call` | The MVP supports one ask-human call per run. Ask the model to ask one question at a time. | +| Run fails with `tool name must be ...` | Use the configured `tool_name`; do not rename it only in downstream form code. | +| File fields are rejected | Set `allow_file_fields=True` and include `file` or `file-list` in `allowed_field_types`. | +| `run_succeeded.data.output` is absent | Check `run_succeeded.data.deferred_tool_call`; this is a human-request success, not a failed run. | diff --git a/dify-agent/mkdocs.yml b/dify-agent/mkdocs.yml index 579cffe536a..3993b3618e5 100644 --- a/dify-agent/mkdocs.yml +++ b/dify-agent/mkdocs.yml @@ -21,6 +21,7 @@ nav: - Prompt Layer: dify-agent/user-manual/prompt-layer/index.md - Execution Context Layer: dify-agent/user-manual/execution-context-layer/index.md - Shell Layer: dify-agent/user-manual/shell-layer/index.md + - Ask Human Layer: dify-agent/user-manual/ask-human-layer/index.md - Plugin LLM Layer: dify-agent/user-manual/plugin-llm-layer/index.md - Plugin Tool Layer: dify-agent/user-manual/plugin-tool-layer/index.md - History Layer: dify-agent/user-manual/history-layer/index.md diff --git a/dify-agent/src/dify_agent/layers/ask_human/__init__.py b/dify-agent/src/dify_agent/layers/ask_human/__init__.py new file mode 100644 index 00000000000..0138d7daf62 --- /dev/null +++ b/dify-agent/src/dify_agent/layers/ask_human/__init__.py @@ -0,0 +1,48 @@ +"""Client-safe exports for Dify ask-human layer DTOs and schema types. + +The runtime layer implementation lives in ``layer.py`` and imports server-side + execution helpers. Keep this package root import-safe for client code that only + needs to build run requests or understand deferred payload shapes. +""" + +from dify_agent.layers.ask_human.configs import ( + DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION, + DIFY_ASK_HUMAN_LAYER_TYPE_ID, + DifyAskHumanLayerConfig, +) +from dify_agent.layers.ask_human.schema import ( + AskHumanAction, + AskHumanActionStyle, + AskHumanField, + AskHumanFieldType, + AskHumanFileField, + AskHumanFileListField, + AskHumanParagraphField, + AskHumanResultStatus, + AskHumanSelectField, + AskHumanSelectOption, + AskHumanSelectedAction, + AskHumanToolArgs, + AskHumanToolResult, + AskHumanUrgency, +) + +__all__ = [ + "AskHumanAction", + "AskHumanActionStyle", + "AskHumanField", + "AskHumanFieldType", + "AskHumanFileField", + "AskHumanFileListField", + "AskHumanParagraphField", + "AskHumanResultStatus", + "AskHumanSelectField", + "AskHumanSelectOption", + "AskHumanSelectedAction", + "AskHumanToolArgs", + "AskHumanToolResult", + "AskHumanUrgency", + "DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION", + "DIFY_ASK_HUMAN_LAYER_TYPE_ID", + "DifyAskHumanLayerConfig", +] diff --git a/dify-agent/src/dify_agent/layers/ask_human/configs.py b/dify-agent/src/dify_agent/layers/ask_human/configs.py new file mode 100644 index 00000000000..432d0aad56c --- /dev/null +++ b/dify-agent/src/dify_agent/layers/ask_human/configs.py @@ -0,0 +1,136 @@ +"""Client-safe DTOs for the Dify ask-human layer. + +The public config controls only stable model-facing tool identity and guardrails. +Delivery, recipient selection, timeout policy, and other operational behavior are +intentionally out of scope for this layer and must stay outside the model-facing +tool contract. Setting ``enabled=False`` disables both ask-human tool exposure +and the prompt guidance that tells the model about these limits. Caller-provided +limits are additionally capped by small server hard limits so one composition +cannot widen the public deferred-tool surface arbitrarily. File field variants +are part of the schema vocabulary for forward compatibility, but they remain +invalid unless ``allow_file_fields=True`` and the allowed field-type list also +permits them. +""" + +from __future__ import annotations + +import re +from typing import ClassVar, Final + +from pydantic import ConfigDict, Field, field_validator, model_validator + +from agenton.layers import LayerConfig +from dify_agent.layers.ask_human.schema import AskHumanFieldType + + +DIFY_ASK_HUMAN_LAYER_TYPE_ID: Final[str] = "dify.ask_human" +DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION: Final[str] = ( + "Ask a human for missing information or a decision that is required to continue. " + "Use this only when the answer cannot be inferred from the conversation, available tools, or current context. " + "Provide concise instructions, structured fields, and clear actions for the human." +) + +_TOOL_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") +_HARD_MAX_FIELDS = 16 +_HARD_MAX_ACTIONS = 8 +_HARD_MAX_MARKDOWN_CHARS = 20_000 +_HARD_MAX_QUESTION_CHARS = 4_000 +_HARD_MAX_FIELD_LABEL_CHARS = 200 +_HARD_MAX_ACTION_LABEL_CHARS = 120 +_FILE_FIELD_TYPES: Final[frozenset[AskHumanFieldType]] = frozenset({"file", "file-list"}) + + +class DifyAskHumanLayerConfig(LayerConfig): + """Public config for the optional ask-human deferred tool layer. + + This DTO describes the exact model-facing guardrail surface that the runtime + will both validate and surface back to the model through prompt guidance. + ``enabled=False`` means callers keep the layer in composition data without + exposing either the tool or its instructions for that run. Numeric limits are + caller-configurable only within the server's hard caps, and file field types + are rejected unless callers opt in with ``allow_file_fields=True``. + """ + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + enabled: bool = True + tool_name: str = "ask_human" + tool_description: str | None = None + max_fields: int = Field(default=8, ge=0) + max_actions: int = Field(default=4, ge=1) + allowed_field_types: list[AskHumanFieldType] = Field(default_factory=lambda: ["paragraph", "select"]) + allow_file_fields: bool = False + max_markdown_chars: int = Field(default=8_000, ge=0) + max_question_chars: int = Field(default=1_000, ge=1) + max_field_label_chars: int = Field(default=120, ge=1) + max_action_label_chars: int = Field(default=80, ge=1) + + @property + def effective_tool_description(self) -> str: + """Return the configured description or the proposal default text.""" + return self.tool_description or DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION + + @field_validator("tool_name") + @classmethod + def _validate_tool_name(cls, value: str) -> str: + if not _TOOL_NAME_PATTERN.fullmatch(value): + raise ValueError("tool_name must be a valid tool identifier") + return value + + @field_validator("tool_description") + @classmethod + def _normalize_tool_description(cls, value: str | None) -> str | None: + if value is None: + return None + stripped = value.strip() + return stripped or None + + @field_validator("allowed_field_types") + @classmethod + def _validate_allowed_field_types(cls, value: list[AskHumanFieldType]) -> list[AskHumanFieldType]: + if len(set(value)) != len(value): + raise ValueError("allowed_field_types must not contain duplicates") + return value + + @field_validator( + "max_fields", + "max_actions", + "max_markdown_chars", + "max_question_chars", + "max_field_label_chars", + "max_action_label_chars", + mode="after", + ) + @classmethod + def _validate_hard_limits(cls, value: int, info: object) -> int: + field_name = getattr(info, "field_name", "value") + hard_limits = { + "max_fields": _HARD_MAX_FIELDS, + "max_actions": _HARD_MAX_ACTIONS, + "max_markdown_chars": _HARD_MAX_MARKDOWN_CHARS, + "max_question_chars": _HARD_MAX_QUESTION_CHARS, + "max_field_label_chars": _HARD_MAX_FIELD_LABEL_CHARS, + "max_action_label_chars": _HARD_MAX_ACTION_LABEL_CHARS, + } + hard_limit = hard_limits[field_name] + if value > hard_limit: + raise ValueError(f"{field_name} must be <= {hard_limit}") + return value + + @model_validator(mode="after") + def _validate_file_field_policy(self) -> DifyAskHumanLayerConfig: + if not self.allow_file_fields: + forbidden = [field_type for field_type in self.allowed_field_types if field_type in _FILE_FIELD_TYPES] + if forbidden: + joined = ", ".join(forbidden) + raise ValueError( + f"allowed_field_types cannot include file field types when allow_file_fields is false: {joined}" + ) + return self + + +__all__ = [ + "DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION", + "DIFY_ASK_HUMAN_LAYER_TYPE_ID", + "DifyAskHumanLayerConfig", +] diff --git a/dify-agent/src/dify_agent/layers/ask_human/layer.py b/dify-agent/src/dify_agent/layers/ask_human/layer.py new file mode 100644 index 00000000000..a94de9af72e --- /dev/null +++ b/dify-agent/src/dify_agent/layers/ask_human/layer.py @@ -0,0 +1,275 @@ +"""Runtime ask-human layer built on pydantic-ai external deferred tools. + +The layer contributes one optional external tool plus one prompt hint. The tool +never executes Python during the initial run; instead the model emits an +external deferred tool call that Dify Agent returns through ``run_succeeded`` as +``deferred_tool_call``. Guardrails are enforced in two places: + +* prompt/tool-definition guidance nudges the model toward valid requests, and +* runtime validation normalizes default actions and rejects out-of-policy calls. + +The layer stays product-neutral: downstream systems decide delivery, recipients, +timeouts, and authorization for the human request. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, ClassVar, cast + +from pydantic import JsonValue, ValidationError +from pydantic_ai import Tool +from pydantic_ai.exceptions import ModelRetry +from pydantic_ai.tools import DeferredToolRequests, RunContext, ToolDefinition +from typing_extensions import Self, override + +from agenton.layers import EmptyRuntimeState, NoLayerDeps, PydanticAILayer, PydanticAIPrompt, PydanticAITool +from dify_agent.layers.ask_human.configs import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig +from dify_agent.layers.ask_human.schema import ( + AskHumanAction, + AskHumanField, + AskHumanToolArgs, +) +from dify_agent.protocol.schemas import DeferredToolCallPayload, RunComposition + + +_ASK_HUMAN_DEFERRED_SCHEMA_VERSION = 1 +_DEFAULT_SUBMIT_ACTION = AskHumanAction(id="submit", label="Submit", style="primary") + + +@dataclass(slots=True) +class DifyAskHumanLayer(PydanticAILayer[NoLayerDeps, object, DifyAskHumanLayerConfig, EmptyRuntimeState]): + """State-free pydantic-ai layer that exposes the ask-human deferred tool.""" + + type_id: ClassVar[str | None] = DIFY_ASK_HUMAN_LAYER_TYPE_ID + + config: DifyAskHumanLayerConfig + + @classmethod + @override + def from_config(cls, config: DifyAskHumanLayerConfig) -> Self: + """Create the layer from validated public config.""" + return cls(config=DifyAskHumanLayerConfig.model_validate(config)) + + @property + @override + def prefix_prompts(self) -> list[PydanticAIPrompt[object]]: + if not self.config.enabled: + return [] + return [self.build_prompt_hint] + + @property + @override + def tools(self) -> list[PydanticAITool[object]]: + if not self.config.enabled: + return [] + return [ + Tool( + self._never_executed_tool, + takes_ctx=True, + name=self.config.tool_name, + description=self.config.effective_tool_description, + prepare=self._prepare_tool_definition, + args_validator=self._validate_tool_args, + sequential=True, + ) + ] + + def build_prompt_hint(self) -> str: + """Return the model-facing instruction text for ask-human guardrails.""" + allowed_field_types = ", ".join(self.config.allowed_field_types) if self.config.allowed_field_types else "none" + file_field_status = "enabled" if self.config.allow_file_fields else "disabled" + if self.config.max_fields == 0: + field_count_hint = "Do not add any fields." + else: + field_count_hint = f"Use at most {self.config.max_fields} field(s)." + return ( + f"You may call the external tool '{self.config.tool_name}' only when human input is required to continue. " + "Do not ask a human for information that can be inferred from the conversation, current context, or other tools.\n\n" + f"Ask-human guardrails:\n" + f"- Allowed field types: {allowed_field_types}.\n" + f"- File upload fields are {file_field_status}.\n" + f"- {field_count_hint}\n" + f"- Use at most {self.config.max_actions} action(s).\n" + f"- Keep 'question' under {self.config.max_question_chars} characters.\n" + f"- Keep 'markdown' under {self.config.max_markdown_chars} characters.\n" + f"- Keep each field label under {self.config.max_field_label_chars} characters.\n" + f"- Keep each action label under {self.config.max_action_label_chars} characters.\n" + "- If you omit actions, the system will add one primary action: Submit.\n" + "Prefer concise, structured requests that stay comfortably within these limits." + ) + + def build_deferred_tool_call_payload(self, requests: DeferredToolRequests) -> DeferredToolCallPayload: + """Validate and normalize the single supported deferred ask-human call.""" + if requests.approvals: + raise ValueError("ask_human does not support approval requests; use external deferred calls only") + + call_count = len(requests.calls) + if call_count != 1: + raise ValueError(f"ask_human supports exactly one deferred call per run in this version; got {call_count}.") + + call = requests.calls[0] + if call.tool_name != self.config.tool_name: + raise ValueError(f"ask_human deferred tool name must be '{self.config.tool_name}', got '{call.tool_name}'.") + + args = self._validate_and_normalize_tool_args( + title=None, + question="", + markdown=None, + fields=[], + actions=[], + urgency="normal", + raw_args=call.args, + ) + return DeferredToolCallPayload( + tool_call_id=call.tool_call_id, + tool_name=call.tool_name, + args=cast(JsonValue, args.model_dump(mode="json")), + metadata={ + "layer_type": self.type_id, + "tool_name": self.config.tool_name, + "schema_version": _ASK_HUMAN_DEFERRED_SCHEMA_VERSION, + }, + ) + + def _prepare_tool_definition(self, _ctx: RunContext[object], tool_def: ToolDefinition) -> ToolDefinition: + """Convert the ask-human tool into a pydantic-ai external deferred tool.""" + del tool_def + return ToolDefinition( + name=self.config.tool_name, + description=self.config.effective_tool_description, + parameters_json_schema=cast(dict[str, Any], AskHumanToolArgs.model_json_schema()), + strict=False, + sequential=True, + kind="external", + ) + + async def _never_executed_tool( + self, + _ctx: RunContext[object], + *, + title: str | None = None, + question: str, + markdown: str | None = None, + fields: list[AskHumanField] | None = None, + actions: list[AskHumanAction] | None = None, + urgency: str = "normal", + ) -> str: + del title, question, markdown, fields, actions, urgency + raise RuntimeError("ask_human is an external deferred tool and should not execute during the initial run") + + def _validate_tool_args( + self, + _ctx: RunContext[object], + *, + title: str | None = None, + question: str, + markdown: str | None = None, + fields: list[AskHumanField] | None = None, + actions: list[AskHumanAction] | None = None, + urgency: str = "normal", + ) -> None: + try: + _ = self._validate_and_normalize_tool_args( + title=title, + question=question, + markdown=markdown, + fields=fields or [], + actions=actions or [], + urgency=urgency, + ) + except (ValidationError, ValueError) as exc: + raise ModelRetry(str(exc)) from exc + + def _validate_and_normalize_tool_args( + self, + *, + title: str | None, + question: str, + markdown: str | None, + fields: list[AskHumanField], + actions: list[AskHumanAction], + urgency: str, + raw_args: str | dict[str, Any] | None = None, + ) -> AskHumanToolArgs: + if raw_args is not None: + args = _validate_tool_args_payload(raw_args) + else: + args = AskHumanToolArgs( + title=title, + question=question, + markdown=markdown, + fields=fields, + actions=actions, + urgency=cast(Any, urgency), + ) + + if len(args.fields) > self.config.max_fields: + raise ValueError(f"ask_human fields must contain at most {self.config.max_fields} item(s)") + + normalized_actions = list(args.actions) + if not normalized_actions: + normalized_actions = [_DEFAULT_SUBMIT_ACTION.model_copy()] + if len(normalized_actions) > self.config.max_actions: + raise ValueError(f"ask_human actions must contain at most {self.config.max_actions} item(s)") + + if len(args.question) > self.config.max_question_chars: + raise ValueError(f"ask_human question must be <= {self.config.max_question_chars} characters") + if args.markdown is not None and len(args.markdown) > self.config.max_markdown_chars: + raise ValueError(f"ask_human markdown must be <= {self.config.max_markdown_chars} characters") + + allowed_field_types = set(self.config.allowed_field_types) + for field in args.fields: + if field.type not in allowed_field_types: + raise ValueError(f"ask_human field type '{field.type}' is not allowed by this layer config") + if len(field.label) > self.config.max_field_label_chars: + raise ValueError( + f"ask_human field label '{field.label}' must be <= {self.config.max_field_label_chars} characters" + ) + if not self.config.allow_file_fields and field.type in {"file", "file-list"}: + raise ValueError("ask_human file fields are disabled by this layer config") + + for action in normalized_actions: + if len(action.label) > self.config.max_action_label_chars: + raise ValueError( + f"ask_human action label '{action.label}' must be <= {self.config.max_action_label_chars} characters" + ) + + return args.model_copy(update={"actions": normalized_actions}) + + +def validate_ask_human_layer_composition(composition: RunComposition) -> None: + """Reject unsupported public ask-human layer graph shapes.""" + ask_human_layers = [layer.name for layer in composition.layers if layer.type == DIFY_ASK_HUMAN_LAYER_TYPE_ID] + if len(ask_human_layers) > 1: + names = ", ".join(ask_human_layers) + raise ValueError(f"Only one '{DIFY_ASK_HUMAN_LAYER_TYPE_ID}' layer is supported. Found layers: {names}.") + + +def get_ask_human_layer(run: Any) -> DifyAskHumanLayer | None: + """Return the active ask-human layer when one is present and enabled.""" + matched: list[DifyAskHumanLayer] = [] + for slot in run.slots.values(): + layer = slot.layer + if isinstance(layer, DifyAskHumanLayer): + matched.append(layer) + if not matched: + return None + if len(matched) > 1: + raise ValueError(f"Only one '{DIFY_ASK_HUMAN_LAYER_TYPE_ID}' layer is supported per run.") + + layer = matched[0] + return layer if layer.config.enabled else None + + +def _validate_tool_args_payload(raw_args: str | dict[str, Any]) -> AskHumanToolArgs: + if isinstance(raw_args, str): + return AskHumanToolArgs.model_validate_json(raw_args or "{}") + return AskHumanToolArgs.model_validate(raw_args or {}) + + +__all__ = [ + "DifyAskHumanLayer", + "get_ask_human_layer", + "validate_ask_human_layer_composition", +] diff --git a/dify-agent/src/dify_agent/layers/ask_human/schema.py b/dify-agent/src/dify_agent/layers/ask_human/schema.py new file mode 100644 index 00000000000..59fb9d3fc70 --- /dev/null +++ b/dify-agent/src/dify_agent/layers/ask_human/schema.py @@ -0,0 +1,234 @@ +"""Product-neutral schemas for the Dify ask-human deferred tool contract. + +These models describe the model-facing tool arguments and the later human result +payload expected by resumed runs. Config-specific guardrails such as maximum +counts, allowed field types, or per-install length limits are enforced by +``dify_agent.layers.ask_human.layer`` so this module stays import-safe for +client code that only needs the stable wire/schema shapes. +""" + +from __future__ import annotations + +import re +from typing import Annotated, ClassVar, Literal + +from pydantic import BaseModel, ConfigDict, Field, JsonValue, ValidationInfo, field_validator, model_validator + + +_IDENTIFIER_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + +type AskHumanFieldType = Literal["paragraph", "select", "file", "file-list"] +type AskHumanActionStyle = Literal["default", "primary", "destructive"] +type AskHumanUrgency = Literal["normal", "high"] +type AskHumanResultStatus = Literal["submitted", "timeout", "cancelled", "unavailable"] + + +def is_valid_identifier(value: str) -> bool: + """Return whether ``value`` matches the stable ask-human identifier rules.""" + return bool(_IDENTIFIER_PATTERN.fullmatch(value)) + + +def _require_non_blank(value: str, *, label: str) -> str: + if not value.strip(): + raise ValueError(f"{label} must not be blank") + return value + + +class AskHumanSelectOption(BaseModel): + """One selectable option for an ask-human select field.""" + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + value: str = Field(min_length=1) + label: str = Field(min_length=1) + + @field_validator("value", "label") + @classmethod + def _validate_non_blank(cls, value: str, info: ValidationInfo) -> str: + return _require_non_blank(value, label=f"select option {info.field_name}") + + +class AskHumanFieldBase(BaseModel): + """Shared field properties for ask-human form fields.""" + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + name: str = Field(min_length=1) + label: str = Field(min_length=1) + required: bool = False + + @field_validator("name") + @classmethod + def _validate_name(cls, value: str) -> str: + if not is_valid_identifier(value): + raise ValueError("field name must be a valid identifier") + return value + + @field_validator("label") + @classmethod + def _validate_label(cls, value: str) -> str: + return _require_non_blank(value, label="field label") + + +class AskHumanParagraphField(AskHumanFieldBase): + """Free-text paragraph field.""" + + type: Literal["paragraph"] = "paragraph" + placeholder: str | None = None + default: str | None = None + + +class AskHumanSelectField(AskHumanFieldBase): + """Single-choice select field.""" + + type: Literal["select"] = "select" + options: list[AskHumanSelectOption] = Field(default_factory=list) + default: str | None = None + + @model_validator(mode="after") + def _validate_options(self) -> AskHumanSelectField: + if not self.options: + raise ValueError("select fields must define at least one option") + + seen_values: set[str] = set() + for option in self.options: + if option.value in seen_values: + raise ValueError(f"select field '{self.name}' contains duplicate option value '{option.value}'") + seen_values.add(option.value) + + if self.default is not None and self.default not in seen_values: + raise ValueError(f"select field '{self.name}' default must match one of its option values") + return self + + +class AskHumanFileField(AskHumanFieldBase): + """Single-file upload field.""" + + type: Literal["file"] = "file" + + +class AskHumanFileListField(AskHumanFieldBase): + """Multi-file upload field.""" + + type: Literal["file-list"] = "file-list" + max_files: int | None = Field(default=None, ge=1) + + +type AskHumanField = Annotated[ + AskHumanParagraphField | AskHumanSelectField | AskHumanFileField | AskHumanFileListField, + Field(discriminator="type"), +] + + +class AskHumanAction(BaseModel): + """One human-visible action rendered with an ask-human request.""" + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + id: str = Field(min_length=1) + label: str = Field(min_length=1) + style: AskHumanActionStyle = "default" + + @field_validator("id") + @classmethod + def _validate_id(cls, value: str) -> str: + if not is_valid_identifier(value): + raise ValueError("action id must be a valid identifier") + return value + + @field_validator("label") + @classmethod + def _validate_label(cls, value: str) -> str: + return _require_non_blank(value, label="action label") + + +class AskHumanSelectedAction(BaseModel): + """Action metadata returned with a human-submitted result.""" + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + id: str = Field(min_length=1) + label: str = Field(min_length=1) + + @field_validator("id") + @classmethod + def _validate_id(cls, value: str) -> str: + if not is_valid_identifier(value): + raise ValueError("selected action id must be a valid identifier") + return value + + @field_validator("label") + @classmethod + def _validate_label(cls, value: str) -> str: + return _require_non_blank(value, label="selected action label") + + +class AskHumanToolArgs(BaseModel): + """Arguments accepted by the ask-human external deferred tool.""" + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + title: str | None = None + question: str = Field(min_length=1) + markdown: str | None = None + fields: list[AskHumanField] = Field(default_factory=list) + actions: list[AskHumanAction] = Field(default_factory=list) + urgency: AskHumanUrgency = "normal" + + @field_validator("question") + @classmethod + def _validate_question(cls, value: str) -> str: + return _require_non_blank(value, label="question") + + @field_validator("title") + @classmethod + def _validate_title(cls, value: str | None) -> str | None: + if value is None: + return None + return _require_non_blank(value, label="title") + + @model_validator(mode="after") + def _validate_unique_ids(self) -> AskHumanToolArgs: + field_names: set[str] = set() + for field in self.fields: + if field.name in field_names: + raise ValueError(f"field name '{field.name}' must be unique") + field_names.add(field.name) + + action_ids: set[str] = set() + for action in self.actions: + if action.id in action_ids: + raise ValueError(f"action id '{action.id}' must be unique") + action_ids.add(action.id) + return self + + +class AskHumanToolResult(BaseModel): + """Expected value shape for a later deferred ask-human tool result.""" + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + status: AskHumanResultStatus + action: AskHumanSelectedAction | None = None + values: dict[str, JsonValue] = Field(default_factory=dict) + message: str | None = None + rendered_content: str | None = None + + +__all__ = [ + "AskHumanAction", + "AskHumanActionStyle", + "AskHumanField", + "AskHumanFieldType", + "AskHumanFileField", + "AskHumanFileListField", + "AskHumanParagraphField", + "AskHumanResultStatus", + "AskHumanSelectField", + "AskHumanSelectOption", + "AskHumanSelectedAction", + "AskHumanToolArgs", + "AskHumanToolResult", + "AskHumanUrgency", + "is_valid_identifier", +] diff --git a/dify-agent/src/dify_agent/protocol/__init__.py b/dify-agent/src/dify_agent/protocol/__init__.py index c31800e3bf9..e179d96793b 100644 --- a/dify-agent/src/dify_agent/protocol/__init__.py +++ b/dify-agent/src/dify_agent/protocol/__init__.py @@ -14,6 +14,8 @@ from .schemas import ( CancelRunResponse, CreateRunRequest, CreateRunResponse, + DeferredToolCallPayload, + DeferredToolResultsPayload, EmptyRunEventData, LayerExitSignals, PydanticAIStreamRunEvent, @@ -25,8 +27,6 @@ from .schemas import ( RunEventsResponse, RunFailedEvent, RunFailedEventData, - RunPausedEvent, - RunPausedEventData, RunPurpose, RunLayerSpec, RunStartedEvent, @@ -44,6 +44,8 @@ __all__ = [ "CancelRunResponse", "CreateRunRequest", "CreateRunResponse", + "DeferredToolCallPayload", + "DeferredToolResultsPayload", "DIFY_AGENT_HISTORY_LAYER_ID", "DIFY_AGENT_MODEL_LAYER_ID", "DIFY_AGENT_OUTPUT_LAYER_ID", @@ -59,8 +61,6 @@ __all__ = [ "RunEventsResponse", "RunFailedEvent", "RunFailedEventData", - "RunPausedEvent", - "RunPausedEventData", "RunPurpose", "RunLayerSpec", "RunStartedEvent", diff --git a/dify-agent/src/dify_agent/protocol/schemas.py b/dify-agent/src/dify_agent/protocol/schemas.py index 9a989976c71..77501942666 100644 --- a/dify-agent/src/dify_agent/protocol/schemas.py +++ b/dify-agent/src/dify_agent/protocol/schemas.py @@ -5,9 +5,9 @@ producers, storage adapters, and Python client. Create-run requests expose a Dify-friendly ``composition.layers[].config`` shape so callers can describe one layer in one place; the server normalizes that public DTO into Agenton's state-only ``CompositorConfig`` plus node-name keyed per-run configs before -calling ``Compositor.enter(configs=...)``. Session snapshots and ``on_exit`` stay -top-level because they are per-run resume state and exit policy, not graph node -definition. +calling ``Compositor.enter(configs=...)``. Session snapshots, deferred tool +results, and ``on_exit`` stay top-level because they are per-run resume state or +exit policy, not graph node definition. The server still constructs layers only from explicit provider type ids, keeping HTTP input data-only and preventing unsafe import-path construction. Run events @@ -22,21 +22,25 @@ by ``DIFY_AGENT_MODEL_LAYER_ID``, the optional history layer named by by ``DIFY_AGENT_OUTPUT_LAYER_ID``. Request-level ``on_exit`` signals decide whether each active layer is suspended or deleted when the run exits, with suspend as the default so successful terminal events can include resumable -snapshots. Successful runs publish the final JSON-safe agent output and the -resumable Agenton session snapshot together on the terminal ``run_succeeded`` -event so consumers can treat terminal events as complete run summaries. Session -snapshots carry only layer lifecycle/runtime state in compositor order; they do -not persist output-layer config. Resumed structured-output runs therefore must -resubmit the same ``output`` layer in ``composition.layers[]`` so snapshot layer -name/order still matches the composition and the runtime can rebuild the same -structured output contract. +snapshots. Successful runs always publish the resumable Agenton session snapshot +on the terminal ``run_succeeded`` event together with exactly one of the final +JSON-safe ``output`` or a deferred external ``deferred_tool_call`` payload. That +lets consumers treat terminal success events as complete run summaries without a +separate pause protocol. Session snapshots carry only layer lifecycle/runtime +state in compositor order; they do not persist output-layer config. Resumed +structured-output runs therefore must resubmit the same ``output`` layer in +``composition.layers[]`` so snapshot layer name/order still matches the +composition and the runtime can rebuild the same structured output contract. """ +from __future__ import annotations + from datetime import datetime, timezone from typing import Annotated, ClassVar, Final, Literal, TypeAlias -from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter +from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter, model_serializer, model_validator from pydantic_ai.messages import AgentStreamEvent +from pydantic_ai.tools import DeferredToolResults from agenton.compositor import CompositorConfig, CompositorSessionSnapshot, LayerConfigInput, LayerNodeConfig from agenton.layers import ExitIntent @@ -45,12 +49,11 @@ from agenton.layers import ExitIntent DIFY_AGENT_MODEL_LAYER_ID: Final[str] = "llm" DIFY_AGENT_HISTORY_LAYER_ID: Final[str] = "history" DIFY_AGENT_OUTPUT_LAYER_ID: Final[str] = "output" -RunStatus = Literal["running", "paused", "succeeded", "failed", "cancelled"] +RunStatus = Literal["running", "succeeded", "failed", "cancelled"] RunPurpose = Literal["workflow_node", "single_step", "agent_app", "babysit", "fasten_preview"] RunEventType = Literal[ "run_started", "pydantic_ai_event", - "run_paused", "run_succeeded", "run_failed", "run_cancelled", @@ -121,7 +124,15 @@ class CreateRunRequest(BaseModel): keep snapshot compatibility and rebuild the output schema. Dify tenant, user, and run-correlation identifiers must be submitted through a ``dify.execution_context`` entry in ``composition.layers[]``; there is no - parallel top-level ``execution_context`` request field. + parallel top-level ``execution_context`` request field. External deferred + tool continuation input belongs in the top-level ``deferred_tool_results`` + field rather than inside composition. Resume requests are therefore expected + to pair a prior ``session_snapshot`` with the same logical composition so + Agenton can rebuild the same layers and message history. For ask-human + continuation specifically, the matching pending tool call must still exist + in prior history state; callers should keep the history layer active across + runs so deferred tool results can be matched against the original model + response instead of starting a fresh user-prompt turn. """ composition: RunComposition @@ -129,6 +140,7 @@ class CreateRunRequest(BaseModel): idempotency_key: str | None = None metadata: dict[str, JsonValue] = Field(default_factory=dict) session_snapshot: CompositorSessionSnapshot | None = None + deferred_tool_results: DeferredToolResultsPayload | None = None on_exit: LayerExitSignals = Field(default_factory=LayerExitSignals) model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") @@ -213,14 +225,59 @@ class EmptyRunEventData(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") -class RunSucceededEventData(BaseModel): - """Terminal success payload for final output and resumable session state.""" +class DeferredToolResultsPayload(BaseModel): + """Public JSON-safe DTO for deferred external tool results supplied on resume.""" - output: JsonValue + calls: dict[str, JsonValue] = Field(default_factory=dict) + metadata: dict[str, dict[str, JsonValue]] = Field(default_factory=dict) + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + def to_pydantic_ai(self) -> DeferredToolResults: + """Convert the public DTO into pydantic-ai's resume input dataclass.""" + return DeferredToolResults( + calls=dict(self.calls), + metadata={key: dict(value) for key, value in self.metadata.items()}, + ) + + +class DeferredToolCallPayload(BaseModel): + """Terminal success payload for one deferred external tool request.""" + + tool_call_id: str + tool_name: str + args: JsonValue + metadata: dict[str, JsonValue] = Field(default_factory=dict) + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + +class RunSucceededEventData(BaseModel): + """Terminal success payload for final output or deferred tool continuation.""" + + output: JsonValue | None = None + deferred_tool_call: DeferredToolCallPayload | None = None session_snapshot: CompositorSessionSnapshot model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + @model_validator(mode="after") + def _validate_result_shape(self) -> RunSucceededEventData: + has_output = "output" in self.model_fields_set + has_deferred_tool_call = "deferred_tool_call" in self.model_fields_set + if has_output == has_deferred_tool_call: + raise ValueError("Exactly one of output or deferred_tool_call must be set") + return self + + @model_serializer(mode="plain") + def _serialize_active_result(self) -> dict[str, object]: + data: dict[str, object] = {"session_snapshot": self.session_snapshot} + if "output" in self.model_fields_set: + data["output"] = self.output + if "deferred_tool_call" in self.model_fields_set: + data["deferred_tool_call"] = self.deferred_tool_call + return data + class RunFailedEventData(BaseModel): """Terminal failure payload shown to polling and SSE consumers.""" @@ -231,16 +288,6 @@ class RunFailedEventData(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") -class RunPausedEventData(BaseModel): - """Pause payload used for human handoff or other resumable waits.""" - - reason: str - message: str | None = None - session_snapshot: CompositorSessionSnapshot | None = None - - model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") - - class RunCancelledEventData(BaseModel): """Terminal cancellation payload for explicit user/operator cancellation.""" @@ -288,13 +335,6 @@ class RunFailedEvent(BaseRunEvent): data: RunFailedEventData -class RunPausedEvent(BaseRunEvent): - """Resumable pause event emitted when a run waits for outside input.""" - - type: Literal["run_paused"] = "run_paused" - data: RunPausedEventData - - class RunCancelledEvent(BaseRunEvent): """Terminal cancellation event emitted after an explicit cancel request.""" @@ -303,12 +343,7 @@ class RunCancelledEvent(BaseRunEvent): RunEvent: TypeAlias = Annotated[ - RunStartedEvent - | PydanticAIStreamRunEvent - | RunPausedEvent - | RunSucceededEvent - | RunFailedEvent - | RunCancelledEvent, + RunStartedEvent | PydanticAIStreamRunEvent | RunSucceededEvent | RunFailedEvent | RunCancelledEvent, Field(discriminator="type"), ] RUN_EVENT_ADAPTER: TypeAdapter[RunEvent] = TypeAdapter(RunEvent) @@ -330,6 +365,8 @@ __all__ = [ "CancelRunResponse", "CreateRunRequest", "CreateRunResponse", + "DeferredToolCallPayload", + "DeferredToolResultsPayload", "DIFY_AGENT_HISTORY_LAYER_ID", "DIFY_AGENT_MODEL_LAYER_ID", "DIFY_AGENT_OUTPUT_LAYER_ID", @@ -345,8 +382,6 @@ __all__ = [ "RunEventsResponse", "RunFailedEvent", "RunFailedEventData", - "RunPausedEvent", - "RunPausedEventData", "RunPurpose", "RunStartedEvent", "RunStatus", diff --git a/dify-agent/src/dify_agent/runtime/compositor_factory.py b/dify-agent/src/dify_agent/runtime/compositor_factory.py index 8454513af22..959a1329ac6 100644 --- a/dify-agent/src/dify_agent/runtime/compositor_factory.py +++ b/dify-agent/src/dify_agent/runtime/compositor_factory.py @@ -2,8 +2,9 @@ Only explicitly allowed provider type ids are constructible here. The default provider set contains prompt layers, the optional pydantic-ai history layer, the -state-free Dify structured output layer, the Dify execution-context layer, the -stateful Dify shell layer, and the Dify plugin business-layer family: +state-free Dify structured output layer, the optional Dify ask-human layer, the +Dify execution-context layer, the stateful Dify shell layer, and the Dify +plugin business-layer family: - ``dify.execution_context`` for shared tenant/user/run daemon context, - ``dify.shell`` for shellctl-backed shell job control, @@ -33,6 +34,7 @@ from agenton_collections.layers.plain.basic import PromptLayer from agenton_collections.transformers.pydantic_ai import PYDANTIC_AI_TRANSFORMERS from dify_agent.agent_stub.server.shell_agent_stub_env import ShellAgentStubTokenFactory from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubTokenCodec +from dify_agent.layers.ask_human.layer import DifyAskHumanLayer from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer from dify_agent.layers.dify_plugin.tools_layer import DifyPluginToolsLayer from dify_agent.layers.execution_context.configs import DifyExecutionContextLayerConfig @@ -80,6 +82,7 @@ def create_default_layer_providers( LayerProvider.from_layer_type(PromptLayer), LayerProvider.from_layer_type(PydanticAIHistoryLayer), LayerProvider.from_layer_type(DifyOutputLayer), + LayerProvider.from_layer_type(DifyAskHumanLayer), LayerProvider.from_factory( layer_type=DifyExecutionContextLayer, create=lambda config: DifyExecutionContextLayer.from_config_with_settings( diff --git a/dify-agent/src/dify_agent/runtime/event_sink.py b/dify-agent/src/dify_agent/runtime/event_sink.py index 6567189c699..80cbf76cbd9 100644 --- a/dify-agent/src/dify_agent/runtime/event_sink.py +++ b/dify-agent/src/dify_agent/runtime/event_sink.py @@ -3,19 +3,21 @@ The runner only needs append-only event writes and status transitions, so tests can use ``InMemoryRunEventSink`` without Redis. Production storage implements the same protocol with Redis streams in ``dify_agent.storage.redis_run_store``. The -terminal success helper writes the final JSON-safe output and session snapshot in -one event so event consumers can stop at ``run_succeeded`` without correlating -separate payload events. +terminal success helper writes either the final JSON-safe output or one deferred +tool request together with the resumable session snapshot in a single event so +consumers can stop at ``run_succeeded`` without correlating separate payload +events. """ from collections import defaultdict -from typing import Protocol +from typing import Protocol, cast from pydantic import JsonValue from pydantic_ai.messages import AgentStreamEvent from agenton.compositor import CompositorSessionSnapshot from dify_agent.protocol.schemas import ( + DeferredToolCallPayload, EmptyRunEventData, PydanticAIStreamRunEvent, RunEvent, @@ -29,6 +31,9 @@ from dify_agent.protocol.schemas import ( ) +_UNSET = object() + + class RunEventSink(Protocol): """Boundary used by runtime code to publish observable run progress.""" @@ -95,15 +100,31 @@ async def emit_run_succeeded( sink: RunEventSink, *, run_id: str, - output: JsonValue, + output: JsonValue | None | object = _UNSET, + deferred_tool_call: DeferredToolCallPayload | object = _UNSET, session_snapshot: CompositorSessionSnapshot, ) -> str: - """Emit the terminal success event with output and resumable state.""" + """Emit the terminal success event with output or deferred continuation. + + Callers must activate exactly one result branch. ``_UNSET`` is used instead + of ``None`` to preserve the distinction between an omitted inactive branch + and an active ``output`` branch whose JSON value is explicitly ``null``. + Without that sentinel, ``output=None`` would be indistinguishable from + “output field absent”, which would break nullable-success payloads. + """ + data: dict[str, JsonValue | DeferredToolCallPayload | CompositorSessionSnapshot | None] = { + "session_snapshot": session_snapshot, + } + if output is not _UNSET: + data["output"] = cast(JsonValue | None, output) + if deferred_tool_call is not _UNSET: + data["deferred_tool_call"] = cast(DeferredToolCallPayload, deferred_tool_call) + return await emit_run_event( sink, event=RunSucceededEvent( run_id=run_id, - data=RunSucceededEventData(output=output, session_snapshot=session_snapshot), + data=RunSucceededEventData.model_validate(data), created_at=utc_now(), ), ) diff --git a/dify-agent/src/dify_agent/runtime/runner.py b/dify-agent/src/dify_agent/runtime/runner.py index 9458b5e7e33..8a6d7b9bd91 100644 --- a/dify-agent/src/dify_agent/runtime/runner.py +++ b/dify-agent/src/dify_agent/runtime/runner.py @@ -3,36 +3,47 @@ The runner is storage-agnostic: it normalizes the public Dify composition into Agenton's graph/config split, enters a fresh ``CompositorRun`` (or resumes one from a snapshot), renders the current Dify system prompts into temporary -``message_history``, runs pydantic-ai with ``run.user_prompts`` as the current -user input, emits stream events, applies request-level ``on_exit`` signals, and -then publishes a terminal success or failure event. The Pydantic AI model is -resolved from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID``. -An optional history layer contributes stored message history only through -session state; successful runs append only ``result.new_messages()`` back into -that layer so current system prompts are not persisted. An optional structured -output layer named by ``DIFY_AGENT_OUTPUT_LAYER_ID`` is read after entry and -resolved into an output contract whose type both exposes the output schema to -the model and performs runtime JSON Schema validation through custom Pydantic -hooks. Invalid structured outputs therefore trigger Pydantic AI's normal -output-validation retry behavior before Dify Agent emits ``run_succeeded``. -Layers still never own the FastAPI lifespan-owned plugin daemon HTTP client. -Successful terminal events contain both the JSON-safe final output and session -snapshot; there are no separate output or snapshot events to correlate. +``message_history``, runs pydantic-ai with either the current ``run.user_prompts`` +or deferred external tool results, emits stream events, applies request-level +``on_exit`` signals, and then publishes a terminal success or failure event. The +Pydantic AI model is resolved from the active Agenton layer named by +``DIFY_AGENT_MODEL_LAYER_ID``. An optional history layer contributes stored +message history only through session state; successful runs append only +``result.new_messages()`` back into that layer so current system prompts are not +persisted. An optional structured output layer named by +``DIFY_AGENT_OUTPUT_LAYER_ID`` is read after entry and resolved into an output +contract whose type both exposes the output schema to the model and performs +runtime JSON Schema validation through custom Pydantic hooks. When the ask-human +layer is active, the runtime also allows ``DeferredToolRequests`` output and +publishes that deferred request through the normal ``run_succeeded`` event as +``deferred_tool_call`` instead of a final ``output``. Invalid structured outputs +or invalid deferred-tool behavior still trigger normal retries/failures before +Dify Agent emits success. Layers still never own the FastAPI lifespan-owned +plugin daemon HTTP client. """ from collections.abc import AsyncIterable from collections import Counter -from typing import Any, cast +from dataclasses import dataclass +from typing import Any, Literal, cast import httpx from pydantic import JsonValue, TypeAdapter from pydantic_ai.messages import AgentStreamEvent +from pydantic_ai.output import OutputSpec +from pydantic_ai.tools import DeferredToolRequests, DeferredToolResults from agenton.compositor import CompositorSessionSnapshot, LayerProviderInput from agenton.layers.types import PydanticAITool +from dify_agent.layers.ask_human.layer import get_ask_human_layer, validate_ask_human_layer_composition from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer from dify_agent.layers.dify_plugin.tools_layer import DifyPluginToolsLayer -from dify_agent.protocol.schemas import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, normalize_composition +from dify_agent.protocol.schemas import ( + CreateRunRequest, + DIFY_AGENT_MODEL_LAYER_ID, + DeferredToolCallPayload, + normalize_composition, +) from dify_agent.runtime.agent_factory import create_agent, normalize_user_input from dify_agent.runtime.agenton_validation import is_agenton_enter_validation_runtime_error from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_providers @@ -61,6 +72,16 @@ class AgentRunValidationError(ValueError): """Raised when a run request is valid JSON but cannot execute.""" +@dataclass(slots=True) +class RunSuccessOutcome: + """Normalized successful runner output before event emission.""" + + result_kind: Literal["output", "deferred_tool_call"] + output: JsonValue | None + deferred_tool_call: DeferredToolCallPayload | None + session_snapshot: CompositorSessionSnapshot + + class AgentRunRunner: """Executes one run and writes only public run events to its sink.""" @@ -92,7 +113,7 @@ class AgentRunRunner: _ = await emit_run_started(self.sink, run_id=self.run_id) try: - output, session_snapshot = await self._run_agent() + outcome = await self._run_agent() except Exception as exc: message = str(exc) or type(exc).__name__ _ = await emit_run_failed(self.sink, run_id=self.run_id, error=message) @@ -102,12 +123,16 @@ class AgentRunRunner: _ = await emit_run_succeeded( self.sink, run_id=self.run_id, - output=output, - session_snapshot=session_snapshot, + **( + {"output": outcome.output} + if outcome.result_kind == "output" + else {"deferred_tool_call": outcome.deferred_tool_call} + ), + session_snapshot=outcome.session_snapshot, ) await self.sink.update_status(self.run_id, "succeeded") - async def _run_agent(self) -> tuple[JsonValue, CompositorSessionSnapshot]: + async def _run_agent(self) -> RunSuccessOutcome: """Run pydantic-ai inside an entered Agenton run. Known request-shaped Agenton enter-time failures are normalized to @@ -128,6 +153,7 @@ class AgentRunRunner: try: validate_output_layer_composition(self.request.composition) validate_history_layer_composition(self.request.composition) + validate_ask_human_layer_composition(self.request.composition) graph_config, layer_configs = normalize_composition(self.request.composition) compositor = build_pydantic_ai_compositor(graph_config, providers=self.layer_providers) validate_layer_exit_signals(compositor, self.request.on_exit) @@ -135,12 +161,16 @@ class AgentRunRunner: raise AgentRunValidationError(str(exc)) from exc entered_run = False + output: JsonValue | None = None + deferred_tool_call: DeferredToolCallPayload | None = None + result_kind: Literal["output", "deferred_tool_call"] | None = None try: async with compositor.enter(configs=layer_configs, session_snapshot=self.request.session_snapshot) as run: entered_run = True apply_layer_exit_signals(run, self.request.on_exit) user_prompts = run.user_prompts - if not has_non_blank_user_prompt(user_prompts): + deferred_tool_results = _resolve_deferred_tool_results(self.request) + if deferred_tool_results is None and not has_non_blank_user_prompt(user_prompts): raise AgentRunValidationError(EMPTY_USER_PROMPTS_ERROR) async def handle_events(_ctx: object, events: AsyncIterable[AgentStreamEvent]) -> None: @@ -154,24 +184,44 @@ class AgentRunRunner: system_prompts=run.prompts, stored_history=history_layer.message_history if history_layer is not None else (), ) + ask_human_layer = get_ask_human_layer(run) llm_layer = run.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer) model = llm_layer.get_model(http_client=self.plugin_daemon_http_client) tools = await _resolve_run_tools(run, http_client=self.plugin_daemon_http_client) except (KeyError, TypeError, RuntimeError, ValueError) as exc: raise AgentRunValidationError(str(exc)) from exc + if deferred_tool_results is not None and history_layer is None: + raise AgentRunValidationError( + "Deferred tool results require a 'history' layer with prior message history." + ) + agent = create_agent( model, tools=tools, - output_type=output_contract.output_type, + output_type=_resolve_agent_output_type(output_contract.output_type, ask_human_layer is not None), ) result = await agent.run( - normalize_user_input(user_prompts), + None if deferred_tool_results is not None else normalize_user_input(user_prompts), message_history=message_history, + deferred_tool_results=deferred_tool_results, event_stream_handler=handle_events, ) - output = _serialize_agent_output(result.output) append_successful_run_history(history_layer, result.new_messages()) + if isinstance(result.output, DeferredToolRequests): + if ask_human_layer is None: + raise AgentRunValidationError( + "Deferred tool requests were returned, but no active ask_human layer is available for validation." + ) + if history_layer is None: + raise AgentRunValidationError( + "ask_human deferred tool requests require a 'history' layer so the pending tool call can be resumed." + ) + deferred_tool_call = ask_human_layer.build_deferred_tool_call_payload(result.output) + result_kind = "deferred_tool_call" + else: + output = _serialize_agent_output(result.output) + result_kind = "output" except RuntimeError as exc: if not entered_run and is_agenton_enter_validation_runtime_error(exc): raise AgentRunValidationError(str(exc)) from exc @@ -183,8 +233,15 @@ class AgentRunRunner: if run.session_snapshot is None: raise RuntimeError("Agenton run did not produce a session snapshot after exit.") + if result_kind is None: + raise RuntimeError("Agent run did not resolve either a final output or a deferred tool call.") - return output, run.session_snapshot + return RunSuccessOutcome( + result_kind=result_kind, + output=output, + deferred_tool_call=deferred_tool_call, + session_snapshot=run.session_snapshot, + ) def _serialize_agent_output(output: object) -> JsonValue: @@ -192,6 +249,20 @@ def _serialize_agent_output(output: object) -> JsonValue: return cast(JsonValue, _AGENT_OUTPUT_ADAPTER.dump_python(output, mode="json")) +def _resolve_agent_output_type(output_type: OutputSpec[object], allow_deferred_tools: bool) -> OutputSpec[object]: + """Return the run output type, optionally augmented with deferred-tool support.""" + if not allow_deferred_tools: + return output_type + return cast(OutputSpec[object], [output_type, DeferredToolRequests]) + + +def _resolve_deferred_tool_results(request: CreateRunRequest) -> DeferredToolResults | None: + """Convert public deferred tool results into the pydantic-ai resume input.""" + if request.deferred_tool_results is None: + return None + return request.deferred_tool_results.to_pydantic_ai() + + async def _resolve_run_tools( run: Any, *, diff --git a/dify-agent/tests/local/dify_agent/layers/ask_human/test_configs.py b/dify-agent/tests/local/dify_agent/layers/ask_human/test_configs.py new file mode 100644 index 00000000000..f9b42220e03 --- /dev/null +++ b/dify-agent/tests/local/dify_agent/layers/ask_human/test_configs.py @@ -0,0 +1,73 @@ +import pytest +from pydantic import ValidationError + +import dify_agent.layers.ask_human as ask_human_exports +from dify_agent.layers.ask_human import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig +from dify_agent.layers.ask_human.schema import AskHumanToolArgs + + +def test_ask_human_package_exports_client_safe_symbols_only() -> None: + assert ask_human_exports.DIFY_ASK_HUMAN_LAYER_TYPE_ID == "dify.ask_human" + assert ask_human_exports.__all__ == [ + "AskHumanAction", + "AskHumanActionStyle", + "AskHumanField", + "AskHumanFieldType", + "AskHumanFileField", + "AskHumanFileListField", + "AskHumanParagraphField", + "AskHumanResultStatus", + "AskHumanSelectField", + "AskHumanSelectOption", + "AskHumanSelectedAction", + "AskHumanToolArgs", + "AskHumanToolResult", + "AskHumanUrgency", + "DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION", + "DIFY_ASK_HUMAN_LAYER_TYPE_ID", + "DifyAskHumanLayerConfig", + ] + assert not hasattr(ask_human_exports, "DifyAskHumanLayer") + + +def test_ask_human_layer_config_defaults_and_effective_description() -> None: + config = DifyAskHumanLayerConfig() + + assert DIFY_ASK_HUMAN_LAYER_TYPE_ID == "dify.ask_human" + assert config.model_dump(mode="json") == { + "enabled": True, + "tool_name": "ask_human", + "tool_description": None, + "max_fields": 8, + "max_actions": 4, + "allowed_field_types": ["paragraph", "select"], + "allow_file_fields": False, + "max_markdown_chars": 8000, + "max_question_chars": 1000, + "max_field_label_chars": 120, + "max_action_label_chars": 80, + } + assert "Ask a human for missing information" in config.effective_tool_description + + +def test_ask_human_layer_config_rejects_invalid_tool_name() -> None: + with pytest.raises(ValidationError, match="tool_name must be a valid tool identifier"): + _ = DifyAskHumanLayerConfig(tool_name="ask-human") + + +def test_ask_human_layer_config_rejects_file_field_types_when_disabled() -> None: + with pytest.raises(ValidationError, match="cannot include file field types"): + _ = DifyAskHumanLayerConfig(allowed_field_types=["paragraph", "file"]) + + +def test_ask_human_tool_args_reject_duplicate_field_names() -> None: + with pytest.raises(ValidationError, match="field name 'comment' must be unique"): + _ = AskHumanToolArgs.model_validate( + { + "question": "Need a reply", + "fields": [ + {"type": "paragraph", "name": "comment", "label": "Comment"}, + {"type": "paragraph", "name": "comment", "label": "Another comment"}, + ], + } + ) diff --git a/dify-agent/tests/local/dify_agent/layers/ask_human/test_layer.py b/dify-agent/tests/local/dify_agent/layers/ask_human/test_layer.py new file mode 100644 index 00000000000..903a3164a01 --- /dev/null +++ b/dify-agent/tests/local/dify_agent/layers/ask_human/test_layer.py @@ -0,0 +1,152 @@ +import asyncio +import inspect +from collections.abc import Awaitable +from typing import Any, cast + +import pytest +from pydantic_ai.messages import ToolCallPart +from pydantic_ai.tools import DeferredToolRequests, ToolDefinition + +from dify_agent.layers.ask_human import DifyAskHumanLayerConfig +from dify_agent.layers.ask_human.schema import AskHumanToolArgs +from dify_agent.layers.ask_human.layer import DifyAskHumanLayer + + +async def _await_tool_definition(value: Awaitable[ToolDefinition | None]) -> ToolDefinition | None: + return await value + + +def test_ask_human_layer_exposes_one_external_tool_and_prompt_hint() -> None: + config = DifyAskHumanLayerConfig( + tool_name="human_gate", + tool_description="Collect a human decision.", + max_fields=2, + max_actions=3, + allowed_field_types=["paragraph"], + allow_file_fields=False, + max_question_chars=240, + max_markdown_chars=512, + max_field_label_chars=32, + max_action_label_chars=16, + ) + layer = DifyAskHumanLayer.from_config(config) + + prompt_hint = layer.build_prompt_hint() + tool = layer.tools[0] + prepare = tool.prepare + assert prepare is not None + prepared_or_awaitable = prepare( + cast(Any, None), + ToolDefinition( + name=tool.name, description=tool.description, parameters_json_schema=tool.function_schema.json_schema + ), + ) + prepared = ( + asyncio.run(_await_tool_definition(cast(Awaitable[ToolDefinition | None], prepared_or_awaitable))) + if inspect.isawaitable(prepared_or_awaitable) + else prepared_or_awaitable + ) + + assert len(layer.prefix_prompts) == 1 + assert len(layer.tools) == 1 + assert "Allowed field types: paragraph." in prompt_hint + assert "File upload fields are disabled." in prompt_hint + assert "Use at most 2 field(s)." in prompt_hint + assert "Use at most 3 action(s)." in prompt_hint + assert "Keep 'question' under 240 characters." in prompt_hint + assert "Keep 'markdown' under 512 characters." in prompt_hint + assert "Keep each field label under 32 characters." in prompt_hint + assert "Keep each action label under 16 characters." in prompt_hint + assert prepared is not None + assert prepared.name == "human_gate" + assert prepared.description == "Collect a human decision." + assert prepared.kind == "external" + assert prepared.parameters_json_schema == AskHumanToolArgs.model_json_schema() + + +def test_ask_human_layer_normalizes_default_action_in_deferred_payload() -> None: + layer = DifyAskHumanLayer.from_config(DifyAskHumanLayerConfig()) + + payload = layer.build_deferred_tool_call_payload( + DeferredToolRequests( + calls=[ + ToolCallPart( + tool_name="ask_human", + args={ + "question": "Need a human answer", + "fields": [{"type": "paragraph", "name": "comment", "label": "Comment"}], + }, + tool_call_id="call-1", + ) + ] + ) + ) + + assert payload.tool_call_id == "call-1" + assert payload.tool_name == "ask_human" + assert payload.args == { + "title": None, + "question": "Need a human answer", + "markdown": None, + "fields": [ + { + "type": "paragraph", + "name": "comment", + "label": "Comment", + "required": False, + "placeholder": None, + "default": None, + } + ], + "actions": [{"id": "submit", "label": "Submit", "style": "primary"}], + "urgency": "normal", + } + assert payload.metadata == { + "layer_type": "dify.ask_human", + "tool_name": "ask_human", + "schema_version": 1, + } + + +def test_ask_human_layer_rejects_disallowed_field_types_in_deferred_payload() -> None: + layer = DifyAskHumanLayer.from_config(DifyAskHumanLayerConfig(allowed_field_types=["paragraph"])) + + with pytest.raises(ValueError, match="field type 'select' is not allowed"): + _ = layer.build_deferred_tool_call_payload( + DeferredToolRequests( + calls=[ + ToolCallPart( + tool_name="ask_human", + args={ + "question": "Need a choice", + "fields": [ + { + "type": "select", + "name": "decision", + "label": "Decision", + "options": [{"value": "yes", "label": "Yes"}], + } + ], + }, + tool_call_id="call-2", + ) + ] + ) + ) + + +def test_ask_human_layer_rejects_tool_name_mismatch_in_deferred_payload() -> None: + layer = DifyAskHumanLayer.from_config(DifyAskHumanLayerConfig(tool_name="human_gate")) + + with pytest.raises(ValueError, match="deferred tool name must be 'human_gate', got 'ask_human'"): + _ = layer.build_deferred_tool_call_payload( + DeferredToolRequests( + calls=[ + ToolCallPart( + tool_name="ask_human", + args={"question": "Need a human answer"}, + tool_call_id="call-3", + ) + ] + ) + ) diff --git a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py index 76606b9132c..b58e0818229 100644 --- a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py +++ b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py @@ -6,6 +6,7 @@ from agenton.compositor import CompositorSessionSnapshot from agenton.layers import ExitIntent from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig import dify_agent.protocol as protocol_exports +from dify_agent.layers.ask_human import DifyAskHumanLayerConfig from dify_agent.layers.execution_context import DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, DifyExecutionContextLayerConfig from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LLM_LAYER_TYPE_ID, DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig @@ -13,6 +14,7 @@ from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LA from dify_agent.protocol.schemas import ( RUN_EVENT_ADAPTER, CreateRunRequest, + DeferredToolCallPayload, LayerExitSignals, PydanticAIStreamRunEvent, RunCancelledEvent, @@ -21,8 +23,6 @@ from dify_agent.protocol.schemas import ( RunFailedEvent, RunFailedEventData, RunLayerSpec, - RunPausedEvent, - RunPausedEventData, RunStartedEvent, RunSucceededEvent, RunSucceededEventData, @@ -49,15 +49,19 @@ def test_run_event_adapter_round_trips_typed_variants() -> None: session_snapshot=CompositorSessionSnapshot(layers=[]), ), ), - RunFailedEvent(run_id="run-1", data=RunFailedEventData(error="boom", reason="shutdown")), - RunPausedEvent( - run_id="run-1", - data=RunPausedEventData( - reason="human_handoff", - message="Need review", + RunSucceededEvent( + run_id="run-2", + data=RunSucceededEventData( + deferred_tool_call=DeferredToolCallPayload( + tool_call_id="tool-call-1", + tool_name="ask_human", + args={"question": "Need approval"}, + metadata={"layer_type": "dify.ask_human"}, + ), session_snapshot=CompositorSessionSnapshot(layers=[]), ), ), + RunFailedEvent(run_id="run-1", data=RunFailedEventData(error="boom", reason="shutdown")), RunCancelledEvent(run_id="run-1", data=RunCancelledEventData(reason="user_cancelled")), ] @@ -204,6 +208,35 @@ def test_create_run_request_accepts_dto_first_public_composition_and_normalizes_ } +def test_create_run_request_accepts_deferred_tool_results_payload() -> None: + request = CreateRunRequest.model_validate( + { + "composition": { + "layers": [ + {"name": "prompt", "type": PLAIN_PROMPT_LAYER_TYPE_ID, "config": {"user": "hello"}}, + {"name": "ask_human", "type": "dify.ask_human", "config": DifyAskHumanLayerConfig().model_dump()}, + ] + }, + "deferred_tool_results": { + "calls": { + "tool-call-1": { + "status": "submitted", + "action": {"id": "submit", "label": "Submit"}, + "values": {"comment": "Looks good."}, + } + } + }, + } + ) + + assert request.deferred_tool_results is not None + assert request.deferred_tool_results.calls["tool-call-1"] == { + "status": "submitted", + "action": {"id": "submit", "label": "Submit"}, + "values": {"comment": "Looks good."}, + } + + def test_create_run_request_accepts_plugin_tools_layer_with_prepared_parameters_and_schema() -> None: request = CreateRunRequest.model_validate( { @@ -327,6 +360,49 @@ def test_on_exit_default_to_suspend_and_are_public() -> None: assert request.on_exit.layers == {} +def test_run_succeeded_event_data_requires_exactly_one_result_variant() -> None: + snapshot = CompositorSessionSnapshot(layers=[]) + + with pytest.raises(ValidationError, match="Exactly one of output or deferred_tool_call must be set"): + _ = RunSucceededEventData(session_snapshot=snapshot) + + with pytest.raises(ValidationError, match="Exactly one of output or deferred_tool_call must be set"): + _ = RunSucceededEventData( + output="done", + deferred_tool_call=DeferredToolCallPayload( + tool_call_id="tool-call-1", + tool_name="ask_human", + args={"question": "Need approval"}, + ), + session_snapshot=snapshot, + ) + + +def test_run_succeeded_event_data_allows_explicit_json_null_output() -> None: + snapshot = CompositorSessionSnapshot(layers=[]) + + data = RunSucceededEventData(output=None, session_snapshot=snapshot) + + assert data.output is None + assert data.deferred_tool_call is None + + +def test_run_succeeded_event_round_trips_explicit_json_null_output() -> None: + event = RunSucceededEvent( + run_id="run-null-output", + data=RunSucceededEventData(output=None, session_snapshot=CompositorSessionSnapshot(layers=[])), + ) + + payload = RUN_EVENT_ADAPTER.dump_json(event) + decoded = RUN_EVENT_ADAPTER.validate_json(payload) + + assert isinstance(decoded, RunSucceededEvent) + assert decoded.data.output is None + assert decoded.data.deferred_tool_call is None + assert b'"output":null' in payload + assert b'"deferred_tool_call"' not in payload + + def test_on_exit_accept_layer_overrides() -> None: request = CreateRunRequest.model_validate( { diff --git a/dify-agent/tests/local/dify_agent/runtime/test_runner.py b/dify-agent/tests/local/dify_agent/runtime/test_runner.py index 93f18446d55..c910b7c3dd9 100644 --- a/dify-agent/tests/local/dify_agent/runtime/test_runner.py +++ b/dify-agent/tests/local/dify_agent/runtime/test_runner.py @@ -1,5 +1,5 @@ import asyncio -from collections.abc import Mapping +from collections.abc import Iterable, Mapping from typing import Any, ClassVar, cast import httpx @@ -8,6 +8,7 @@ from pydantic import JsonValue from pydantic_ai import Tool from pydantic_ai.exceptions import UnexpectedModelBehavior from pydantic_ai.messages import ( + ToolReturnPart, ModelMessage, ModelRequest, ModelResponse, @@ -18,12 +19,14 @@ from pydantic_ai.messages import ( ) from pydantic_ai.models import ModelRequestParameters from pydantic_ai.models.test import TestModel +from pydantic_ai.tools import DeferredToolRequests, DeferredToolResults from pydantic_ai.settings import ModelSettings from agenton.compositor import CompositorSessionSnapshot, LayerProvider, LayerSessionSnapshot from agenton.layers import ExitIntent, LifecycleState from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, PydanticAIHistoryRuntimeState from agenton_collections.layers.plain import PromptLayerConfig, ToolsLayer +from dify_agent.layers.ask_human import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig from dify_agent.layers.execution_context import DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, DifyExecutionContextLayerConfig from dify_agent.layers.shell import DIFY_SHELL_LAYER_TYPE_ID, DifyShellLayerConfig from dify_agent.layers.shell.layer import DifyShellLayer @@ -42,6 +45,7 @@ from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerC from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID from dify_agent.protocol.schemas import ( CreateRunRequest, + DeferredToolResultsPayload, LayerExitSignals, RunComposition, RunLayerSpec, @@ -54,7 +58,7 @@ from shell_session_manager.shellctl.shared import DeleteJobResponse, JobResult, class StaticToolsTestLayer(ToolsLayer): - type_id: ClassVar[str] = "test.static.tools" + type_id: ClassVar[str | None] = "test.static.tools" class FakeRunnerShellctlClient: @@ -115,6 +119,8 @@ def _request( user: str | list[str] = "hello", *, include_history: bool = False, + include_ask_human: bool = False, + ask_human_config: DifyAskHumanLayerConfig | None = None, llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID, execution_context_layer_name: str = "execution_context", on_exit: LayerExitSignals | None = None, @@ -131,6 +137,17 @@ def _request( if include_history else [] ), + *( + [ + RunLayerSpec( + name="ask_human", + type=DIFY_ASK_HUMAN_LAYER_TYPE_ID, + config=ask_human_config or DifyAskHumanLayerConfig(), + ) + ] + if include_ask_human + else [] + ), RunLayerSpec( name=execution_context_layer_name, type=DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, @@ -270,6 +287,7 @@ class RecordingTestModel(TestModel): def _history_session_snapshot( messages: list[ModelMessage], *, + include_ask_human: bool = False, include_output: bool = False, ) -> CompositorSessionSnapshot: layers = [ @@ -279,6 +297,11 @@ def _history_session_snapshot( lifecycle_state=LifecycleState.SUSPENDED, runtime_state=PydanticAIHistoryRuntimeState(messages=messages).model_dump(mode="json"), ), + *( + [LayerSessionSnapshot(name="ask_human", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={})] + if include_ask_human + else [] + ), LayerSessionSnapshot(name="execution_context", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}), LayerSessionSnapshot( name=DIFY_AGENT_MODEL_LAYER_ID, lifecycle_state=LifecycleState.SUSPENDED, runtime_state={} @@ -302,6 +325,18 @@ def _flatten_message_parts(messages: list[ModelMessage]) -> list[object]: return [part for message in messages for part in message.parts] +class FakeAgentRunResult: + output: object + _new_messages: list[ModelMessage] + + def __init__(self, output: object, new_messages: list[ModelMessage]) -> None: + self.output = output + self._new_messages = new_messages + + def new_messages(self) -> list[ModelMessage]: + return list(self._new_messages) + + def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPatch) -> None: seen_clients: list[httpx.AsyncClient] = [] @@ -350,6 +385,489 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa assert sink.statuses["run-1"] == "succeeded" +def test_runner_preserves_explicit_json_null_output(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient): + assert http_client.is_closed is False + return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType] + + class FakeAgent: + async def run(self, *_args: object, **_kwargs: object) -> FakeAgentRunResult: + return FakeAgentRunResult(None, []) + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + monkeypatch.setattr("dify_agent.runtime.runner.create_agent", lambda *_args, **_kwargs: FakeAgent()) + request = _request() + sink = InMemoryRunEventSink() + + async def scenario() -> None: + async with httpx.AsyncClient() as client: + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-null-output", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) + + terminal = sink.events["run-null-output"][-1] + assert isinstance(terminal, RunSucceededEvent) + assert terminal.data.output is None + assert terminal.data.deferred_tool_call is None + assert sink.statuses["run-null-output"] == "succeeded" + + +def test_runner_emits_deferred_tool_call_and_persists_pending_history(monkeypatch: pytest.MonkeyPatch) -> None: + captured_output_types: list[object] = [] + captured_user_prompts: list[object] = [] + pending_tool_call = ToolCallPart( + tool_name="ask_human", + args={ + "question": "Which deployment window should we use?", + "fields": [{"type": "paragraph", "name": "window", "label": "Deployment window"}], + }, + tool_call_id="tool-call-1", + ) + + def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient): + assert http_client.is_closed is False + return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType] + + class FakeAgent: + async def run(self, user_prompt: object, **kwargs: object) -> FakeAgentRunResult: + captured_user_prompts.append(user_prompt) + assert kwargs["deferred_tool_results"] is None + return FakeAgentRunResult( + DeferredToolRequests(calls=[pending_tool_call]), + [ + ModelRequest(parts=[UserPromptPart(content="current user")]), + ModelResponse(parts=[pending_tool_call]), + ], + ) + + def fake_create_agent(model: object, *, tools: list[Tool[object]], output_type: object) -> FakeAgent: + del model, tools + captured_output_types.append(output_type) + return FakeAgent() + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + monkeypatch.setattr("dify_agent.runtime.runner.create_agent", fake_create_agent) + request = _request("current user", include_history=True, include_ask_human=True) + sink = InMemoryRunEventSink() + + async def scenario() -> None: + async with httpx.AsyncClient() as client: + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-ask-human", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) + + terminal = sink.events["run-ask-human"][-1] + assert isinstance(terminal, RunSucceededEvent) + assert captured_user_prompts == ["current user"] + assert any(item is DeferredToolRequests for item in cast(Iterable[object], captured_output_types[0])) + assert terminal.data.output is None + assert terminal.data.deferred_tool_call is not None + assert terminal.data.deferred_tool_call.tool_call_id == "tool-call-1" + assert terminal.data.deferred_tool_call.tool_name == "ask_human" + assert terminal.data.deferred_tool_call.args == { + "title": None, + "question": "Which deployment window should we use?", + "markdown": None, + "fields": [ + { + "type": "paragraph", + "name": "window", + "label": "Deployment window", + "required": False, + "placeholder": None, + "default": None, + } + ], + "actions": [{"id": "submit", "label": "Submit", "style": "primary"}], + "urgency": "normal", + } + saved_history = _history_messages_from_snapshot(terminal.data.session_snapshot) + assert isinstance(saved_history[-1], ModelResponse) + assert saved_history[-1].parts == [pending_tool_call] + + +def test_runner_resumes_with_deferred_tool_results_and_no_user_prompt(monkeypatch: pytest.MonkeyPatch) -> None: + seen_user_prompts: list[object] = [] + seen_deferred_results: list[object] = [] + pending_tool_call = ToolCallPart( + tool_name="ask_human", + args={"question": "Need approval"}, + tool_call_id="tool-call-1", + ) + + def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient): + assert http_client.is_closed is False + return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType] + + class FakeAgent: + async def run(self, user_prompt: object, **kwargs: object) -> FakeAgentRunResult: + seen_user_prompts.append(user_prompt) + seen_deferred_results.append(kwargs.get("deferred_tool_results")) + if kwargs.get("deferred_tool_results") is None: + return FakeAgentRunResult( + DeferredToolRequests(calls=[pending_tool_call]), + [ + ModelRequest(parts=[UserPromptPart(content="current user")]), + ModelResponse(parts=[pending_tool_call]), + ], + ) + + deferred_tool_results = cast(DeferredToolResults, kwargs["deferred_tool_results"]) + assert deferred_tool_results is not None + submitted_result = cast(dict[str, object], deferred_tool_results.calls["tool-call-1"]) + assert submitted_result["status"] == "submitted" + return FakeAgentRunResult( + "done after human", + [ + ModelRequest( + parts=[ + ToolReturnPart( + tool_name="ask_human", + content={"status": "submitted", "values": {"comment": "Ship it"}}, + tool_call_id="tool-call-1", + ) + ] + ), + ModelResponse(parts=[TextPart(content="done after human")]), + ], + ) + + def fake_create_agent(model: object, *, tools: list[Tool[object]], output_type: object) -> FakeAgent: + del model, tools, output_type + return FakeAgent() + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + monkeypatch.setattr("dify_agent.runtime.runner.create_agent", fake_create_agent) + request = _request("current user", include_history=True, include_ask_human=True) + sink = InMemoryRunEventSink() + + async def scenario() -> None: + async with httpx.AsyncClient() as client: + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-ask-human-initial", + plugin_daemon_http_client=client, + ).run() + + initial_terminal = sink.events["run-ask-human-initial"][-1] + assert isinstance(initial_terminal, RunSucceededEvent) + + resumed_request = request.model_copy(deep=True) + resumed_request.session_snapshot = initial_terminal.data.session_snapshot + resumed_request.deferred_tool_results = DeferredToolResultsPayload.model_validate( + { + "calls": { + "tool-call-1": { + "status": "submitted", + "action": {"id": "submit", "label": "Submit"}, + "values": {"comment": "Ship it"}, + } + } + } + ) + + await AgentRunRunner( + sink=sink, + request=resumed_request, + run_id="run-ask-human-resume", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) + + resumed_terminal = sink.events["run-ask-human-resume"][-1] + assert isinstance(resumed_terminal, RunSucceededEvent) + assert resumed_terminal.data.output == "done after human" + assert resumed_terminal.data.deferred_tool_call is None + assert seen_user_prompts == ["current user", None] + assert seen_deferred_results[0] is None + assert seen_deferred_results[1] is not None + + +def test_runner_can_emit_second_deferred_tool_call_after_resume(monkeypatch: pytest.MonkeyPatch) -> None: + seen_user_prompts: list[object] = [] + first_pending_tool_call = ToolCallPart( + tool_name="ask_human", + args={"question": "Need deployment owner"}, + tool_call_id="tool-call-1", + ) + second_pending_tool_call = ToolCallPart( + tool_name="ask_human", + args={"question": "Need final go-live confirmation"}, + tool_call_id="tool-call-2", + ) + + def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient): + assert http_client.is_closed is False + return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType] + + class FakeAgent: + async def run(self, user_prompt: object, **kwargs: object) -> FakeAgentRunResult: + seen_user_prompts.append(user_prompt) + deferred_tool_results = kwargs.get("deferred_tool_results") + if deferred_tool_results is None: + return FakeAgentRunResult( + DeferredToolRequests(calls=[first_pending_tool_call]), + [ + ModelRequest(parts=[UserPromptPart(content="current user")]), + ModelResponse(parts=[first_pending_tool_call]), + ], + ) + + return FakeAgentRunResult( + DeferredToolRequests(calls=[second_pending_tool_call]), + [ + ModelRequest( + parts=[ + ToolReturnPart( + tool_name="ask_human", + content={"status": "submitted", "values": {"owner": "ops"}}, + tool_call_id="tool-call-1", + ) + ] + ), + ModelResponse(parts=[second_pending_tool_call]), + ], + ) + + def fake_create_agent(model: object, *, tools: list[Tool[object]], output_type: object) -> FakeAgent: + del model, tools, output_type + return FakeAgent() + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + monkeypatch.setattr("dify_agent.runtime.runner.create_agent", fake_create_agent) + request = _request("current user", include_history=True, include_ask_human=True) + sink = InMemoryRunEventSink() + + async def scenario() -> None: + async with httpx.AsyncClient() as client: + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-ask-human-turn-1", + plugin_daemon_http_client=client, + ).run() + + first_terminal = sink.events["run-ask-human-turn-1"][-1] + assert isinstance(first_terminal, RunSucceededEvent) + + resumed_request = request.model_copy(deep=True) + resumed_request.session_snapshot = first_terminal.data.session_snapshot + resumed_request.deferred_tool_results = DeferredToolResultsPayload.model_validate( + { + "calls": { + "tool-call-1": { + "status": "submitted", + "action": {"id": "submit", "label": "Submit"}, + "values": {"owner": "ops"}, + } + } + } + ) + + await AgentRunRunner( + sink=sink, + request=resumed_request, + run_id="run-ask-human-turn-2", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) + + second_terminal = sink.events["run-ask-human-turn-2"][-1] + assert isinstance(second_terminal, RunSucceededEvent) + assert second_terminal.data.output is None + assert second_terminal.data.deferred_tool_call is not None + assert second_terminal.data.deferred_tool_call.tool_call_id == "tool-call-2" + assert seen_user_prompts == ["current user", None] + saved_history = _history_messages_from_snapshot(second_terminal.data.session_snapshot) + assert isinstance(saved_history[1], ModelResponse) + assert saved_history[1].parts == [first_pending_tool_call] + assert isinstance(saved_history[2], ModelRequest) + assert len(saved_history[2].parts) == 1 + assert isinstance(saved_history[2].parts[0], ToolReturnPart) + assert saved_history[2].parts[0].tool_name == "ask_human" + assert saved_history[2].parts[0].tool_call_id == "tool-call-1" + assert saved_history[2].parts[0].content == {"status": "submitted", "values": {"owner": "ops"}} + assert isinstance(saved_history[3], ModelResponse) + assert saved_history[3].parts == [second_pending_tool_call] + + +def test_runner_rejects_deferred_tool_call_without_history_layer(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient): + assert http_client.is_closed is False + return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType] + + class FakeAgent: + async def run(self, *_args: object, **_kwargs: object) -> FakeAgentRunResult: + return FakeAgentRunResult( + DeferredToolRequests( + calls=[ + ToolCallPart(tool_name="ask_human", args={"question": "Need owner"}, tool_call_id="tool-call-1") + ] + ), + [], + ) + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + monkeypatch.setattr("dify_agent.runtime.runner.create_agent", lambda *args, **kwargs: FakeAgent()) + request = _request("current user", include_history=False, include_ask_human=True) + sink = InMemoryRunEventSink() + + async def scenario() -> None: + async with httpx.AsyncClient() as client: + with pytest.raises( + AgentRunValidationError, + match="ask_human deferred tool requests require a 'history' layer so the pending tool call can be resumed", + ): + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-ask-human-no-history", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) + + assert [event.type for event in sink.events["run-ask-human-no-history"]] == ["run_started", "run_failed"] + + +def test_runner_rejects_resume_with_deferred_tool_results_without_history_layer( + monkeypatch: pytest.MonkeyPatch, +) -> None: + agent_run_called = False + + def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient): + assert http_client.is_closed is False + return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType] + + class FakeAgent: + async def run(self, *_args: object, **_kwargs: object) -> FakeAgentRunResult: + nonlocal agent_run_called + agent_run_called = True + return FakeAgentRunResult("unexpected", []) + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + monkeypatch.setattr("dify_agent.runtime.runner.create_agent", lambda *args, **kwargs: FakeAgent()) + request = _request("current user", include_history=False, include_ask_human=True) + request.deferred_tool_results = DeferredToolResultsPayload.model_validate( + { + "calls": { + "tool-call-1": { + "status": "submitted", + "action": {"id": "submit", "label": "Submit"}, + "values": {"owner": "ops"}, + } + } + } + ) + sink = InMemoryRunEventSink() + + async def scenario() -> None: + async with httpx.AsyncClient() as client: + with pytest.raises( + AgentRunValidationError, + match="Deferred tool results require a 'history' layer with prior message history", + ): + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-ask-human-resume-no-history", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) + + assert agent_run_called is False + assert [event.type for event in sink.events["run-ask-human-resume-no-history"]] == ["run_started", "run_failed"] + + +def test_runner_rejects_multiple_deferred_tool_calls(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient): + assert http_client.is_closed is False + return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType] + + class FakeAgent: + async def run(self, *_args: object, **_kwargs: object) -> FakeAgentRunResult: + return FakeAgentRunResult( + DeferredToolRequests( + calls=[ + ToolCallPart(tool_name="ask_human", args={"question": "One"}, tool_call_id="tool-call-1"), + ToolCallPart(tool_name="ask_human", args={"question": "Two"}, tool_call_id="tool-call-2"), + ] + ), + [], + ) + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + monkeypatch.setattr("dify_agent.runtime.runner.create_agent", lambda *args, **kwargs: FakeAgent()) + request = _request("current user", include_history=True, include_ask_human=True) + sink = InMemoryRunEventSink() + + async def scenario() -> None: + async with httpx.AsyncClient() as client: + with pytest.raises(ValueError, match="supports exactly one deferred call per run"): + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-ask-human-multi", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) + + assert [event.type for event in sink.events["run-ask-human-multi"]] == ["run_started", "run_failed"] + + +def test_runner_rejects_deferred_approval_requests(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient): + assert http_client.is_closed is False + return TestModel(custom_output_text="unused") # pyright: ignore[reportReturnType] + + class FakeAgent: + async def run(self, *_args: object, **_kwargs: object) -> FakeAgentRunResult: + return FakeAgentRunResult( + DeferredToolRequests( + approvals=[ + ToolCallPart( + tool_name="ask_human", args={"question": "Need approval"}, tool_call_id="tool-call-1" + ) + ] + ), + [], + ) + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + monkeypatch.setattr("dify_agent.runtime.runner.create_agent", lambda *args, **kwargs: FakeAgent()) + request = _request("current user", include_history=True, include_ask_human=True) + sink = InMemoryRunEventSink() + + async def scenario() -> None: + async with httpx.AsyncClient() as client: + with pytest.raises(ValueError, match="does not support approval requests"): + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-ask-human-approval", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) + + assert [event.type for event in sink.events["run-ask-human-approval"]] == ["run_started", "run_failed"] + + def test_runner_passes_dynamic_dify_plugin_tools_to_agent(monkeypatch: pytest.MonkeyPatch) -> None: seen_tools: list[Tool[object]] = [] diff --git a/dify-agent/tests/local/dify_agent/test_client_safe_exports.py b/dify-agent/tests/local/dify_agent/test_client_safe_exports.py index 7bcf5515935..4c64e6209a6 100644 --- a/dify-agent/tests/local/dify_agent/test_client_safe_exports.py +++ b/dify-agent/tests/local/dify_agent/test_client_safe_exports.py @@ -74,6 +74,7 @@ def test_client_public_exports_work_with_default_dependencies_only(tmp_path: Pat shell_module = importlib.import_module("dify_agent.layers.shell") execution_context_module = importlib.import_module("dify_agent.layers.execution_context") plugin_module = importlib.import_module("dify_agent.layers.dify_plugin") + ask_human_module = importlib.import_module("dify_agent.layers.ask_human") output_module = importlib.import_module("dify_agent.layers.output") assert agenton_layers.ExitIntent is not None @@ -92,6 +93,7 @@ def test_client_public_exports_work_with_default_dependencies_only(tmp_path: Pat assert shell_module.DifyShellLayerConfig is not None assert execution_context_module.DifyExecutionContextLayerConfig is not None assert plugin_module.DifyPluginLLMLayerConfig is not None + assert ask_human_module.DifyAskHumanLayerConfig is not None assert output_module.DifyOutputLayerConfig is not None grpc_error = importlib.import_module("dify_agent.agent_stub.client._errors").AgentStubMissingGRPCDependencyError diff --git a/dify-agent/tests/local/dify_agent/test_import_boundaries.py b/dify-agent/tests/local/dify_agent/test_import_boundaries.py index 19f1c6c7316..b1e0207873f 100644 --- a/dify-agent/tests/local/dify_agent/test_import_boundaries.py +++ b/dify-agent/tests/local/dify_agent/test_import_boundaries.py @@ -80,6 +80,7 @@ def test_protocol_and_dify_plugin_exports_do_not_import_server_only_modules() -> "anthropic", "dify_agent.adapters.llm", "dify_agent.layers.execution_context.layer", + "dify_agent.layers.ask_human.layer", "dify_agent.layers.dify_plugin.llm_layer", "dify_agent.layers.dify_plugin.tools_layer", "dify_agent.layers.output.output_layer", @@ -98,6 +99,7 @@ def test_protocol_and_dify_plugin_exports_do_not_import_server_only_modules() -> imports=[ "dify_agent.protocol", "dify_agent.layers.execution_context", + "dify_agent.layers.ask_human", "dify_agent.layers.dify_plugin", "dify_agent.layers.output", "dify_agent.layers.shell", @@ -105,6 +107,7 @@ def test_protocol_and_dify_plugin_exports_do_not_import_server_only_modules() -> assertions=[ "assert hasattr(dify_agent_protocol, 'PydanticAIStreamRunEvent')", "assert dify_agent_layers_execution_context.__all__ == ['DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID', 'DifyExecutionContextAgentMode', 'DifyExecutionContextInvokeFrom', 'DifyExecutionContextLayerConfig', 'DifyExecutionContextUserFrom']", + "assert dify_agent_layers_ask_human.__all__ == ['AskHumanAction', 'AskHumanActionStyle', 'AskHumanField', 'AskHumanFieldType', 'AskHumanFileField', 'AskHumanFileListField', 'AskHumanParagraphField', 'AskHumanResultStatus', 'AskHumanSelectField', 'AskHumanSelectOption', 'AskHumanSelectedAction', 'AskHumanToolArgs', 'AskHumanToolResult', 'AskHumanUrgency', 'DEFAULT_ASK_HUMAN_TOOL_DESCRIPTION', 'DIFY_ASK_HUMAN_LAYER_TYPE_ID', 'DifyAskHumanLayerConfig']", "assert dify_agent_layers_dify_plugin.__all__ == ['DIFY_PLUGIN_LLM_LAYER_TYPE_ID', 'DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID', 'DifyPluginCredentialValue', 'DifyPluginLLMLayerConfig', 'DifyPluginToolCredentialType', 'DifyPluginToolConfig', 'DifyPluginToolOption', 'DifyPluginToolParameter', 'DifyPluginToolParameterForm', 'DifyPluginToolParameterType', 'DifyPluginToolsLayerConfig', 'DifyPluginToolValue']", "assert dify_agent_layers_output.__all__ == ['DIFY_OUTPUT_LAYER_TYPE_ID', 'DifyOutputLayerConfig']", "assert dify_agent_layers_shell.__all__ == ['DIFY_SHELL_LAYER_TYPE_ID', 'DifyShellCliToolConfig', 'DifyShellEnvVarConfig', 'DifyShellLayerConfig', 'DifyShellSandboxConfig', 'DifyShellSecretRefConfig']",