feat(api): Agent ask_human HITL (phase-1) — workflow node + Agent v2 chat — ENG-635 (#37437)

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
zyssyz123 2026-06-16 11:43:40 +08:00 committed by GitHub
parent f06278951a
commit 8d05185e39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 2164 additions and 29 deletions

View File

@ -19,6 +19,7 @@ from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
from dify_agent.layers.ask_human import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig
from dify_agent.layers.dify_plugin import (
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
@ -38,6 +39,7 @@ from dify_agent.protocol import (
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
CreateRunRequest,
DeferredToolResultsPayload,
LayerExitSignals,
RunComposition,
RunLayerSpec,
@ -53,6 +55,7 @@ AGENT_APP_USER_PROMPT_LAYER_ID = "agent_app_user_prompt"
DIFY_EXECUTION_CONTEXT_LAYER_ID = "execution_context"
DIFY_DRIVE_LAYER_ID = "drive"
DIFY_PLUGIN_TOOLS_LAYER_ID = "tools"
DIFY_ASK_HUMAN_LAYER_ID = "ask_human"
DIFY_SHELL_LAYER_ID = "shell"
@ -139,11 +142,18 @@ class AgentBackendWorkflowNodeRunInput(BaseModel):
# Drive Skills & Files declaration (dify.drive) — an index the agent pulls
# through the back proxy, never inline content; see AGENT_DRIVE_MANIFEST_ENABLED.
drive_config: DifyDriveLayerConfig | None = None
# Human-in-the-loop ask_human deferred tool (dify.ask_human). Present only when
# the Agent Soul configures human involvement; a deferred call ends the run and
# the workflow pauses via the existing HITL form mechanism (ENG-635).
ask_human_config: DifyAskHumanLayerConfig | None = None
# Inject the sandboxed shell layer (dify.shell). Requires the agent backend
# to be wired with a shellctl entrypoint; see configs AGENT_SHELL_ENABLED.
include_shell: bool = False
shell_config: DifyShellLayerConfig | None = None
session_snapshot: CompositorSessionSnapshot | None = None
# Human tool results fed back into a continuation run after a HITL submission
# (ENG-638). Keyed by the original deferred tool_call_id.
deferred_tool_results: DeferredToolResultsPayload | None = None
include_history: bool = True
suspend_on_exit: bool = True
metadata: dict[str, JsonValue] = Field(default_factory=dict)
@ -178,11 +188,17 @@ class AgentBackendAgentAppRunInput(BaseModel):
# Drive Skills & Files declaration (dify.drive) — an index the agent pulls
# through the back proxy, never inline content; see AGENT_DRIVE_MANIFEST_ENABLED.
drive_config: DifyDriveLayerConfig | None = None
# Human-in-the-loop ask_human deferred tool (dify.ask_human). Present only when
# the Agent Soul configures human involvement (ENG-635).
ask_human_config: DifyAskHumanLayerConfig | None = None
# Inject the sandboxed shell layer (dify.shell). Requires the agent backend
# to be wired with a shellctl entrypoint; see configs AGENT_SHELL_ENABLED.
include_shell: bool = False
shell_config: DifyShellLayerConfig | None = None
session_snapshot: CompositorSessionSnapshot | None = None
# Human tool results fed back into a continuation run after a HITL submission
# (ENG-638). Keyed by the original deferred tool_call_id.
deferred_tool_results: DeferredToolResultsPayload | None = None
include_history: bool = True
suspend_on_exit: bool = True
metadata: dict[str, JsonValue] = Field(default_factory=dict)
@ -284,6 +300,19 @@ class AgentBackendRunRequestBuilder:
)
)
if run_input.ask_human_config is not None:
# Human-in-the-loop ask_human deferred tool (dify.ask_human). A call ends
# the run with a deferred_tool_call; the caller pauses (workflow HITL) and
# later resumes with deferred_tool_results. Needs the history layer above.
layers.append(
RunLayerSpec(
name=DIFY_ASK_HUMAN_LAYER_ID,
type=DIFY_ASK_HUMAN_LAYER_TYPE_ID,
metadata=run_input.metadata,
config=run_input.ask_human_config,
)
)
if run_input.include_shell:
# Sandboxed bash workspace (dify.shell). Depends on execution_context so
# the agent server can mint per-command Agent Stub env (back proxy);
@ -318,6 +347,7 @@ class AgentBackendRunRequestBuilder:
idempotency_key=run_input.idempotency_key,
metadata=run_input.metadata,
session_snapshot=run_input.session_snapshot,
deferred_tool_results=run_input.deferred_tool_results,
on_exit=LayerExitSignals(
default=ExitIntent.SUSPEND if run_input.suspend_on_exit else ExitIntent.DELETE,
),
@ -453,6 +483,19 @@ class AgentBackendRunRequestBuilder:
)
)
if run_input.ask_human_config is not None:
# Human-in-the-loop ask_human deferred tool (dify.ask_human). A call ends
# the run with a deferred_tool_call; the caller pauses (workflow HITL) and
# later resumes with deferred_tool_results. Needs the history layer above.
layers.append(
RunLayerSpec(
name=DIFY_ASK_HUMAN_LAYER_ID,
type=DIFY_ASK_HUMAN_LAYER_TYPE_ID,
metadata=run_input.metadata,
config=run_input.ask_human_config,
)
)
if run_input.include_shell:
# Sandboxed bash workspace (dify.shell). Depends on execution_context so
# the agent server can mint per-command Agent Stub env (back proxy);
@ -487,6 +530,7 @@ class AgentBackendRunRequestBuilder:
idempotency_key=run_input.idempotency_key,
metadata=run_input.metadata,
session_snapshot=run_input.session_snapshot,
deferred_tool_results=run_input.deferred_tool_results,
on_exit=LayerExitSignals(
default=ExitIntent.SUSPEND if run_input.suspend_on_exit else ExitIntent.DELETE,
),

View File

