fix(api): Agent v2 chat ask_human — resume on timeout + skip input guards on resume (#37492)

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
zyssyz123 2026-06-16 16:11:28 +08:00 committed by GitHub
parent 9ccbfbaf9d
commit f9ed81c3f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 109 additions and 18 deletions

View File

@ -246,6 +246,8 @@ class AgentAppGenerator(MessageBasedAppGenerator):
"conversation_id": conversation.id,
"message_id": message.id,
"user_from": UserFrom.ACCOUNT if isinstance(user, Account) else UserFrom.END_USER,
# Resume continues a paused agent run; skip input guards (see _generate_worker).
"is_resume": True,
},
)
worker_thread.start()
@ -270,6 +272,7 @@ class AgentAppGenerator(MessageBasedAppGenerator):
conversation_id: str,
message_id: str,
user_from: UserFrom,
is_resume: bool = False,
) -> None:
from libs.flask_utils import preserve_flask_contexts
@ -279,20 +282,30 @@ class AgentAppGenerator(MessageBasedAppGenerator):
message = self._get_message(message_id)
app_config = application_generate_entity.app_config
# Apply app-level input guards (content moderation + annotation
# reply) before reaching the Agent backend, mirroring the EasyUI
# chat / agent-chat runners. These can short-circuit the turn.
app_model = db.session.get(App, app_config.app_id)
if app_model is None:
raise AgentAppGeneratorError("App not found")
handled, query = self._run_input_guards(
application_generate_entity=application_generate_entity,
app_model=app_model,
message=message,
queue_manager=queue_manager,
)
if handled:
return
if is_resume:
# ENG-638: a resume continues a paused agent run; the human's
# reply is threaded in by the runner as deferred_tool_results.
# The query is the replayed paused-turn message, kept only to
# match the suspended snapshot's layers — it is NOT new
# end-user input, so input guards must NOT run. Moderation or an
# annotation match on the replayed query would short-circuit the
# turn and drop the human reply, stranding the ask_human session.
query = application_generate_entity.query or ""
else:
# Apply app-level input guards (content moderation + annotation
# reply) before reaching the Agent backend, mirroring the EasyUI
# chat / agent-chat runners. These can short-circuit the turn.
app_model = db.session.get(App, app_config.app_id)
if app_model is None:
raise AgentAppGeneratorError("App not found")
handled, query = self._run_input_guards(
application_generate_entity=application_generate_entity,
app_model=app_model,
message=message,
queue_manager=queue_manager,
)
if handled:
return
dify_context = DifyRunContext(
tenant_id=app_config.tenant_id,

View File

@ -95,16 +95,30 @@ def check_and_handle_human_input_timeouts(limit: int = 100) -> None:
timeout_status=HumanInputFormStatus.EXPIRED if is_global else HumanInputFormStatus.TIMEOUT,
reason="global_timeout" if is_global else "node_timeout",
)
assert record.workflow_run_id is not None, "workflow_run_id should not be None for non-test form"
if is_global:
# Global timeout applies only to workflow-owned forms
# (_is_global_timeout requires a workflow_run_id): end the run.
assert record.workflow_run_id is not None, "global timeout requires a workflow_run_id"
_handle_global_timeout(
form_id=record.form_id,
workflow_run_id=record.workflow_run_id,
node_id=record.node_id,
session_factory=session_factory,
)
else:
elif record.workflow_run_id is not None:
# Workflow Agent node / Human Input node form: resume the workflow.
service.enqueue_resume(record.workflow_run_id)
elif record.conversation_id is not None:
# ENG-635: Agent v2 chat ask_human form is conversation-owned (no
# workflow_run_id). Resume the chat turn so the timeout is threaded
# back to the agent run as the ask_human deferred_tool_result
# (status="timeout"), mirroring HumanInputService.submit_form_by_token.
service.enqueue_agent_app_resume(conversation_id=record.conversation_id, form_id=record.form_id)
else:
logger.warning(
"Timed-out form %s has neither workflow_run_id nor conversation_id; skipping resume",
record.form_id,
)
except Exception:
logger.exception(
"Failed to handle timeout for form_id=%s workflow_run_id=%s",

View File

@ -201,17 +201,18 @@ class TestGenerateWorker:
mocker.patch(f"{MODULE}.AgentAppRunner", return_value=runner)
return runner
def _call(self, generator, mocker: MockerFixture, queue_manager):
def _call(self, generator, mocker: MockerFixture, queue_manager, *, is_resume=False, query="query"):
generator._generate_worker(
flask_app=mocker.MagicMock(),
context=mocker.MagicMock(),
application_generate_entity=mocker.MagicMock(
agent_id="a", agent_config_snapshot_id="s", model_conf=mocker.MagicMock(model="m")
agent_id="a", agent_config_snapshot_id="s", model_conf=mocker.MagicMock(model="m"), query=query
),
queue_manager=queue_manager,
conversation_id="conv",
message_id="msg",
user_from=UserFrom.END_USER,
is_resume=is_resume,
)
def test_happy_path_runs_backend(self, generator, mocker: MockerFixture):
@ -227,6 +228,20 @@ class TestGenerateWorker:
self._call(generator, mocker, queue_manager)
runner.run.assert_not_called()
def test_resume_skips_input_guards_and_consumes_reply(self, generator, mocker: MockerFixture):
# ENG-638 (review): on resume the replayed query is NOT new end-user input.
# Input guards must be skipped, even if moderation/annotation would match,
# so the run continues and the human reply (deferred_tool_results) is used.
runner = self._wire(generator, mocker, handled=True) # guards WOULD short-circuit
queue_manager = mocker.MagicMock()
self._call(generator, mocker, queue_manager, is_resume=True, query="the approved reply")
generator._run_input_guards.assert_not_called()
runner.run.assert_called_once()
# the replayed paused-turn query flows straight to the runner (snapshot match)
assert runner.run.call_args.kwargs["query"] == "the approved reply"
def test_generate_task_stopped_is_swallowed(self, generator, mocker: MockerFixture):
self._wire(generator, mocker, run_side_effect=GenerateTaskStoppedError())
queue_manager = mocker.MagicMock()

View File

@ -63,6 +63,7 @@ class _FakeFormRepo:
return SimpleNamespace(
form_id=form_id,
workflow_run_id=getattr(form, "workflow_run_id", None),
conversation_id=getattr(form, "conversation_id", None),
node_id=getattr(form, "node_id", None),
)
@ -70,11 +71,15 @@ class _FakeFormRepo:
class _FakeService:
def __init__(self, _session_factory, form_repository=None):
self.enqueued: list[str] = []
self.agent_app_resumed: list[tuple[str, str]] = []
def enqueue_resume(self, workflow_run_id: str | None) -> None:
if workflow_run_id is not None:
self.enqueued.append(workflow_run_id)
def enqueue_agent_app_resume(self, *, conversation_id: str, form_id: str) -> None:
self.agent_app_resumed.append((conversation_id, form_id))
def _build_form(
*,
@ -84,6 +89,7 @@ def _build_form(
expiration_time: datetime,
workflow_run_id: str | None,
node_id: str,
conversation_id: str | None = None,
) -> SimpleNamespace:
return SimpleNamespace(
id=form_id,
@ -91,6 +97,7 @@ def _build_form(
created_at=created_at,
expiration_time=expiration_time,
workflow_run_id=workflow_run_id,
conversation_id=conversation_id,
node_id=node_id,
status=HumanInputFormStatus.WAITING,
)
@ -208,3 +215,45 @@ def test_check_and_handle_human_input_timeouts_omits_global_filter_when_disabled
assert stmt is not None
stmt_text = str(stmt)
assert "created_at <=" not in stmt_text
def test_check_and_handle_human_input_timeouts_routes_conversation_owned_form_to_agent_app_resume(
monkeypatch: pytest.MonkeyPatch,
):
# ENG-635 (review): a conversation-owned Agent v2 chat ask_human form has no
# workflow_run_id. On timeout it must enqueue the Agent App resume (so the
# timeout is threaded back as the ask_human result), instead of asserting on
# workflow_run_id — which previously raised and was swallowed by the except.
now = datetime(2025, 1, 1, 12, 0, 0)
monkeypatch.setattr(task_module, "naive_utc_now", lambda: now)
monkeypatch.setattr(task_module.dify_config, "HUMAN_INPUT_GLOBAL_TIMEOUT_SECONDS", 3600)
monkeypatch.setattr(task_module, "db", SimpleNamespace(engine=object()))
forms = [
_build_form(
form_id="form-chat",
form_kind=HumanInputFormKind.RUNTIME,
created_at=now - timedelta(minutes=5),
expiration_time=now - timedelta(seconds=1),
workflow_run_id=None,
conversation_id="conv-1",
node_id="agent",
),
]
capture: dict[str, Any] = {}
monkeypatch.setattr(task_module, "sessionmaker", lambda *args, **kwargs: _FakeSessionFactory(forms, capture))
repo = _FakeFormRepo(form_map={form.id: form for form in forms})
service = _FakeService(None)
monkeypatch.setattr(task_module, "HumanInputFormSubmissionRepository", lambda: repo)
monkeypatch.setattr(task_module, "HumanInputService", lambda *_args, **_kwargs: service)
monkeypatch.setattr(task_module, "_handle_global_timeout", lambda **_kwargs: None)
task_module.check_and_handle_human_input_timeouts(limit=100)
# Node timeout (conversation forms are never "global"), routed to Agent App resume.
assert repo.calls == [
{"form_id": "form-chat", "timeout_status": HumanInputFormStatus.TIMEOUT, "reason": "node_timeout"}
]
assert service.agent_app_resumed == [("conv-1", "form-chat")]
assert service.enqueued == []