From 8d05185e3904ec9a60e7de8cda0a561cc5d6113f Mon Sep 17 00:00:00 2001 From: zyssyz123 <916125788@qq.com> Date: Tue, 16 Jun 2026 11:43:40 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20Agent=20ask=5Fhuman=20HITL=20(phas?= =?UTF-8?q?e-1)=20=E2=80=94=20workflow=20node=20+=20Agent=20v2=20chat=20?= =?UTF-8?q?=E2=80=94=20ENG-635=20(#37437)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Claude Fable 5 Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/clients/agent_backend/request_builder.py | 44 +++ api/core/app/apps/agent_app/app_generator.py | 102 +++++ api/core/app/apps/agent_app/app_runner.py | 129 ++++++- .../apps/agent_app/runtime_request_builder.py | 7 +- api/core/app/apps/agent_app/session_store.py | 13 + .../repositories/human_input_repository.py | 20 +- api/core/workflow/node_factory.py | 1 + api/core/workflow/node_runtime.py | 6 + .../workflow/nodes/agent_v2/agent_node.py | 115 +++++- .../workflow/nodes/agent_v2/ask_human_hitl.py | 354 ++++++++++++++++++ .../nodes/agent_v2/ask_human_resume.py | 159 ++++++++ .../agent_v2/runtime_feature_manifest.py | 5 +- .../nodes/agent_v2/runtime_request_builder.py | 22 +- .../workflow/nodes/agent_v2/session_store.py | 24 +- api/extensions/ext_celery.py | 1 + ...0eb_add_ask_human_pause_correlation_to_.py | 32 ++ ...dd_conversation_id_to_human_input_forms.py | 29 ++ api/models/agent.py | 6 + api/models/human_input.py | 7 + api/services/human_input_service.py | 25 +- .../app_generate/resume_agent_app_task.py | 70 ++++ .../agent_backend/test_request_builder.py | 46 +++ .../app/apps/agent_app/test_app_generator.py | 53 +++ .../app/apps/agent_app/test_app_runner.py | 113 +++++- .../test_human_input_form_repository_impl.py | 1 + .../test_human_input_repository.py | 1 + .../nodes/agent_v2/test_agent_node.py | 117 +++++- .../nodes/agent_v2/test_ask_human_hitl.py | 338 +++++++++++++++++ .../nodes/agent_v2/test_ask_human_resume.py | 121 ++++++ .../agent_v2/test_runtime_request_builder.py | 33 ++ .../core/workflow/test_node_runtime.py | 31 ++ .../services/test_human_input_service.py | 30 ++ .../tasks/test_resume_agent_app_task.py | 138 +++++++ 33 files changed, 2164 insertions(+), 29 deletions(-) create mode 100644 api/core/workflow/nodes/agent_v2/ask_human_hitl.py create mode 100644 api/core/workflow/nodes/agent_v2/ask_human_resume.py create mode 100644 api/migrations/versions/2026_06_15_1052-c167a72a00eb_add_ask_human_pause_correlation_to_.py create mode 100644 api/migrations/versions/2026_06_15_1110-d2f1a4b8c3e0_add_conversation_id_to_human_input_forms.py create mode 100644 api/tasks/app_generate/resume_agent_app_task.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_ask_human_hitl.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_ask_human_resume.py create mode 100644 api/tests/unit_tests/tasks/test_resume_agent_app_task.py diff --git a/api/clients/agent_backend/request_builder.py b/api/clients/agent_backend/request_builder.py index 0915c127eb..55944929dd 100644 --- a/api/clients/agent_backend/request_builder.py +++ b/api/clients/agent_backend/request_builder.py @@ -19,6 +19,7 @@ from agenton.compositor.schemas import LayerSessionSnapshot from agenton.layers import ExitIntent from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID +from dify_agent.layers.ask_human import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig from dify_agent.layers.dify_plugin import ( DIFY_PLUGIN_LLM_LAYER_TYPE_ID, DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID, @@ -38,6 +39,7 @@ from dify_agent.protocol import ( DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID, CreateRunRequest, + DeferredToolResultsPayload, LayerExitSignals, RunComposition, RunLayerSpec, @@ -53,6 +55,7 @@ AGENT_APP_USER_PROMPT_LAYER_ID = "agent_app_user_prompt" DIFY_EXECUTION_CONTEXT_LAYER_ID = "execution_context" DIFY_DRIVE_LAYER_ID = "drive" DIFY_PLUGIN_TOOLS_LAYER_ID = "tools" +DIFY_ASK_HUMAN_LAYER_ID = "ask_human" DIFY_SHELL_LAYER_ID = "shell" @@ -139,11 +142,18 @@ class AgentBackendWorkflowNodeRunInput(BaseModel): # Drive Skills & Files declaration (dify.drive) — an index the agent pulls # through the back proxy, never inline content; see AGENT_DRIVE_MANIFEST_ENABLED. drive_config: DifyDriveLayerConfig | None = None + # Human-in-the-loop ask_human deferred tool (dify.ask_human). Present only when + # the Agent Soul configures human involvement; a deferred call ends the run and + # the workflow pauses via the existing HITL form mechanism (ENG-635). + ask_human_config: DifyAskHumanLayerConfig | None = None # Inject the sandboxed shell layer (dify.shell). Requires the agent backend # to be wired with a shellctl entrypoint; see configs AGENT_SHELL_ENABLED. include_shell: bool = False shell_config: DifyShellLayerConfig | None = None session_snapshot: CompositorSessionSnapshot | None = None + # Human tool results fed back into a continuation run after a HITL submission + # (ENG-638). Keyed by the original deferred tool_call_id. + deferred_tool_results: DeferredToolResultsPayload | None = None include_history: bool = True suspend_on_exit: bool = True metadata: dict[str, JsonValue] = Field(default_factory=dict) @@ -178,11 +188,17 @@ class AgentBackendAgentAppRunInput(BaseModel): # Drive Skills & Files declaration (dify.drive) — an index the agent pulls # through the back proxy, never inline content; see AGENT_DRIVE_MANIFEST_ENABLED. drive_config: DifyDriveLayerConfig | None = None + # Human-in-the-loop ask_human deferred tool (dify.ask_human). Present only when + # the Agent Soul configures human involvement (ENG-635). + ask_human_config: DifyAskHumanLayerConfig | None = None # Inject the sandboxed shell layer (dify.shell). Requires the agent backend # to be wired with a shellctl entrypoint; see configs AGENT_SHELL_ENABLED. include_shell: bool = False shell_config: DifyShellLayerConfig | None = None session_snapshot: CompositorSessionSnapshot | None = None + # Human tool results fed back into a continuation run after a HITL submission + # (ENG-638). Keyed by the original deferred tool_call_id. + deferred_tool_results: DeferredToolResultsPayload | None = None include_history: bool = True suspend_on_exit: bool = True metadata: dict[str, JsonValue] = Field(default_factory=dict) @@ -284,6 +300,19 @@ class AgentBackendRunRequestBuilder: ) ) + if run_input.ask_human_config is not None: + # Human-in-the-loop ask_human deferred tool (dify.ask_human). A call ends + # the run with a deferred_tool_call; the caller pauses (workflow HITL) and + # later resumes with deferred_tool_results. Needs the history layer above. + layers.append( + RunLayerSpec( + name=DIFY_ASK_HUMAN_LAYER_ID, + type=DIFY_ASK_HUMAN_LAYER_TYPE_ID, + metadata=run_input.metadata, + config=run_input.ask_human_config, + ) + ) + if run_input.include_shell: # Sandboxed bash workspace (dify.shell). Depends on execution_context so # the agent server can mint per-command Agent Stub env (back proxy); @@ -318,6 +347,7 @@ class AgentBackendRunRequestBuilder: idempotency_key=run_input.idempotency_key, metadata=run_input.metadata, session_snapshot=run_input.session_snapshot, + deferred_tool_results=run_input.deferred_tool_results, on_exit=LayerExitSignals( default=ExitIntent.SUSPEND if run_input.suspend_on_exit else ExitIntent.DELETE, ), @@ -453,6 +483,19 @@ class AgentBackendRunRequestBuilder: ) ) + if run_input.ask_human_config is not None: + # Human-in-the-loop ask_human deferred tool (dify.ask_human). A call ends + # the run with a deferred_tool_call; the caller pauses (workflow HITL) and + # later resumes with deferred_tool_results. Needs the history layer above. + layers.append( + RunLayerSpec( + name=DIFY_ASK_HUMAN_LAYER_ID, + type=DIFY_ASK_HUMAN_LAYER_TYPE_ID, + metadata=run_input.metadata, + config=run_input.ask_human_config, + ) + ) + if run_input.include_shell: # Sandboxed bash workspace (dify.shell). Depends on execution_context so # the agent server can mint per-command Agent Stub env (back proxy); @@ -487,6 +530,7 @@ class AgentBackendRunRequestBuilder: idempotency_key=run_input.idempotency_key, metadata=run_input.metadata, session_snapshot=run_input.session_snapshot, + deferred_tool_results=run_input.deferred_tool_results, on_exit=LayerExitSignals( default=ExitIntent.SUSPEND if run_input.suspend_on_exit else ExitIntent.DELETE, ), diff --git a/api/core/app/apps/agent_app/app_generator.py b/api/core/app/apps/agent_app/app_generator.py index 467afb891d..13eaae47eb 100644 --- a/api/core/app/apps/agent_app/app_generator.py +++ b/api/core/app/apps/agent_app/app_generator.py @@ -158,6 +158,108 @@ class AgentAppGenerator(MessageBasedAppGenerator): ) return AgentAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from) + def resume_after_form_submission( + self, + *, + app_model: App, + user: Account | EndUser, + conversation_id: str, + invoke_from: InvokeFrom, + ) -> None: + """Resume an Agent App conversation after a submitted ask_human HITL form. + + ENG-635: triggered by a background task (not an HTTP request). Runs one + blocking turn with no user query; the runner threads the human's reply + into the agent run as deferred_tool_results and the assistant answer is + persisted to the conversation. Live streaming to a reconnected client is + out of scope here — the message is persisted and can be re-fetched. + """ + agent, snapshot, agent_soul = self._resolve_agent(app_model) + conversation = ConversationService.get_conversation( + app_model=app_model, conversation_id=conversation_id, user=user + ) + + app_config = AgentAppConfigManager.get_app_config( + app_model=app_model, + agent_soul=agent_soul, + app_model_config=app_model.app_model_config, + conversation=conversation, + ) + model_conf = ModelConfigConverter.convert(app_config) + trace_manager = TraceQueueManager(app_model.id, user.id if isinstance(user, Account) else user.session_id) + + # ENG-638: the agent backend requires the resume composition's layer + # names to match the suspended snapshot, which includes the per-turn + # user-prompt layer. So re-send the original user message (the paused + # turn's query); the continuation is driven by deferred_tool_results and + # the restored snapshot, not by re-processing this prompt. A blank prompt + # would drop the user-prompt layer and fail the snapshot match. + paused_message = db.session.scalar( + select(Message) + .where(Message.conversation_id == conversation.id, Message.query != "") + .order_by(Message.created_at.desc()) + .limit(1) + ) + resume_query = paused_message.query if paused_message and paused_message.query else "(resumed)" + + application_generate_entity = AgentAppGenerateEntity( + task_id=str(uuid.uuid4()), + app_config=app_config, + model_conf=model_conf, + conversation_id=conversation.id, + # A resume carries no new user inputs; the human's answer is the + # submitted form, threaded in by the runner as deferred_tool_results. + # The query re-sends the paused turn's message (see above). + inputs={}, + query=resume_query, + files=[], + parent_message_id=UUID_NIL, + user_id=user.id, + stream=False, + invoke_from=invoke_from, + extras={"auto_generate_conversation_name": False}, + call_depth=0, + trace_manager=trace_manager, + agent_id=agent.id, + agent_config_snapshot_id=snapshot.id, + ) + + conversation, message = self._init_generate_records(application_generate_entity, conversation) + + queue_manager = MessageBasedAppQueueManager( + task_id=application_generate_entity.task_id, + user_id=application_generate_entity.user_id, + invoke_from=application_generate_entity.invoke_from, + conversation_id=conversation.id, + app_mode=conversation.mode, + message_id=message.id, + ) + + context = contextvars.copy_context() + worker_thread = threading.Thread( + target=self._generate_worker, + kwargs={ + "flask_app": current_app._get_current_object(), # type: ignore + "context": context, + "application_generate_entity": application_generate_entity, + "queue_manager": queue_manager, + "conversation_id": conversation.id, + "message_id": message.id, + "user_from": UserFrom.ACCOUNT if isinstance(user, Account) else UserFrom.END_USER, + }, + ) + worker_thread.start() + + # Blocking: drive the chat task pipeline to persist the assistant answer. + self._handle_response( + application_generate_entity=application_generate_entity, + queue_manager=queue_manager, + conversation=conversation, + message=message, + user=user, + stream=False, + ) + def _generate_worker( self, *, diff --git a/api/core/app/apps/agent_app/app_runner.py b/api/core/app/apps/agent_app/app_runner.py index 7b767e5088..d4d2a754bf 100644 --- a/api/core/app/apps/agent_app/app_runner.py +++ b/api/core/app/apps/agent_app/app_runner.py @@ -14,9 +14,12 @@ import json import logging from typing import Any +from dify_agent.layers.ask_human import AskHumanToolArgs +from dify_agent.protocol import DeferredToolResultsPayload from pydantic import JsonValue from clients.agent_backend import ( + AgentBackendDeferredToolCallInternalEvent, AgentBackendError, AgentBackendInternalEventType, AgentBackendRunClient, @@ -27,13 +30,21 @@ from clients.agent_backend import ( ) from core.app.apps.agent_app.runtime_request_builder import ( AgentAppRuntimeBuildContext, + AgentAppRuntimeRequest, AgentAppRuntimeRequestBuilder, ) -from core.app.apps.agent_app.session_store import AgentAppRuntimeSessionStore, AgentAppSessionScope +from core.app.apps.agent_app.session_store import ( + AgentAppRuntimeSessionStore, + AgentAppSessionScope, + StoredAgentAppSession, +) from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.exc import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import DifyRunContext from core.app.entities.queue_entities import QueueLLMChunkEvent, QueueMessageEndEvent +from core.repositories.human_input_repository import HumanInputFormRepository, HumanInputFormRepositoryImpl +from core.workflow.nodes.agent_v2.ask_human_hitl import AskHumanFormBuildError, create_ask_human_form +from core.workflow.nodes.agent_v2.ask_human_resume import build_deferred_tool_results, resolve_ask_human_form from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage from graphon.model_runtime.entities.message_entities import AssistantPromptMessage from models.agent_config_entities import AgentSoulConfig @@ -104,7 +115,13 @@ class AgentAppRunner: agent_id=agent_id, agent_config_snapshot_id=agent_config_snapshot_id, ) - session_snapshot = self._session_store.load_active_snapshot(scope) + # ENG-638: if a prior turn paused on ask_human and the form is now answered, + # resume by threading the human's reply into this run as deferred_tool_results. + stored = self._session_store.load_active_session(scope) + session_snapshot = stored.session_snapshot if stored is not None else None + deferred_tool_results = self._resolve_pending_ask_human( + stored=stored, dify_context=dify_context, message_id=message_id + ) runtime = self._request_builder.build( AgentAppRuntimeBuildContext( @@ -116,12 +133,29 @@ class AgentAppRunner: user_query=query, idempotency_key=message_id, session_snapshot=session_snapshot, + deferred_tool_results=deferred_tool_results, ) ) create_response = self._agent_backend_client.create_run(runtime.request) terminal = self._consume_stream(create_response.run_id, queue_manager=queue_manager) + if isinstance(terminal, AgentBackendDeferredToolCallInternalEvent): + # ENG-635: the agent asked a human. End this turn with the question and + # a conversation-owned HITL form; a form submission resumes the run. + self._pause_for_ask_human( + terminal=terminal, + scope=scope, + dify_context=dify_context, + agent_soul=agent_soul, + conversation_id=conversation_id, + message_id=message_id, + model_name=model_name, + runtime=runtime, + queue_manager=queue_manager, + ) + return + if not isinstance(terminal, AgentBackendRunSucceededInternalEvent): error = getattr(terminal, "error", None) or "Agent backend run did not complete successfully." raise AgentBackendError(str(error)) @@ -135,6 +169,93 @@ class AgentAppRunner: runtime_layer_specs=extract_runtime_layer_specs(runtime.request.composition), ) + def _pause_for_ask_human( + self, + *, + terminal: AgentBackendDeferredToolCallInternalEvent, + scope: AgentAppSessionScope, + dify_context: DifyRunContext, + agent_soul: AgentSoulConfig, + conversation_id: str, + message_id: str, + model_name: str, + runtime: AgentAppRuntimeRequest, + queue_manager: AppQueueManager, + ) -> None: + """End the chat turn on a dify.ask_human call: create a conversation-owned + HITL form, persist the pause correlation, and surface the question.""" + try: + created = create_ask_human_form( + deferred_tool_call=terminal.deferred_tool_call, + # Chat forms have no workflow node; key by the turn's message id. + node_id=message_id, + default_node_title="Agent", + contacts=agent_soul.human.contacts, + repository=self._build_form_repository(dify_context), + conversation_id=conversation_id, + ) + except AskHumanFormBuildError as error: + raise AgentBackendError(f"Failed to build ask_human form for Agent App chat: {error}") from error + + # Persist the snapshot + correlation so a form submission can start the + # second run with the human's answer (ENG-637/638 columns, conversation owner). + self._save_session( + scope=scope, + backend_run_id=terminal.run_id, + snapshot=terminal.session_snapshot, + runtime_layer_specs=extract_runtime_layer_specs(runtime.request.composition), + pending_form_id=created.form_id, + pending_tool_call_id=terminal.deferred_tool_call.tool_call_id, + ) + + # The structured form is delivered via the HITL surface(s); the chat turn + # ends by echoing the agent's question so the conversation reflects the ask. + self._publish_answer( + queue_manager=queue_manager, + model_name=model_name, + answer=self._ask_human_message(created.args), + ) + + def _resolve_pending_ask_human( + self, + *, + stored: StoredAgentAppSession | None, + dify_context: DifyRunContext, + message_id: str, + ) -> DeferredToolResultsPayload | None: + """Build deferred_tool_results when a pending ask_human form is answered.""" + if stored is None or stored.pending_form_id is None or stored.pending_tool_call_id is None: + return None + outcome = resolve_ask_human_form( + form_id=stored.pending_form_id, + tenant_id=dify_context.tenant_id, + node_id=message_id, + ) + if outcome is None or outcome.deferred_result is None: + # Form missing or still waiting — run a normal turn, no resume. + return None + return build_deferred_tool_results( + tool_call_id=stored.pending_tool_call_id, + result=outcome.deferred_result, + ) + + def _build_form_repository(self, dify_context: DifyRunContext) -> HumanInputFormRepository: + invoke_source = dify_context.invoke_from.value + return HumanInputFormRepositoryImpl( + tenant_id=dify_context.tenant_id, + app_id=dify_context.app_id, + workflow_execution_id=None, + invoke_source=invoke_source, + submission_actor_id=dify_context.user_id if invoke_source in {"debugger", "explore"} else None, + ) + + @staticmethod + def _ask_human_message(args: AskHumanToolArgs) -> str: + parts = [args.question] + if args.markdown: + parts.append(args.markdown) + return "\n\n".join(parts) + def _consume_stream(self, run_id: str, *, queue_manager: AppQueueManager): terminal = None for public_event in self._agent_backend_client.stream_events(run_id): @@ -178,6 +299,8 @@ class AgentAppRunner: backend_run_id: str, snapshot: Any, runtime_layer_specs: Any, + pending_form_id: str | None = None, + pending_tool_call_id: str | None = None, ) -> None: try: self._session_store.save_active_snapshot( @@ -185,6 +308,8 @@ class AgentAppRunner: backend_run_id=backend_run_id, snapshot=snapshot, runtime_layer_specs=runtime_layer_specs, + pending_form_id=pending_form_id, + pending_tool_call_id=pending_tool_call_id, ) except Exception: logger.warning( diff --git a/api/core/app/apps/agent_app/runtime_request_builder.py b/api/core/app/apps/agent_app/runtime_request_builder.py index 73d7fdedb8..71cc0385f9 100644 --- a/api/core/app/apps/agent_app/runtime_request_builder.py +++ b/api/core/app/apps/agent_app/runtime_request_builder.py @@ -18,7 +18,7 @@ from dify_agent.layers.execution_context import ( DifyExecutionContextLayerConfig, DifyExecutionContextUserFrom, ) -from dify_agent.protocol import CreateRunRequest +from dify_agent.protocol import CreateRunRequest, DeferredToolResultsPayload from clients.agent_backend import ( AgentBackendAgentAppRunInput, @@ -34,6 +34,7 @@ from core.workflow.nodes.agent_v2.plugin_tools_builder import ( ) from core.workflow.nodes.agent_v2.runtime_request_builder import ( append_runtime_warnings, + build_ask_human_layer_config, build_drive_layer_config, build_shell_layer_config, ) @@ -64,6 +65,8 @@ class AgentAppRuntimeBuildContext: user_query: str idempotency_key: str session_snapshot: CompositorSessionSnapshot | None = None + # ENG-638: set when resuming a chat turn after a submitted ask_human form. + deferred_tool_results: DeferredToolResultsPayload | None = None @dataclass(frozen=True, slots=True) @@ -154,9 +157,11 @@ class AgentAppRuntimeRequestBuilder: user_prompt=context.user_query, tools=tools_layer, drive_config=drive_config, + ask_human_config=build_ask_human_layer_config(agent_soul), include_shell=dify_config.AGENT_SHELL_ENABLED, shell_config=build_shell_layer_config(agent_soul), session_snapshot=context.session_snapshot, + deferred_tool_results=context.deferred_tool_results, idempotency_key=context.idempotency_key, metadata=metadata, ) diff --git a/api/core/app/apps/agent_app/session_store.py b/api/core/app/apps/agent_app/session_store.py index 22af2f2068..8c68e218d1 100644 --- a/api/core/app/apps/agent_app/session_store.py +++ b/api/core/app/apps/agent_app/session_store.py @@ -56,6 +56,10 @@ class StoredAgentAppSession: session_snapshot: CompositorSessionSnapshot backend_run_id: str | None runtime_layer_specs: list[RuntimeLayerSpec] = field(default_factory=list) + # ENG-635: set while the conversation turn is paused on a dify.ask_human + # deferred call, awaiting a HITL form submission. + pending_form_id: str | None = None + pending_tool_call_id: str | None = None class AgentAppRuntimeSessionStore: @@ -75,6 +79,8 @@ class AgentAppRuntimeSessionStore: session_snapshot=CompositorSessionSnapshot.model_validate_json(row.session_snapshot), backend_run_id=row.backend_run_id, runtime_layer_specs=_deserialize_runtime_layer_specs(row.composition_layer_specs), + pending_form_id=row.pending_form_id, + pending_tool_call_id=row.pending_tool_call_id, ) def load_active_session_for_conversation( @@ -125,6 +131,8 @@ class AgentAppRuntimeSessionStore: backend_run_id: str, snapshot: CompositorSessionSnapshot | None, runtime_layer_specs: list[RuntimeLayerSpec], + pending_form_id: str | None = None, + pending_tool_call_id: str | None = None, ) -> None: if snapshot is None: return @@ -144,6 +152,8 @@ class AgentAppRuntimeSessionStore: session_snapshot=snapshot_json, composition_layer_specs=runtime_layer_specs_json, status=AgentRuntimeSessionStatus.ACTIVE, + pending_form_id=pending_form_id, + pending_tool_call_id=pending_tool_call_id, ) session.add(row) else: @@ -152,6 +162,9 @@ class AgentAppRuntimeSessionStore: row.composition_layer_specs = runtime_layer_specs_json row.status = AgentRuntimeSessionStatus.ACTIVE row.cleaned_at = None + # Set (or clear, when omitted) the ask_human pause correlation. + row.pending_form_id = pending_form_id + row.pending_tool_call_id = pending_tool_call_id session.flush() other_rows = session.scalars( select(AgentRuntimeSession).where( diff --git a/api/core/repositories/human_input_repository.py b/api/core/repositories/human_input_repository.py index 4d1a3ef006..599cc643ea 100644 --- a/api/core/repositories/human_input_repository.py +++ b/api/core/repositories/human_input_repository.py @@ -63,6 +63,10 @@ class FormCreateParams: display_in_ui: bool resolved_default_values: Mapping[str, Any] form_kind: HumanInputFormKind = HumanInputFormKind.RUNTIME + # ENG-635: the conversation this form belongs to. Set together with + # workflow_execution_id for chatflow runs; set alone (workflow_execution_id None) + # for Agent v2 chat ask_human forms, which have no workflow run. + conversation_id: str | None = None class HumanInputFormRecipientEntity(Protocol): @@ -217,6 +221,9 @@ class HumanInputFormRecord: recipient_id: str | None recipient_type: RecipientType | None access_token: str | None + # ENG-635: Agent v2 chat owner (NULL for workflow-owned forms). Trailing + + # defaulted so existing record constructions stay source-compatible. + conversation_id: str | None = None @property def submitted(self) -> bool: @@ -232,6 +239,7 @@ class HumanInputFormRecord: return cls( form_id=form_model.id, workflow_run_id=form_model.workflow_run_id, + conversation_id=form_model.conversation_id, node_id=form_model.node_id, tenant_id=form_model.tenant_id, app_id=form_model.app_id, @@ -433,8 +441,15 @@ class HumanInputFormRepositoryImpl: if not app_id: raise ValueError("app_id is required to create a human input form") workflow_execution_id = params.workflow_execution_id or self._workflow_execution_id - if params.form_kind == HumanInputFormKind.RUNTIME and workflow_execution_id is None: - raise ValueError("workflow_execution_id is required for runtime human input forms") + # A RUNTIME form must be owned by at least one of: a workflow run (workflow / + # Human-Input / agent node) or a conversation turn (ENG-635: Agent v2 chat + # ask_human; chatflow runs set both — workflow_run_id and conversation_id). + if ( + params.form_kind == HumanInputFormKind.RUNTIME + and workflow_execution_id is None + and params.conversation_id is None + ): + raise ValueError("a runtime human input form requires a workflow_execution_id or conversation_id") with session_factory.create_session() as session, session.begin(): # Generate unique form ID @@ -456,6 +471,7 @@ class HumanInputFormRepositoryImpl: tenant_id=self._tenant_id, app_id=app_id, workflow_run_id=workflow_execution_id, + conversation_id=params.conversation_id, form_kind=params.form_kind, node_id=params.node_id, form_definition=form_definition.model_dump_json(), diff --git a/api/core/workflow/node_factory.py b/api/core/workflow/node_factory.py index bf85fc1918..54c6c55949 100644 --- a/api/core/workflow/node_factory.py +++ b/api/core/workflow/node_factory.py @@ -334,6 +334,7 @@ class DifyNodeFactory(NodeFactory): self.graph_runtime_state.variable_pool, SystemVariableKey.WORKFLOW_EXECUTION_ID, ), + conversation_id_getter=self._conversation_id, ) self._tool_runtime = DifyToolNodeRuntime(self._dify_context) self._http_request_file_manager = file_manager diff --git a/api/core/workflow/node_runtime.py b/api/core/workflow/node_runtime.py index 8c7d5c157c..4eced02cd1 100644 --- a/api/core/workflow/node_runtime.py +++ b/api/core/workflow/node_runtime.py @@ -710,10 +710,12 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol): run_context: Mapping[str, Any] | DifyRunContext, *, workflow_execution_id_getter: Callable[[], str | None] | None = None, + conversation_id_getter: Callable[[], str | None] | None = None, form_repository: HumanInputFormRepository | None = None, ) -> None: self._run_context = resolve_dify_run_context(run_context) self._workflow_execution_id_getter = workflow_execution_id_getter + self._conversation_id_getter = conversation_id_getter self._form_repository = form_repository self._file_reference_factory = DifyFileReferenceFactory(self._run_context) @@ -762,6 +764,7 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol): return DifyHumanInputNodeRuntime( self._run_context, workflow_execution_id_getter=self._workflow_execution_id_getter, + conversation_id_getter=self._conversation_id_getter, form_repository=form_repository, ) @@ -799,6 +802,9 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol): repo = self.build_form_repository() params = FormCreateParams( workflow_execution_id=self._workflow_execution_id_getter() if self._workflow_execution_id_getter else None, + # A chatflow (advanced-chat) run carries a conversation; tag the form with + # it too so it is queryable per conversation. None for a pure workflow run. + conversation_id=self._conversation_id_getter() if self._conversation_id_getter else None, node_id=node_id, form_config=node_data, rendered_content=rendered_content, diff --git a/api/core/workflow/nodes/agent_v2/agent_node.py b/api/core/workflow/nodes/agent_v2/agent_node.py index 1e22a35fc9..8adb27240c 100644 --- a/api/core/workflow/nodes/agent_v2/agent_node.py +++ b/api/core/workflow/nodes/agent_v2/agent_node.py @@ -24,13 +24,16 @@ from clients.agent_backend import ( extract_runtime_layer_specs, ) from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext +from core.repositories.human_input_repository import HumanInputFormRepository, HumanInputFormRepositoryImpl from core.workflow.system_variables import SystemVariableKey, get_system_text -from graphon.entities.pause_reason import SchedulingPause +from graphon.entities.pause_reason import HumanInputRequired, SchedulingPause from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from graphon.node_events import NodeEventBase, NodeRunResult, PauseRequestedEvent, StreamCompletedEvent from graphon.nodes.base.node import Node -from models.agent_config_entities import WorkflowNodeJobConfig +from models.agent_config_entities import AgentSoulConfig, WorkflowNodeJobConfig +from .ask_human_hitl import AskHumanFormBuildError, build_ask_human_pause_reason +from .ask_human_resume import build_deferred_tool_results, resolve_ask_human_form from .binding_resolver import WorkflowAgentBindingError, WorkflowAgentBindingResolver from .entities import DifyAgentNodeData from .output_adapter import WorkflowAgentOutputAdapter @@ -117,6 +120,12 @@ class DifyAgentNode(Node[DifyAgentNodeData]): self.graph_runtime_state.variable_pool, SystemVariableKey.WORKFLOW_EXECUTION_ID, ) + # Set on chatflow (advanced-chat) runs; None for a pure workflow run. Lets an + # ask_human form be tagged with its conversation in addition to workflow_run_id. + conversation_id = get_system_text( + self.graph_runtime_state.variable_pool, + SystemVariableKey.CONVERSATION_ID, + ) inputs: dict[str, Any] = {} process_data: dict[str, Any] = {} metadata: dict[str, Any] = { @@ -168,6 +177,33 @@ class DifyAgentNode(Node[DifyAgentNodeData]): ) outputs_by_name = {o.name: o for o in effective_outputs} + # ──── ENG-638: resume after a submitted/timed-out ask_human form ──── + # graphon re-executes this _run when the outer workflow resumes. If a + # pending ask_human form is now terminal, thread the human's answer into + # the second Agent run as deferred_tool_results; if it is somehow still + # waiting, re-emit the same pause defensively. + deferred_tool_results = None + if self._session_store is not None: + stored_session = self._session_store.load_active_session(session_scope) + if stored_session is not None and stored_session.pending_form_id is not None: + resume_outcome = resolve_ask_human_form( + form_id=stored_session.pending_form_id, + tenant_id=dify_ctx.tenant_id, + node_id=self._node_id, + ) + if resume_outcome is not None and resume_outcome.repause is not None: + yield PauseRequestedEvent(reason=resume_outcome.repause) + return + if ( + resume_outcome is not None + and resume_outcome.deferred_result is not None + and stored_session.pending_tool_call_id is not None + ): + deferred_tool_results = build_deferred_tool_results( + tool_call_id=stored_session.pending_tool_call_id, + result=resume_outcome.deferred_result, + ) + # ──── Retry loop (Stage 4 §7) ──── attempt = 0 while True: @@ -188,6 +224,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]): snapshot=bundle.snapshot, attempt=attempt, session_snapshot=session_snapshot, + deferred_tool_results=deferred_tool_results, ) ) except WorkflowAgentRuntimeRequestBuildError as error: @@ -251,19 +288,56 @@ class DifyAgentNode(Node[DifyAgentNodeData]): return if isinstance(terminal_event, AgentBackendDeferredToolCallInternalEvent): + # ENG-636: a dify.ask_human deferred call pauses the *outer* + # workflow through the existing HITL form path. Any other deferred + # tool (none today) falls back to a generic scheduling pause. The + # form is built *before* the snapshot is saved so its id can be + # persisted as the pause correlation (ENG-637). + try: + pause_reason: HumanInputRequired | SchedulingPause | None = build_ask_human_pause_reason( + deferred_tool_call=terminal_event.deferred_tool_call, + node_id=self._node_id, + default_node_title=bundle.agent.name or self._node_id, + workflow_run_id=workflow_run_id, + conversation_id=conversation_id, + contacts=AgentSoulConfig.model_validate(bundle.snapshot.config_snapshot_dict).human.contacts, + repository=self._build_human_input_form_repository( + dify_ctx=dify_ctx, workflow_run_id=workflow_run_id + ), + ) + except AskHumanFormBuildError as error: + yield self._failure_event( + inputs=inputs, + process_data=process_data, + metadata=metadata, + error=str(error), + error_type="agent_ask_human_form_build_error", + ) + return + + # ENG-637: persist the awaiting-form id + deferred tool_call id + # next to the snapshot so the resumed node can rebuild + # deferred_tool_results from the submitted form. + pending_form_id: str | None = None + pending_tool_call_id: str | None = None + if isinstance(pause_reason, HumanInputRequired): + pending_form_id = pause_reason.form_id + pending_tool_call_id = terminal_event.deferred_tool_call.tool_call_id + else: + pause_reason = SchedulingPause( + message=terminal_event.message + or "Agent backend run requested workflow pause for external input." + ) self._save_session_snapshot( session_scope=session_scope, backend_run_id=terminal_event.run_id, snapshot=terminal_event.session_snapshot, runtime_layer_specs=extract_runtime_layer_specs(runtime_request.request.composition), metadata=metadata, + pending_form_id=pending_form_id, + pending_tool_call_id=pending_tool_call_id, ) - yield PauseRequestedEvent( - reason=SchedulingPause( - message=terminal_event.message - or "Agent backend run requested workflow pause for external input." - ) - ) + yield PauseRequestedEvent(reason=pause_reason) return # Non-success terminal (failed / cancelled) skips per-output @@ -448,6 +522,27 @@ class DifyAgentNode(Node[DifyAgentNodeData]): ], } + def _build_human_input_form_repository( + self, + *, + dify_ctx: DifyRunContext, + workflow_run_id: str | None, + ) -> HumanInputFormRepository: + """Construct the existing HITL form repository for ask_human form creation. + + Mirrors the Human Input node's repository wiring (``node_runtime``) so the + ask_human form shares the same delivery/debug/console behavior: a + submission actor is only attributed for debugger/explore surfaces. + """ + invoke_source = dify_ctx.invoke_from.value + return HumanInputFormRepositoryImpl( + tenant_id=dify_ctx.tenant_id, + app_id=dify_ctx.app_id, + workflow_execution_id=workflow_run_id, + invoke_source=invoke_source, + submission_actor_id=dify_ctx.user_id if invoke_source in {"debugger", "explore"} else None, + ) + def _save_session_snapshot( self, *, @@ -456,6 +551,8 @@ class DifyAgentNode(Node[DifyAgentNodeData]): snapshot: CompositorSessionSnapshot | None, runtime_layer_specs: list[RuntimeLayerSpec], metadata: dict[str, Any], + pending_form_id: str | None = None, + pending_tool_call_id: str | None = None, ) -> None: if self._session_store is None: return @@ -465,6 +562,8 @@ class DifyAgentNode(Node[DifyAgentNodeData]): backend_run_id=backend_run_id, snapshot=snapshot, runtime_layer_specs=runtime_layer_specs, + pending_form_id=pending_form_id, + pending_tool_call_id=pending_tool_call_id, ) agent_backend = dict(metadata.get("agent_backend") or {}) agent_backend["session_snapshot_persisted"] = snapshot is not None diff --git a/api/core/workflow/nodes/agent_v2/ask_human_hitl.py b/api/core/workflow/nodes/agent_v2/ask_human_hitl.py new file mode 100644 index 0000000000..0b535cf4f1 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/ask_human_hitl.py @@ -0,0 +1,354 @@ +"""Map a ``dify.ask_human`` deferred tool call onto the existing HITL form path. + +ENG-636. When an Agent backend run ends with a deferred ``dify.ask_human`` tool +call, the workflow Agent node pauses the *outer* workflow through the very same +``HumanInputRequired`` form mechanism the Human Input node uses — reusing the +form repository, delivery channels, and submission endpoints unchanged. This +module is a pure translation layer: model-facing ask_human tool args become +graphon form entities (``HumanInputNodeData`` / ``FormInputConfig`` / +``UserActionConfig``) plus Dify delivery configs. It adds no new HITL behavior. + +The agent-side ``dify.ask_human`` contract is richer than the workflow form +schema in two places, handled here without widening the form vocabulary: + +* ask_human fields carry a human label; graphon ``FormInputConfig`` does not. + Labels are rendered into ``form_content`` next to a ``{{#$output.#}}`` + marker — exactly how the Human Input node positions labelled inputs today. +* ask_human action ids are valid identifiers of any length; graphon + ``UserActionConfig.id`` caps ids at 20 chars. Over-long ids are clamped + deterministically (stable + collision-resistant) so the form always builds; + the human-visible action *label* is always preserved verbatim. +""" + +from __future__ import annotations + +import hashlib +from collections.abc import Mapping, Sequence +from dataclasses import dataclass +from typing import Any, assert_never + +from dify_agent.layers.ask_human import ( + AskHumanAction, + AskHumanField, + AskHumanFileField, + AskHumanFileListField, + AskHumanParagraphField, + AskHumanSelectField, + AskHumanToolArgs, +) +from dify_agent.protocol import DeferredToolCallPayload +from pydantic import ValidationError + +from core.repositories.human_input_repository import FormCreateParams, HumanInputFormRepository +from core.workflow.human_input_adapter import ( + DeliveryChannelConfig, + EmailDeliveryConfig, + EmailDeliveryMethod, + EmailRecipients, + ExternalRecipient, + InteractiveSurfaceDeliveryMethod, +) +from graphon.entities.pause_reason import HumanInputRequired +from graphon.nodes.human_input.entities import ( + FileInputConfig, + FileListInputConfig, + FormInputConfig, + HumanInputNodeData, + ParagraphInputConfig, + SelectInputConfig, + StringListSource, + StringSource, + UserActionConfig, +) +from graphon.nodes.human_input.enums import ButtonStyle, TimeoutUnit, ValueSourceType +from models.agent_config_entities import AgentHumanContactConfig + +# Default ask_human tool name (see ``DifyAskHumanLayerConfig.tool_name``). The +# Agent node only knows how to translate this one deferred tool into a form. +DEFAULT_ASK_HUMAN_TOOL_NAME = "ask_human" + +# Graphon ``UserActionConfig.id`` hard-caps identifiers at 20 chars. +_MAX_ACTION_ID_LEN = 20 +# No timeout lives in the ask_human contract; reuse the Human Input node default. +_DEFAULT_TIMEOUT_HOURS = 36 +_EMAIL_SUBJECT_MAX_LEN = 120 + +# ``ButtonStyle`` has no "destructive"; map the ask_human destructive intent onto +# the closest existing form style rather than extending the HITL vocabulary. +_ACTION_STYLE_TO_BUTTON: dict[str, ButtonStyle] = { + "default": ButtonStyle.DEFAULT, + "primary": ButtonStyle.PRIMARY, + "destructive": ButtonStyle.ACCENT, +} + +# ask_human permits zero actions (a plain acknowledgement); the form surface +# needs at least one submit affordance. +_DEFAULT_SUBMIT_ACTION = UserActionConfig(id="submit", title="Submit", button_style=ButtonStyle.PRIMARY) + + +class AskHumanFormBuildError(ValueError): + """Raised when ask_human tool args cannot be mapped to a HITL form.""" + + +def parse_ask_human_args(raw_args: object) -> AskHumanToolArgs: + """Validate the raw deferred-tool ``args`` payload into ``AskHumanToolArgs``.""" + if isinstance(raw_args, AskHumanToolArgs): + return raw_args + if isinstance(raw_args, Mapping): + try: + return AskHumanToolArgs.model_validate(dict(raw_args)) + except ValidationError as error: + raise AskHumanFormBuildError(f"invalid ask_human args: {error}") from error + raise AskHumanFormBuildError(f"ask_human args must be a mapping, got {type(raw_args).__name__}") + + +def _clamp_action_id(action_id: str) -> str: + """Return a stable graphon-safe (<= 20 char) action id. + + ask_human ids are already valid identifiers; ids within the limit pass + through untouched so the resume round-trip returns the model's exact id. + Over-long ids are truncated with a short content hash to stay deterministic + and collision-resistant while remaining a valid identifier. + """ + if len(action_id) <= _MAX_ACTION_ID_LEN: + return action_id + digest = hashlib.blake2b(action_id.encode("utf-8"), digest_size=3).hexdigest() + prefix = action_id[: _MAX_ACTION_ID_LEN - len(digest) - 1] + return f"{prefix}_{digest}" + + +def _to_form_input(field: AskHumanField) -> FormInputConfig: + match field: + case AskHumanParagraphField(): + default = ( + StringSource(type=ValueSourceType.CONSTANT, value=field.default) if field.default is not None else None + ) + return ParagraphInputConfig(output_variable_name=field.name, default=default) + case AskHumanSelectField(): + return SelectInputConfig( + output_variable_name=field.name, + option_source=StringListSource( + type=ValueSourceType.CONSTANT, + value=[option.value for option in field.options], + ), + ) + case AskHumanFileField(): + return FileInputConfig(output_variable_name=field.name) + case AskHumanFileListField(): + return FileListInputConfig(output_variable_name=field.name, number_limits=field.max_files or 0) + case _: # pragma: no cover - exhaustive over the discriminated union + assert_never(field) + + +def _to_user_actions(actions: Sequence[AskHumanAction]) -> list[UserActionConfig]: + if not actions: + return [_DEFAULT_SUBMIT_ACTION] + return [ + UserActionConfig( + id=_clamp_action_id(action.id), + title=action.label, + button_style=_ACTION_STYLE_TO_BUTTON.get(action.style, ButtonStyle.DEFAULT), + ) + for action in actions + ] + + +def _render_form_content(args: AskHumanToolArgs) -> str: + """Compose the markdown body, positioning each field's label + input marker. + + Graphon ``FormInputConfig`` carries no label, so the field label is written + into the content next to the ``{{#$output.#}}`` marker that the form + surface replaces with the live input — identical to the Human Input node. + """ + blocks: list[str] = [] + if args.title: + blocks.append(f"## {args.title}") + blocks.append(args.question) + if args.markdown: + blocks.append(args.markdown) + for field in args.fields: + label = f"{field.label} *" if field.required else field.label + blocks.append(f"**{label}**\n\n{{{{#$output.{field.name}#}}}}") + return "\n\n".join(blocks) + + +def _resolved_default_values(args: AskHumanToolArgs) -> dict[str, Any]: + """Pre-fill map the form surface reads, keyed by output variable name. + + The graphon select input has no default field, so a select default can only + be conveyed here; paragraph defaults are included for a uniform pre-fill. + """ + defaults: dict[str, Any] = {} + for field in args.fields: + if isinstance(field, AskHumanParagraphField | AskHumanSelectField) and field.default is not None: + defaults[field.name] = field.default + return defaults + + +def ask_human_args_to_node_data(args: AskHumanToolArgs, *, node_title: str) -> HumanInputNodeData: + """Translate validated ask_human args into a synthetic Human Input node config.""" + return HumanInputNodeData( + title=node_title, + form_content=_render_form_content(args), + inputs=[_to_form_input(field) for field in args.fields], + user_actions=_to_user_actions(args.actions), + timeout=_DEFAULT_TIMEOUT_HOURS, + timeout_unit=TimeoutUnit.HOUR, + ) + + +def build_delivery_methods( + contacts: Sequence[AgentHumanContactConfig], + *, + args: AskHumanToolArgs, +) -> list[DeliveryChannelConfig]: + """Build form delivery channels: always the interactive surface, plus email to + the configured human contacts (the recipients chosen in Agent Soul).""" + methods: list[DeliveryChannelConfig] = [InteractiveSurfaceDeliveryMethod()] + + seen: set[str] = set() + emails: list[str] = [] + for contact in contacts: + email = (contact.email or "").strip() + if email and email not in seen: + seen.add(email) + emails.append(email) + + if emails: + subject = (args.title or args.question).strip()[:_EMAIL_SUBJECT_MAX_LEN] + if args.urgency == "high": + subject = f"[Action needed] {subject}" + body = f"{args.question}\n\nOpen the request: {EmailDeliveryConfig.URL_PLACEHOLDER}" + methods.append( + EmailDeliveryMethod( + config=EmailDeliveryConfig( + recipients=EmailRecipients(items=[ExternalRecipient(email=email) for email in emails]), + subject=subject, + body=body, + ) + ) + ) + return methods + + +@dataclass(frozen=True, slots=True) +class AskHumanFormCreated: + """A created ask_human HITL form, owner-agnostic (workflow run or conversation).""" + + form_id: str + args: AskHumanToolArgs + node_data: HumanInputNodeData + node_title: str + resolved_default_values: dict[str, Any] + + +def create_ask_human_form( + *, + deferred_tool_call: DeferredToolCallPayload, + node_id: str, + default_node_title: str, + contacts: Sequence[AgentHumanContactConfig], + repository: HumanInputFormRepository, + workflow_run_id: str | None = None, + conversation_id: str | None = None, + display_in_ui: bool = True, +) -> AskHumanFormCreated: + """Create a HITL form from an ask_human deferred call (caller verified tool_name). + + The form is owned by exactly one of ``workflow_run_id`` (workflow Agent node) + or ``conversation_id`` (Agent v2 chat). Raises ``AskHumanFormBuildError`` on + invalid args, a missing owner, or a repository failure. + """ + if not workflow_run_id and not conversation_id: + raise AskHumanFormBuildError("an ask_human HITL form requires a workflow_run_id or conversation_id") + + args = parse_ask_human_args(deferred_tool_call.args) + node_title = args.title or default_node_title + node_data = ask_human_args_to_node_data(args, node_title=node_title) + resolved_default_values = _resolved_default_values(args) + + try: + form = repository.create_form( + FormCreateParams( + workflow_execution_id=workflow_run_id, + conversation_id=conversation_id, + node_id=node_id, + form_config=node_data, + # No workflow-variable placeholders to resolve — the content is + # fully model-authored, so rendered == template. + rendered_content=node_data.form_content, + delivery_methods=build_delivery_methods(contacts, args=args), + display_in_ui=display_in_ui, + resolved_default_values=resolved_default_values, + ) + ) + except ValueError as error: + raise AskHumanFormBuildError(f"failed to create ask_human HITL form: {error}") from error + + return AskHumanFormCreated( + form_id=form.id, + args=args, + node_data=node_data, + node_title=node_title, + resolved_default_values=resolved_default_values, + ) + + +def build_ask_human_pause_reason( + *, + deferred_tool_call: DeferredToolCallPayload, + node_id: str, + default_node_title: str, + workflow_run_id: str | None, + contacts: Sequence[AgentHumanContactConfig], + repository: HumanInputFormRepository, + conversation_id: str | None = None, + expected_tool_name: str = DEFAULT_ASK_HUMAN_TOOL_NAME, + display_in_ui: bool = True, +) -> HumanInputRequired | None: + """Create a workflow HITL form for an ask_human call and return its pause reason. + + Returns ``None`` when the deferred call is *not* the ask_human tool, letting + the caller fall back to a generic scheduling pause. Raises + ``AskHumanFormBuildError`` when the call is ask_human but its args or the form + cannot be built — the caller should surface that as a node failure rather + than a silent, unresumable pause. + """ + if deferred_tool_call.tool_name != expected_tool_name: + return None + if not workflow_run_id: + raise AskHumanFormBuildError("workflow_run_id is required to create an ask_human HITL form") + + created = create_ask_human_form( + deferred_tool_call=deferred_tool_call, + node_id=node_id, + default_node_title=default_node_title, + contacts=contacts, + repository=repository, + workflow_run_id=workflow_run_id, + # A chatflow agent node also belongs to a conversation; tag the form so it is + # queryable per conversation. None for a pure workflow run (workflow_run_id only). + conversation_id=conversation_id, + display_in_ui=display_in_ui, + ) + return HumanInputRequired( + form_id=created.form_id, + form_content=created.node_data.form_content, + inputs=list(created.node_data.inputs), + actions=list(created.node_data.user_actions), + node_id=node_id, + node_title=created.node_title, + resolved_default_values=created.resolved_default_values, + ) + + +__all__ = [ + "DEFAULT_ASK_HUMAN_TOOL_NAME", + "AskHumanFormBuildError", + "AskHumanFormCreated", + "ask_human_args_to_node_data", + "build_ask_human_pause_reason", + "build_delivery_methods", + "create_ask_human_form", + "parse_ask_human_args", +] diff --git a/api/core/workflow/nodes/agent_v2/ask_human_resume.py b/api/core/workflow/nodes/agent_v2/ask_human_resume.py new file mode 100644 index 0000000000..13c9602722 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/ask_human_resume.py @@ -0,0 +1,159 @@ +"""Resume an Agent run after a dify.ask_human HITL form reaches a terminal state. + +ENG-638. When the outer workflow resumes (the human submitted the form, or it +timed out), graphon re-executes the Agent node's ``_run``. This module reads the +correlated HITL form (by ``pending_form_id``) and maps it back into the +agent-side ``dify.ask_human`` contract so the node can start a *second* Agent run +that carries the human's answer: + +* submitted -> AskHumanToolResult(status="submitted", action, values) +* timeout / expired -> AskHumanToolResult(status="timeout") +* still waiting (defensive: the host resumed us early) -> re-emit the same + HumanInputRequired pause rebuilt from the stored form definition. + +It only *reads* existing HITL form state — it never mutates the form or the HITL +submission flow. The DB read (``resolve_ask_human_form``) is kept thin so the +mapping (``map_form_to_outcome``) stays pure and unit-testable. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass + +from dify_agent.layers.ask_human import ( + AskHumanResultStatus, + AskHumanSelectedAction, + AskHumanToolResult, +) +from dify_agent.protocol import DeferredToolResultsPayload +from pydantic import JsonValue +from sqlalchemy import select + +from core.db.session_factory import session_factory +from graphon.entities.pause_reason import HumanInputRequired +from graphon.nodes.human_input.entities import FormDefinition +from graphon.nodes.human_input.enums import HumanInputFormStatus +from models.human_input import HumanInputForm + +# A WAITING form has not been answered yet; the other terminal states map onto +# the agent-facing result status. EXPIRED (global timeout) and TIMEOUT +# (node-level) both surface to the model as "timeout" so it can react. +_FORM_STATUS_TO_RESULT: dict[HumanInputFormStatus, AskHumanResultStatus] = { + HumanInputFormStatus.SUBMITTED: "submitted", + HumanInputFormStatus.TIMEOUT: "timeout", + HumanInputFormStatus.EXPIRED: "timeout", +} + + +@dataclass(frozen=True, slots=True) +class AskHumanResumeOutcome: + """Result of inspecting a correlated ask_human form on workflow resume. + + Exactly one of ``deferred_result`` / ``repause`` is set: + * ``deferred_result`` — the form reached a terminal state; start the second run. + * ``repause`` — the form is still waiting; re-emit this pause defensively. + """ + + deferred_result: AskHumanToolResult | None = None + repause: HumanInputRequired | None = None + + +def resolve_ask_human_form(*, form_id: str, tenant_id: str, node_id: str) -> AskHumanResumeOutcome | None: + """Load the correlated form and map it to a resume outcome. + + Returns ``None`` when the form no longer exists (correlation lost) — the + caller should fall back to a normal (non-resume) Agent run. + """ + with session_factory.create_session() as session: + form = session.scalar( + select(HumanInputForm).where( + HumanInputForm.id == form_id, + HumanInputForm.tenant_id == tenant_id, + ) + ) + if form is None: + return None + return map_form_to_outcome( + status=form.status, + selected_action_id=form.selected_action_id, + submitted_data=form.submitted_data, + rendered_content=form.rendered_content, + form_definition=form.form_definition, + form_id=form.id, + node_id=node_id, + ) + + +def map_form_to_outcome( + *, + status: HumanInputFormStatus, + selected_action_id: str | None, + submitted_data: str | None, + rendered_content: str, + form_definition: str, + form_id: str, + node_id: str, +) -> AskHumanResumeOutcome: + """Map a terminal (or still-waiting) HITL form to a resume outcome. Pure.""" + definition = FormDefinition.model_validate_json(form_definition) + if status == HumanInputFormStatus.WAITING: + return AskHumanResumeOutcome(repause=_rebuild_pause(definition=definition, form_id=form_id, node_id=node_id)) + + result_status = _FORM_STATUS_TO_RESULT.get(status, "unavailable") + if result_status != "submitted": + return AskHumanResumeOutcome(deferred_result=AskHumanToolResult(status=result_status)) + return AskHumanResumeOutcome( + deferred_result=AskHumanToolResult( + status="submitted", + action=_selected_action(selected_action_id=selected_action_id, definition=definition), + values=_submitted_values(submitted_data), + rendered_content=rendered_content, + ) + ) + + +def build_deferred_tool_results(*, tool_call_id: str, result: AskHumanToolResult) -> DeferredToolResultsPayload: + """Wrap an ask_human result as the deferred-tool-results payload for resume.""" + return DeferredToolResultsPayload(calls={tool_call_id: result.model_dump(mode="json")}) + + +def _submitted_values(submitted_data: str | None) -> dict[str, JsonValue]: + if not submitted_data: + return {} + parsed = json.loads(submitted_data) + if not isinstance(parsed, dict): + return {} + return {str(key): value for key, value in parsed.items()} + + +def _selected_action(*, selected_action_id: str | None, definition: FormDefinition) -> AskHumanSelectedAction | None: + if selected_action_id is None: + return None + # The form's user_action title is the verbatim ask_human action label set at + # form-build time; fall back to the id only if the action is somehow missing. + label = next( + (action.title for action in definition.user_actions if action.id == selected_action_id), + selected_action_id, + ) + return AskHumanSelectedAction(id=selected_action_id, label=label) + + +def _rebuild_pause(*, definition: FormDefinition, form_id: str, node_id: str) -> HumanInputRequired: + return HumanInputRequired( + form_id=form_id, + form_content=definition.rendered_content or definition.form_content, + inputs=list(definition.inputs), + actions=list(definition.user_actions), + node_id=node_id, + node_title=definition.node_title or node_id, + resolved_default_values=dict(definition.default_values), + ) + + +__all__ = [ + "AskHumanResumeOutcome", + "build_deferred_tool_results", + "map_form_to_outcome", + "resolve_ask_human_form", +] diff --git a/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py b/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py index 35da898f1e..8e0578d1a1 100644 --- a/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py +++ b/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py @@ -18,13 +18,15 @@ SUPPORTED_AGENT_BACKEND_FEATURES = frozenset( # ENG-623: exposed at runtime as the dify.drive declaration layer # (an index the agent pulls through the back proxy). "skills_files", + # ENG-635: human involvement is exposed at runtime as the dify.ask_human + # deferred tool; a call pauses via the existing HITL form mechanism. + "human", } ) RESERVED_AGENT_BACKEND_FEATURES = frozenset( { "knowledge", - "human", "memory", } ) @@ -85,6 +87,7 @@ def build_runtime_feature_manifest( reserved_status["tools.cli_tools"] = "supported_by_shell_bootstrap" reserved_status["env"] = "supported_by_shell_bootstrap" reserved_status["sandbox"] = "forwarded_to_shell_layer_config" + reserved_status["human"] = "supported_by_ask_human_hitl" if agent_soul.human.contacts else "not_configured" return { "supported": sorted(SUPPORTED_AGENT_BACKEND_FEATURES), diff --git a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py index 984a0a2069..d4aa43898d 100644 --- a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py +++ b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py @@ -5,6 +5,7 @@ from dataclasses import dataclass from typing import Any, Literal, Protocol, assert_never, cast from agenton.compositor import CompositorSessionSnapshot +from dify_agent.layers.ask_human import DifyAskHumanLayerConfig from dify_agent.layers.drive import ( DifyDriveFileConfig, DifyDriveLayerConfig, @@ -22,7 +23,7 @@ from dify_agent.layers.shell import ( DifyShellSandboxConfig, DifyShellSecretRefConfig, ) -from dify_agent.protocol import CreateRunRequest +from dify_agent.protocol import CreateRunRequest, DeferredToolResultsPayload from pydantic import BaseModel from clients.agent_backend import ( @@ -103,6 +104,9 @@ class WorkflowAgentRuntimeBuildContext: # idempotency key so the backend treats each retry as a fresh request. attempt: int = 0 session_snapshot: CompositorSessionSnapshot | None = None + # ENG-638: set when resuming after a submitted ask_human HITL form; threads + # the human's answer back into the second Agent run keyed by tool_call_id. + deferred_tool_results: DeferredToolResultsPayload | None = None @dataclass(frozen=True, slots=True) @@ -217,9 +221,11 @@ class WorkflowAgentRuntimeRequestBuilder: output=self._build_output_config(node_job.declared_outputs), tools=tools_layer, drive_config=drive_config, + ask_human_config=build_ask_human_layer_config(agent_soul), include_shell=dify_config.AGENT_SHELL_ENABLED, shell_config=build_shell_layer_config(agent_soul), session_snapshot=context.session_snapshot, + deferred_tool_results=context.deferred_tool_results, idempotency_key=self._idempotency_key(context), metadata=metadata, ) @@ -496,6 +502,20 @@ def build_shell_layer_config(agent_soul: AgentSoulConfig) -> DifyShellLayerConfi ) +def build_ask_human_layer_config(agent_soul: AgentSoulConfig) -> DifyAskHumanLayerConfig | None: + """Enable the dify.ask_human deferred tool when the soul configures human involvement. + + HITL is opt-in: only when at least one human contact is configured does the + model get the ``ask_human`` tool (recipients for the resulting form come from + those contacts, ENG-635). Returns ``None`` to leave the tool off entirely. + The tool/field guardrails use the layer defaults; ``human.tools`` semantics are + out of scope this round. + """ + if not agent_soul.human.contacts: + return None + return DifyAskHumanLayerConfig() + + def append_runtime_warnings(metadata: dict[str, Any], warnings: list[dict[str, str]]) -> None: """Merge build-time warnings into the metadata runtime-support manifest.""" if not warnings: diff --git a/api/core/workflow/nodes/agent_v2/session_store.py b/api/core/workflow/nodes/agent_v2/session_store.py index 08a83ad531..215fa67b5e 100644 --- a/api/core/workflow/nodes/agent_v2/session_store.py +++ b/api/core/workflow/nodes/agent_v2/session_store.py @@ -47,12 +47,20 @@ class StoredWorkflowAgentSession: session_snapshot: CompositorSessionSnapshot backend_run_id: str | None runtime_layer_specs: list[RuntimeLayerSpec] = field(default_factory=list) + # ENG-637: set while the session is paused on a dify.ask_human deferred call. + pending_form_id: str | None = None + pending_tool_call_id: str | None = None class WorkflowAgentRuntimeSessionStore: """Stores Agent backend session snapshots for workflow Agent node re-entry.""" def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None: + stored = self.load_active_session(scope) + return stored.session_snapshot if stored is not None else None + + def load_active_session(self, scope: WorkflowAgentSessionScope) -> StoredWorkflowAgentSession | None: + """Load the active session row including any pending ask_human correlation.""" if scope.workflow_run_id is None: return None @@ -69,7 +77,14 @@ class WorkflowAgentRuntimeSessionStore: ) if row is None: return None - return CompositorSessionSnapshot.model_validate_json(row.session_snapshot) + return StoredWorkflowAgentSession( + scope=scope, + session_snapshot=CompositorSessionSnapshot.model_validate_json(row.session_snapshot), + backend_run_id=row.backend_run_id, + runtime_layer_specs=_deserialize_specs(row.composition_layer_specs), + pending_form_id=row.pending_form_id, + pending_tool_call_id=row.pending_tool_call_id, + ) def list_active_sessions(self, *, workflow_run_id: str) -> list[StoredWorkflowAgentSession]: with session_factory.create_session() as session: @@ -109,6 +124,8 @@ class WorkflowAgentRuntimeSessionStore: backend_run_id: str, snapshot: CompositorSessionSnapshot | None, runtime_layer_specs: list[RuntimeLayerSpec], + pending_form_id: str | None = None, + pending_tool_call_id: str | None = None, ) -> None: if scope.workflow_run_id is None or snapshot is None: return @@ -141,6 +158,8 @@ class WorkflowAgentRuntimeSessionStore: session_snapshot=snapshot_json, composition_layer_specs=specs_json, status=WorkflowAgentRuntimeSessionStatus.ACTIVE, + pending_form_id=pending_form_id, + pending_tool_call_id=pending_tool_call_id, ) session.add(row) else: @@ -151,6 +170,9 @@ class WorkflowAgentRuntimeSessionStore: row.composition_layer_specs = specs_json row.status = WorkflowAgentRuntimeSessionStatus.ACTIVE row.cleaned_at = None + # Set (or clear, when omitted) the ask_human pause correlation. + row.pending_form_id = pending_form_id + row.pending_tool_call_id = pending_tool_call_id session.commit() def mark_cleaned(self, *, scope: WorkflowAgentSessionScope, backend_run_id: str | None = None) -> None: diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index feb3bc7a4c..42c83b30f2 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -153,6 +153,7 @@ def init_app(app: DifyApp) -> Celery: "tasks.trigger_processing_tasks", # async trigger processing "tasks.generate_summary_index_task", # summary index generation "tasks.regenerate_summary_index_task", # summary index regeneration + "tasks.app_generate.resume_agent_app_task", # ENG-635: Agent v2 chat ask_human resume ] day = dify_config.CELERY_BEAT_SCHEDULER_TIME diff --git a/api/migrations/versions/2026_06_15_1052-c167a72a00eb_add_ask_human_pause_correlation_to_.py b/api/migrations/versions/2026_06_15_1052-c167a72a00eb_add_ask_human_pause_correlation_to_.py new file mode 100644 index 0000000000..6bf0bae4bb --- /dev/null +++ b/api/migrations/versions/2026_06_15_1052-c167a72a00eb_add_ask_human_pause_correlation_to_.py @@ -0,0 +1,32 @@ +"""add ask_human pause correlation to agent_runtime_sessions + +Revision ID: c167a72a00eb +Revises: c4d5e6f7a8b9 +Create Date: 2026-06-15 10:52:15.736666 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = 'c167a72a00eb' +down_revision = 'c4d5e6f7a8b9' +branch_labels = None +depends_on = None + + +def upgrade(): + # ENG-637: correlate a paused dify.ask_human session to its awaiting HITL + # form and the deferred tool_call id, so the resumed Agent node can rebuild + # deferred_tool_results from the submitted form. Both columns are nullable + # and NULL whenever the session is not paused on human input. + with op.batch_alter_table('agent_runtime_sessions', schema=None) as batch_op: + batch_op.add_column(sa.Column('pending_form_id', models.types.StringUUID(), nullable=True)) + batch_op.add_column(sa.Column('pending_tool_call_id', sa.String(length=255), nullable=True)) + + +def downgrade(): + with op.batch_alter_table('agent_runtime_sessions', schema=None) as batch_op: + batch_op.drop_column('pending_tool_call_id') + batch_op.drop_column('pending_form_id') diff --git a/api/migrations/versions/2026_06_15_1110-d2f1a4b8c3e0_add_conversation_id_to_human_input_forms.py b/api/migrations/versions/2026_06_15_1110-d2f1a4b8c3e0_add_conversation_id_to_human_input_forms.py new file mode 100644 index 0000000000..3af00e3926 --- /dev/null +++ b/api/migrations/versions/2026_06_15_1110-d2f1a4b8c3e0_add_conversation_id_to_human_input_forms.py @@ -0,0 +1,29 @@ +"""add conversation_id to human_input_forms for agent v2 chat hitl + +Revision ID: d2f1a4b8c3e0 +Revises: c167a72a00eb +Create Date: 2026-06-15 11:10:00.000000 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = 'd2f1a4b8c3e0' +down_revision = 'c167a72a00eb' +branch_labels = None +depends_on = None + + +def upgrade(): + # ENG-635: Agent v2 chat ask_human forms are owned by a conversation turn + # instead of a workflow run (the new Agent App has no workflow_run_id). + # Nullable; existing workflow-owned forms keep conversation_id NULL. + with op.batch_alter_table('human_input_forms', schema=None) as batch_op: + batch_op.add_column(sa.Column('conversation_id', models.types.StringUUID(), nullable=True)) + + +def downgrade(): + with op.batch_alter_table('human_input_forms', schema=None) as batch_op: + batch_op.drop_column('conversation_id') diff --git a/api/models/agent.py b/api/models/agent.py index e60e09de81..649835f522 100644 --- a/api/models/agent.py +++ b/api/models/agent.py @@ -398,6 +398,12 @@ class AgentRuntimeSession(DefaultFieldsMixin, Base): default=AgentRuntimeSessionStatus.ACTIVE, ) cleaned_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + # ENG-637: when a run pauses for a dify.ask_human deferred call, these link + # the session to the awaiting HITL form and the deferred tool_call_id, so a + # resumed node can map the submitted form back into deferred_tool_results. + # Both NULL whenever the session is not paused on human input. + pending_form_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + pending_tool_call_id: Mapped[str | None] = mapped_column(String(255), nullable=True) # Back-compat alias for the shipped workflow lifecycle code (PR #36724). diff --git a/api/models/human_input.py b/api/models/human_input.py index 7b02e8d29d..d11274bc92 100644 --- a/api/models/human_input.py +++ b/api/models/human_input.py @@ -40,6 +40,13 @@ class HumanInputForm(DefaultFieldsMixin, Base): tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) workflow_run_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + # ENG-635: a RUNTIME form is tagged with its owning workflow run and/or its + # conversation. Workflow / Human-Input / agent-node forms always set + # workflow_run_id, and ALSO set conversation_id when the run has a conversation + # (chatflow / advanced-chat). Agent v2 chat ask_human forms set only + # conversation_id (the new Agent App has no workflow_run_id). At least one is set; + # resume routing prefers workflow_run_id when both are present. + conversation_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) form_kind: Mapped[HumanInputFormKind] = mapped_column( EnumText(HumanInputFormKind), nullable=False, diff --git a/api/services/human_input_service.py b/api/services/human_input_service.py index dcddfe0f2c..cf6645d935 100644 --- a/api/services/human_input_service.py +++ b/api/services/human_input_service.py @@ -223,9 +223,12 @@ class HumanInputService: if result.form_kind != HumanInputFormKind.RUNTIME: return - if result.workflow_run_id is None: - return - self.enqueue_resume(result.workflow_run_id) + # A RUNTIME form is owned by a workflow run (workflow Agent node) or a + # conversation (ENG-635: Agent v2 chat). Route the resume accordingly. + if result.workflow_run_id is not None: + self.enqueue_resume(result.workflow_run_id) + elif result.conversation_id is not None: + self.enqueue_agent_app_resume(conversation_id=result.conversation_id, form_id=result.form_id) def ensure_form_active(self, form: Form) -> None: if form.submitted: @@ -286,6 +289,22 @@ class HumanInputService: logger.warning("App mode %s does not support resume for workflow run %s", app.mode, workflow_run_id) + def enqueue_agent_app_resume(self, *, conversation_id: str, form_id: str) -> None: + """ENG-635: resume an Agent v2 chat after its ask_human form is submitted. + + Enqueues a background turn for the conversation; the Agent App runner + continues the agent run, threading the human's reply into the request as + ``deferred_tool_results``. + """ + from tasks.app_generate.resume_agent_app_task import resume_agent_app_execution + + try: + resume_agent_app_execution.apply_async( + kwargs={"conversation_id": conversation_id, "form_id": form_id}, + ) + except Exception: # pragma: no cover + logger.exception("Failed to enqueue Agent App resume for conversation %s form %s", conversation_id, form_id) + def _load_variable_pool_for_form(self, form: Form) -> ReadOnlyVariablePool | None: workflow_run_id = form.workflow_run_id if workflow_run_id is None: diff --git a/api/tasks/app_generate/resume_agent_app_task.py b/api/tasks/app_generate/resume_agent_app_task.py new file mode 100644 index 0000000000..118a476bc8 --- /dev/null +++ b/api/tasks/app_generate/resume_agent_app_task.py @@ -0,0 +1,70 @@ +"""Background resume of an Agent v2 chat after a submitted ask_human HITL form. + +ENG-635. When a human submits a conversation-owned ask_human form (delivered via +email/webapp), ``HumanInputService`` enqueues this task. It reconstructs the +conversation context and runs one blocking Agent App turn; the runner detects the +answered form and continues the agent run with the human's reply +(``deferred_tool_results``), persisting the assistant answer to the conversation. +""" + +from __future__ import annotations + +import logging + +from celery import shared_task + +from core.app.apps.agent_app.app_generator import AgentAppGenerator +from core.app.entities.app_invoke_entities import InvokeFrom +from extensions.ext_database import db +from models.account import Account +from models.human_input import HumanInputForm +from models.model import App, Conversation, EndUser +from tasks.app_generate.workflow_execute_task import WORKFLOW_BASED_APP_EXECUTION_QUEUE + +logger = logging.getLogger(__name__) + + +@shared_task(queue=WORKFLOW_BASED_APP_EXECUTION_QUEUE, name="resume_agent_app_execution") +def resume_agent_app_execution(*, conversation_id: str, form_id: str) -> None: + form = db.session.get(HumanInputForm, form_id) + if form is None or form.conversation_id != conversation_id: + logger.warning("Agent App resume: form %s missing or conversation mismatch", form_id) + return + + app_model = db.session.get(App, form.app_id) + if app_model is None: + logger.warning("Agent App resume: app %s not found for form %s", form.app_id, form_id) + return + + conversation = db.session.get(Conversation, conversation_id) + if conversation is None: + logger.warning("Agent App resume: conversation %s not found", conversation_id) + return + + user = _resolve_conversation_user(app_model=app_model, conversation=conversation) + if user is None: + logger.warning("Agent App resume: no user resolvable for conversation %s", conversation_id) + return + + try: + AgentAppGenerator().resume_after_form_submission( + app_model=app_model, + user=user, + conversation_id=conversation_id, + invoke_from=InvokeFrom.WEB_APP, + ) + except Exception: + logger.exception("Agent App resume failed for conversation %s form %s", conversation_id, form_id) + finally: + db.session.close() + + +def _resolve_conversation_user(*, app_model: App, conversation: Conversation) -> Account | EndUser | None: + if conversation.from_account_id: + account = db.session.get(Account, conversation.from_account_id) + if account is not None: + account.set_tenant_id(app_model.tenant_id) + return account + if conversation.from_end_user_id: + return db.session.get(EndUser, conversation.from_end_user_id) + return None diff --git a/api/tests/unit_tests/clients/agent_backend/test_request_builder.py b/api/tests/unit_tests/clients/agent_backend/test_request_builder.py index 5ab1912d4c..0fa4d3261b 100644 --- a/api/tests/unit_tests/clients/agent_backend/test_request_builder.py +++ b/api/tests/unit_tests/clients/agent_backend/test_request_builder.py @@ -327,3 +327,49 @@ def test_agent_app_request_builder_adds_shell_layer_when_include_shell(): assert layers[DIFY_SHELL_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID} shell_config = cast(DifyShellLayerConfig, layers[DIFY_SHELL_LAYER_ID].config) assert shell_config.env[0].name == "APP_ENV" + + +# ── ENG-635 / ENG-638: ask_human layer injection + deferred_tool_results ───── + + +def test_ask_human_layer_injected_when_configured(): + + from dify_agent.layers.ask_human import DIFY_ASK_HUMAN_LAYER_TYPE_ID, DifyAskHumanLayerConfig + + from clients.agent_backend.request_builder import DIFY_ASK_HUMAN_LAYER_ID + + run_input = _run_input().model_copy(update={"ask_human_config": DifyAskHumanLayerConfig()}) + request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input) + + layers = {layer.name: layer for layer in request.composition.layers} + assert DIFY_ASK_HUMAN_LAYER_ID in layers + assert layers[DIFY_ASK_HUMAN_LAYER_ID].type == DIFY_ASK_HUMAN_LAYER_TYPE_ID + # the deferred tool needs the history layer to resume, so history must precede it + names = [layer.name for layer in request.composition.layers] + assert names.index(DIFY_AGENT_HISTORY_LAYER_ID) < names.index(DIFY_ASK_HUMAN_LAYER_ID) + + +def test_no_ask_human_layer_when_unconfigured(): + from clients.agent_backend.request_builder import DIFY_ASK_HUMAN_LAYER_ID + + request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input()) + assert all(layer.name != DIFY_ASK_HUMAN_LAYER_ID for layer in request.composition.layers) + + +def test_deferred_tool_results_threaded_into_request(): + from dify_agent.protocol import DeferredToolResultsPayload + + payload = DeferredToolResultsPayload( + calls={ + "tool-call-1": { + "status": "submitted", + "action": {"id": "submit", "label": "Submit"}, + "values": {"x": "y"}, + } + } + ) + run_input = _run_input().model_copy(update={"deferred_tool_results": payload}) + request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input) + + assert request.deferred_tool_results is not None + assert "tool-call-1" in request.deferred_tool_results.calls diff --git a/api/tests/unit_tests/core/app/apps/agent_app/test_app_generator.py b/api/tests/unit_tests/core/app/apps/agent_app/test_app_generator.py index dcaa31e15e..62073bb73a 100644 --- a/api/tests/unit_tests/core/app/apps/agent_app/test_app_generator.py +++ b/api/tests/unit_tests/core/app/apps/agent_app/test_app_generator.py @@ -238,3 +238,56 @@ class TestGenerateWorker: queue_manager = mocker.MagicMock() self._call(generator, mocker, queue_manager) assert queue_manager.publish_error.called + + +class TestResumeAfterFormSubmission: + """ENG-638: a resume turn re-sends the paused turn's original query so the + composition's user-prompt layer matches the suspended snapshot (never blank).""" + + def _wire(self, generator, mocker: MockerFixture): + generator._resolve_agent = mocker.MagicMock( + return_value=(mocker.MagicMock(id="agent1"), mocker.MagicMock(id="snap1"), mocker.MagicMock()) + ) + generator._init_generate_records = mocker.MagicMock( + return_value=(mocker.MagicMock(id="conv", mode="agent"), mocker.MagicMock(id="msg")) + ) + generator._handle_response = mocker.MagicMock(return_value=None) + mocker.patch(f"{MODULE}.ConversationService.get_conversation", return_value=mocker.MagicMock(id="conv")) + mocker.patch(f"{MODULE}.AgentAppConfigManager.get_app_config", return_value=mocker.MagicMock(variables=[])) + mocker.patch(f"{MODULE}.ModelConfigConverter.convert", return_value=mocker.MagicMock()) + mocker.patch(f"{MODULE}.TraceQueueManager", return_value=mocker.MagicMock()) + mocker.patch(f"{MODULE}.MessageBasedAppQueueManager", return_value=mocker.MagicMock()) + mocker.patch(f"{MODULE}.threading.Thread", return_value=mocker.MagicMock()) + return mocker.patch( + f"{MODULE}.AgentAppGenerateEntity", return_value=mocker.MagicMock(task_id="t", user_id="user") + ) + + def test_resume_resends_paused_turn_query(self, generator, mocker: MockerFixture): + entity = self._wire(generator, mocker) + db_mock = mocker.patch(f"{MODULE}.db") + db_mock.session.scalar.return_value = mocker.MagicMock(query="original question") + + generator.resume_after_form_submission( + app_model=mocker.MagicMock(id="app1", tenant_id="tenant", mode="agent"), + user=DummyAccount("user"), + conversation_id="conv", + invoke_from=InvokeFrom.WEB_APP, + ) + + # The paused turn's query is re-sent verbatim — never blank. + assert entity.call_args.kwargs["query"] == "original question" + + def test_resume_falls_back_to_placeholder_when_no_paused_message(self, generator, mocker: MockerFixture): + entity = self._wire(generator, mocker) + db_mock = mocker.patch(f"{MODULE}.db") + db_mock.session.scalar.return_value = None + + generator.resume_after_form_submission( + app_model=mocker.MagicMock(id="app1", tenant_id="tenant", mode="agent"), + user=DummyAccount("user"), + conversation_id="conv", + invoke_from=InvokeFrom.WEB_APP, + ) + + # No prior user message -> a non-blank placeholder, still never blank. + assert entity.call_args.kwargs["query"] == "(resumed)" diff --git a/api/tests/unit_tests/core/app/apps/agent_app/test_app_runner.py b/api/tests/unit_tests/core/app/apps/agent_app/test_app_runner.py index b41e7b6ab0..48dea5583c 100644 --- a/api/tests/unit_tests/core/app/apps/agent_app/test_app_runner.py +++ b/api/tests/unit_tests/core/app/apps/agent_app/test_app_runner.py @@ -6,9 +6,11 @@ from __future__ import annotations from types import SimpleNamespace from typing import Any, override +from unittest.mock import MagicMock import pytest from agenton.compositor import CompositorSessionSnapshot +from dify_agent.layers.ask_human import AskHumanToolResult from dify_agent.protocol import CancelRunRequest, CancelRunResponse, RuntimeLayerSpec from clients.agent_backend import ( @@ -19,10 +21,11 @@ from clients.agent_backend import ( ) from core.app.apps.agent_app.app_runner import AgentAppRunner from core.app.apps.agent_app.runtime_request_builder import AgentAppRuntimeRequestBuilder -from core.app.apps.agent_app.session_store import AgentAppSessionScope +from core.app.apps.agent_app.session_store import AgentAppSessionScope, StoredAgentAppSession from core.app.apps.exc import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom from core.app.entities.queue_entities import QueueLLMChunkEvent, QueueMessageEndEvent +from core.workflow.nodes.agent_v2.ask_human_resume import AskHumanResumeOutcome from models.agent_config_entities import AgentSoulConfig @@ -65,17 +68,47 @@ class _RecordingFakeAgentBackendRunClient(FakeAgentBackendRunClient): class _FakeSessionStore: - def __init__(self, loaded: CompositorSessionSnapshot | None = None) -> None: + def __init__( + self, + loaded: CompositorSessionSnapshot | None = None, + loaded_session: StoredAgentAppSession | None = None, + ) -> None: self.loaded = loaded + self._loaded_session = loaded_session self.saved: list[ - tuple[AgentAppSessionScope, str, CompositorSessionSnapshot | None, list[RuntimeLayerSpec]] + tuple[ + AgentAppSessionScope, + str, + CompositorSessionSnapshot | None, + list[RuntimeLayerSpec], + str | None, + str | None, + ] ] = [] def load_active_snapshot(self, scope: AgentAppSessionScope) -> CompositorSessionSnapshot | None: return self.loaded - def save_active_snapshot(self, *, scope, backend_run_id, snapshot, runtime_layer_specs) -> None: - self.saved.append((scope, backend_run_id, snapshot, list(runtime_layer_specs))) + def load_active_session(self, scope: AgentAppSessionScope) -> StoredAgentAppSession | None: + if self._loaded_session is not None: + return self._loaded_session + if self.loaded is None: + return None + return StoredAgentAppSession(scope=scope, session_snapshot=self.loaded, backend_run_id=None) + + def save_active_snapshot( + self, + *, + scope, + backend_run_id, + snapshot, + runtime_layer_specs, + pending_form_id=None, + pending_tool_call_id=None, + ) -> None: + self.saved.append( + (scope, backend_run_id, snapshot, list(runtime_layer_specs), pending_form_id, pending_tool_call_id) + ) def _soul() -> AgentSoulConfig: @@ -144,11 +177,14 @@ def test_successful_turn_publishes_chunk_and_message_end_and_saves_session(): assert end_events[0].llm_result.model == "gpt-4o-mini" # The conversation session snapshot is persisted for multi-turn continuity. assert store.saved - saved_scope, saved_run_id, saved_snapshot, saved_specs = store.saved[0] + saved_scope, saved_run_id, saved_snapshot, saved_specs, pending_form_id, pending_tool_call_id = store.saved[0] assert saved_scope.conversation_id == "conv-1" assert saved_scope.agent_config_snapshot_id == "snap-1" assert saved_run_id == "fake-run-1" assert saved_snapshot is not None + # A successful turn carries no ask_human pause correlation. + assert pending_form_id is None + assert pending_tool_call_id is None assert [spec.name for spec in saved_specs] == [ "agent_soul_prompt", "agent_app_user_prompt", @@ -197,3 +233,68 @@ def test_extract_answer_handles_plain_string_and_dict(): assert AgentAppRunner._extract_answer("plain text") == "plain text" assert AgentAppRunner._extract_answer({"text": "hi"}) == "hi" assert AgentAppRunner._extract_answer({"a": 1}) == '{"a": 1}' + + +def test_ask_human_pauses_turn_creates_form_and_persists_correlation(): + # ENG-635/637: the PAUSED scenario emits a dify.ask_human deferred call, so + # the chat turn ends by creating a conversation-owned HITL form + saving the + # pause correlation, instead of crashing. Stub the form repo (DB-free). + client = FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario.PAUSED) + store = _FakeSessionStore() + qm = _FakeQueueManager() + runner = _runner(client, store) + + fake_repo = MagicMock() + fake_repo.create_form.return_value = MagicMock(id="form-1") + runner._build_form_repository = lambda dify_context: fake_repo # type: ignore[assignment] + + _run(runner, qm) + + # The conversation-owned form was created and the agent's question surfaced. + fake_repo.create_form.assert_called_once() + created_params = fake_repo.create_form.call_args.args[0] + assert created_params.conversation_id == "conv-1" + assert created_params.workflow_execution_id is None + assert [e for e in qm.events if isinstance(e, QueueMessageEndEvent)] + # The pause correlation is persisted so a form submission can resume the run. + assert store.saved + assert store.saved[0][4] == "form-1" + assert store.saved[0][5] == "fake-ask-human-1" + + +def test_submitted_form_resumes_turn_with_deferred_tool_results(monkeypatch): + # ENG-638: a turn that runs while a pending form is answered threads the + # human's reply into the request as deferred_tool_results. + snapshot = CompositorSessionSnapshot(layers=[]) + stored = StoredAgentAppSession( + scope=AgentAppSessionScope( + tenant_id="tenant-1", + app_id="app-1", + conversation_id="conv-1", + agent_id="agent-1", + agent_config_snapshot_id="snap-1", + ), + session_snapshot=snapshot, + backend_run_id="run-0", + pending_form_id="form-1", + pending_tool_call_id="call-1", + ) + store = _FakeSessionStore(loaded_session=stored) + submitted = AskHumanResumeOutcome(deferred_result=AskHumanToolResult(status="submitted", values={"ok": True})) + monkeypatch.setattr( + "core.app.apps.agent_app.app_runner.resolve_ask_human_form", + lambda **_kwargs: submitted, + ) + + client = FakeAgentBackendRunClient() # SUCCESS -> the resumed run completes + qm = _FakeQueueManager() + _run(_runner(client, store), qm) + + assert client.request is not None + assert client.request.deferred_tool_results is not None + assert set(client.request.deferred_tool_results.calls) == {"call-1"} + # ENG-638: the resume composition must keep the user-prompt layer so it + # matches the suspended snapshot's layer names (the agent backend rejects a + # mismatch). A resume therefore re-sends a non-blank query, never blank. + layer_names = [layer.name for layer in client.request.composition.layers] + assert "agent_app_user_prompt" in layer_names diff --git a/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py b/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py index a2e10d924c..5bd35e6d3c 100644 --- a/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py +++ b/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py @@ -288,6 +288,7 @@ class _DummyForm: form_definition: str rendered_content: str expiration_time: datetime + conversation_id: str | None = None form_kind: HumanInputFormKind = HumanInputFormKind.RUNTIME created_at: datetime = dataclasses.field(default_factory=naive_utc_now) selected_action_id: str | None = None diff --git a/api/tests/unit_tests/core/repositories/test_human_input_repository.py b/api/tests/unit_tests/core/repositories/test_human_input_repository.py index edd8be8618..780687bec5 100644 --- a/api/tests/unit_tests/core/repositories/test_human_input_repository.py +++ b/api/tests/unit_tests/core/repositories/test_human_input_repository.py @@ -73,6 +73,7 @@ class _DummyForm: form_definition: str rendered_content: str expiration_time: datetime + conversation_id: str | None = None form_kind: HumanInputFormKind = HumanInputFormKind.RUNTIME created_at: datetime = dataclasses.field(default_factory=naive_utc_now) selected_action_id: str | None = None diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py index 471e17468c..b2a5ef3931 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py @@ -1,8 +1,9 @@ from types import SimpleNamespace from typing import cast -from unittest.mock import patch +from unittest.mock import MagicMock, patch from agenton.compositor import CompositorSessionSnapshot +from dify_agent.layers.ask_human import AskHumanToolResult from dify_agent.protocol import RunStartedEvent, RunSucceededEvent, RunSucceededEventData from clients.agent_backend import ( @@ -15,15 +16,22 @@ from clients.agent_backend import ( from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext, InvokeFrom, UserFrom from core.workflow.file_reference import build_file_reference from core.workflow.nodes.agent_v2 import DifyAgentNode +from core.workflow.nodes.agent_v2.ask_human_resume import AskHumanResumeOutcome from core.workflow.nodes.agent_v2.binding_resolver import WorkflowAgentBindingBundle, WorkflowAgentBindingResolver from core.workflow.nodes.agent_v2.entities import DifyAgentNodeData from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter from core.workflow.nodes.agent_v2.runtime_request_builder import WorkflowAgentRuntimeRequestBuilder -from core.workflow.nodes.agent_v2.session_store import WorkflowAgentRuntimeSessionStore, WorkflowAgentSessionScope +from core.workflow.nodes.agent_v2.session_store import ( + StoredWorkflowAgentSession, + WorkflowAgentRuntimeSessionStore, + WorkflowAgentSessionScope, +) from graphon.entities import GraphInitParams +from graphon.entities.pause_reason import HumanInputRequired from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from graphon.file import File, FileTransferMethod, FileType from graphon.node_events import PauseRequestedEvent, StreamCompletedEvent +from graphon.nodes.human_input.entities import UserActionConfig from graphon.runtime import GraphRuntimeState from graphon.variables.segments import ArrayFileSegment, FileSegment, StringSegment from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding @@ -113,12 +121,16 @@ class FakeBindingResolver(WorkflowAgentBindingResolver): class FakeSessionStore: def __init__(self, snapshot: CompositorSessionSnapshot | None = None) -> None: self.loaded_snapshot = snapshot + # ENG-638: set to simulate resume after a submitted/timed-out form. + self.loaded_session: StoredWorkflowAgentSession | None = None self.saved: list[ tuple[ WorkflowAgentSessionScope, str, CompositorSessionSnapshot | None, list[RuntimeLayerSpec], + str | None, + str | None, ] ] = [] self.cleaned: list[tuple[WorkflowAgentSessionScope, str | None]] = [] @@ -126,6 +138,9 @@ class FakeSessionStore: def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None: return self.loaded_snapshot + def load_active_session(self, scope: WorkflowAgentSessionScope) -> StoredWorkflowAgentSession | None: + return self.loaded_session + def save_active_snapshot( self, *, @@ -133,8 +148,12 @@ class FakeSessionStore: backend_run_id: str, snapshot: CompositorSessionSnapshot | None, runtime_layer_specs: list[RuntimeLayerSpec], + pending_form_id: str | None = None, + pending_tool_call_id: str | None = None, ) -> None: - self.saved.append((scope, backend_run_id, snapshot, list(runtime_layer_specs))) + self.saved.append( + (scope, backend_run_id, snapshot, list(runtime_layer_specs), pending_form_id, pending_tool_call_id) + ) def mark_cleaned( self, @@ -378,10 +397,13 @@ def test_agent_node_saves_success_snapshot_and_reuses_existing_snapshot(): assert len(events) == 1 assert store.saved - scope, backend_run_id, saved_snapshot, saved_specs = store.saved[0] + scope, backend_run_id, saved_snapshot, saved_specs, pending_form_id, pending_tool_call_id = store.saved[0] assert scope.workflow_run_id == "workflow-run-1" assert backend_run_id == "fake-run-1" assert saved_snapshot is not None + # A successful terminal carries no ask_human pause correlation. + assert pending_form_id is None + assert pending_tool_call_id is None assert client.request is not None assert client.request.session_snapshot is existing_snapshot # Persist enough composition shape to replay a cleanup run; plugin layers @@ -462,13 +484,100 @@ def test_agent_node_paused_run_requests_workflow_pause_and_persists_snapshot(): store = FakeSessionStore() node = _node(scenario=FakeAgentBackendScenario.PAUSED, session_store=store) + # ENG-636: the PAUSED scenario emits a dify.ask_human deferred call, so the + # node now builds a HITL form and pauses with HumanInputRequired. Stub the + # form repository so the unit test stays DB-free. + fake_repo = MagicMock() + fake_repo.create_form.return_value = MagicMock(id="form-1") + node._build_human_input_form_repository = lambda *, dify_ctx, workflow_run_id: fake_repo # type: ignore[assignment] + events = list(node._run()) assert len(events) == 1 assert isinstance(events[0], PauseRequestedEvent) + assert isinstance(events[0].reason, HumanInputRequired) + assert events[0].reason.form_id == "form-1" + assert events[0].reason.node_id == "agent-node" + fake_repo.create_form.assert_called_once() assert store.saved assert store.saved[0][1] == "fake-run-1" assert store.saved[0][3], "paused agent run should still persist replayable layer specs" + # ENG-637: the awaiting form + deferred tool_call correlation is persisted. + assert store.saved[0][4] == "form-1" + assert store.saved[0][5] == "fake-ask-human-1" + + +def _pending_session(snapshot: CompositorSessionSnapshot) -> StoredWorkflowAgentSession: + return StoredWorkflowAgentSession( + scope=WorkflowAgentSessionScope( + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + workflow_run_id="workflow-run-1", + node_id="agent-node", + node_execution_id="exec-1", + binding_id="binding-1", + agent_id="agent-1", + agent_config_snapshot_id="snapshot-1", + ), + session_snapshot=snapshot, + backend_run_id="run-0", + pending_form_id="form-1", + pending_tool_call_id="call-1", + ) + + +def test_agent_node_resumes_with_deferred_tool_results_after_submitted_form(monkeypatch): + # ENG-638: a submitted form re-enters _run; the human's answer is threaded + # into the second Agent run as deferred_tool_results. + snapshot = CompositorSessionSnapshot(layers=[]) + store = FakeSessionStore(snapshot=snapshot) + store.loaded_session = _pending_session(snapshot) + + def _fake_resolve(*, form_id: str, tenant_id: str, node_id: str) -> AskHumanResumeOutcome: + assert form_id == "form-1" + return AskHumanResumeOutcome(deferred_result=AskHumanToolResult(status="submitted", values={"note": "ok"})) + + monkeypatch.setattr("core.workflow.nodes.agent_v2.agent_node.resolve_ask_human_form", _fake_resolve) + + client = FakeAgentBackendRunClient() # SUCCESS scenario -> second run completes + node = _node(agent_backend_client=client, session_store=store) + + events = list(node._run()) + + assert client.request is not None + assert client.request.deferred_tool_results is not None + assert set(client.request.deferred_tool_results.calls) == {"call-1"} + assert any(isinstance(event, StreamCompletedEvent) for event in events) + + +def test_agent_node_repauses_when_resumed_form_still_waiting(monkeypatch): + snapshot = CompositorSessionSnapshot(layers=[]) + store = FakeSessionStore(snapshot=snapshot) + store.loaded_session = _pending_session(snapshot) + + repause = HumanInputRequired( + form_id="form-1", + form_content="Approve?", + inputs=[], + actions=[UserActionConfig(id="ok", title="OK")], + node_id="agent-node", + node_title="Budget review", + ) + monkeypatch.setattr( + "core.workflow.nodes.agent_v2.agent_node.resolve_ask_human_form", + lambda **_kwargs: AskHumanResumeOutcome(repause=repause), + ) + + client = FakeAgentBackendRunClient() + node = _node(agent_backend_client=client, session_store=store) + + events = list(node._run()) + + assert len(events) == 1 + assert isinstance(events[0], PauseRequestedEvent) + assert isinstance(events[0].reason, HumanInputRequired) + assert client.request is None # no second Agent run was created def test_agent_node_records_stream_usage_metadata(): diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_ask_human_hitl.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_ask_human_hitl.py new file mode 100644 index 0000000000..5f656f13a2 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_ask_human_hitl.py @@ -0,0 +1,338 @@ +"""Unit tests for the ask_human -> HITL form translation layer (ENG-636).""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import MagicMock + +import pytest +from dify_agent.layers.ask_human import AskHumanToolArgs +from dify_agent.protocol import DeferredToolCallPayload + +from core.repositories.human_input_repository import FormCreateParams, HumanInputFormRepository +from core.workflow.human_input_adapter import ( + EmailDeliveryMethod, + ExternalRecipient, + InteractiveSurfaceDeliveryMethod, +) +from core.workflow.nodes.agent_v2.ask_human_hitl import ( + AskHumanFormBuildError, + ask_human_args_to_node_data, + build_ask_human_pause_reason, + build_delivery_methods, + parse_ask_human_args, +) +from graphon.nodes.human_input.entities import ( + FileInputConfig, + FileListInputConfig, + ParagraphInputConfig, + SelectInputConfig, +) +from graphon.nodes.human_input.enums import ButtonStyle, TimeoutUnit +from models.agent_config_entities import AgentHumanContactConfig + + +def _args(**overrides: Any) -> AskHumanToolArgs: + payload: dict[str, Any] = {"question": "Approve the budget?"} + payload.update(overrides) + return AskHumanToolArgs.model_validate(payload) + + +def _deferred_call(args: dict[str, Any], *, tool_name: str = "ask_human") -> DeferredToolCallPayload: + return DeferredToolCallPayload(tool_call_id="call-1", tool_name=tool_name, args=args) + + +def _fake_repository(form_id: str = "form-123") -> MagicMock: + repo = MagicMock(spec=HumanInputFormRepository) + repo.create_form.return_value = MagicMock(id=form_id) + return repo + + +# ─────────────────────────── parse_ask_human_args ─────────────────────────── + + +def test_parse_ask_human_args_from_mapping() -> None: + parsed = parse_ask_human_args({"question": "Need a decision"}) + assert isinstance(parsed, AskHumanToolArgs) + assert parsed.question == "Need a decision" + + +def test_parse_ask_human_args_passthrough() -> None: + original = _args() + assert parse_ask_human_args(original) is original + + +def test_parse_ask_human_args_invalid_payload_raises() -> None: + with pytest.raises(AskHumanFormBuildError): + parse_ask_human_args({"question": ""}) # blank question is rejected + + +def test_parse_ask_human_args_non_mapping_raises() -> None: + with pytest.raises(AskHumanFormBuildError): + parse_ask_human_args("not a mapping") + + +# ───────────────────────── ask_human_args_to_node_data ────────────────────── + + +def test_node_data_maps_every_field_type() -> None: + args = _args( + fields=[ + {"type": "paragraph", "name": "reason", "label": "Reason", "default": "n/a"}, + { + "type": "select", + "name": "tier", + "label": "Tier", + "options": [{"value": "t1", "label": "Tier 1"}, {"value": "t2", "label": "Tier 2"}], + "default": "t2", + }, + {"type": "file", "name": "doc", "label": "Document"}, + {"type": "file-list", "name": "evidence", "label": "Evidence", "max_files": 3}, + ], + ) + + node_data = ask_human_args_to_node_data(args, node_title="Budget review") + + assert [type(i) for i in node_data.inputs] == [ + ParagraphInputConfig, + SelectInputConfig, + FileInputConfig, + FileListInputConfig, + ] + paragraph, select, _file, file_list = node_data.inputs + assert isinstance(paragraph, ParagraphInputConfig) + assert isinstance(select, SelectInputConfig) + assert isinstance(file_list, FileListInputConfig) + assert paragraph.output_variable_name == "reason" + assert paragraph.default is not None + assert paragraph.default.value == "n/a" + assert select.option_source.value == ["t1", "t2"] + assert file_list.number_limits == 3 + assert node_data.timeout == 36 + assert node_data.timeout_unit == TimeoutUnit.HOUR + assert node_data.title == "Budget review" + + +def test_node_data_form_content_embeds_title_question_and_field_markers() -> None: + args = _args( + title="Decision needed", + markdown="Some **context** here.", + fields=[{"type": "paragraph", "name": "reason", "label": "Reason", "required": True}], + ) + + content = ask_human_args_to_node_data(args, node_title="t").form_content + + assert "## Decision needed" in content + assert "Approve the budget?" in content + assert "Some **context** here." in content + # The label carries a required marker and positions the input via $output. + assert "**Reason ***" in content + assert "{{#$output.reason#}}" in content + + +def test_node_data_maps_action_styles_and_titles() -> None: + args = _args( + actions=[ + {"id": "approve", "label": "Approve", "style": "primary"}, + {"id": "reject", "label": "Reject", "style": "destructive"}, + {"id": "later", "label": "Decide later"}, + ], + ) + + actions = ask_human_args_to_node_data(args, node_title="t").user_actions + + assert [(a.id, a.title, a.button_style) for a in actions] == [ + ("approve", "Approve", ButtonStyle.PRIMARY), + ("reject", "Reject", ButtonStyle.ACCENT), # destructive -> accent + ("later", "Decide later", ButtonStyle.DEFAULT), + ] + + +def test_node_data_synthesizes_submit_action_when_none_given() -> None: + actions = ask_human_args_to_node_data(_args(), node_title="t").user_actions + assert len(actions) == 1 + assert actions[0].id == "submit" + assert actions[0].button_style == ButtonStyle.PRIMARY + + +def test_node_data_clamps_overlong_action_id_deterministically() -> None: + long_id = "approve_the_quarterly_budget_request" # > 20 chars, valid identifier + args = _args(actions=[{"id": long_id, "label": "Approve"}]) + + first = ask_human_args_to_node_data(args, node_title="t").user_actions[0] + second = ask_human_args_to_node_data(args, node_title="t").user_actions[0] + + assert len(first.id) <= 20 + assert first.id.isidentifier() + assert first.id == second.id # stable across builds + assert first.title == "Approve" # label preserved verbatim + + +# ───────────────────────────── build_delivery_methods ────────────────────── + + +def test_delivery_always_includes_interactive_surface() -> None: + methods = build_delivery_methods([], args=_args()) + assert len(methods) == 1 + assert isinstance(methods[0], InteractiveSurfaceDeliveryMethod) + + +def test_delivery_adds_email_for_contacts_and_dedupes() -> None: + contacts = [ + AgentHumanContactConfig(email="a@x.com"), + AgentHumanContactConfig(email="a@x.com"), # duplicate + AgentHumanContactConfig(email=None), # no email + AgentHumanContactConfig(email="b@x.com"), + ] + + methods = build_delivery_methods(contacts, args=_args()) + + email_methods = [m for m in methods if isinstance(m, EmailDeliveryMethod)] + assert len(email_methods) == 1 + recipients = email_methods[0].config.recipients.items + assert [r.email for r in recipients if isinstance(r, ExternalRecipient)] == ["a@x.com", "b@x.com"] + + +def test_delivery_high_urgency_prefixes_subject() -> None: + methods = build_delivery_methods( + [AgentHumanContactConfig(email="a@x.com")], + args=_args(title="Sign off", urgency="high"), + ) + email_method = next(m for m in methods if isinstance(m, EmailDeliveryMethod)) + assert email_method.config.subject.startswith("[Action needed] ") + + +# ─────────────────────────── build_ask_human_pause_reason ─────────────────── + + +def test_pause_reason_none_for_non_ask_human_tool() -> None: + result = build_ask_human_pause_reason( + deferred_tool_call=_deferred_call({"question": "q"}, tool_name="final_output"), + node_id="node-1", + default_node_title="Agent", + workflow_run_id="wf-1", + contacts=[], + repository=_fake_repository(), + ) + assert result is None + + +def test_pause_reason_requires_workflow_run_id() -> None: + with pytest.raises(AskHumanFormBuildError): + build_ask_human_pause_reason( + deferred_tool_call=_deferred_call({"question": "q"}), + node_id="node-1", + default_node_title="Agent", + workflow_run_id="", + contacts=[], + repository=_fake_repository(), + ) + + +def test_pause_reason_builds_form_and_returns_human_input_required() -> None: + repo = _fake_repository(form_id="form-xyz") + contacts = [AgentHumanContactConfig(email="a@x.com")] + + result = build_ask_human_pause_reason( + deferred_tool_call=_deferred_call( + { + "title": "Approve?", + "question": "Please approve", + "fields": [{"type": "paragraph", "name": "note", "label": "Note"}], + "actions": [{"id": "ok", "label": "OK"}], + } + ), + node_id="node-1", + default_node_title="Agent fallback", + workflow_run_id="wf-1", + contacts=contacts, + repository=repo, + ) + + assert result is not None + assert result.form_id == "form-xyz" + assert result.node_id == "node-1" + assert result.node_title == "Approve?" # args.title wins over default + assert [i.output_variable_name for i in result.inputs] == ["note"] + assert [a.id for a in result.actions] == ["ok"] + + params: FormCreateParams = repo.create_form.call_args.args[0] + assert params.workflow_execution_id == "wf-1" + assert params.node_id == "node-1" + # No conversation_id passed -> pure workflow run owns the form by workflow_run_id only. + assert params.conversation_id is None + assert any(isinstance(m, EmailDeliveryMethod) for m in params.delivery_methods) + + +def test_pause_reason_forwards_conversation_id_for_chatflow() -> None: + # ENG-635 (review): an agent node running in a chatflow tags its ask_human form + # with the conversation in addition to the workflow run. + repo = _fake_repository(form_id="form-xyz") + + build_ask_human_pause_reason( + deferred_tool_call=_deferred_call({"question": "Please approve"}), + node_id="node-1", + default_node_title="Agent", + workflow_run_id="wf-1", + conversation_id="conv-1", + contacts=[], + repository=repo, + ) + + params: FormCreateParams = repo.create_form.call_args.args[0] + assert params.workflow_execution_id == "wf-1" + assert params.conversation_id == "conv-1" + + +def test_pause_reason_falls_back_to_default_node_title() -> None: + result = build_ask_human_pause_reason( + deferred_tool_call=_deferred_call({"question": "q with no title"}), + node_id="node-1", + default_node_title="Agent fallback", + workflow_run_id="wf-1", + contacts=[], + repository=_fake_repository(), + ) + assert result is not None + assert result.node_title == "Agent fallback" + + +def test_pause_reason_select_default_flows_into_resolved_defaults() -> None: + repo = _fake_repository() + result = build_ask_human_pause_reason( + deferred_tool_call=_deferred_call( + { + "question": "pick", + "fields": [ + { + "type": "select", + "name": "tier", + "label": "Tier", + "options": [{"value": "t1", "label": "Tier 1"}], + "default": "t1", + } + ], + } + ), + node_id="node-1", + default_node_title="Agent", + workflow_run_id="wf-1", + contacts=[], + repository=repo, + ) + assert result is not None + assert result.resolved_default_values == {"tier": "t1"} + + +def test_pause_reason_wraps_repository_value_error() -> None: + repo = MagicMock(spec=HumanInputFormRepository) + repo.create_form.side_effect = ValueError("db boom") + with pytest.raises(AskHumanFormBuildError): + build_ask_human_pause_reason( + deferred_tool_call=_deferred_call({"question": "q"}), + node_id="node-1", + default_node_title="Agent", + workflow_run_id="wf-1", + contacts=[], + repository=repo, + ) diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_ask_human_resume.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_ask_human_resume.py new file mode 100644 index 0000000000..8f7454313d --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_ask_human_resume.py @@ -0,0 +1,121 @@ +"""Unit tests for mapping a submitted/timed-out HITL form back to ask_human (ENG-638).""" + +from __future__ import annotations + +from datetime import datetime + +from dify_agent.layers.ask_human import AskHumanToolResult + +from core.workflow.nodes.agent_v2.ask_human_resume import ( + build_deferred_tool_results, + map_form_to_outcome, +) +from graphon.entities.pause_reason import HumanInputRequired +from graphon.nodes.human_input.entities import FormDefinition, ParagraphInputConfig, UserActionConfig +from graphon.nodes.human_input.enums import HumanInputFormStatus + + +def _form_definition_json() -> str: + return FormDefinition( + form_content="Approve? {{#$output.note#}}", + inputs=[ParagraphInputConfig(output_variable_name="note")], + user_actions=[UserActionConfig(id="approve", title="Approve"), UserActionConfig(id="reject", title="Reject")], + rendered_content="Approve? ", + expiration_time=datetime(2026, 1, 1), + default_values={"note": "default"}, + node_title="Budget review", + ).model_dump_json() + + +def test_map_submitted_form_to_result() -> None: + outcome = map_form_to_outcome( + status=HumanInputFormStatus.SUBMITTED, + selected_action_id="approve", + submitted_data='{"note": "looks good"}', + rendered_content="Approve? ", + form_definition=_form_definition_json(), + form_id="form-1", + node_id="node-1", + ) + + assert outcome.repause is None + result = outcome.deferred_result + assert result is not None + assert result.status == "submitted" + assert result.action is not None + assert result.action.id == "approve" + assert result.action.label == "Approve" # verbatim label recovered from the form + assert result.values == {"note": "looks good"} + assert result.rendered_content == "Approve? " + + +def test_map_timeout_form_to_timeout_result() -> None: + outcome = map_form_to_outcome( + status=HumanInputFormStatus.TIMEOUT, + selected_action_id=None, + submitted_data=None, + rendered_content="x", + form_definition=_form_definition_json(), + form_id="form-1", + node_id="node-1", + ) + assert outcome.deferred_result is not None + assert outcome.deferred_result.status == "timeout" + assert outcome.deferred_result.action is None + + +def test_map_expired_form_to_timeout_result() -> None: + outcome = map_form_to_outcome( + status=HumanInputFormStatus.EXPIRED, + selected_action_id=None, + submitted_data=None, + rendered_content="x", + form_definition=_form_definition_json(), + form_id="form-1", + node_id="node-1", + ) + assert outcome.deferred_result is not None + assert outcome.deferred_result.status == "timeout" + + +def test_map_waiting_form_rebuilds_pause() -> None: + outcome = map_form_to_outcome( + status=HumanInputFormStatus.WAITING, + selected_action_id=None, + submitted_data=None, + rendered_content="x", + form_definition=_form_definition_json(), + form_id="form-1", + node_id="node-1", + ) + + assert outcome.deferred_result is None + pause = outcome.repause + assert isinstance(pause, HumanInputRequired) + assert pause.form_id == "form-1" + assert pause.node_id == "node-1" + assert pause.node_title == "Budget review" + assert [a.id for a in pause.actions] == ["approve", "reject"] + assert [i.output_variable_name for i in pause.inputs] == ["note"] + + +def test_map_submitted_without_action_id() -> None: + outcome = map_form_to_outcome( + status=HumanInputFormStatus.SUBMITTED, + selected_action_id=None, + submitted_data="{}", + rendered_content="x", + form_definition=_form_definition_json(), + form_id="form-1", + node_id="node-1", + ) + assert outcome.deferred_result is not None + assert outcome.deferred_result.action is None + assert outcome.deferred_result.values == {} + + +def test_build_deferred_tool_results_keys_by_tool_call_id() -> None: + result = AskHumanToolResult(status="timeout") + payload = build_deferred_tool_results(tool_call_id="call-42", result=result) + assert set(payload.calls) == {"call-42"} + assert payload.calls["call-42"] == result.model_dump(mode="json") diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py index 0fffb0617a..3cd77f7f2e 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py @@ -796,3 +796,36 @@ def test_build_drive_layer_config_all_refs_dangling_yields_no_config(): config, warnings = build_drive_layer_config(soul, agent_id="agent-1") assert config is None assert [w["code"] for w in warnings] == ["skill_ref_dangling"] + + +# ── ENG-635: ask_human layer gating + feature manifest ─────────────────────── + + +def test_build_ask_human_layer_config_gated_on_human_contacts(): + from dify_agent.layers.ask_human import DifyAskHumanLayerConfig + + from core.workflow.nodes.agent_v2.runtime_request_builder import build_ask_human_layer_config + + # no human involvement configured -> tool stays off + assert build_ask_human_layer_config(AgentSoulConfig()) is None + + soul = AgentSoulConfig.model_validate( + {"human": {"contacts": [{"id": "c-1", "name": "David", "email": "d@acme.com", "channel": "email"}]}} + ) + config = build_ask_human_layer_config(soul) + assert isinstance(config, DifyAskHumanLayerConfig) + assert config.enabled is True + + +def test_feature_manifest_marks_human_supported_when_configured(): + from core.workflow.nodes.agent_v2.runtime_feature_manifest import build_runtime_feature_manifest + + soul = AgentSoulConfig.model_validate( + {"human": {"contacts": [{"id": "c-1", "name": "David", "email": "d@acme.com", "channel": "email"}]}} + ) + manifest = build_runtime_feature_manifest(soul) + assert "human" in manifest["supported"] + assert "human" not in manifest["reserved"] + assert manifest["reserved_status"]["human"] == "supported_by_ask_human_hitl" + # configured human no longer produces a "not executed" warning + assert all("human" not in w["section"] for w in manifest["unsupported_runtime_warnings"]) diff --git a/api/tests/unit_tests/core/workflow/test_node_runtime.py b/api/tests/unit_tests/core/workflow/test_node_runtime.py index 216ce513f8..bdccea478d 100644 --- a/api/tests/unit_tests/core/workflow/test_node_runtime.py +++ b/api/tests/unit_tests/core/workflow/test_node_runtime.py @@ -625,12 +625,43 @@ def test_dify_human_input_runtime_create_form_filters_debugger_delivery_methods( params = repository.create_form.call_args.args[0] assert params.node_id == "human-input-node" assert params.workflow_execution_id == "workflow-execution-id" + # No conversation_id_getter wired -> a pure workflow run leaves it None. + assert params.conversation_id is None assert params.display_in_ui is True assert len(params.delivery_methods) == 1 assert params.delivery_methods[0].type == DeliveryMethodType.EMAIL assert params.delivery_methods[0].config.recipients.items[0].reference_id == "user-id" +def test_dify_human_input_runtime_create_form_tags_conversation_id_for_chatflow() -> None: + # ENG-635 (review): a chatflow (advanced-chat) run carries a conversation, so its + # Human Input form is tagged with BOTH its workflow run and its conversation — + # making the form queryable per conversation without changing resume routing. + repository = MagicMock() + repository.create_form.return_value = sentinel.form + node_data = HumanInputNodeData( + title="Human Input", + delivery_methods=[WebAppDeliveryMethod(enabled=True, config=_WebAppDeliveryConfig())], + ) + runtime = DifyHumanInputNodeRuntime( + _build_run_context(), + workflow_execution_id_getter=lambda: "workflow-execution-id", + conversation_id_getter=lambda: "conversation-id", + form_repository=repository, + ) + + runtime.create_form( + node_id="human-input-node", + node_data=node_data, + rendered_content="

Rendered

", + resolved_default_values={}, + ) + + params = repository.create_form.call_args.args[0] + assert params.workflow_execution_id == "workflow-execution-id" + assert params.conversation_id == "conversation-id" + + def test_dify_human_input_runtime_preserves_webapp_delivery_for_web_invocations() -> None: repository = MagicMock() repository.create_form.return_value = sentinel.form diff --git a/api/tests/unit_tests/services/test_human_input_service.py b/api/tests/unit_tests/services/test_human_input_service.py index a0434f9b43..01d918cd89 100644 --- a/api/tests/unit_tests/services/test_human_input_service.py +++ b/api/tests/unit_tests/services/test_human_input_service.py @@ -281,6 +281,36 @@ def test_submit_form_by_token_calls_repository_and_enqueue( enqueue_spy.assert_called_once_with(sample_form_record.workflow_run_id) +def test_submit_form_by_token_enqueues_agent_app_resume_for_conversation_form( + sample_form_record, mock_session_factory, mocker: MockerFixture +): + # ENG-635: a conversation-owned (Agent v2 chat) form routes to the chat + # resume, not the workflow resume. + session_factory, _ = mock_session_factory + repo = MagicMock(spec=HumanInputFormSubmissionRepository) + conversation_record = dataclasses.replace( + sample_form_record, + workflow_run_id=None, + conversation_id="conv-1", + ) + repo.get_by_token.return_value = conversation_record + repo.mark_submitted.return_value = conversation_record + service = HumanInputService(session_factory, form_repository=repo) + workflow_enqueue_spy = mocker.patch.object(service, "enqueue_resume") + chat_enqueue_spy = mocker.patch.object(service, "enqueue_agent_app_resume") + + service.submit_form_by_token( + recipient_type=RecipientType.STANDALONE_WEB_APP, + form_token="token", + selected_action_id="submit", + form_data={"field": "value"}, + submission_end_user_id="end-user-id", + ) + + chat_enqueue_spy.assert_called_once_with(conversation_id="conv-1", form_id=conversation_record.form_id) + workflow_enqueue_spy.assert_not_called() + + def test_submit_form_by_token_skips_enqueue_for_delivery_test( sample_form_record, mock_session_factory, mocker: MockerFixture ): diff --git a/api/tests/unit_tests/tasks/test_resume_agent_app_task.py b/api/tests/unit_tests/tasks/test_resume_agent_app_task.py new file mode 100644 index 0000000000..237b43200d --- /dev/null +++ b/api/tests/unit_tests/tasks/test_resume_agent_app_task.py @@ -0,0 +1,138 @@ +"""Unit tests for the ``resume_agent_app_execution`` celery task (ENG-635). + +Every DB access (``db.session.get``) and the generator are patched at the module +level, so the task's branch logic is exercised without a database or live stack. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +from pytest_mock import MockerFixture + +from models.account import Account +from models.human_input import HumanInputForm +from models.model import App, Conversation, EndUser +from tasks.app_generate import resume_agent_app_task as mod + +MODULE = "tasks.app_generate.resume_agent_app_task" + + +def _form(conversation_id: str = "conv-1", app_id: str = "app-1") -> MagicMock: + return MagicMock(conversation_id=conversation_id, app_id=app_id) + + +def _wire_db( + mocker: MockerFixture, + *, + form=None, + app=None, + conversation=None, + account=None, + end_user=None, +) -> MagicMock: + """Patch the module ``db`` so ``db.session.get(Model, id)`` dispatches by model.""" + table = { + HumanInputForm: form, + App: app, + Conversation: conversation, + Account: account, + EndUser: end_user, + } + db = mocker.patch(f"{MODULE}.db") + db.session.get.side_effect = lambda model, _id: table.get(model) + return db + + +def test_resume_happy_path_account_user_sets_tenant_and_runs(mocker: MockerFixture): + conversation = MagicMock(from_account_id="acct-1", from_end_user_id=None) + account = MagicMock() + app = MagicMock(tenant_id="tenant-1") + _wire_db(mocker, form=_form(), app=app, conversation=conversation, account=account) + gen = mocker.patch(f"{MODULE}.AgentAppGenerator") + + mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1") + + account.set_tenant_id.assert_called_once_with("tenant-1") + gen.return_value.resume_after_form_submission.assert_called_once() + kwargs = gen.return_value.resume_after_form_submission.call_args.kwargs + assert kwargs["conversation_id"] == "conv-1" + assert kwargs["user"] is account + assert kwargs["app_model"] is app + + +def test_resume_end_user_path(mocker: MockerFixture): + conversation = MagicMock(from_account_id=None, from_end_user_id="eu-1") + end_user = MagicMock() + _wire_db(mocker, form=_form(), app=MagicMock(tenant_id="t"), conversation=conversation, end_user=end_user) + gen = mocker.patch(f"{MODULE}.AgentAppGenerator") + + mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1") + + assert gen.return_value.resume_after_form_submission.call_args.kwargs["user"] is end_user + + +def test_resume_returns_when_form_missing(mocker: MockerFixture): + _wire_db(mocker, form=None) + gen = mocker.patch(f"{MODULE}.AgentAppGenerator") + + mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1") + + gen.assert_not_called() + + +def test_resume_returns_on_conversation_mismatch(mocker: MockerFixture): + _wire_db(mocker, form=_form(conversation_id="other-conv")) + gen = mocker.patch(f"{MODULE}.AgentAppGenerator") + + mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1") + + gen.assert_not_called() + + +def test_resume_returns_when_app_missing(mocker: MockerFixture): + _wire_db(mocker, form=_form(), app=None) + gen = mocker.patch(f"{MODULE}.AgentAppGenerator") + + mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1") + + gen.assert_not_called() + + +def test_resume_returns_when_conversation_missing(mocker: MockerFixture): + _wire_db(mocker, form=_form(), app=MagicMock(), conversation=None) + gen = mocker.patch(f"{MODULE}.AgentAppGenerator") + + mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1") + + gen.assert_not_called() + + +def test_resume_returns_when_no_user_resolvable(mocker: MockerFixture): + conversation = MagicMock(from_account_id=None, from_end_user_id=None) + _wire_db(mocker, form=_form(), app=MagicMock(), conversation=conversation) + gen = mocker.patch(f"{MODULE}.AgentAppGenerator") + + mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1") + + gen.assert_not_called() + + +def test_resume_returns_when_account_id_set_but_account_gone(mocker: MockerFixture): + conversation = MagicMock(from_account_id="acct-x", from_end_user_id=None) + _wire_db(mocker, form=_form(), app=MagicMock(), conversation=conversation, account=None) + gen = mocker.patch(f"{MODULE}.AgentAppGenerator") + + mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1") + + gen.assert_not_called() + + +def test_resume_swallows_generator_exception(mocker: MockerFixture): + conversation = MagicMock(from_account_id="acct-1", from_end_user_id=None) + _wire_db(mocker, form=_form(), app=MagicMock(tenant_id="t"), conversation=conversation, account=MagicMock()) + gen = mocker.patch(f"{MODULE}.AgentAppGenerator") + gen.return_value.resume_after_form_submission.side_effect = RuntimeError("boom") + + # The task must not propagate the failure (it is logged and the session closed). + mod.resume_agent_app_execution(conversation_id="conv-1", form_id="form-1")