@ -158,6 +158,108 @@ class AgentAppGenerator(MessageBasedAppGenerator):
)
return AgentAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
def resume_after_form_submission(
self,
*,
app_model: App,
user: Account | EndUser,
conversation_id: str,
invoke_from: InvokeFrom,
) -> None:
"""Resume an Agent App conversation after a submitted ask_human HITL form.
ENG-635: triggered by a background task (not an HTTP request). Runs one
blocking turn with no user query; the runner threads the human's reply
into the agent run as deferred_tool_results and the assistant answer is
persisted to the conversation. Live streaming to a reconnected client is
out of scope here the message is persisted and can be re-fetched.
"""
agent, snapshot, agent_soul = self._resolve_agent(app_model)
conversation = ConversationService.get_conversation(
app_model=app_model, conversation_id=conversation_id, user=user
)
app_config = AgentAppConfigManager.get_app_config(
app_model=app_model,
agent_soul=agent_soul,
app_model_config=app_model.app_model_config,
conversation=conversation,
)
model_conf = ModelConfigConverter.convert(app_config)
trace_manager = TraceQueueManager(app_model.id, user.id if isinstance(user, Account) else user.session_id)
# ENG-638: the agent backend requires the resume composition's layer
# names to match the suspended snapshot, which includes the per-turn
# user-prompt layer. So re-send the original user message (the paused
# turn's query); the continuation is driven by deferred_tool_results and
# the restored snapshot, not by re-processing this prompt. A blank prompt
# would drop the user-prompt layer and fail the snapshot match.
paused_message = db.session.scalar(
select(Message)
.where(Message.conversation_id == conversation.id, Message.query != "")
.order_by(Message.created_at.desc())
.limit(1)
)
resume_query = paused_message.query if paused_message and paused_message.query else "(resumed)"
application_generate_entity = AgentAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=model_conf,
conversation_id=conversation.id,
# A resume carries no new user inputs; the human's answer is the
# submitted form, threaded in by the runner as deferred_tool_results.
# The query re-sends the paused turn's message (see above).
inputs={},
query=resume_query,
files=[],
parent_message_id=UUID_NIL,
user_id=user.id,
stream=False,
invoke_from=invoke_from,
extras={"auto_generate_conversation_name": False},
call_depth=0,
trace_manager=trace_manager,
agent_id=agent.id,
agent_config_snapshot_id=snapshot.id,
)
conversation, message = self._init_generate_records(application_generate_entity, conversation)
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
context = contextvars.copy_context()
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"context": context,
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
"user_from": UserFrom.ACCOUNT if isinstance(user, Account) else UserFrom.END_USER,
},
)
worker_thread.start()
# Blocking: drive the chat task pipeline to persist the assistant answer.
self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=False,
)
def _generate_worker(
self,
*,

View File

@ -14,9 +14,12 @@ import json
import logging
from typing import Any
from dify_agent.layers.ask_human import AskHumanToolArgs
from dify_agent.protocol import DeferredToolResultsPayload
from pydantic import JsonValue
from clients.agent_backend import (
AgentBackendDeferredToolCallInternalEvent,
AgentBackendError,
AgentBackendInternalEventType,
AgentBackendRunClient,
@ -27,13 +30,21 @@ from clients.agent_backend import (
)
from core.app.apps.agent_app.runtime_request_builder import (
AgentAppRuntimeBuildContext,
AgentAppRuntimeRequest,
AgentAppRuntimeRequestBuilder,
)
from core.app.apps.agent_app.session_store import AgentAppRuntimeSessionStore, AgentAppSessionScope
from core.app.apps.agent_app.session_store import (
AgentAppRuntimeSessionStore,
AgentAppSessionScope,
StoredAgentAppSession,
)
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import DifyRunContext
from core.app.entities.queue_entities import QueueLLMChunkEvent, QueueMessageEndEvent
from core.repositories.human_input_repository import HumanInputFormRepository, HumanInputFormRepositoryImpl
from core.workflow.nodes.agent_v2.ask_human_hitl import AskHumanFormBuildError, create_ask_human_form
from core.workflow.nodes.agent_v2.ask_human_resume import build_deferred_tool_results, resolve_ask_human_form
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from graphon.model_runtime.entities.message_entities import AssistantPromptMessage
from models.agent_config_entities import AgentSoulConfig
@ -104,7 +115,13 @@ class AgentAppRunner:
agent_id=agent_id,
agent_config_snapshot_id=agent_config_snapshot_id,
)
session_snapshot = self._session_store.load_active_snapshot(scope)
# ENG-638: if a prior turn paused on ask_human and the form is now answered,
# resume by threading the human's reply into this run as deferred_tool_results.
stored = self._session_store.load_active_session(scope)
session_snapshot = stored.session_snapshot if stored is not None else None
deferred_tool_results = self._resolve_pending_ask_human(
stored=stored, dify_context=dify_context, message_id=message_id
)
runtime = self._request_builder.build(
AgentAppRuntimeBuildContext(
@ -116,12 +133,29 @@ class AgentAppRunner:
user_query=query,
idempotency_key=message_id,
session_snapshot=session_snapshot,
deferred_tool_results=deferred_tool_results,
)
)
create_response = self._agent_backend_client.create_run(runtime.request)
terminal = self._consume_stream(create_response.run_id, queue_manager=queue_manager)
if isinstance(terminal, AgentBackendDeferredToolCallInternalEvent):
# ENG-635: the agent asked a human. End this turn with the question and
# a conversation-owned HITL form; a form submission resumes the run.
self._pause_for_ask_human(
terminal=terminal,
scope=scope,
dify_context=dify_context,
agent_soul=agent_soul,
conversation_id=conversation_id,
message_id=message_id,
model_name=model_name,
runtime=runtime,
queue_manager=queue_manager,
)
return
if not isinstance(terminal, AgentBackendRunSucceededInternalEvent):
error = getattr(terminal, "error", None) or "Agent backend run did not complete successfully."
raise AgentBackendError(str(error))
@ -135,6 +169,93 @@ class AgentAppRunner:
runtime_layer_specs=extract_runtime_layer_specs(runtime.request.composition),
)
def _pause_for_ask_human(
self,
*,
terminal: AgentBackendDeferredToolCallInternalEvent,
scope: AgentAppSessionScope,
dify_context: DifyRunContext,
agent_soul: AgentSoulConfig,
conversation_id: str,
message_id: str,
model_name: str,
runtime: AgentAppRuntimeRequest,
queue_manager: AppQueueManager,
) -> None:
"""End the chat turn on a dify.ask_human call: create a conversation-owned
HITL form, persist the pause correlation, and surface the question."""
try:
created = create_ask_human_form(
deferred_tool_call=terminal.deferred_tool_call,
# Chat forms have no workflow node; key by the turn's message id.
node_id=message_id,
default_node_title="Agent",
contacts=agent_soul.human.contacts,
repository=self._build_form_repository(dify_context),
conversation_id=conversation_id,
)
except AskHumanFormBuildError as error:
raise AgentBackendError(f"Failed to build ask_human form for Agent App chat: {error}") from error
# Persist the snapshot + correlation so a form submission can start the
# second run with the human's answer (ENG-637/638 columns, conversation owner).
self._save_session(
scope=scope,
backend_run_id=terminal.run_id,
snapshot=terminal.session_snapshot,
runtime_layer_specs=extract_runtime_layer_specs(runtime.request.composition),
pending_form_id=created.form_id,
pending_tool_call_id=terminal.deferred_tool_call.tool_call_id,
)
# The structured form is delivered via the HITL surface(s); the chat turn
# ends by echoing the agent's question so the conversation reflects the ask.
self._publish_answer(
queue_manager=queue_manager,
model_name=model_name,
answer=self._ask_human_message(created.args),
)
def _resolve_pending_ask_human(
self,
*,
stored: StoredAgentAppSession | None,
dify_context: DifyRunContext,
message_id: str,
) -> DeferredToolResultsPayload | None:
"""Build deferred_tool_results when a pending ask_human form is answered."""
if stored is None or stored.pending_form_id is None or stored.pending_tool_call_id is None:
return None
outcome = resolve_ask_human_form(
form_id=stored.pending_form_id,
tenant_id=dify_context.tenant_id,
node_id=message_id,
)
if outcome is None or outcome.deferred_result is None:
# Form missing or still waiting — run a normal turn, no resume.
return None
return build_deferred_tool_results(
tool_call_id=stored.pending_tool_call_id,
result=outcome.deferred_result,
)
def _build_form_repository(self, dify_context: DifyRunContext) -> HumanInputFormRepository:
invoke_source = dify_context.invoke_from.value
return HumanInputFormRepositoryImpl(
tenant_id=dify_context.tenant_id,
app_id=dify_context.app_id,
workflow_execution_id=None,
invoke_source=invoke_source,
submission_actor_id=dify_context.user_id if invoke_source in {"debugger", "explore"} else None,
)
@staticmethod
def _ask_human_message(args: AskHumanToolArgs) -> str:
parts = [args.question]
if args.markdown:
parts.append(args.markdown)
return "\n\n".join(parts)
def _consume_stream(self, run_id: str, *, queue_manager: AppQueueManager):
terminal = None
for public_event in self._agent_backend_client.stream_events(run_id):
@ -178,6 +299,8 @@ class AgentAppRunner:
backend_run_id: str,
snapshot: Any,
runtime_layer_specs: Any,
pending_form_id: str | None = None,
pending_tool_call_id: str | None = None,
) -> None:
try:
self._session_store.save_active_snapshot(
@ -185,6 +308,8 @@ class AgentAppRunner:
backend_run_id=backend_run_id,
snapshot=snapshot,
runtime_layer_specs=runtime_layer_specs,
pending_form_id=pending_form_id,
pending_tool_call_id=pending_tool_call_id,
)
except Exception:
logger.warning(

View File

@ -18,7 +18,7 @@ from dify_agent.layers.execution_context import (
DifyExecutionContextLayerConfig,
DifyExecutionContextUserFrom,
)
from dify_agent.protocol import CreateRunRequest
from dify_agent.protocol import CreateRunRequest, DeferredToolResultsPayload
from clients.agent_backend import (
AgentBackendAgentAppRunInput,
@ -34,6 +34,7 @@ from core.workflow.nodes.agent_v2.plugin_tools_builder import (
)
from core.workflow.nodes.agent_v2.runtime_request_builder import (
append_runtime_warnings,
build_ask_human_layer_config,
build_drive_layer_config,
build_shell_layer_config,
)
@ -64,6 +65,8 @@ class AgentAppRuntimeBuildContext:
user_query: str
idempotency_key: str
session_snapshot: CompositorSessionSnapshot | None = None
# ENG-638: set when resuming a chat turn after a submitted ask_human form.
deferred_tool_results: DeferredToolResultsPayload | None = None
@dataclass(frozen=True, slots=True)
@ -154,9 +157,11 @@ class AgentAppRuntimeRequestBuilder:
user_prompt=context.user_query,
tools=tools_layer,
drive_config=drive_config,
ask_human_config=build_ask_human_layer_config(agent_soul),
include_shell=dify_config.AGENT_SHELL_ENABLED,
shell_config=build_shell_layer_config(agent_soul),
session_snapshot=context.session_snapshot,
deferred_tool_results=context.deferred_tool_results,
idempotency_key=context.idempotency_key,
metadata=metadata,
)

View File

@ -56,6 +56,10 @@ class StoredAgentAppSession:
session_snapshot: CompositorSessionSnapshot
backend_run_id: str | None
runtime_layer_specs: list[RuntimeLayerSpec] = field(default_factory=list)
# ENG-635: set while the conversation turn is paused on a dify.ask_human
# deferred call, awaiting a HITL form submission.
pending_form_id: str | None = None
pending_tool_call_id: str | None = None
class AgentAppRuntimeSessionStore:
@ -75,6 +79,8 @@ class AgentAppRuntimeSessionStore:
session_snapshot=CompositorSessionSnapshot.model_validate_json(row.session_snapshot),
backend_run_id=row.backend_run_id,
runtime_layer_specs=_deserialize_runtime_layer_specs(row.composition_layer_specs),
pending_form_id=row.pending_form_id,
pending_tool_call_id=row.pending_tool_call_id,
)
def load_active_session_for_conversation(
@ -125,6 +131,8 @@ class AgentAppRuntimeSessionStore:
backend_run_id: str,
snapshot: CompositorSessionSnapshot | None,
runtime_layer_specs: list[RuntimeLayerSpec],
pending_form_id: str | None = None,
pending_tool_call_id: str | None = None,
) -> None:
if snapshot is None:
return
@ -144,6 +152,8 @@ class AgentAppRuntimeSessionStore:
session_snapshot=snapshot_json,
composition_layer_specs=runtime_layer_specs_json,
status=AgentRuntimeSessionStatus.ACTIVE,
pending_form_id=pending_form_id,
pending_tool_call_id=pending_tool_call_id,
)
session.add(row)
else:
@ -152,6 +162,9 @@ class AgentAppRuntimeSessionStore:
row.composition_layer_specs = runtime_layer_specs_json
row.status = AgentRuntimeSessionStatus.ACTIVE
row.cleaned_at = None
# Set (or clear, when omitted) the ask_human pause correlation.
row.pending_form_id = pending_form_id
row.pending_tool_call_id = pending_tool_call_id
session.flush()
other_rows = session.scalars(
select(AgentRuntimeSession).where(

View File

@ -63,6 +63,10 @@ class FormCreateParams:
display_in_ui: bool
resolved_default_values: Mapping[str, Any]
form_kind: HumanInputFormKind = HumanInputFormKind.RUNTIME
# ENG-635: the conversation this form belongs to. Set together with
# workflow_execution_id for chatflow runs; set alone (workflow_execution_id None)
# for Agent v2 chat ask_human forms, which have no workflow run.
conversation_id: str | None = None
class HumanInputFormRecipientEntity(Protocol):
@ -217,6 +221,9 @@ class HumanInputFormRecord:
recipient_id: str | None
recipient_type: RecipientType | None
access_token: str | None
# ENG-635: Agent v2 chat owner (NULL for workflow-owned forms). Trailing +
# defaulted so existing record constructions stay source-compatible.
conversation_id: str | None = None
@property
def submitted(self) -> bool:
@ -232,6 +239,7 @@ class HumanInputFormRecord:
return cls(
form_id=form_model.id,
workflow_run_id=form_model.workflow_run_id,
conversation_id=form_model.conversation_id,
node_id=form_model.node_id,
tenant_id=form_model.tenant_id,
app_id=form_model.app_id,
@ -433,8 +441,15 @@ class HumanInputFormRepositoryImpl:
if not app_id:
raise ValueError("app_id is required to create a human input form")
workflow_execution_id = params.workflow_execution_id or self._workflow_execution_id
if params.form_kind == HumanInputFormKind.RUNTIME and workflow_execution_id is None:
raise ValueError("workflow_execution_id is required for runtime human input forms")
# A RUNTIME form must be owned by at least one of: a workflow run (workflow /
# Human-Input / agent node) or a conversation turn (ENG-635: Agent v2 chat
# ask_human; chatflow runs set both — workflow_run_id and conversation_id).
if (
params.form_kind == HumanInputFormKind.RUNTIME
and workflow_execution_id is None
and params.conversation_id is None
):
raise ValueError("a runtime human input form requires a workflow_execution_id or conversation_id")
with session_factory.create_session() as session, session.begin():
# Generate unique form ID
@ -456,6 +471,7 @@ class HumanInputFormRepositoryImpl:
tenant_id=self._tenant_id,
app_id=app_id,
workflow_run_id=workflow_execution_id,
conversation_id=params.conversation_id,
form_kind=params.form_kind,
node_id=params.node_id,
form_definition=form_definition.model_dump_json(),

View File

@ -334,6 +334,7 @@ class DifyNodeFactory(NodeFactory):
self.graph_runtime_state.variable_pool,
SystemVariableKey.WORKFLOW_EXECUTION_ID,
),
conversation_id_getter=self._conversation_id,
)
self._tool_runtime = DifyToolNodeRuntime(self._dify_context)
self._http_request_file_manager = file_manager

View File

@ -710,10 +710,12 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol):
run_context: Mapping[str, Any] | DifyRunContext,
*,
workflow_execution_id_getter: Callable[[], str | None] | None = None,
conversation_id_getter: Callable[[], str | None] | None = None,
form_repository: HumanInputFormRepository | None = None,
) -> None:
self._run_context = resolve_dify_run_context(run_context)
self._workflow_execution_id_getter = workflow_execution_id_getter
self._conversation_id_getter = conversation_id_getter
self._form_repository = form_repository
self._file_reference_factory = DifyFileReferenceFactory(self._run_context)
@ -762,6 +764,7 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol):
return DifyHumanInputNodeRuntime(
self._run_context,
workflow_execution_id_getter=self._workflow_execution_id_getter,
conversation_id_getter=self._conversation_id_getter,
form_repository=form_repository,
)
@ -799,6 +802,9 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol):
repo = self.build_form_repository()
params = FormCreateParams(
workflow_execution_id=self._workflow_execution_id_getter() if self._workflow_execution_id_getter else None,
# A chatflow (advanced-chat) run carries a conversation; tag the form with
# it too so it is queryable per conversation. None for a pure workflow run.
conversation_id=self._conversation_id_getter() if self._conversation_id_getter else None,
node_id=node_id,
form_config=node_data,
rendered_content=rendered_content,

View File

@ -24,13 +24,16 @@ from clients.agent_backend import (
extract_runtime_layer_specs,
)
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext
from core.repositories.human_input_repository import HumanInputFormRepository, HumanInputFormRepositoryImpl
from core.workflow.system_variables import SystemVariableKey, get_system_text
from graphon.entities.pause_reason import SchedulingPause
from graphon.entities.pause_reason import HumanInputRequired, SchedulingPause
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.node_events import NodeEventBase, NodeRunResult, PauseRequestedEvent, StreamCompletedEvent
from graphon.nodes.base.node import Node
from models.agent_config_entities import WorkflowNodeJobConfig
from models.agent_config_entities import AgentSoulConfig, WorkflowNodeJobConfig
from .ask_human_hitl import AskHumanFormBuildError, build_ask_human_pause_reason
from .ask_human_resume import build_deferred_tool_results, resolve_ask_human_form
from .binding_resolver import WorkflowAgentBindingError, WorkflowAgentBindingResolver
from .entities import DifyAgentNodeData
from .output_adapter import WorkflowAgentOutputAdapter
@ -117,6 +120,12 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
self.graph_runtime_state.variable_pool,
SystemVariableKey.WORKFLOW_EXECUTION_ID,
)
# Set on chatflow (advanced-chat) runs; None for a pure workflow run. Lets an
# ask_human form be tagged with its conversation in addition to workflow_run_id.
conversation_id = get_system_text(
self.graph_runtime_state.variable_pool,
SystemVariableKey.CONVERSATION_ID,
)
inputs: dict[str, Any] = {}
process_data: dict[str, Any] = {}
metadata: dict[str, Any] = {
@ -168,6 +177,33 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
)
outputs_by_name = {o.name: o for o in effective_outputs}
# ──── ENG-638: resume after a submitted/timed-out ask_human form ────
# graphon re-executes this _run when the outer workflow resumes. If a
# pending ask_human form is now terminal, thread the human's answer into
# the second Agent run as deferred_tool_results; if it is somehow still
# waiting, re-emit the same pause defensively.
deferred_tool_results = None
if self._session_store is not None:
stored_session = self._session_store.load_active_session(session_scope)
if stored_session is not None and stored_session.pending_form_id is not None:
resume_outcome = resolve_ask_human_form(
form_id=stored_session.pending_form_id,
tenant_id=dify_ctx.tenant_id,
node_id=self._node_id,
)
if resume_outcome is not None and resume_outcome.repause is not None:
yield PauseRequestedEvent(reason=resume_outcome.repause)
return
if (
resume_outcome is not None
and resume_outcome.deferred_result is not None
and stored_session.pending_tool_call_id is not None
):
deferred_tool_results = build_deferred_tool_results(
tool_call_id=stored_session.pending_tool_call_id,
result=resume_outcome.deferred_result,
)
# ──── Retry loop (Stage 4 §7) ────
attempt = 0
while True:
@ -188,6 +224,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
snapshot=bundle.snapshot,
attempt=attempt,
session_snapshot=session_snapshot,
deferred_tool_results=deferred_tool_results,
)
)
except WorkflowAgentRuntimeRequestBuildError as error:
@ -251,19 +288,56 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
return
if isinstance(terminal_event, AgentBackendDeferredToolCallInternalEvent):
# ENG-636: a dify.ask_human deferred call pauses the *outer*
# workflow through the existing HITL form path. Any other deferred
# tool (none today) falls back to a generic scheduling pause. The
# form is built *before* the snapshot is saved so its id can be
# persisted as the pause correlation (ENG-637).
try:
pause_reason: HumanInputRequired | SchedulingPause | None = build_ask_human_pause_reason(
deferred_tool_call=terminal_event.deferred_tool_call,
node_id=self._node_id,
default_node_title=bundle.agent.name or self._node_id,
workflow_run_id=workflow_run_id,
conversation_id=conversation_id,
contacts=AgentSoulConfig.model_validate(bundle.snapshot.config_snapshot_dict).human.contacts,
repository=self._build_human_input_form_repository(
dify_ctx=dify_ctx, workflow_run_id=workflow_run_id
),
)
except AskHumanFormBuildError as error:
yield self._failure_event(
inputs=inputs,
process_data=process_data,
metadata=metadata,
error=str(error),
error_type="agent_ask_human_form_build_error",
)
return
# ENG-637: persist the awaiting-form id + deferred tool_call id
# next to the snapshot so the resumed node can rebuild
# deferred_tool_results from the submitted form.
pending_form_id: str | None = None
pending_tool_call_id: str | None = None
if isinstance(pause_reason, HumanInputRequired):
pending_form_id = pause_reason.form_id
pending_tool_call_id = terminal_event.deferred_tool_call.tool_call_id
else:
pause_reason = SchedulingPause(
message=terminal_event.message
or "Agent backend run requested workflow pause for external input."
)
self._save_session_snapshot(
session_scope=session_scope,
backend_run_id=terminal_event.run_id,
snapshot=terminal_event.session_snapshot,
runtime_layer_specs=extract_runtime_layer_specs(runtime_request.request.composition),
metadata=metadata,
pending_form_id=pending_form_id,
pending_tool_call_id=pending_tool_call_id,
)
yield PauseRequestedEvent(
reason=SchedulingPause(
message=terminal_event.message
or "Agent backend run requested workflow pause for external input."
)
)
yield PauseRequestedEvent(reason=pause_reason)
return
# Non-success terminal (failed / cancelled) skips per-output
@ -448,6 +522,27 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
],
}
def _build_human_input_form_repository(
self,
*,
dify_ctx: DifyRunContext,
workflow_run_id: str | None,
) -> HumanInputFormRepository:
"""Construct the existing HITL form repository for ask_human form creation.
Mirrors the Human Input node's repository wiring (``node_runtime``) so the
ask_human form shares the same delivery/debug/console behavior: a
submission actor is only attributed for debugger/explore surfaces.
"""
invoke_source = dify_ctx.invoke_from.value
return HumanInputFormRepositoryImpl(
tenant_id=dify_ctx.tenant_id,
app_id=dify_ctx.app_id,
workflow_execution_id=workflow_run_id,
invoke_source=invoke_source,
submission_actor_id=dify_ctx.user_id if invoke_source in {"debugger", "explore"} else None,
)
def _save_session_snapshot(
self,
*,
@ -456,6 +551,8 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
snapshot: CompositorSessionSnapshot | None,
runtime_layer_specs: list[RuntimeLayerSpec],
metadata: dict[str, Any],
pending_form_id: str | None = None,
pending_tool_call_id: str | None = None,
) -> None:
if self._session_store is None:
return
@ -465,6 +562,8 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
backend_run_id=backend_run_id,
snapshot=snapshot,
runtime_layer_specs=runtime_layer_specs,
pending_form_id=pending_form_id,
pending_tool_call_id=pending_tool_call_id,
)
agent_backend = dict(metadata.get("agent_backend") or {})
agent_backend["session_snapshot_persisted"] = snapshot is not None

