diff --git a/api/core/app/apps/agent_app/app_generator.py b/api/core/app/apps/agent_app/app_generator.py index 13eaae47eb..92c75276e0 100644 --- a/api/core/app/apps/agent_app/app_generator.py +++ b/api/core/app/apps/agent_app/app_generator.py @@ -246,6 +246,8 @@ class AgentAppGenerator(MessageBasedAppGenerator): "conversation_id": conversation.id, "message_id": message.id, "user_from": UserFrom.ACCOUNT if isinstance(user, Account) else UserFrom.END_USER, + # Resume continues a paused agent run; skip input guards (see _generate_worker). + "is_resume": True, }, ) worker_thread.start() @@ -270,6 +272,7 @@ class AgentAppGenerator(MessageBasedAppGenerator): conversation_id: str, message_id: str, user_from: UserFrom, + is_resume: bool = False, ) -> None: from libs.flask_utils import preserve_flask_contexts @@ -279,20 +282,30 @@ class AgentAppGenerator(MessageBasedAppGenerator): message = self._get_message(message_id) app_config = application_generate_entity.app_config - # Apply app-level input guards (content moderation + annotation - # reply) before reaching the Agent backend, mirroring the EasyUI - # chat / agent-chat runners. These can short-circuit the turn. - app_model = db.session.get(App, app_config.app_id) - if app_model is None: - raise AgentAppGeneratorError("App not found") - handled, query = self._run_input_guards( - application_generate_entity=application_generate_entity, - app_model=app_model, - message=message, - queue_manager=queue_manager, - ) - if handled: - return + if is_resume: + # ENG-638: a resume continues a paused agent run; the human's + # reply is threaded in by the runner as deferred_tool_results. + # The query is the replayed paused-turn message, kept only to + # match the suspended snapshot's layers — it is NOT new + # end-user input, so input guards must NOT run. Moderation or an + # annotation match on the replayed query would short-circuit the + # turn and drop the human reply, stranding the ask_human session. + query = application_generate_entity.query or "" + else: + # Apply app-level input guards (content moderation + annotation + # reply) before reaching the Agent backend, mirroring the EasyUI + # chat / agent-chat runners. These can short-circuit the turn. + app_model = db.session.get(App, app_config.app_id) + if app_model is None: + raise AgentAppGeneratorError("App not found") + handled, query = self._run_input_guards( + application_generate_entity=application_generate_entity, + app_model=app_model, + message=message, + queue_manager=queue_manager, + ) + if handled: + return dify_context = DifyRunContext( tenant_id=app_config.tenant_id, diff --git a/api/tasks/human_input_timeout_tasks.py b/api/tasks/human_input_timeout_tasks.py index fd743205a1..f670d5a338 100644 --- a/api/tasks/human_input_timeout_tasks.py +++ b/api/tasks/human_input_timeout_tasks.py @@ -95,16 +95,30 @@ def check_and_handle_human_input_timeouts(limit: int = 100) -> None: timeout_status=HumanInputFormStatus.EXPIRED if is_global else HumanInputFormStatus.TIMEOUT, reason="global_timeout" if is_global else "node_timeout", ) - assert record.workflow_run_id is not None, "workflow_run_id should not be None for non-test form" if is_global: + # Global timeout applies only to workflow-owned forms + # (_is_global_timeout requires a workflow_run_id): end the run. + assert record.workflow_run_id is not None, "global timeout requires a workflow_run_id" _handle_global_timeout( form_id=record.form_id, workflow_run_id=record.workflow_run_id, node_id=record.node_id, session_factory=session_factory, ) - else: + elif record.workflow_run_id is not None: + # Workflow Agent node / Human Input node form: resume the workflow. service.enqueue_resume(record.workflow_run_id) + elif record.conversation_id is not None: + # ENG-635: Agent v2 chat ask_human form is conversation-owned (no + # workflow_run_id). Resume the chat turn so the timeout is threaded + # back to the agent run as the ask_human deferred_tool_result + # (status="timeout"), mirroring HumanInputService.submit_form_by_token. + service.enqueue_agent_app_resume(conversation_id=record.conversation_id, form_id=record.form_id) + else: + logger.warning( + "Timed-out form %s has neither workflow_run_id nor conversation_id; skipping resume", + record.form_id, + ) except Exception: logger.exception( "Failed to handle timeout for form_id=%s workflow_run_id=%s", diff --git a/api/tests/unit_tests/core/app/apps/agent_app/test_app_generator.py b/api/tests/unit_tests/core/app/apps/agent_app/test_app_generator.py index 62073bb73a..6c36815753 100644 --- a/api/tests/unit_tests/core/app/apps/agent_app/test_app_generator.py +++ b/api/tests/unit_tests/core/app/apps/agent_app/test_app_generator.py @@ -201,17 +201,18 @@ class TestGenerateWorker: mocker.patch(f"{MODULE}.AgentAppRunner", return_value=runner) return runner - def _call(self, generator, mocker: MockerFixture, queue_manager): + def _call(self, generator, mocker: MockerFixture, queue_manager, *, is_resume=False, query="query"): generator._generate_worker( flask_app=mocker.MagicMock(), context=mocker.MagicMock(), application_generate_entity=mocker.MagicMock( - agent_id="a", agent_config_snapshot_id="s", model_conf=mocker.MagicMock(model="m") + agent_id="a", agent_config_snapshot_id="s", model_conf=mocker.MagicMock(model="m"), query=query ), queue_manager=queue_manager, conversation_id="conv", message_id="msg", user_from=UserFrom.END_USER, + is_resume=is_resume, ) def test_happy_path_runs_backend(self, generator, mocker: MockerFixture): @@ -227,6 +228,20 @@ class TestGenerateWorker: self._call(generator, mocker, queue_manager) runner.run.assert_not_called() + def test_resume_skips_input_guards_and_consumes_reply(self, generator, mocker: MockerFixture): + # ENG-638 (review): on resume the replayed query is NOT new end-user input. + # Input guards must be skipped, even if moderation/annotation would match, + # so the run continues and the human reply (deferred_tool_results) is used. + runner = self._wire(generator, mocker, handled=True) # guards WOULD short-circuit + queue_manager = mocker.MagicMock() + + self._call(generator, mocker, queue_manager, is_resume=True, query="the approved reply") + + generator._run_input_guards.assert_not_called() + runner.run.assert_called_once() + # the replayed paused-turn query flows straight to the runner (snapshot match) + assert runner.run.call_args.kwargs["query"] == "the approved reply" + def test_generate_task_stopped_is_swallowed(self, generator, mocker: MockerFixture): self._wire(generator, mocker, run_side_effect=GenerateTaskStoppedError()) queue_manager = mocker.MagicMock() diff --git a/api/tests/unit_tests/tasks/test_human_input_timeout_tasks.py b/api/tests/unit_tests/tasks/test_human_input_timeout_tasks.py index 591da56f49..f2203bae25 100644 --- a/api/tests/unit_tests/tasks/test_human_input_timeout_tasks.py +++ b/api/tests/unit_tests/tasks/test_human_input_timeout_tasks.py @@ -63,6 +63,7 @@ class _FakeFormRepo: return SimpleNamespace( form_id=form_id, workflow_run_id=getattr(form, "workflow_run_id", None), + conversation_id=getattr(form, "conversation_id", None), node_id=getattr(form, "node_id", None), ) @@ -70,11 +71,15 @@ class _FakeFormRepo: class _FakeService: def __init__(self, _session_factory, form_repository=None): self.enqueued: list[str] = [] + self.agent_app_resumed: list[tuple[str, str]] = [] def enqueue_resume(self, workflow_run_id: str | None) -> None: if workflow_run_id is not None: self.enqueued.append(workflow_run_id) + def enqueue_agent_app_resume(self, *, conversation_id: str, form_id: str) -> None: + self.agent_app_resumed.append((conversation_id, form_id)) + def _build_form( *, @@ -84,6 +89,7 @@ def _build_form( expiration_time: datetime, workflow_run_id: str | None, node_id: str, + conversation_id: str | None = None, ) -> SimpleNamespace: return SimpleNamespace( id=form_id, @@ -91,6 +97,7 @@ def _build_form( created_at=created_at, expiration_time=expiration_time, workflow_run_id=workflow_run_id, + conversation_id=conversation_id, node_id=node_id, status=HumanInputFormStatus.WAITING, ) @@ -208,3 +215,45 @@ def test_check_and_handle_human_input_timeouts_omits_global_filter_when_disabled assert stmt is not None stmt_text = str(stmt) assert "created_at <=" not in stmt_text + + +def test_check_and_handle_human_input_timeouts_routes_conversation_owned_form_to_agent_app_resume( + monkeypatch: pytest.MonkeyPatch, +): + # ENG-635 (review): a conversation-owned Agent v2 chat ask_human form has no + # workflow_run_id. On timeout it must enqueue the Agent App resume (so the + # timeout is threaded back as the ask_human result), instead of asserting on + # workflow_run_id — which previously raised and was swallowed by the except. + now = datetime(2025, 1, 1, 12, 0, 0) + monkeypatch.setattr(task_module, "naive_utc_now", lambda: now) + monkeypatch.setattr(task_module.dify_config, "HUMAN_INPUT_GLOBAL_TIMEOUT_SECONDS", 3600) + monkeypatch.setattr(task_module, "db", SimpleNamespace(engine=object())) + + forms = [ + _build_form( + form_id="form-chat", + form_kind=HumanInputFormKind.RUNTIME, + created_at=now - timedelta(minutes=5), + expiration_time=now - timedelta(seconds=1), + workflow_run_id=None, + conversation_id="conv-1", + node_id="agent", + ), + ] + capture: dict[str, Any] = {} + monkeypatch.setattr(task_module, "sessionmaker", lambda *args, **kwargs: _FakeSessionFactory(forms, capture)) + + repo = _FakeFormRepo(form_map={form.id: form for form in forms}) + service = _FakeService(None) + monkeypatch.setattr(task_module, "HumanInputFormSubmissionRepository", lambda: repo) + monkeypatch.setattr(task_module, "HumanInputService", lambda *_args, **_kwargs: service) + monkeypatch.setattr(task_module, "_handle_global_timeout", lambda **_kwargs: None) + + task_module.check_and_handle_human_input_timeouts(limit=100) + + # Node timeout (conversation forms are never "global"), routed to Agent App resume. + assert repo.calls == [ + {"form_id": "form-chat", "timeout_status": HumanInputFormStatus.TIMEOUT, "reason": "node_timeout"} + ] + assert service.agent_app_resumed == [("conv-1", "form-chat")] + assert service.enqueued == []