View File

@ -0,0 +1,354 @@
"""Map a ``dify.ask_human`` deferred tool call onto the existing HITL form path.
ENG-636. When an Agent backend run ends with a deferred ``dify.ask_human`` tool
call, the workflow Agent node pauses the *outer* workflow through the very same
``HumanInputRequired`` form mechanism the Human Input node uses reusing the
form repository, delivery channels, and submission endpoints unchanged. This
module is a pure translation layer: model-facing ask_human tool args become
graphon form entities (``HumanInputNodeData`` / ``FormInputConfig`` /
``UserActionConfig``) plus Dify delivery configs. It adds no new HITL behavior.
The agent-side ``dify.ask_human`` contract is richer than the workflow form
schema in two places, handled here without widening the form vocabulary:
* ask_human fields carry a human label; graphon ``FormInputConfig`` does not.
Labels are rendered into ``form_content`` next to a ``{{#$output.<name>#}}``
marker exactly how the Human Input node positions labelled inputs today.
* ask_human action ids are valid identifiers of any length; graphon
``UserActionConfig.id`` caps ids at 20 chars. Over-long ids are clamped
deterministically (stable + collision-resistant) so the form always builds;
the human-visible action *label* is always preserved verbatim.
"""
from __future__ import annotations
import hashlib
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import Any, assert_never
from dify_agent.layers.ask_human import (
AskHumanAction,
AskHumanField,
AskHumanFileField,
AskHumanFileListField,
AskHumanParagraphField,
AskHumanSelectField,
AskHumanToolArgs,
)
from dify_agent.protocol import DeferredToolCallPayload
from pydantic import ValidationError
from core.repositories.human_input_repository import FormCreateParams, HumanInputFormRepository
from core.workflow.human_input_adapter import (
DeliveryChannelConfig,
EmailDeliveryConfig,
EmailDeliveryMethod,
EmailRecipients,
ExternalRecipient,
InteractiveSurfaceDeliveryMethod,
)
from graphon.entities.pause_reason import HumanInputRequired
from graphon.nodes.human_input.entities import (
FileInputConfig,
FileListInputConfig,
FormInputConfig,
HumanInputNodeData,
ParagraphInputConfig,
SelectInputConfig,
StringListSource,
StringSource,
UserActionConfig,
)
from graphon.nodes.human_input.enums import ButtonStyle, TimeoutUnit, ValueSourceType
from models.agent_config_entities import AgentHumanContactConfig
# Default ask_human tool name (see ``DifyAskHumanLayerConfig.tool_name``). The
# Agent node only knows how to translate this one deferred tool into a form.
DEFAULT_ASK_HUMAN_TOOL_NAME = "ask_human"
# Graphon ``UserActionConfig.id`` hard-caps identifiers at 20 chars.
_MAX_ACTION_ID_LEN = 20
# No timeout lives in the ask_human contract; reuse the Human Input node default.
_DEFAULT_TIMEOUT_HOURS = 36
_EMAIL_SUBJECT_MAX_LEN = 120
# ``ButtonStyle`` has no "destructive"; map the ask_human destructive intent onto
# the closest existing form style rather than extending the HITL vocabulary.
_ACTION_STYLE_TO_BUTTON: dict[str, ButtonStyle] = {
"default": ButtonStyle.DEFAULT,
"primary": ButtonStyle.PRIMARY,
"destructive": ButtonStyle.ACCENT,
}
# ask_human permits zero actions (a plain acknowledgement); the form surface
# needs at least one submit affordance.
_DEFAULT_SUBMIT_ACTION = UserActionConfig(id="submit", title="Submit", button_style=ButtonStyle.PRIMARY)
class AskHumanFormBuildError(ValueError):
"""Raised when ask_human tool args cannot be mapped to a HITL form."""
def parse_ask_human_args(raw_args: object) -> AskHumanToolArgs:
"""Validate the raw deferred-tool ``args`` payload into ``AskHumanToolArgs``."""
if isinstance(raw_args, AskHumanToolArgs):
return raw_args
if isinstance(raw_args, Mapping):
try:
return AskHumanToolArgs.model_validate(dict(raw_args))
except ValidationError as error:
raise AskHumanFormBuildError(f"invalid ask_human args: {error}") from error
raise AskHumanFormBuildError(f"ask_human args must be a mapping, got {type(raw_args).__name__}")
def _clamp_action_id(action_id: str) -> str:
"""Return a stable graphon-safe (<= 20 char) action id.
ask_human ids are already valid identifiers; ids within the limit pass
through untouched so the resume round-trip returns the model's exact id.
Over-long ids are truncated with a short content hash to stay deterministic
and collision-resistant while remaining a valid identifier.
"""
if len(action_id) <= _MAX_ACTION_ID_LEN:
return action_id
digest = hashlib.blake2b(action_id.encode("utf-8"), digest_size=3).hexdigest()
prefix = action_id[: _MAX_ACTION_ID_LEN - len(digest) - 1]
return f"{prefix}_{digest}"
def _to_form_input(field: AskHumanField) -> FormInputConfig:
match field:
case AskHumanParagraphField():
default = (
StringSource(type=ValueSourceType.CONSTANT, value=field.default) if field.default is not None else None
)
return ParagraphInputConfig(output_variable_name=field.name, default=default)
case AskHumanSelectField():
return SelectInputConfig(
output_variable_name=field.name,
option_source=StringListSource(
type=ValueSourceType.CONSTANT,
value=[option.value for option in field.options],
),
)
case AskHumanFileField():
return FileInputConfig(output_variable_name=field.name)
case AskHumanFileListField():
return FileListInputConfig(output_variable_name=field.name, number_limits=field.max_files or 0)
case _: # pragma: no cover - exhaustive over the discriminated union
assert_never(field)
def _to_user_actions(actions: Sequence[AskHumanAction]) -> list[UserActionConfig]:
if not actions:
return [_DEFAULT_SUBMIT_ACTION]
return [
UserActionConfig(
id=_clamp_action_id(action.id),
title=action.label,
button_style=_ACTION_STYLE_TO_BUTTON.get(action.style, ButtonStyle.DEFAULT),
)
for action in actions
]
def _render_form_content(args: AskHumanToolArgs) -> str:
"""Compose the markdown body, positioning each field's label + input marker.
Graphon ``FormInputConfig`` carries no label, so the field label is written
into the content next to the ``{{#$output.<name>#}}`` marker that the form
surface replaces with the live input identical to the Human Input node.
"""
blocks: list[str] = []
if args.title:
blocks.append(f"## {args.title}")
blocks.append(args.question)
if args.markdown:
blocks.append(args.markdown)
for field in args.fields:
label = f"{field.label} *" if field.required else field.label
blocks.append(f"**{label}**\n\n{{{{#$output.{field.name}#}}}}")
return "\n\n".join(blocks)
def _resolved_default_values(args: AskHumanToolArgs) -> dict[str, Any]:
"""Pre-fill map the form surface reads, keyed by output variable name.
The graphon select input has no default field, so a select default can only
be conveyed here; paragraph defaults are included for a uniform pre-fill.
"""
defaults: dict[str, Any] = {}
for field in args.fields:
if isinstance(field, AskHumanParagraphField | AskHumanSelectField) and field.default is not None:
defaults[field.name] = field.default
return defaults
def ask_human_args_to_node_data(args: AskHumanToolArgs, *, node_title: str) -> HumanInputNodeData:
"""Translate validated ask_human args into a synthetic Human Input node config."""
return HumanInputNodeData(
title=node_title,
form_content=_render_form_content(args),
inputs=[_to_form_input(field) for field in args.fields],
user_actions=_to_user_actions(args.actions),
timeout=_DEFAULT_TIMEOUT_HOURS,
timeout_unit=TimeoutUnit.HOUR,
)
def build_delivery_methods(
contacts: Sequence[AgentHumanContactConfig],
*,
args: AskHumanToolArgs,
) -> list[DeliveryChannelConfig]:
"""Build form delivery channels: always the interactive surface, plus email to
the configured human contacts (the recipients chosen in Agent Soul)."""
methods: list[DeliveryChannelConfig] = [InteractiveSurfaceDeliveryMethod()]
seen: set[str] = set()
emails: list[str] = []
for contact in contacts:
email = (contact.email or "").strip()
if email and email not in seen:
seen.add(email)
emails.append(email)
if emails:
subject = (args.title or args.question).strip()[:_EMAIL_SUBJECT_MAX_LEN]
if args.urgency == "high":
subject = f"[Action needed] {subject}"
body = f"{args.question}\n\nOpen the request: {EmailDeliveryConfig.URL_PLACEHOLDER}"
methods.append(
EmailDeliveryMethod(
config=EmailDeliveryConfig(
recipients=EmailRecipients(items=[ExternalRecipient(email=email) for email in emails]),
subject=subject,
body=body,
)
)
)
return methods
@dataclass(frozen=True, slots=True)
class AskHumanFormCreated:
"""A created ask_human HITL form, owner-agnostic (workflow run or conversation)."""
form_id: str
args: AskHumanToolArgs
node_data: HumanInputNodeData
node_title: str
resolved_default_values: dict[str, Any]
def create_ask_human_form(
*,
deferred_tool_call: DeferredToolCallPayload,
node_id: str,
default_node_title: str,
contacts: Sequence[AgentHumanContactConfig],
repository: HumanInputFormRepository,
workflow_run_id: str | None = None,
conversation_id: str | None = None,
display_in_ui: bool = True,
) -> AskHumanFormCreated:
"""Create a HITL form from an ask_human deferred call (caller verified tool_name).
The form is owned by exactly one of ``workflow_run_id`` (workflow Agent node)
or ``conversation_id`` (Agent v2 chat). Raises ``AskHumanFormBuildError`` on
invalid args, a missing owner, or a repository failure.
"""
if not workflow_run_id and not conversation_id:
raise AskHumanFormBuildError("an ask_human HITL form requires a workflow_run_id or conversation_id")
args = parse_ask_human_args(deferred_tool_call.args)
node_title = args.title or default_node_title
node_data = ask_human_args_to_node_data(args, node_title=node_title)
resolved_default_values = _resolved_default_values(args)
try:
form = repository.create_form(
FormCreateParams(
workflow_execution_id=workflow_run_id,
conversation_id=conversation_id,
node_id=node_id,
form_config=node_data,
# No workflow-variable placeholders to resolve — the content is
# fully model-authored, so rendered == template.
rendered_content=node_data.form_content,
delivery_methods=build_delivery_methods(contacts, args=args),
display_in_ui=display_in_ui,
resolved_default_values=resolved_default_values,
)
)
except ValueError as error:
raise AskHumanFormBuildError(f"failed to create ask_human HITL form: {error}") from error
return AskHumanFormCreated(
form_id=form.id,
args=args,
node_data=node_data,
node_title=node_title,
resolved_default_values=resolved_default_values,
)
def build_ask_human_pause_reason(
*,
deferred_tool_call: DeferredToolCallPayload,
node_id: str,
default_node_title: str,
workflow_run_id: str | None,
contacts: Sequence[AgentHumanContactConfig],
repository: HumanInputFormRepository,
conversation_id: str | None = None,
expected_tool_name: str = DEFAULT_ASK_HUMAN_TOOL_NAME,
display_in_ui: bool = True,
) -> HumanInputRequired | None:
"""Create a workflow HITL form for an ask_human call and return its pause reason.
Returns ``None`` when the deferred call is *not* the ask_human tool, letting
the caller fall back to a generic scheduling pause. Raises
``AskHumanFormBuildError`` when the call is ask_human but its args or the form
cannot be built the caller should surface that as a node failure rather
than a silent, unresumable pause.
"""
if deferred_tool_call.tool_name != expected_tool_name:
return None
if not workflow_run_id:
raise AskHumanFormBuildError("workflow_run_id is required to create an ask_human HITL form")
created = create_ask_human_form(
deferred_tool_call=deferred_tool_call,
node_id=node_id,
default_node_title=default_node_title,
contacts=contacts,
repository=repository,
workflow_run_id=workflow_run_id,
# A chatflow agent node also belongs to a conversation; tag the form so it is
# queryable per conversation. None for a pure workflow run (workflow_run_id only).
conversation_id=conversation_id,
display_in_ui=display_in_ui,
)
return HumanInputRequired(
form_id=created.form_id,
form_content=created.node_data.form_content,
inputs=list(created.node_data.inputs),
actions=list(created.node_data.user_actions),
node_id=node_id,
node_title=created.node_title,
resolved_default_values=created.resolved_default_values,
)
__all__ = [
"DEFAULT_ASK_HUMAN_TOOL_NAME",
"AskHumanFormBuildError",
"AskHumanFormCreated",
"ask_human_args_to_node_data",
"build_ask_human_pause_reason",
"build_delivery_methods",
"create_ask_human_form",
"parse_ask_human_args",
]

View File

@ -0,0 +1,159 @@
"""Resume an Agent run after a dify.ask_human HITL form reaches a terminal state.
ENG-638. When the outer workflow resumes (the human submitted the form, or it
timed out), graphon re-executes the Agent node's ``_run``. This module reads the
correlated HITL form (by ``pending_form_id``) and maps it back into the
agent-side ``dify.ask_human`` contract so the node can start a *second* Agent run
that carries the human's answer:
* submitted -> AskHumanToolResult(status="submitted", action, values)
* timeout / expired -> AskHumanToolResult(status="timeout")
* still waiting (defensive: the host resumed us early) -> re-emit the same
HumanInputRequired pause rebuilt from the stored form definition.
It only *reads* existing HITL form state it never mutates the form or the HITL
submission flow. The DB read (``resolve_ask_human_form``) is kept thin so the
mapping (``map_form_to_outcome``) stays pure and unit-testable.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from dify_agent.layers.ask_human import (
AskHumanResultStatus,
AskHumanSelectedAction,
AskHumanToolResult,
)
from dify_agent.protocol import DeferredToolResultsPayload
from pydantic import JsonValue
from sqlalchemy import select
from core.db.session_factory import session_factory
from graphon.entities.pause_reason import HumanInputRequired
from graphon.nodes.human_input.entities import FormDefinition
from graphon.nodes.human_input.enums import HumanInputFormStatus
from models.human_input import HumanInputForm
# A WAITING form has not been answered yet; the other terminal states map onto
# the agent-facing result status. EXPIRED (global timeout) and TIMEOUT
# (node-level) both surface to the model as "timeout" so it can react.
_FORM_STATUS_TO_RESULT: dict[HumanInputFormStatus, AskHumanResultStatus] = {
HumanInputFormStatus.SUBMITTED: "submitted",
HumanInputFormStatus.TIMEOUT: "timeout",
HumanInputFormStatus.EXPIRED: "timeout",
}
@dataclass(frozen=True, slots=True)
class AskHumanResumeOutcome:
"""Result of inspecting a correlated ask_human form on workflow resume.
Exactly one of ``deferred_result`` / ``repause`` is set:
* ``deferred_result`` the form reached a terminal state; start the second run.
* ``repause`` the form is still waiting; re-emit this pause defensively.
"""
deferred_result: AskHumanToolResult | None = None
repause: HumanInputRequired | None = None
def resolve_ask_human_form(*, form_id: str, tenant_id: str, node_id: str) -> AskHumanResumeOutcome | None:
"""Load the correlated form and map it to a resume outcome.
Returns ``None`` when the form no longer exists (correlation lost) the
caller should fall back to a normal (non-resume) Agent run.
"""
with session_factory.create_session() as session:
form = session.scalar(
select(HumanInputForm).where(
HumanInputForm.id == form_id,
HumanInputForm.tenant_id == tenant_id,
)
)
if form is None:
return None
return map_form_to_outcome(
status=form.status,
selected_action_id=form.selected_action_id,
submitted_data=form.submitted_data,
rendered_content=form.rendered_content,
form_definition=form.form_definition,
form_id=form.id,
node_id=node_id,
)
def map_form_to_outcome(
*,
status: HumanInputFormStatus,
selected_action_id: str | None,
submitted_data: str | None,
rendered_content: str,
form_definition: str,
form_id: str,
node_id: str,
) -> AskHumanResumeOutcome:
"""Map a terminal (or still-waiting) HITL form to a resume outcome. Pure."""
definition = FormDefinition.model_validate_json(form_definition)
if status == HumanInputFormStatus.WAITING:
return AskHumanResumeOutcome(repause=_rebuild_pause(definition=definition, form_id=form_id, node_id=node_id))
result_status = _FORM_STATUS_TO_RESULT.get(status, "unavailable")
if result_status != "submitted":
return AskHumanResumeOutcome(deferred_result=AskHumanToolResult(status=result_status))
return AskHumanResumeOutcome(
deferred_result=AskHumanToolResult(
status="submitted",
action=_selected_action(selected_action_id=selected_action_id, definition=definition),
values=_submitted_values(submitted_data),
rendered_content=rendered_content,
)
)
def build_deferred_tool_results(*, tool_call_id: str, result: AskHumanToolResult) -> DeferredToolResultsPayload:
"""Wrap an ask_human result as the deferred-tool-results payload for resume."""
return DeferredToolResultsPayload(calls={tool_call_id: result.model_dump(mode="json")})
def _submitted_values(submitted_data: str | None) -> dict[str, JsonValue]:
if not submitted_data:
return {}
parsed = json.loads(submitted_data)
if not isinstance(parsed, dict):
return {}
return {str(key): value for key, value in parsed.items()}
def _selected_action(*, selected_action_id: str | None, definition: FormDefinition) -> AskHumanSelectedAction | None:
if selected_action_id is None:
return None
# The form's user_action title is the verbatim ask_human action label set at
# form-build time; fall back to the id only if the action is somehow missing.
label = next(
(action.title for action in definition.user_actions if action.id == selected_action_id),
selected_action_id,
)
return AskHumanSelectedAction(id=selected_action_id, label=label)
def _rebuild_pause(*, definition: FormDefinition, form_id: str, node_id: str) -> HumanInputRequired:
return HumanInputRequired(
form_id=form_id,
form_content=definition.rendered_content or definition.form_content,
inputs=list(definition.inputs),
actions=list(definition.user_actions),
node_id=node_id,
node_title=definition.node_title or node_id,
resolved_default_values=dict(definition.default_values),
)
__all__ = [
"AskHumanResumeOutcome",
"build_deferred_tool_results",
"map_form_to_outcome",
"resolve_ask_human_form",
]

View File

@ -18,13 +18,15 @@ SUPPORTED_AGENT_BACKEND_FEATURES = frozenset(
# ENG-623: exposed at runtime as the dify.drive declaration layer
# (an index the agent pulls through the back proxy).
"skills_files",
# ENG-635: human involvement is exposed at runtime as the dify.ask_human
# deferred tool; a call pauses via the existing HITL form mechanism.
"human",
}
)
RESERVED_AGENT_BACKEND_FEATURES = frozenset(
{
"knowledge",
"human",
"memory",
}
)
@ -85,6 +87,7 @@ def build_runtime_feature_manifest(
reserved_status["tools.cli_tools"] = "supported_by_shell_bootstrap"
reserved_status["env"] = "supported_by_shell_bootstrap"
reserved_status["sandbox"] = "forwarded_to_shell_layer_config"
reserved_status["human"] = "supported_by_ask_human_hitl" if agent_soul.human.contacts else "not_configured"
return {
"supported": sorted(SUPPORTED_AGENT_BACKEND_FEATURES),

View File

@ -5,6 +5,7 @@ from dataclasses import dataclass
from typing import Any, Literal, Protocol, assert_never, cast
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.layers.ask_human import DifyAskHumanLayerConfig
from dify_agent.layers.drive import (
DifyDriveFileConfig,
DifyDriveLayerConfig,
@ -22,7 +23,7 @@ from dify_agent.layers.shell import (
DifyShellSandboxConfig,
DifyShellSecretRefConfig,
)
from dify_agent.protocol import CreateRunRequest
from dify_agent.protocol import CreateRunRequest, DeferredToolResultsPayload
from pydantic import BaseModel
from clients.agent_backend import (
@ -103,6 +104,9 @@ class WorkflowAgentRuntimeBuildContext:
# idempotency key so the backend treats each retry as a fresh request.
attempt: int = 0
session_snapshot: CompositorSessionSnapshot | None = None
# ENG-638: set when resuming after a submitted ask_human HITL form; threads
# the human's answer back into the second Agent run keyed by tool_call_id.
deferred_tool_results: DeferredToolResultsPayload | None = None
@dataclass(frozen=True, slots=True)
@ -217,9 +221,11 @@ class WorkflowAgentRuntimeRequestBuilder:
output=self._build_output_config(node_job.declared_outputs),
tools=tools_layer,
drive_config=drive_config,
ask_human_config=build_ask_human_layer_config(agent_soul),
include_shell=dify_config.AGENT_SHELL_ENABLED,
shell_config=build_shell_layer_config(agent_soul),
session_snapshot=context.session_snapshot,
deferred_tool_results=context.deferred_tool_results,
idempotency_key=self._idempotency_key(context),
metadata=metadata,
)
@ -496,6 +502,20 @@ def build_shell_layer_config(agent_soul: AgentSoulConfig) -> DifyShellLayerConfi
)
def build_ask_human_layer_config(agent_soul: AgentSoulConfig) -> DifyAskHumanLayerConfig | None:
"""Enable the dify.ask_human deferred tool when the soul configures human involvement.
HITL is opt-in: only when at least one human contact is configured does the
model get the ``ask_human`` tool (recipients for the resulting form come from
those contacts, ENG-635). Returns ``None`` to leave the tool off entirely.
The tool/field guardrails use the layer defaults; ``human.tools`` semantics are
out of scope this round.
"""
if not agent_soul.human.contacts:
return None
return DifyAskHumanLayerConfig()
def append_runtime_warnings(metadata: dict[str, Any], warnings: list[dict[str, str]]) -> None:
"""Merge build-time warnings into the metadata runtime-support manifest."""
if not warnings:

View File

@ -47,12 +47,20 @@ class StoredWorkflowAgentSession:
session_snapshot: CompositorSessionSnapshot
backend_run_id: str | None
runtime_layer_specs: list[RuntimeLayerSpec] = field(default_factory=list)
# ENG-637: set while the session is paused on a dify.ask_human deferred call.
pending_form_id: str | None = None
pending_tool_call_id: str | None = None
class WorkflowAgentRuntimeSessionStore:
"""Stores Agent backend session snapshots for workflow Agent node re-entry."""
def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None:
stored = self.load_active_session(scope)
return stored.session_snapshot if stored is not None else None
def load_active_session(self, scope: WorkflowAgentSessionScope) -> StoredWorkflowAgentSession | None:
"""Load the active session row including any pending ask_human correlation."""
if scope.workflow_run_id is None:
return None
@ -69,7 +77,14 @@ class WorkflowAgentRuntimeSessionStore:
)
if row is None:
return None
return CompositorSessionSnapshot.model_validate_json(row.session_snapshot)
return StoredWorkflowAgentSession(
scope=scope,
session_snapshot=CompositorSessionSnapshot.model_validate_json(row.session_snapshot),
backend_run_id=row.backend_run_id,
runtime_layer_specs=_deserialize_specs(row.composition_layer_specs),
pending_form_id=row.pending_form_id,
pending_tool_call_id=row.pending_tool_call_id,
)
def list_active_sessions(self, *, workflow_run_id: str) -> list[StoredWorkflowAgentSession]:
with session_factory.create_session() as session:
@ -109,6 +124,8 @@ class WorkflowAgentRuntimeSessionStore:
backend_run_id: str,
snapshot: CompositorSessionSnapshot | None,
runtime_layer_specs: list[RuntimeLayerSpec],
pending_form_id: str | None = None,
pending_tool_call_id: str | None = None,
) -> None:
if scope.workflow_run_id is None or snapshot is None:
return
@ -141,6 +158,8 @@ class WorkflowAgentRuntimeSessionStore:
session_snapshot=snapshot_json,
composition_layer_specs=specs_json,
status=WorkflowAgentRuntimeSessionStatus.ACTIVE,
pending_form_id=pending_form_id,
pending_tool_call_id=pending_tool_call_id,
)
session.add(row)
else:
@ -151,6 +170,9 @@ class WorkflowAgentRuntimeSessionStore:
row.composition_layer_specs = specs_json
row.status = WorkflowAgentRuntimeSessionStatus.ACTIVE
row.cleaned_at = None
# Set (or clear, when omitted) the ask_human pause correlation.
row.pending_form_id = pending_form_id
row.pending_tool_call_id = pending_tool_call_id
session.commit()
def mark_cleaned(self, *, scope: WorkflowAgentSessionScope, backend_run_id: str | None = None) -> None:

View File

@ -153,6 +153,7 @@ def init_app(app: DifyApp) -> Celery:
"tasks.trigger_processing_tasks", # async trigger processing
"tasks.generate_summary_index_task", # summary index generation
"tasks.regenerate_summary_index_task", # summary index regeneration
"tasks.app_generate.resume_agent_app_task", # ENG-635: Agent v2 chat ask_human resume
]
day = dify_config.CELERY_BEAT_SCHEDULER_TIME

View File

@ -0,0 +1,32 @@
"""add ask_human pause correlation to agent_runtime_sessions
Revision ID: c167a72a00eb
Revises: c4d5e6f7a8b9
Create Date: 2026-06-15 10:52:15.736666
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'c167a72a00eb'
down_revision = 'c4d5e6f7a8b9'
branch_labels = None
depends_on = None
def upgrade():
# ENG-637: correlate a paused dify.ask_human session to its awaiting HITL
# form and the deferred tool_call id, so the resumed Agent node can rebuild
# deferred_tool_results from the submitted form. Both columns are nullable
# and NULL whenever the session is not paused on human input.
with op.batch_alter_table('agent_runtime_sessions', schema=None) as batch_op:
batch_op.add_column(sa.Column('pending_form_id', models.types.StringUUID(), nullable=True))
batch_op.add_column(sa.Column('pending_tool_call_id', sa.String(length=255), nullable=True))
def downgrade():
with op.batch_alter_table('agent_runtime_sessions', schema=None) as batch_op:
batch_op.drop_column('pending_tool_call_id')
batch_op.drop_column('pending_form_id')

View File

@ -0,0 +1,29 @@
"""add conversation_id to human_input_forms for agent v2 chat hitl
Revision ID: d2f1a4b8c3e0
Revises: c167a72a00eb
Create Date: 2026-06-15 11:10:00.000000
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'd2f1a4b8c3e0'
down_revision = 'c167a72a00eb'
branch_labels = None
depends_on = None
def upgrade():
# ENG-635: Agent v2 chat ask_human forms are owned by a conversation turn
# instead of a workflow run (the new Agent App has no workflow_run_id).
# Nullable; existing workflow-owned forms keep conversation_id NULL.
with op.batch_alter_table('human_input_forms', schema=None) as batch_op:
batch_op.add_column(sa.Column('conversation_id', models.types.StringUUID(), nullable=True))
def downgrade():
with op.batch_alter_table('human_input_forms', schema=None) as batch_op:
batch_op.drop_column('conversation_id')

View File

@ -398,6 +398,12 @@ class AgentRuntimeSession(DefaultFieldsMixin, Base):
default=AgentRuntimeSessionStatus.ACTIVE,
)
cleaned_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
# ENG-637: when a run pauses for a dify.ask_human deferred call, these link
# the session to the awaiting HITL form and the deferred tool_call_id, so a
# resumed node can map the submitted form back into deferred_tool_results.
# Both NULL whenever the session is not paused on human input.
pending_form_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
pending_tool_call_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
# Back-compat alias for the shipped workflow lifecycle code (PR #36724).

View File

@ -40,6 +40,13 @@ class HumanInputForm(DefaultFieldsMixin, Base):
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
# ENG-635: a RUNTIME form is tagged with its owning workflow run and/or its
# conversation. Workflow / Human-Input / agent-node forms always set
# workflow_run_id, and ALSO set conversation_id when the run has a conversation
# (chatflow / advanced-chat). Agent v2 chat ask_human forms set only
# conversation_id (the new Agent App has no workflow_run_id). At least one is set;
# resume routing prefers workflow_run_id when both are present.
conversation_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
form_kind: Mapped[HumanInputFormKind] = mapped_column(
EnumText(HumanInputFormKind),
nullable=False,

View File

@ -223,9 +223,12 @@ class HumanInputService:
if result.form_kind != HumanInputFormKind.RUNTIME:
return
if result.workflow_run_id is None:
return
self.enqueue_resume(result.workflow_run_id)
# A RUNTIME form is owned by a workflow run (workflow Agent node) or a
# conversation (ENG-635: Agent v2 chat). Route the resume accordingly.
if result.workflow_run_id is not None:
self.enqueue_resume(result.workflow_run_id)
elif result.conversation_id is not None:
self.enqueue_agent_app_resume(conversation_id=result.conversation_id, form_id=result.form_id)
def ensure_form_active(self, form: Form) -> None:
if form.submitted:
@ -286,6 +289,22 @@ class HumanInputService:
logger.warning("App mode %s does not support resume for workflow run %s", app.mode, workflow_run_id)
def enqueue_agent_app_resume(self, *, conversation_id: str, form_id: str) -> None:
"""ENG-635: resume an Agent v2 chat after its ask_human form is submitted.
Enqueues a background turn for the conversation; the Agent App runner
continues the agent run, threading the human's reply into the request as
``deferred_tool_results``.
"""
from tasks.app_generate.resume_agent_app_task import resume_agent_app_execution
try:
resume_agent_app_execution.apply_async(
kwargs={"conversation_id": conversation_id, "form_id": form_id},
)
except Exception: # pragma: no cover
logger.exception("Failed to enqueue Agent App resume for conversation %s form %s", conversation_id, form_id)
def _load_variable_pool_for_form(self, form: Form) -> ReadOnlyVariablePool | None:
workflow_run_id = form.workflow_run_id
if workflow_run_id is None:

View File

@ -0,0 +1,70 @@
"""Background resume of an Agent v2 chat after a submitted ask_human HITL form.
ENG-635. When a human submits a conversation-owned ask_human form (delivered via
email/webapp), ``HumanInputService`` enqueues this task. It reconstructs the
conversation context and runs one blocking Agent App turn; the runner detects the
answered form and continues the agent run with the human's reply
(``deferred_tool_results``), persisting the assistant answer to the conversation.
"""
from __future__ import annotations
import logging
from celery import shared_task
from core.app.apps.agent_app.app_generator import AgentAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from extensions.ext_database import db
from models.account import Account
from models.human_input import HumanInputForm
from models.model import App, Conversation, EndUser
from tasks.app_generate.workflow_execute_task import WORKFLOW_BASED_APP_EXECUTION_QUEUE
logger = logging.getLogger(__name__)
@shared_task(queue=WORKFLOW_BASED_APP_EXECUTION_QUEUE, name="resume_agent_app_execution")
def resume_agent_app_execution(*, conversation_id: str, form_id: str) -> None:
form = db.session.get(HumanInputForm, form_id)
if form is None or form.conversation_id != conversation_id:
logger.warning("Agent App resume: form %s missing or conversation mismatch", form_id)
return
app_model = db.session.get(App, form.app_id)
if app_model is None:
logger.warning("Agent App resume: app %s not found for form %s", form.app_id, form_id)
return
conversation = db.session.get(Conversation, conversation_id)
if conversation is None:
logger.warning("Agent App resume: conversation %s not found", conversation_id)
return
user = _resolve_conversation_user(app_model=app_model, conversation=conversation)
if user is None:
logger.warning("Agent App resume: no user resolvable for conversation %s", conversation_id)
return
try:
AgentAppGenerator().resume_after_form_submission(
app_model=app_model,
user=user,
conversation_id=conversation_id,
invoke_from=InvokeFrom.WEB_APP,
)
except Exception:
logger.exception("Agent App resume failed for conversation %s form %s", conversation_id, form_id)
finally:
db.session.close()
def _resolve_conversation_user(*, app_model: App, conversation: Conversation) -> Account | EndUser | None:
if conversation.from_account_id:
account = db.session.get(Account, conversation.from_account_id)
if account is not None:
account.set_tenant_id(app_model.tenant_id)
return account
if conversation.from_end_user_id:
return db.session.get(EndUser, conversation.from_end_user_id)
return None

View File

@ -327,3 +327,49 @@ def test_agent_app_request_builder_adds_shell_layer_when_include_shell():
assert layers[DIFY_SHELL_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
shell_config = cast(DifyShellLayerConfig, layers[DIFY_SHELL_LAYER_ID].config)
assert shell_config.env[0].name == "APP_ENV"
# ── ENG-635 / ENG-638: ask_human layer injection + deferred_tool_results ─────
def test_ask_human_layer_injected_when_configured():
from dify_agent.layers.ask_human import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig
from clients.agent_backend.request_builder import DIFY_ASK_HUMAN_LAYER_ID
run_input = _run_input().model_copy(update={"ask_human_config": DifyAskHumanLayerConfig()})
request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input)
layers = {layer.name: layer for layer in request.composition.layers}
assert DIFY_ASK_HUMAN_LAYER_ID in layers
assert layers[DIFY_ASK_HUMAN_LAYER_ID].type == DIFY_ASK_HUMAN_LAYER_TYPE_ID
# the deferred tool needs the history layer to resume, so history must precede it
names = [layer.name for layer in request.composition.layers]
assert names.index(DIFY_AGENT_HISTORY_LAYER_ID) < names.index(DIFY_ASK_HUMAN_LAYER_ID)
def test_no_ask_human_layer_when_unconfigured():
from clients.agent_backend.request_builder import DIFY_ASK_HUMAN_LAYER_ID
request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input())
assert all(layer.name != DIFY_ASK_HUMAN_LAYER_ID for layer in request.composition.layers)
def test_deferred_tool_results_threaded_into_request():
from dify_agent.protocol import DeferredToolResultsPayload
payload = DeferredToolResultsPayload(
calls={
"tool-call-1": {
"status": "submitted",
"action": {"id": "submit", "label": "Submit"},
"values": {"x": "y"},
}
}
)
run_input = _run_input().model_copy(update={"deferred_tool_results": payload})
request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input)
assert request.deferred_tool_results is not None
assert "tool-call-1" in request.deferred_tool_results.calls

View File

@ -238,3 +238,56 @@ class TestGenerateWorker:
queue_manager = mocker.MagicMock()
self._call(generator, mocker, queue_manager)
assert queue_manager.publish_error.called
class TestResumeAfterFormSubmission:
"""ENG-638: a resume turn re-sends the paused turn's original query so the
composition's user-prompt layer matches the suspended snapshot (never blank)."""
def _wire(self, generator, mocker: MockerFixture):
generator._resolve_agent = mocker.MagicMock(
return_value=(mocker.MagicMock(id="agent1"), mocker.MagicMock(id="snap1"), mocker.MagicMock())
)
generator._init_generate_records = mocker.MagicMock(
return_value=(mocker.MagicMock(id="conv", mode="agent"), mocker.MagicMock(id="msg"))
)
generator._handle_response = mocker.MagicMock(return_value=None)
mocker.patch(f"{MODULE}.ConversationService.get_conversation", return_value=mocker.MagicMock(id="conv"))
mocker.patch(f"{MODULE}.AgentAppConfigManager.get_app_config", return_value=mocker.MagicMock(variables=[]))
mocker.patch(f"{MODULE}.ModelConfigConverter.convert", return_value=mocker.MagicMock())
mocker.patch(f"{MODULE}.TraceQueueManager", return_value=mocker.MagicMock())
mocker.patch(f"{MODULE}.MessageBasedAppQueueManager", return_value=mocker.MagicMock())
mocker.patch(f"{MODULE}.threading.Thread", return_value=mocker.MagicMock())
return mocker.patch(
f"{MODULE}.AgentAppGenerateEntity", return_value=mocker.MagicMock(task_id="t", user_id="user")
)
def test_resume_resends_paused_turn_query(self, generator, mocker: MockerFixture):
entity = self._wire(generator, mocker)
db_mock = mocker.patch(f"{MODULE}.db")
db_mock.session.scalar.return_value = mocker.MagicMock(query="original question")
generator.resume_after_form_submission(
app_model=mocker.MagicMock(id="app1", tenant_id="tenant", mode="agent"),
user=DummyAccount("user"),
conversation_id="conv",
invoke_from=InvokeFrom.WEB_APP,
)
# The paused turn's query is re-sent verbatim — never blank.
assert entity.call_args.kwargs["query"] == "original question"
def test_resume_falls_back_to_placeholder_when_no_paused_message(self, generator, mocker: MockerFixture):
entity = self._wire(generator, mocker)
db_mock = mocker.patch(f"{MODULE}.db")
db_mock.session.scalar.return_value = None
generator.resume_after_form_submission(
app_model=mocker.MagicMock(id="app1", tenant_id="tenant", mode="agent"),
user=DummyAccount("user"),
conversation_id="conv",
invoke_from=InvokeFrom.WEB_APP,
)
# No prior user message -> a non-blank placeholder, still never blank.
assert entity.call_args.kwargs["query"] == "(resumed)"

View File

@ -6,9 +6,11 @@ from __future__ import annotations
from types import SimpleNamespace
from typing import Any, override
from unittest.mock import MagicMock
import pytest
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.layers.ask_human import AskHumanToolResult
from dify_agent.protocol import CancelRunRequest, CancelRunResponse, RuntimeLayerSpec
from clients.agent_backend import (
@ -19,10 +21,11 @@ from clients.agent_backend import (
)
from core.app.apps.agent_app.app_runner import AgentAppRunner
from core.app.apps.agent_app.runtime_request_builder import AgentAppRuntimeRequestBuilder
from core.app.apps.agent_app.session_store import AgentAppSessionScope
from core.app.apps.agent_app.session_store import AgentAppSessionScope, StoredAgentAppSession
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom
from core.app.entities.queue_entities import QueueLLMChunkEvent, QueueMessageEndEvent
from core.workflow.nodes.agent_v2.ask_human_resume import AskHumanResumeOutcome
from models.agent_config_entities import AgentSoulConfig
@ -65,17 +68,47 @@ class _RecordingFakeAgentBackendRunClient(FakeAgentBackendRunClient):
class _FakeSessionStore:
def __init__(self, loaded: CompositorSessionSnapshot | None = None) -> None:
def __init__(
self,
loaded: CompositorSessionSnapshot | None = None,
loaded_session: StoredAgentAppSession | None = None,
) -> None:
self.loaded = loaded
self._loaded_session = loaded_session
self.saved: list[
tuple[AgentAppSessionScope, str, CompositorSessionSnapshot | None, list[RuntimeLayerSpec]]
tuple[
AgentAppSessionScope,
str,
CompositorSessionSnapshot | None,
list[RuntimeLayerSpec],
str | None,
str | None,
]
] = []
def load_active_snapshot(self, scope: AgentAppSessionScope) -> CompositorSessionSnapshot | None:
return self.loaded
def save_active_snapshot(self, *, scope, backend_run_id, snapshot, runtime_layer_specs) -> None:
self.saved.append((scope, backend_run_id, snapshot, list(runtime_layer_specs)))
def load_active_session(self, scope: AgentAppSessionScope) -> StoredAgentAppSession | None:
if self._loaded_session is not None:
return self._loaded_session
if self.loaded is None:
return None
return StoredAgentAppSession(scope=scope, session_snapshot=self.loaded, backend_run_id=None)
def save_active_snapshot(
self,
*,
scope,
backend_run_id,
snapshot,
runtime_layer_specs,
pending_form_id=None,
pending_tool_call_id=None,
) -> None:
self.saved.append(
(scope, backend_run_id, snapshot, list(runtime_layer_specs), pending_form_id, pending_tool_call_id)
)
def _soul() -> AgentSoulConfig:
@ -144,11 +177,14 @@ def test_successful_turn_publishes_chunk_and_message_end_and_saves_session():
assert end_events[0].llm_result.model == "gpt-4o-mini"
# The conversation session snapshot is persisted for multi-turn continuity.
assert store.saved
saved_scope, saved_run_id, saved_snapshot, saved_specs = store.saved[0]
saved_scope, saved_run_id, saved_snapshot, saved_specs, pending_form_id, pending_tool_call_id = store.saved[0]
assert saved_scope.conversation_id == "conv-1"
assert saved_scope.agent_config_snapshot_id == "snap-1"
assert saved_run_id == "fake-run-1"
assert saved_snapshot is not None
# A successful turn carries no ask_human pause correlation.
assert pending_form_id is None
assert pending_tool_call_id is None
assert [spec.name for spec in saved_specs] == [
"agent_soul_prompt",
"agent_app_user_prompt",
@ -197,3 +233,68 @@ def test_extract_answer_handles_plain_string_and_dict():
assert AgentAppRunner._extract_answer("plain text") == "plain text"
assert AgentAppRunner._extract_answer({"text": "hi"}) == "hi"
assert AgentAppRunner._extract_answer({"a": 1}) == '{"a": 1}'
def test_ask_human_pauses_turn_creates_form_and_persists_correlation():
# ENG-635/637: the PAUSED scenario emits a dify.ask_human deferred call, so
# the chat turn ends by creating a conversation-owned HITL form + saving the
# pause correlation, instead of crashing. Stub the form repo (DB-free).
client = FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario.PAUSED)
store = _FakeSessionStore()
qm = _FakeQueueManager()
runner = _runner(client, store)
fake_repo = MagicMock()
fake_repo.create_form.return_value = MagicMock(id="form-1")
runner._build_form_repository = lambda dify_context: fake_repo # type: ignore[assignment]
_run(runner, qm)
# The conversation-owned form was created and the agent's question surfaced.
fake_repo.create_form.assert_called_once()
created_params = fake_repo.create_form.call_args.args[0]
assert created_params.conversation_id == "conv-1"
assert created_params.workflow_execution_id is None
assert [e for e in qm.events if isinstance(e, QueueMessageEndEvent)]
# The pause correlation is persisted so a form submission can resume the run.
assert store.saved
assert store.saved[0][4] == "form-1"
assert store.saved[0][5] == "fake-ask-human-1"
def test_submitted_form_resumes_turn_with_deferred_tool_results(monkeypatch):
# ENG-638: a turn that runs while a pending form is answered threads the
# human's reply into the request as deferred_tool_results.
snapshot = CompositorSessionSnapshot(layers=[])
stored = StoredAgentAppSession(
scope=AgentAppSessionScope(
tenant_id="tenant-1",
app_id="app-1",
conversation_id="conv-1",
agent_id="agent-1",
agent_config_snapshot_id="snap-1",
),
session_snapshot=snapshot,
backend_run_id="run-0",
pending_form_id="form-1",
pending_tool_call_id="call-1",
)
store = _FakeSessionStore(loaded_session=stored)
submitted = AskHumanResumeOutcome(deferred_result=AskHumanToolResult(status="submitted", values={"ok": True}))
monkeypatch.setattr(
"core.app.apps.agent_app.app_runner.resolve_ask_human_form",
lambda **_kwargs: submitted,
)
client = FakeAgentBackendRunClient() # SUCCESS -> the resumed run completes
qm = _FakeQueueManager()
_run(_runner(client, store), qm)
assert client.request is not None
assert client.request.deferred_tool_results is not None
assert set(client.request.deferred_tool_results.calls) == {"call-1"}
# ENG-638: the resume composition must keep the user-prompt layer so it
# matches the suspended snapshot's layer names (the agent backend rejects a
# mismatch). A resume therefore re-sends a non-blank query, never blank.
layer_names = [layer.name for layer in client.request.composition.layers]
assert "agent_app_user_prompt" in layer_names

View File

@ -288,6 +288,7 @@ class _DummyForm:
form_definition: str
rendered_content: str
expiration_time: datetime
conversation_id: str | None = None
form_kind: HumanInputFormKind = HumanInputFormKind.RUNTIME
created_at: datetime = dataclasses.field(default_factory=naive_utc_now)
selected_action_id: str | None = None

View File

@ -73,6 +73,7 @@ class _DummyForm:
form_definition: str
rendered_content: str
expiration_time: datetime
conversation_id: str | None = None
form_kind: HumanInputFormKind = HumanInputFormKind.RUNTIME
created_at: datetime = dataclasses.field(default_factory=naive_utc_now)
selected_action_id: str | None = None

View File

@ -1,8 +1,9 @@
from types import SimpleNamespace
from typing import cast
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.layers.ask_human import AskHumanToolResult
from dify_agent.protocol import RunStartedEvent, RunSucceededEvent, RunSucceededEventData
from clients.agent_backend import (
@ -15,15 +16,22 @@ from clients.agent_backend import (
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext, InvokeFrom, UserFrom
from core.workflow.file_reference import build_file_reference
from core.workflow.nodes.agent_v2 import DifyAgentNode
from core.workflow.nodes.agent_v2.ask_human_resume import AskHumanResumeOutcome
from core.workflow.nodes.agent_v2.binding_resolver import WorkflowAgentBindingBundle, WorkflowAgentBindingResolver
from core.workflow.nodes.agent_v2.entities import DifyAgentNodeData
from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter
from core.workflow.nodes.agent_v2.runtime_request_builder import WorkflowAgentRuntimeRequestBuilder
from core.workflow.nodes.agent_v2.session_store import WorkflowAgentRuntimeSessionStore, WorkflowAgentSessionScope
from core.workflow.nodes.agent_v2.session_store import (
StoredWorkflowAgentSession,
WorkflowAgentRuntimeSessionStore,
WorkflowAgentSessionScope,
)
from graphon.entities import GraphInitParams
from graphon.entities.pause_reason import HumanInputRequired
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.file import File, FileTransferMethod, FileType
from graphon.node_events import PauseRequestedEvent, StreamCompletedEvent
from graphon.nodes.human_input.entities import UserActionConfig
from graphon.runtime import GraphRuntimeState
from graphon.variables.segments import ArrayFileSegment, FileSegment, StringSegment
from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding
@ -113,12 +121,16 @@ class FakeBindingResolver(WorkflowAgentBindingResolver):
class FakeSessionStore:
def __init__(self, snapshot: CompositorSessionSnapshot | None = None) -> None:
self.loaded_snapshot = snapshot
# ENG-638: set to simulate resume after a submitted/timed-out form.
self.loaded_session: StoredWorkflowAgentSession | None = None
self.saved: list[
tuple[
WorkflowAgentSessionScope,
str,
CompositorSessionSnapshot | None,
list[RuntimeLayerSpec],
str | None,
str | None,
]
] = []
self.cleaned: list[tuple[WorkflowAgentSessionScope, str | None]] = []
@ -126,6 +138,9 @@ class FakeSessionStore:
def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None:
return self.loaded_snapshot
def load_active_session(self, scope: WorkflowAgentSessionScope) -> StoredWorkflowAgentSession | None:
return self.loaded_session
def save_active_snapshot(
self,
*,
@ -133,8 +148,12 @@ class FakeSessionStore:
backend_run_id: str,
snapshot: CompositorSessionSnapshot | None,
runtime_layer_specs: list[RuntimeLayerSpec],
pending_form_id: str | None = None,
pending_tool_call_id: str | None = None,
) -> None:
self.saved.append((scope, backend_run_id, snapshot, list(runtime_layer_specs)))
self.saved.append(
(scope, backend_run_id, snapshot, list(runtime_layer_specs), pending_form_id, pending_tool_call_id)
)
def mark_cleaned(
self,
@ -378,10 +397,13 @@ def test_agent_node_saves_success_snapshot_and_reuses_existing_snapshot():
assert len(events) == 1
assert store.saved
scope, backend_run_id, saved_snapshot, saved_specs = store.saved[0]
scope, backend_run_id, saved_snapshot, saved_specs, pending_form_id, pending_tool_call_id = store.saved[0]
assert scope.workflow_run_id == "workflow-run-1"
assert backend_run_id == "fake-run-1"
assert saved_snapshot is not None
# A successful terminal carries no ask_human pause correlation.
assert pending_form_id is None
assert pending_tool_call_id is None
assert client.request is not None
assert client.request.session_snapshot is existing_snapshot
# Persist enough composition shape to replay a cleanup run; plugin layers
@ -462,13 +484,100 @@ def test_agent_node_paused_run_requests_workflow_pause_and_persists_snapshot():
store = FakeSessionStore()
node = _node(scenario=FakeAgentBackendScenario.PAUSED, session_store=store)
# ENG-636: the PAUSED scenario emits a dify.ask_human deferred call, so the
# node now builds a HITL form and pauses with HumanInputRequired. Stub the
# form repository so the unit test stays DB-free.
fake_repo = MagicMock()
fake_repo.create_form.return_value = MagicMock(id="form-1")
node._build_human_input_form_repository = lambda *, dify_ctx, workflow_run_id: fake_repo # type: ignore[assignment]
events = list(node._run())
assert len(events) == 1
assert isinstance(events[0], PauseRequestedEvent)
assert isinstance(events[0].reason, HumanInputRequired)
assert events[0].reason.form_id == "form-1"
assert events[0].reason.node_id == "agent-node"
fake_repo.create_form.assert_called_once()
assert store.saved
assert store.saved[0][1] == "fake-run-1"
assert store.saved[0][3], "paused agent run should still persist replayable layer specs"
# ENG-637: the awaiting form + deferred tool_call correlation is persisted.
assert store.saved[0][4] == "form-1"
assert store.saved[0][5] == "fake-ask-human-1"
def _pending_session(snapshot: CompositorSessionSnapshot) -> StoredWorkflowAgentSession:
return StoredWorkflowAgentSession(
scope=WorkflowAgentSessionScope(
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
workflow_run_id="workflow-run-1",
node_id="agent-node",
node_execution_id="exec-1",
binding_id="binding-1",
agent_id="agent-1",
agent_config_snapshot_id="snapshot-1",
),
session_snapshot=snapshot,
backend_run_id="run-0",
pending_form_id="form-1",
pending_tool_call_id="call-1",
)
def test_agent_node_resumes_with_deferred_tool_results_after_submitted_form(monkeypatch):
# ENG-638: a submitted form re-enters _run; the human's answer is threaded
# into the second Agent run as deferred_tool_results.
snapshot = CompositorSessionSnapshot(layers=[])
store = FakeSessionStore(snapshot=snapshot)
store.loaded_session = _pending_session(snapshot)
def _fake_resolve(*, form_id: str, tenant_id: str, node_id: str) -> AskHumanResumeOutcome:
assert form_id == "form-1"
return AskHumanResumeOutcome(deferred_result=AskHumanToolResult(status="submitted", values={"note": "ok"}))
monkeypatch.setattr("core.workflow.nodes.agent_v2.agent_node.resolve_ask_human_form", _fake_resolve)
client = FakeAgentBackendRunClient() # SUCCESS scenario -> second run completes
node = _node(agent_backend_client=client, session_store=store)
events = list(node._run())
assert client.request is not None
assert client.request.deferred_tool_results is not None
assert set(client.request.deferred_tool_results.calls) == {"call-1"}
assert any(isinstance(event, StreamCompletedEvent) for event in events)
def test_agent_node_repauses_when_resumed_form_still_waiting(monkeypatch):
snapshot = CompositorSessionSnapshot(layers=[])
store = FakeSessionStore(snapshot=snapshot)
store.loaded_session = _pending_session(snapshot)
repause = HumanInputRequired(
form_id="form-1",
form_content="Approve?",
inputs=[],
actions=[UserActionConfig(id="ok", title="OK")],
node_id="agent-node",
node_title="Budget review",
)
monkeypatch.setattr(
"core.workflow.nodes.agent_v2.agent_node.resolve_ask_human_form",
lambda **_kwargs: AskHumanResumeOutcome(repause=repause),
)
client = FakeAgentBackendRunClient()
node = _node(agent_backend_client=client, session_store=store)
events = list(node._run())
assert len(events) == 1
assert isinstance(events[0], PauseRequestedEvent)
assert isinstance(events[0].reason, HumanInputRequired)
assert client.request is None # no second Agent run was created
def test_agent_node_records_stream_usage_metadata():

View File

@ -0,0 +1,338 @@
"""Unit tests for the ask_human -> HITL form translation layer (ENG-636)."""
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock
import pytest
from dify_agent.layers.ask_human import AskHumanToolArgs
from dify_agent.protocol import DeferredToolCallPayload
from core.repositories.human_input_repository import FormCreateParams, HumanInputFormRepository
from core.workflow.human_input_adapter import (
EmailDeliveryMethod,
ExternalRecipient,
InteractiveSurfaceDeliveryMethod,
)
from core.workflow.nodes.agent_v2.ask_human_hitl import (
AskHumanFormBuildError,
ask_human_args_to_node_data,
build_ask_human_pause_reason,
build_delivery_methods,
parse_ask_human_args,
)
from graphon.nodes.human_input.entities import (
FileInputConfig,
FileListInputConfig,
ParagraphInputConfig,
SelectInputConfig,
)
from graphon.nodes.human_input.enums import ButtonStyle, TimeoutUnit
from models.agent_config_entities import AgentHumanContactConfig
def _args(**overrides: Any) -> AskHumanToolArgs:
payload: dict[str, Any] = {"question": "Approve the budget?"}
payload.update(overrides)
return AskHumanToolArgs.model_validate(payload)
def _deferred_call(args: dict[str, Any], *, tool_name: str = "ask_human") -> DeferredToolCallPayload:
return DeferredToolCallPayload(tool_call_id="call-1", tool_name=tool_name, args=args)
def _fake_repository(form_id: str = "form-123") -> MagicMock:
repo = MagicMock(spec=HumanInputFormRepository)
repo.create_form.return_value = MagicMock(id=form_id)
return repo
# ─────────────────────────── parse_ask_human_args ───────────────────────────
def test_parse_ask_human_args_from_mapping() -> None:
parsed = parse_ask_human_args({"question": "Need a decision"})
assert isinstance(parsed, AskHumanToolArgs)
assert parsed.question == "Need a decision"
def test_parse_ask_human_args_passthrough() -> None:
original = _args()
assert parse_ask_human_args(original) is original
def test_parse_ask_human_args_invalid_payload_raises() -> None:
with pytest.raises(AskHumanFormBuildError):
parse_ask_human_args({"question": ""}) # blank question is rejected
def test_parse_ask_human_args_non_mapping_raises() -> None:
with pytest.raises(AskHumanFormBuildError):
parse_ask_human_args("not a mapping")
# ───────────────────────── ask_human_args_to_node_data ──────────────────────
def test_node_data_maps_every_field_type() -> None:
args = _args(
fields=[
{"type": "paragraph", "name": "reason", "label": "Reason", "default": "n/a"},
{
"type": "select",
"name": "tier",
"label": "Tier",
"options": [{"value": "t1", "label": "Tier 1"}, {"value": "t2", "label": "Tier 2"}],
"default": "t2",
},
{"type": "file", "name": "doc", "label": "Document"},
{"type": "file-list", "name": "evidence", "label": "Evidence", "max_files": 3},
],
)
node_data = ask_human_args_to_node_data(args, node_title="Budget review")
assert [type(i) for i in node_data.inputs] == [
ParagraphInputConfig,
SelectInputConfig,
FileInputConfig,
FileListInputConfig,
]
paragraph, select, _file, file_list = node_data.inputs
assert isinstance(paragraph, ParagraphInputConfig)
assert isinstance(select, SelectInputConfig)
assert isinstance(file_list, FileListInputConfig)
assert paragraph.output_variable_name == "reason"
assert paragraph.default is not None
assert paragraph.default.value == "n/a"
assert select.option_source.value == ["t1", "t2"]
assert file_list.number_limits == 3
assert node_data.timeout == 36
assert node_data.timeout_unit == TimeoutUnit.HOUR
assert node_data.title == "Budget review"
def test_node_data_form_content_embeds_title_question_and_field_markers() -> None:
args = _args(
title="Decision needed",
markdown="Some **context** here.",
fields=[{"type": "paragraph", "name": "reason", "label": "Reason", "required": True}],
)
content = ask_human_args_to_node_data(args, node_title="t").form_content
assert "## Decision needed" in content
assert "Approve the budget?" in content
assert "Some **context** here." in content
# The label carries a required marker and positions the input via $output.
assert "**Reason ***" in content
assert "{{#$output.reason#}}" in content
def test_node_data_maps_action_styles_and_titles() -> None:
args = _args(
actions=[
{"id": "approve", "label": "Approve", "style": "primary"},
{"id": "reject", "label": "Reject", "style": "destructive"},
{"id": "later", "label": "Decide later"},
],
)
actions = ask_human_args_to_node_data(args, node_title="t").user_actions
assert [(a.id, a.title, a.button_style) for a in actions] == [
("approve", "Approve", ButtonStyle.PRIMARY),
("reject", "Reject", ButtonStyle.ACCENT), # destructive -> accent
("later", "Decide later", ButtonStyle.DEFAULT),
]
def test_node_data_synthesizes_submit_action_when_none_given() -> None:
actions = ask_human_args_to_node_data(_args(), node_title="t").user_actions
assert len(actions) == 1
assert actions[0].id == "submit"
assert actions[0].button_style == ButtonStyle.PRIMARY
def test_node_data_clamps_overlong_action_id_deterministically() -> None:
long_id = "approve_the_quarterly_budget_request" # > 20 chars, valid identifier
args = _args(actions=[{"id": long_id, "label": "Approve"}])
first = ask_human_args_to_node_data(args, node_title="t").user_actions[0]
second = ask_human_args_to_node_data(args, node_title="t").user_actions[0]
assert len(first.id) <= 20
assert first.id.isidentifier()
assert first.id == second.id # stable across builds
assert first.title == "Approve" # label preserved verbatim
# ───────────────────────────── build_delivery_methods ──────────────────────
def test_delivery_always_includes_interactive_surface() -> None:
methods = build_delivery_methods([], args=_args())
assert len(methods) == 1
assert isinstance(methods[0], InteractiveSurfaceDeliveryMethod)
def test_delivery_adds_email_for_contacts_and_dedupes() -> None:
contacts = [
AgentHumanContactConfig(email="a@x.com"),
AgentHumanContactConfig(email="a@x.com"), # duplicate
AgentHumanContactConfig(email=None), # no email
AgentHumanContactConfig(email="b@x.com"),
]
methods = build_delivery_methods(contacts, args=_args())
email_methods = [m for m in methods if isinstance(m, EmailDeliveryMethod)]
assert len(email_methods) == 1
recipients = email_methods[0].config.recipients.items
assert [r.email for r in recipients if isinstance(r, ExternalRecipient)] == ["a@x.com", "b@x.com"]
def test_delivery_high_urgency_prefixes_subject() -> None:
methods = build_delivery_methods(
[AgentHumanContactConfig(email="a@x.com")],
args=_args(title="Sign off", urgency="high"),
)
email_method = next(m for m in methods if isinstance(m, EmailDeliveryMethod))
assert email_method.config.subject.startswith("[Action needed] ")
# ─────────────────────────── build_ask_human_pause_reason ───────────────────
def test_pause_reason_none_for_non_ask_human_tool() -> None:
result = build_ask_human_pause_reason(
deferred_tool_call=_deferred_call({"question": "q"}, tool_name="final_output"),
node_id="node-1",
default_node_title="Agent",
workflow_run_id="wf-1",
contacts=[],
repository=_fake_repository(),
)
assert result is None
def test_pause_reason_requires_workflow_run_id() -> None:
with pytest.raises(AskHumanFormBuildError):
build_ask_human_pause_reason(
deferred_tool_call=_deferred_call({"question": "q"}),
node_id="node-1",
default_node_title="Agent",
workflow_run_id="",
contacts=[],
repository=_fake_repository(),
)
def test_pause_reason_builds_form_and_returns_human_input_required() -> None:
repo = _fake_repository(form_id="form-xyz")
contacts = [AgentHumanContactConfig(email="a@x.com")]
result = build_ask_human_pause_reason(
deferred_tool_call=_deferred_call(
{
"title": "Approve?",
"question": "Please approve",
"fields": [{"type": "paragraph", "name": "note", "label": "Note"}],
"actions": [{"id": "ok", "label": "OK"}],
}
),
node_id="node-1",
default_node_title="Agent fallback",
workflow_run_id="wf-1",
contacts=contacts,
repository=repo,
)
assert result is not None
assert result.form_id == "form-xyz"
assert result.node_id == "node-1"
assert result.node_title == "Approve?" # args.title wins over default
assert [i.output_variable_name for i in result.inputs] == ["note"]
assert [a.id for a in result.actions] == ["ok"]
params: FormCreateParams = repo.create_form.call_args.args[0]
assert params.workflow_execution_id == "wf-1"
assert params.node_id == "node-1"
# No conversation_id passed -> pure workflow run owns the form by workflow_run_id only.
assert params.conversation_id is None
assert any(isinstance(m, EmailDeliveryMethod) for m in params.delivery_methods)
def test_pause_reason_forwards_conversation_id_for_chatflow() -> None:
# ENG-635 (review): an agent node running in a chatflow tags its ask_human form
# with the conversation in addition to the workflow run.
repo = _fake_repository(form_id="form-xyz")
build_ask_human_pause_reason(
deferred_tool_call=_deferred_call({"question": "Please approve"}),
node_id="node-1",
default_node_title="Agent",
workflow_run_id="wf-1",
conversation_id="conv-1",
contacts=[],
repository=repo,
)
params: FormCreateParams = repo.create_form.call_args.args[0]
assert params.workflow_execution_id == "wf-1"
assert params.conversation_id == "conv-1"
def test_pause_reason_falls_back_to_default_node_title() -> None:
result = build_ask_human_pause_reason(
deferred_tool_call=_deferred_call({"question": "q with no title"}),
node_id="node-1",
default_node_title="Agent fallback",
workflow_run_id="wf-1",
contacts=[],
repository=_fake_repository(),
)
assert result is not None
assert result.node_title == "Agent fallback"
def test_pause_reason_select_default_flows_into_resolved_defaults() -> None:
repo = _fake_repository()
result = build_ask_human_pause_reason(
deferred_tool_call=_deferred_call(
{
"question": "pick",
"fields": [
{
"type": "select",
"name": "tier",
"label": "Tier",
"options": [{"value": "t1", "label": "Tier 1"}],
"default": "t1",
}
],
}
),
node_id="node-1",
default_node_title="Agent",
workflow_run_id="wf-1",
contacts=[],
repository=repo,
)
assert result is not None
assert result.resolved_default_values == {"tier": "t1"}
def test_pause_reason_wraps_repository_value_error() -> None:
repo = MagicMock(spec=HumanInputFormRepository)
repo.create_form.side_effect = ValueError("db boom")
with pytest.raises(AskHumanFormBuildError):
build_ask_human_pause_reason(
deferred_tool_call=_deferred_call({"question": "q"}),
node_id="node-1",
default_node_title="Agent",
workflow_run_id="wf-1",
contacts=[],
repository=repo,
)

View File

@ -0,0 +1,121 @@
"""Unit tests for mapping a submitted/timed-out HITL form back to ask_human (ENG-638)."""
from __future__ import annotations
from datetime import datetime
from dify_agent.layers.ask_human import AskHumanToolResult
from core.workflow.nodes.agent_v2.ask_human_resume import (
build_deferred_tool_results,
map_form_to_outcome,
)
from graphon.entities.pause_reason import HumanInputRequired
from graphon.nodes.human_input.entities import FormDefinition, ParagraphInputConfig, UserActionConfig
from graphon.nodes.human_input.enums import HumanInputFormStatus
def _form_definition_json() -> str:
return FormDefinition(
form_content="Approve? {{#$output.note#}}",
inputs=[ParagraphInputConfig(output_variable_name="note")],
user_actions=[UserActionConfig(id="approve", title="Approve"), UserActionConfig(id="reject", title="Reject")],
rendered_content="Approve? <input>",
expiration_time=datetime(2026, 1, 1),
default_values={"note": "default"},
node_title="Budget review",
).model_dump_json()
def test_map_submitted_form_to_result() -> None:
outcome = map_form_to_outcome(
status=HumanInputFormStatus.SUBMITTED,
selected_action_id="approve",
submitted_data='{"note": "looks good"}',
rendered_content="Approve? <input>",
form_definition=_form_definition_json(),
form_id="form-1",
node_id="node-1",
)
assert outcome.repause is None
result = outcome.deferred_result
assert result is not None
assert result.status == "submitted"
assert result.action is not None
assert result.action.id == "approve"
assert result.action.label == "Approve" # verbatim label recovered from the form
assert result.values == {"note": "looks good"}
assert result.rendered_content == "Approve? <input>"
def test_map_timeout_form_to_timeout_result() -> None:
outcome = map_form_to_outcome(
status=HumanInputFormStatus.TIMEOUT,
selected_action_id=None,
submitted_data=None,
rendered_content="x",
form_definition=_form_definition_json(),
form_id="form-1",
node_id="node-1",
)
assert outcome.deferred_result is not None
assert outcome.deferred_result.status == "timeout"
assert outcome.deferred_result.action is None
def test_map_expired_form_to_timeout_result() -> None:
outcome = map_form_to_outcome(
status=HumanInputFormStatus.EXPIRED,
selected_action_id=None,
submitted_data=None,
rendered_content="x",
form_definition=_form_definition_json(),
form_id="form-1",
node_id="node-1",
)
assert outcome.deferred_result is not None
assert outcome.deferred_result.status == "timeout"
def test_map_waiting_form_rebuilds_pause() -> None:
outcome = map_form_to_outcome(
status=HumanInputFormStatus.WAITING,
selected_action_id=None,
submitted_data=None,
rendered_content="x",
form_definition=_form_definition_json(),
form_id="form-1",
node_id="node-1",
)
assert outcome.deferred_result is None
pause = outcome.repause
assert isinstance(pause, HumanInputRequired)
assert pause.form_id == "form-1"
assert pause.node_id == "node-1"
assert pause.node_title == "Budget review"
assert [a.id for a in pause.actions] == ["approve", "reject"]
assert [i.output_variable_name for i in pause.inputs] == ["note"]
def test_map_submitted_without_action_id() -> None:
outcome = map_form_to_outcome(
status=HumanInputFormStatus.SUBMITTED,
selected_action_id=None,
submitted_data="{}",
rendered_content="x",
form_definition=_form_definition_json(),
form_id="form-1",
node_id="node-1",
)
assert outcome.deferred_result is not None
assert outcome.deferred_result.action is None
assert outcome.deferred_result.values == {}
def test_build_deferred_tool_results_keys_by_tool_call_id() -> None:
result = AskHumanToolResult(status="timeout")
payload = build_deferred_tool_results(tool_call_id="call-42", result=result)
assert set(payload.calls) == {"call-42"}
assert payload.calls["call-42"] == result.model_dump(mode="json")

View File

@ -796,3 +796,36 @@ def test_build_drive_layer_config_all_refs_dangling_yields_no_config():
config, warnings = build_drive_layer_config(soul, agent_id="agent-1")
assert config is None
assert [w["code"] for w in warnings] == ["skill_ref_dangling"]
# ── ENG-635: ask_human layer gating + feature manifest ───────────────────────
def test_build_ask_human_layer_config_gated_on_human_contacts():
from dify_agent.layers.ask_human import DifyAskHumanLayerConfig
from core.workflow.nodes.agent_v2.runtime_request_builder import build_ask_human_layer_config
# no human involvement configured -> tool stays off
assert build_ask_human_layer_config(AgentSoulConfig()) is None
soul = AgentSoulConfig.model_validate(
{"human": {"contacts": [{"id": "c-1", "name": "David", "email": "d@acme.com", "channel": "email"}]}}
)
config = build_ask_human_layer_config(soul)
assert isinstance(config, DifyAskHumanLayerConfig)
assert config.enabled is True
def test_feature_manifest_marks_human_supported_when_configured():
from core.workflow.nodes.agent_v2.runtime_feature_manifest import build_runtime_feature_manifest
soul = AgentSoulConfig.model_validate(
{"human": {"contacts": [{"id": "c-1", "name": "David", "email": "d@acme.com", "channel": "email"}]}}
)
manifest = build_runtime_feature_manifest(soul)
assert "human" in manifest["supported"]
assert "human" not in manifest["reserved"]
assert manifest["reserved_status"]["human"] == "supported_by_ask_human_hitl"
# configured human no longer produces a "not executed" warning
assert all("human" not in w["section"] for w in manifest["unsupported_runtime_warnings"])

View File

@ -625,12 +625,43 @@ def test_dify_human_input_runtime_create_form_filters_debugger_delivery_methods(
params = repository.create_form.call_args.args[0]
assert params.node_id == "human-input-node"
assert params.workflow_execution_id == "workflow-execution-id"
# No conversation_id_getter wired -> a pure workflow run leaves it None.
assert params.conversation_id is None
assert params.display_in_ui is True
assert len(params.delivery_methods) == 1
assert params.delivery_methods[0].type == DeliveryMethodType.EMAIL
assert params.delivery_methods[0].config.recipients.items[0].reference_id == "user-id"
def test_dify_human_input_runtime_create_form_tags_conversation_id_for_chatflow() -> None:
# ENG-635 (review): a chatflow (advanced-chat) run carries a conversation, so its
# Human Input form is tagged with BOTH its workflow run and its conversation —
# making the form queryable per conversation without changing resume routing.
repository = MagicMock()
repository.create_form.return_value = sentinel.form
node_data = HumanInputNodeData(
title="Human Input",
delivery_methods=[WebAppDeliveryMethod(enabled=True, config=_WebAppDeliveryConfig())],
)
runtime = DifyHumanInputNodeRuntime(
_build_run_context(),
workflow_execution_id_getter=lambda: "workflow-execution-id",
conversation_id_getter=lambda: "conversation-id",
form_repository=repository,
)
runtime.create_form(
node_id="human-input-node",
node_data=node_data,
rendered_content="<p>Rendered</p>",
resolved_default_values={},
)
params = repository.create_form.call_args.args[0]
assert params.workflow_execution_id == "workflow-execution-id"
assert params.conversation_id == "conversation-id"
def test_dify_human_input_runtime_preserves_webapp_delivery_for_web_invocations() -> None:
repository = MagicMock()
repository.create_form.return_value = sentinel.form

View File

@ -281,6 +281,36 @@ def test_submit_form_by_token_calls_repository_and_enqueue(
enqueue_spy.assert_called_once_with(sample_form_record.workflow_run_id)
def test_submit_form_by_token_enqueues_agent_app_resume_for_conversation_form(
sample_form_record, mock_session_factory, mocker: MockerFixture
):
# ENG-635: a conversation-owned (Agent v2 chat) form routes to the chat
# resume, not the workflow resume.
session_factory, _ = mock_session_factory
repo = MagicMock(spec=HumanInputFormSubmissionRepository)
conversation_record = dataclasses.replace(
sample_form_record,
workflow_run_id=None,
conversation_id="conv-1",
)
repo.get_by_token.return_value = conversation_record
repo.mark_submitted.return_value = conversation_record
service = HumanInputService(session_factory, form_repository=repo)
workflow_enqueue_spy = mocker.patch.object(service, "enqueue_resume")
chat_enqueue_spy = mocker.patch.object(service, "enqueue_agent_app_resume")
service.submit_form_by_token(
recipient_type=RecipientType.STANDALONE_WEB_APP,
form_token="token",
selected_action_id="submit",
form_data={"field": "value"},
submission_end_user_id="end-user-id",
)
chat_enqueue_spy.assert_called_once_with(conversation_id="conv-1", form_id=conversation_record.form_id)
workflow_enqueue_spy.assert_not_called()
def test_submit_form_by_token_skips_enqueue_for_delivery_test(
sample_form_record, mock_session_factory, mocker: MockerFixture
):

View File

@ -0,0 +1,138 @@
"""Unit tests for the ``resume_agent_app_execution`` celery task (ENG-635).
Every DB access (``db.session.get``) and the generator are patched at the module
level, so the task's branch logic is exercised without a database or live stack.
"""
from __future__ import annotations
from unittest.mock import MagicMock
from pytest_mock import MockerFixture
from models.account import Account
from models.human_input import HumanInputForm
from models.model import App, Conversation, EndUser
from tasks.app_generate import resume_agent_app_task as mod
MODULE = "tasks.app_generate.resume_agent_app_task"
def _form(conversation_id: str = "conv-1", app_id: str = "app-1") -> MagicMock:
return MagicMock(conversation_id=conversation_id, app_id=app_id)
def _wire_db(
mocker: MockerFixture,
*,
form=None,
app=None,
conversation=None,
account=None,
end_user=None,
) -> MagicMock:
"""Patch the module ``db`` so ``db.session.get(Model, id)`` dispatches by model."""
table = {
HumanInputForm: form,
App: app,
Conversation: conversation,
Account: account,
EndUser: end_user,
}
db = mocker.patch(f"{MODULE}.db")
db.session.get.side_effect = lambda model, _id: table.get(model)
return db
def test_resume_happy_path_account_user_sets_tenant_and_runs(mocker: MockerFixture):
conversation = MagicMock(from_account_id="acct-1", from_end_user_id=None)
account = MagicMock()
app = MagicMock(tenant_id="tenant-1")
_wire_db(mocker, form=_form(), app=app, conversation=conversation, account=account)
gen = mocker.patch(f"{MODULE}.AgentAppGenerator")
mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1")
account.set_tenant_id.assert_called_once_with("tenant-1")
gen.return_value.resume_after_form_submission.assert_called_once()
kwargs = gen.return_value.resume_after_form_submission.call_args.kwargs
assert kwargs["conversation_id"] == "conv-1"
assert kwargs["user"] is account
assert kwargs["app_model"] is app
def test_resume_end_user_path(mocker: MockerFixture):
conversation = MagicMock(from_account_id=None, from_end_user_id="eu-1")
end_user = MagicMock()
_wire_db(mocker, form=_form(), app=MagicMock(tenant_id="t"), conversation=conversation, end_user=end_user)
gen = mocker.patch(f"{MODULE}.AgentAppGenerator")
mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1")
assert gen.return_value.resume_after_form_submission.call_args.kwargs["user"] is end_user
def test_resume_returns_when_form_missing(mocker: MockerFixture):
_wire_db(mocker, form=None)
gen = mocker.patch(f"{MODULE}.AgentAppGenerator")
mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1")
gen.assert_not_called()
def test_resume_returns_on_conversation_mismatch(mocker: MockerFixture):
_wire_db(mocker, form=_form(conversation_id="other-conv"))
gen = mocker.patch(f"{MODULE}.AgentAppGenerator")
mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1")
gen.assert_not_called()
def test_resume_returns_when_app_missing(mocker: MockerFixture):
_wire_db(mocker, form=_form(), app=None)
gen = mocker.patch(f"{MODULE}.AgentAppGenerator")
mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1")
gen.assert_not_called()
def test_resume_returns_when_conversation_missing(mocker: MockerFixture):
_wire_db(mocker, form=_form(), app=MagicMock(), conversation=None)
gen = mocker.patch(f"{MODULE}.AgentAppGenerator")
mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1")
gen.assert_not_called()
def test_resume_returns_when_no_user_resolvable(mocker: MockerFixture):
conversation = MagicMock(from_account_id=None, from_end_user_id=None)
_wire_db(mocker, form=_form(), app=MagicMock(), conversation=conversation)
gen = mocker.patch(f"{MODULE}.AgentAppGenerator")
mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1")
gen.assert_not_called()
def test_resume_returns_when_account_id_set_but_account_gone(mocker: MockerFixture):
conversation = MagicMock(from_account_id="acct-x", from_end_user_id=None)
_wire_db(mocker, form=_form(), app=MagicMock(), conversation=conversation, account=None)
gen = mocker.patch(f"{MODULE}.AgentAppGenerator")
mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1")
gen.assert_not_called()
def test_resume_swallows_generator_exception(mocker: MockerFixture):
conversation = MagicMock(from_account_id="acct-1", from_end_user_id=None)
_wire_db(mocker, form=_form(), app=MagicMock(tenant_id="t"), conversation=conversation, account=MagicMock())
gen = mocker.patch(f"{MODULE}.AgentAppGenerator")
gen.return_value.resume_after_form_submission.side_effect = RuntimeError("boom")
# The task must not propagate the failure (it is logged and the session closed).
mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1")