fix(hitl): scope OpenAPI/Service-API resume to author-configured webapp forms

Pause-time token emission now draws only from the recipient set each API
surface is allowed to act on (emit ⊆ validate), so the CLI/OpenAPI caller is
never handed a token the resume endpoint would reject as 404 (WTA-867).

A form's recipients are partitioned once, per surface, into a single
FormDisposition: the surface-actionable recipient yields `form_token`, while
the rest are reported as `approval_channels` (e.g. ["email", "console"]) so the
caller is told where approval actually happens. Token and channels are two
projections of one decision (disposition_for_surface) loaded by one recipient
query (load_form_dispositions_by_form_id); the live pause path and the
reconnect snapshot path consume the same FormDisposition so they cannot drift.

RecipientType carries its user-facing approval-channel label as an enum tuple
value, set in __new__, so a new recipient type cannot be declared without one.

Tests: consolidate recipient/disposition/enrich tests into parametrized
matrices, add CONSOLE-surface and empty-token coverage, extract a shared fake
session for the pause-event tests.
This commit is contained in:
GareArc 2026-06-16 16:11:29 -07:00
parent 1427b0b098
commit f533e992d4
No known key found for this signature in database
20 changed files with 620 additions and 261 deletions

View File

@ -63,6 +63,8 @@ class OpenApiErrorCode(StrEnum):
FILE_EXTENSION_BLOCKED = "file_extension_blocked"
MEMBER_LIMIT_EXCEEDED = "member_limit_exceeded"
MEMBER_LICENSE_EXCEEDED = "member_license_exceeded"
HUMAN_INPUT_FORM_NOT_FOUND = "form_not_found"
RECIPIENT_SURFACE_MISMATCH = "recipient_surface_mismatch"
class ErrorDetail(BaseModel):
@ -239,3 +241,16 @@ class MemberLicenseExceeded(OpenApiError): # noqa: N818
error_code = OpenApiErrorCode.MEMBER_LICENSE_EXCEEDED
description = "Workspace member license capacity reached."
hint = "Contact your workspace administrator to expand the license seat count."
class HumanInputFormNotFound(OpenApiError): # noqa: N818
code = 404
error_code = OpenApiErrorCode.HUMAN_INPUT_FORM_NOT_FOUND
description = "No human-input form matches this token. It may be wrong, expired, or already submitted."
class RecipientSurfaceMismatch(OpenApiError): # noqa: N818
code = 403
error_code = OpenApiErrorCode.RECIPIENT_SURFACE_MISMATCH
description = "This form's recipient can't be submitted via the OpenAPI surface."
hint = "Action it through its channel (web app or console)."

View File

@ -12,16 +12,20 @@ import logging
from flask import Response
from flask_restx import Resource
from werkzeug.exceptions import BadRequest, NotFound
from werkzeug.exceptions import BadRequest
from controllers.common.human_input import HumanInputFormSubmitPayload, stringify_form_default_values
from controllers.common.schema import register_schema_models
from controllers.openapi import openapi_ns
from controllers.openapi._contract import accepts, returns
from controllers.openapi._errors import HumanInputFormNotFound, RecipientSurfaceMismatch
from controllers.openapi._models import FormSubmitResponse, HumanInputFormDefinitionResponse
from controllers.openapi.auth.composition import auth_router
from controllers.openapi.auth.data import AuthData
from core.workflow.human_input_policy import HumanInputSurface, is_recipient_type_allowed_for_surface
from core.workflow.human_input_policy import (
HumanInputSurface,
is_recipient_type_allowed_for_surface,
)
from extensions.ext_database import db
from libs.helper import to_timestamp
from libs.oauth_bearer import Scope
@ -47,12 +51,12 @@ def _jsonify_form_definition(form) -> Response:
def _ensure_form_belongs_to_app(form, app_model: App) -> None:
if form.app_id != app_model.id or form.tenant_id != app_model.tenant_id:
raise NotFound("Form not found")
raise HumanInputFormNotFound()
def _ensure_form_is_allowed_for_openapi(form) -> None:
if not is_recipient_type_allowed_for_surface(form.recipient_type, HumanInputSurface.OPENAPI):
raise NotFound("Form not found")
raise RecipientSurfaceMismatch()
@openapi_ns.route("/apps/<string:app_id>/form/human_input/<string:form_token>")
@ -60,11 +64,11 @@ class OpenApiWorkflowHumanInputFormApi(Resource):
@openapi_ns.response(200, "Form definition", openapi_ns.models[HumanInputFormDefinitionResponse.__name__])
@auth_router.guard(scope=Scope.APPS_RUN)
def get(self, app_id: str, form_token: str, *, auth_data: AuthData):
app_model, caller, caller_kind = auth_data.require_app_context()
app_model, _caller, _caller_kind = auth_data.require_app_context()
service = HumanInputService(db.engine)
form = service.get_form_by_token(form_token)
if form is None:
raise NotFound("Form not found")
raise HumanInputFormNotFound()
_ensure_form_belongs_to_app(form, app_model)
_ensure_form_is_allowed_for_openapi(form)
@ -80,7 +84,7 @@ class OpenApiWorkflowHumanInputFormApi(Resource):
service = HumanInputService(db.engine)
form = service.get_form_by_token(form_token)
if form is None:
raise NotFound("Form not found")
raise HumanInputFormNotFound()
_ensure_form_belongs_to_app(form, app_model)
_ensure_form_is_allowed_for_openapi(form)
@ -106,6 +110,6 @@ class OpenApiWorkflowHumanInputFormApi(Resource):
submission_end_user_id=submission_end_user_id,
)
except FormNotFoundError:
raise NotFound("Form not found")
raise HumanInputFormNotFound()
return FormSubmitResponse()

View File

@ -51,8 +51,11 @@ from core.tools.entities.tool_entities import ToolProviderType
from core.tools.tool_manager import ToolManager
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.trigger_manager import TriggerManager
from core.workflow.human_input_forms import load_form_tokens_by_form_id
from core.workflow.human_input_forms import (
load_form_dispositions_by_form_id,
)
from core.workflow.human_input_policy import (
FormDisposition,
HumanInputSurface,
enrich_human_input_pause_reasons,
resolve_human_input_pause_reason_inputs,
@ -340,13 +343,14 @@ class WorkflowResponseConverter:
human_input_form_ids = [reason.form_id for reason in resolved_reasons if isinstance(reason, HumanInputRequired)]
expiration_times_by_form_id: dict[str, datetime] = {}
display_in_ui_by_form_id: dict[str, bool] = {}
form_token_by_form_id: dict[str, str] = {}
dispositions_by_form_id: dict[str, FormDisposition] = {}
if human_input_form_ids:
stmt = select(
HumanInputForm.id,
HumanInputForm.expiration_time,
HumanInputForm.form_definition,
).where(HumanInputForm.id.in_(human_input_form_ids))
hitl_surface = _INVOKE_FROM_TO_HITL_SURFACE.get(self._application_generate_entity.invoke_from)
with Session(bind=db.engine) as session:
for form_id, expiration_time, form_definition in session.execute(stmt):
expiration_times_by_form_id[str(form_id)] = expiration_time
@ -355,17 +359,17 @@ class WorkflowResponseConverter:
except (TypeError, json.JSONDecodeError):
definition_payload = {}
display_in_ui_by_form_id[str(form_id)] = bool(definition_payload.get("display_in_ui"))
form_token_by_form_id = load_form_tokens_by_form_id(
dispositions_by_form_id = load_form_dispositions_by_form_id(
human_input_form_ids,
session=session,
surface=_INVOKE_FROM_TO_HITL_SURFACE.get(self._application_generate_entity.invoke_from),
surface=hitl_surface,
)
# Reconnect paths must preserve the same pause-reason contract as live streams;
# otherwise clients see schema drift after resume.
pause_reasons = enrich_human_input_pause_reasons(
pause_reasons,
form_tokens_by_form_id=form_token_by_form_id,
dispositions_by_form_id=dispositions_by_form_id,
expiration_times_by_form_id={
form_id: int(expiration_time.timestamp())
for form_id, expiration_time in expiration_times_by_form_id.items()
@ -379,6 +383,7 @@ class WorkflowResponseConverter:
expiration_time = expiration_times_by_form_id.get(reason.form_id)
if expiration_time is None:
raise ValueError(f"HumanInputForm not found for pause reason, form_id={reason.form_id}")
disposition = dispositions_by_form_id.get(reason.form_id)
responses.append(
HumanInputRequiredResponse(
task_id=task_id,
@ -391,7 +396,8 @@ class WorkflowResponseConverter:
inputs=reason.inputs,
actions=reason.actions,
display_in_ui=display_in_ui_by_form_id.get(reason.form_id, False),
form_token=form_token_by_form_id.get(reason.form_id),
form_token=disposition.form_token if disposition else None,
approval_channels=list(disposition.approval_channels) if disposition else [],
resolved_default_values=reason.resolved_default_values,
expiration_time=int(expiration_time.timestamp()),
),

View File

@ -288,6 +288,7 @@ class HumanInputRequiredResponse(StreamResponse):
actions: Sequence[UserActionConfig] = Field(default_factory=list)
display_in_ui: bool = False
form_token: str | None = None
approval_channels: list[str] = Field(default_factory=list)
resolved_default_values: Mapping[str, Any] = Field(default_factory=dict)
expiration_time: int = Field(..., description="Unix timestamp in seconds")
@ -311,6 +312,7 @@ class HumanInputRequiredPauseReasonPayload(BaseModel):
actions: Sequence[UserActionConfig] = Field(default_factory=list)
display_in_ui: bool = False
form_token: str | None = None
approval_channels: list[str] = Field(default_factory=list)
resolved_default_values: Mapping[str, Any] = Field(default_factory=dict)
expiration_time: int
@ -325,6 +327,7 @@ class HumanInputRequiredPauseReasonPayload(BaseModel):
actions=data.actions,
display_in_ui=data.display_in_ui,
form_token=data.form_token,
approval_channels=data.approval_channels,
resolved_default_values=data.resolved_default_values,
expiration_time=data.expiration_time,
)

View File

@ -12,60 +12,61 @@ from collections.abc import Sequence
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.workflow.human_input_policy import HumanInputSurface, get_preferred_form_token
from core.workflow.human_input_policy import (
FormDisposition,
HumanInputSurface,
disposition_for_surface,
)
from extensions.ext_database import db
from models.human_input import HumanInputFormRecipient, RecipientType
def load_form_dispositions_by_form_id(
form_ids: Sequence[str],
*,
session: Session | None = None,
surface: HumanInputSurface | None = None,
) -> dict[str, FormDisposition]:
"""Resolve each paused form's resume token and approval channels for `surface`."""
unique_form_ids = list(dict.fromkeys(form_ids))
if not unique_form_ids:
return {}
if session is not None:
return _load_form_dispositions_by_form_id(session, unique_form_ids, surface=surface)
with Session(bind=db.engine, expire_on_commit=False) as new_session:
return _load_form_dispositions_by_form_id(new_session, unique_form_ids, surface=surface)
def _load_form_dispositions_by_form_id(
session: Session,
form_ids: Sequence[str],
*,
surface: HumanInputSurface | None,
) -> dict[str, FormDisposition]:
recipients_by_form_id: dict[str, list[tuple[RecipientType, str]]] = {}
stmt = select(HumanInputFormRecipient).where(HumanInputFormRecipient.form_id.in_(form_ids))
for recipient in session.scalars(stmt):
recipients_by_form_id.setdefault(recipient.form_id, []).append(
(recipient.recipient_type, recipient.access_token or "")
)
return {
form_id: disposition_for_surface(recipients, surface=surface)
for form_id, recipients in recipients_by_form_id.items()
}
def load_form_tokens_by_form_id(
form_ids: Sequence[str],
*,
session: Session | None = None,
surface: HumanInputSurface | None = None,
) -> dict[str, str]:
"""Load the preferred access token for each human input form."""
unique_form_ids = list(dict.fromkeys(form_ids))
if not unique_form_ids:
return {}
if session is not None:
return _load_form_tokens_by_form_id(session, unique_form_ids, surface=surface)
with Session(bind=db.engine, expire_on_commit=False) as new_session:
return _load_form_tokens_by_form_id(new_session, unique_form_ids, surface=surface)
def _load_form_tokens_by_form_id(
session: Session,
form_ids: Sequence[str],
*,
surface: HumanInputSurface | None = None,
) -> dict[str, str]:
recipients_by_form_id: dict[str, list[tuple[RecipientType, str]]] = {}
stmt = select(HumanInputFormRecipient).where(HumanInputFormRecipient.form_id.in_(form_ids))
for recipient in session.scalars(stmt):
if not recipient.access_token:
continue
recipients_by_form_id.setdefault(recipient.form_id, []).append(
(recipient.recipient_type, recipient.access_token)
)
tokens_by_form_id: dict[str, str] = {}
for form_id, recipients in recipients_by_form_id.items():
token = _get_surface_form_token(recipients, surface=surface)
if token is not None:
tokens_by_form_id[form_id] = token
return tokens_by_form_id
def _get_surface_form_token(
recipients: Sequence[tuple[RecipientType, str]],
*,
surface: HumanInputSurface | None,
) -> str | None:
if surface in {HumanInputSurface.SERVICE_API, HumanInputSurface.OPENAPI}:
for recipient_type, token in recipients:
if recipient_type == RecipientType.STANDALONE_WEB_APP and token:
return token
return get_preferred_form_token(recipients)
"""Resume tokens only, for callers that don't surface approval channels."""
dispositions = load_form_dispositions_by_form_id(form_ids, session=session, surface=surface)
return {
form_id: disposition.form_token
for form_id, disposition in dispositions.items()
if disposition.form_token is not None
}

View File

@ -2,7 +2,7 @@ from __future__ import annotations
from collections.abc import Mapping, Sequence
from enum import StrEnum
from typing import Any
from typing import Any, NamedTuple
from graphon.entities.pause_reason import HumanInputRequired, PauseReason, PauseReasonType
from graphon.nodes.human_input.entities import FormInputConfig, SelectInputConfig
@ -20,7 +20,7 @@ class HumanInputSurface(StrEnum):
# SERVICE_API and OPENAPI are intentionally narrower than CONSOLE: token callers
# should only be able to act on end-user web forms, not internal console flows.
_ALLOWED_RECIPIENT_TYPES_BY_SURFACE: dict[HumanInputSurface, frozenset[RecipientType]] = {
ALLOWED_RECIPIENT_TYPES_BY_SURFACE: dict[HumanInputSurface, frozenset[RecipientType]] = {
HumanInputSurface.SERVICE_API: frozenset({RecipientType.STANDALONE_WEB_APP}),
HumanInputSurface.CONSOLE: frozenset({RecipientType.CONSOLE, RecipientType.BACKSTAGE}),
HumanInputSurface.OPENAPI: frozenset({RecipientType.STANDALONE_WEB_APP}),
@ -41,7 +41,7 @@ def is_recipient_type_allowed_for_surface(
) -> bool:
if recipient_type is None:
return False
return recipient_type in _ALLOWED_RECIPIENT_TYPES_BY_SURFACE[surface]
return recipient_type in ALLOWED_RECIPIENT_TYPES_BY_SURFACE[surface]
def get_preferred_form_token(
@ -59,10 +59,37 @@ def get_preferred_form_token(
return chosen_token
class FormDisposition(NamedTuple):
"""How a paused form resolves for one API surface.
A form's recipients split into those the surface may act on (yielding a resume
`form_token`) and those it may not (their channels named in `approval_channels`
so the caller is told where approval actually happens instead).
"""
form_token: str | None
approval_channels: list[str]
def disposition_for_surface(
recipients: Sequence[tuple[RecipientType, str]],
*,
surface: HumanInputSurface | None,
) -> FormDisposition:
if surface is None:
return FormDisposition(form_token=get_preferred_form_token(recipients), approval_channels=[])
allowed = ALLOWED_RECIPIENT_TYPES_BY_SURFACE[surface]
actionable = [(recipient_type, token) for recipient_type, token in recipients if recipient_type in allowed]
blocked_channels = {
recipient_type.approval_channel_label for recipient_type, _ in recipients if recipient_type not in allowed
}
return FormDisposition(form_token=get_preferred_form_token(actionable), approval_channels=sorted(blocked_channels))
def enrich_human_input_pause_reasons(
reasons: Sequence[Mapping[str, Any]],
*,
form_tokens_by_form_id: Mapping[str, str],
dispositions_by_form_id: Mapping[str, FormDisposition],
expiration_times_by_form_id: Mapping[str, int],
) -> list[dict[str, Any]]:
enriched: list[dict[str, Any]] = []
@ -71,7 +98,9 @@ def enrich_human_input_pause_reasons(
if updated.get("TYPE") == PauseReasonType.HUMAN_INPUT_REQUIRED:
form_id = updated.get("form_id")
if isinstance(form_id, str):
updated["form_token"] = form_tokens_by_form_id.get(form_id)
disposition = dispositions_by_form_id.get(form_id)
updated["form_token"] = disposition.form_token if disposition else None
updated["approval_channels"] = list(disposition.approval_channels) if disposition else []
expiration_time = expiration_times_by_form_id.get(form_id)
if expiration_time is not None:
updated["expiration_time"] = expiration_time

View File

@ -135,19 +135,32 @@ class HumanInputDelivery(DefaultFieldsMixin, Base):
class RecipientType(StrEnum):
# Second value = approval-channel label (surfaced in `approval_channels`).
# EMAIL_MEMBER member means that the
EMAIL_MEMBER = "email_member"
EMAIL_EXTERNAL = "email_external"
EMAIL_MEMBER = "email_member", "email"
EMAIL_EXTERNAL = "email_external", "email"
# STANDALONE_WEB_APP is used by the standalone web app.
#
# It's not used while running workflows / chatflows containing HumanInput
# node inside console.
STANDALONE_WEB_APP = "standalone_web_app"
STANDALONE_WEB_APP = "standalone_web_app", "web_app"
# CONSOLE is used while running workflows / chatflows containing HumanInput
# node inside console. (E.G. running installed apps or debugging workflows / chatflows)
CONSOLE = "console"
CONSOLE = "console", "console"
# BACKSTAGE is used for backstage input inside console.
BACKSTAGE = "backstage"
BACKSTAGE = "backstage", "console"
_approval_channel_label: str
def __new__(cls, value: str, approval_channel_label: str) -> "RecipientType":
member = str.__new__(cls, value)
member._value_ = value
member._approval_channel_label = approval_channel_label
return member
@property
def approval_channel_label(self) -> str:
return self._approval_channel_label
@final

View File

@ -23,8 +23,11 @@ from core.app.entities.task_entities import (
WorkflowStartStreamResponse,
)
from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext
from core.workflow.human_input_forms import load_form_tokens_by_form_id
from core.workflow.human_input_forms import (
load_form_dispositions_by_form_id,
)
from core.workflow.human_input_policy import (
FormDisposition,
HumanInputSurface,
enrich_human_input_pause_reasons,
resolve_human_input_pause_reason_inputs,
@ -359,7 +362,7 @@ def _build_human_input_required_events(
expiration_times_by_form_id: dict[str, int] = {}
display_in_ui_by_form_id: dict[str, bool] = {}
form_tokens_by_form_id: dict[str, str] = {}
dispositions_by_form_id: dict[str, FormDisposition] = {}
if human_input_form_ids and session_maker is not None:
stmt = select(HumanInputForm.id, HumanInputForm.expiration_time, HumanInputForm.form_definition).where(
HumanInputForm.id.in_(human_input_form_ids)
@ -372,7 +375,7 @@ def _build_human_input_required_events(
except (TypeError, json.JSONDecodeError):
definition_payload = {}
display_in_ui_by_form_id[str(form_id)] = bool(definition_payload.get("display_in_ui"))
form_tokens_by_form_id = load_form_tokens_by_form_id(
dispositions_by_form_id = load_form_dispositions_by_form_id(
human_input_form_ids,
session=session,
surface=human_input_surface,
@ -393,6 +396,7 @@ def _build_human_input_required_events(
reason.inputs,
variable_pool=variable_pool,
)
disposition = dispositions_by_form_id.get(form_id)
response = HumanInputRequiredResponse(
task_id=task_id,
@ -405,7 +409,8 @@ def _build_human_input_required_events(
inputs=resolved_inputs,
actions=reason.actions,
display_in_ui=display_in_ui_by_form_id.get(form_id, False),
form_token=form_tokens_by_form_id.get(form_id),
form_token=disposition.form_token if disposition else None,
approval_channels=list(disposition.approval_channels) if disposition else [],
resolved_default_values=reason.resolved_default_values,
expiration_time=expiration_time,
),
@ -493,11 +498,11 @@ def _build_pause_event(
for form_id in [reason.get("form_id")]
if isinstance(form_id, str)
]
form_tokens_by_form_id: dict[str, str] = {}
dispositions_by_form_id: dict[str, FormDisposition] = {}
expiration_times_by_form_id: dict[str, int] = {}
if human_input_form_ids and session_maker is not None:
with session_maker() as session:
form_tokens_by_form_id = load_form_tokens_by_form_id(
dispositions_by_form_id = load_form_dispositions_by_form_id(
human_input_form_ids,
session=session,
surface=human_input_surface,
@ -512,7 +517,7 @@ def _build_pause_event(
# otherwise clients see schema drift after resume.
reasons = enrich_human_input_pause_reasons(
reasons,
form_tokens_by_form_id=form_tokens_by_form_id,
dispositions_by_form_id=dispositions_by_form_id,
expiration_times_by_form_id=expiration_times_by_form_id,
)

View File

@ -26,11 +26,13 @@ from controllers.openapi._errors import (
ErrorBody,
ErrorDetail,
FilenameNotExists,
HumanInputFormNotFound,
MemberLicenseExceeded,
MemberLimitExceeded,
OpenApiError,
OpenApiErrorCode,
OpenApiErrorFormatter,
RecipientSurfaceMismatch,
)
from controllers.service_api.app.error import (
AppUnavailableError,
@ -319,6 +321,8 @@ ERROR_MATRIX = [
(BlockedFileExtensionError(), 400, "file_extension_blocked"),
(MemberLimitExceeded(), 403, "member_limit_exceeded"),
(MemberLicenseExceeded(), 403, "member_license_exceeded"),
(HumanInputFormNotFound(), 404, "form_not_found"),
(RecipientSurfaceMismatch(), 403, "recipient_surface_mismatch"),
]

View File

@ -11,8 +11,9 @@ from unittest.mock import Mock
import pytest
from flask import Flask
from werkzeug.exceptions import NotFound, UnprocessableEntity
from werkzeug.exceptions import UnprocessableEntity
from controllers.openapi._errors import HumanInputFormNotFound, RecipientSurfaceMismatch
from controllers.openapi.auth.data import AuthData
from libs.oauth_bearer import Scope, TokenType
from models.human_input import RecipientType
@ -89,7 +90,7 @@ class TestOpenApiHumanInputFormGet:
caller = SimpleNamespace(id="acct-1")
with app.test_request_context("/openapi/v1/apps/app-1/form/human_input/bad"):
with pytest.raises(NotFound):
with pytest.raises(HumanInputFormNotFound):
api.get.__wrapped__(
api,
app_id="app-1",
@ -101,7 +102,10 @@ class TestOpenApiHumanInputFormGet:
from controllers.openapi.human_input_form import OpenApiWorkflowHumanInputFormApi
form = SimpleNamespace(
app_id="other-app", tenant_id="tenant-1", expiration_time=datetime(2099, 1, 1, tzinfo=UTC)
app_id="other-app",
tenant_id="tenant-1",
recipient_type=RecipientType.STANDALONE_WEB_APP,
expiration_time=datetime(2099, 1, 1, tzinfo=UTC),
)
service_mock = Mock()
service_mock.get_form_by_token.return_value = form
@ -114,7 +118,7 @@ class TestOpenApiHumanInputFormGet:
caller = SimpleNamespace(id="acct-1")
with app.test_request_context("/openapi/v1/apps/app-1/form/human_input/tok-1"):
with pytest.raises(NotFound):
with pytest.raises(HumanInputFormNotFound):
api.get.__wrapped__(
api,
app_id="app-1",
@ -142,7 +146,7 @@ class TestOpenApiHumanInputFormGet:
caller = SimpleNamespace(id="acct-1")
with app.test_request_context("/openapi/v1/apps/app-1/form/human_input/tok-1"):
with pytest.raises(NotFound):
with pytest.raises(RecipientSurfaceMismatch):
api.get.__wrapped__(
api,
app_id="app-1",
@ -234,6 +238,38 @@ class TestOpenApiHumanInputFormPost:
)
assert result == ({}, 200)
def test_post_standalone_web_app_recipient_submits(
self, app: Flask, bypass_pipeline, monkeypatch: pytest.MonkeyPatch
):
from controllers.openapi.human_input_form import OpenApiWorkflowHumanInputFormApi
form = self._make_form(recipient_type=RecipientType.STANDALONE_WEB_APP)
service_mock = Mock()
service_mock.get_form_by_token.return_value = form
module = sys.modules["controllers.openapi.human_input_form"]
monkeypatch.setattr(module, "HumanInputService", lambda _engine: service_mock)
monkeypatch.setattr(module, "db", SimpleNamespace(engine=object()))
api = OpenApiWorkflowHumanInputFormApi()
app_model = SimpleNamespace(id="app-1", tenant_id="tenant-1")
caller = SimpleNamespace(id="anyone")
with app.test_request_context(
"/openapi/v1/apps/app-1/form/human_input/tok-1",
method="POST",
json={"action": "approve", "inputs": {}},
):
result = api.post.__wrapped__(
api,
app_id="app-1",
form_token="tok-1",
auth_data=_make_auth_data(app_model, caller, "end_user"),
)
service_mock.submit_form_by_token.assert_called_once()
assert result == ({}, 200)
def test_post_rejects_invalid_body_with_422(self, app: Flask, bypass_pipeline):
"""Malformed body → 422 via @accepts (was an unmapped pydantic error → 500)."""
from controllers.openapi.human_input_form import OpenApiWorkflowHumanInputFormApi

View File

@ -175,6 +175,7 @@ class TestAdvancedChatGenerateTaskPipeline:
"actions": [{"id": "approve", "title": "Approve", "button_style": "default"}],
"display_in_ui": True,
"form_token": "token-1",
"approval_channels": [],
"resolved_default_values": {},
"expiration_time": 123,
}

View File

@ -26,6 +26,26 @@ from models.account import Account
from models.human_input import RecipientType
class _FakeSession:
"""Stub session: `execute` feeds the form-expiration query, `scalars` the recipients."""
def __init__(self, *, execute_rows=(), scalars_rows=()):
self._execute_rows = execute_rows
self._scalars_rows = scalars_rows
def execute(self, _stmt):
return list(self._execute_rows)
def scalars(self, _stmt):
return list(self._scalars_rows)
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class _RecordingWorkflowAppRunner(WorkflowAppRunner):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@ -97,11 +117,11 @@ def test_graph_run_paused_event_emits_queue_pause_event():
assert queue_event.paused_nodes == ["node-pause-1"]
def _build_converter():
def _build_converter(*, invoke_from: InvokeFrom = InvokeFrom.SERVICE_API):
application_generate_entity = SimpleNamespace(
inputs={},
files=[],
invoke_from=InvokeFrom.SERVICE_API,
invoke_from=invoke_from,
app_config=SimpleNamespace(app_id="app-id", tenant_id="tenant-id"),
)
system_variables = build_system_variables(
@ -131,32 +151,15 @@ def test_queue_workflow_paused_event_to_stream_responses(monkeypatch: pytest.Mon
)
expiration_time = datetime(2024, 1, 1, tzinfo=UTC)
session = _FakeSession(
execute_rows=[("form-1", expiration_time, '{"display_in_ui": true}')],
scalars_rows=[
SimpleNamespace(form_id="form-1", recipient_type=RecipientType.CONSOLE, access_token="console-token"),
SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"),
],
)
class _FakeSession:
def execute(self, _stmt):
return [("form-1", expiration_time, '{"display_in_ui": true}')]
def scalars(self, _stmt):
return [
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.CONSOLE,
access_token="console-token",
),
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.BACKSTAGE,
access_token="backstage-token",
),
]
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: _FakeSession())
monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: session)
monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object()))
reason = HumanInputRequired(
@ -195,10 +198,92 @@ def test_queue_workflow_paused_event_to_stream_responses(monkeypatch: pytest.Mon
assert hi_resp.data.inputs[0].output_variable_name == "field"
assert hi_resp.data.actions[0].id == "approve"
assert hi_resp.data.display_in_ui is True
assert hi_resp.data.form_token == "backstage-token"
assert hi_resp.data.form_token is None
assert hi_resp.data.approval_channels == ["console"]
assert hi_resp.data.expiration_time == int(expiration_time.timestamp())
def _build_paused_human_input_response(monkeypatch, recipients):
"""Drive the live OPENAPI pause path with the given recipients via a fake session."""
converter = _build_converter(invoke_from=InvokeFrom.OPENAPI)
converter.workflow_start_to_stream_response(
task_id="task",
workflow_run_id="run-id",
workflow_id="workflow-id",
reason=WorkflowStartReason.INITIAL,
)
expiration_time = datetime(2024, 1, 1, tzinfo=UTC)
session = _FakeSession(
execute_rows=[("form-1", expiration_time, '{"display_in_ui": true}')],
scalars_rows=list(recipients),
)
monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: session)
monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object()))
reason = HumanInputRequired(
form_id="form-1",
form_content="Rendered",
inputs=[ParagraphInputConfig(output_variable_name="field")],
actions=[UserActionConfig(id="approve", title="Approve")],
node_id="node-id",
node_title="Human Step",
)
queue_event = QueueWorkflowPausedEvent(
reasons=[reason],
outputs={},
paused_nodes=["node-id"],
)
runtime_state = GraphRuntimeState(variable_pool=VariablePool(), start_at=0.0)
responses = converter.workflow_pause_to_stream_response(
event=queue_event,
task_id="task",
graph_runtime_state=runtime_state,
)
assert isinstance(responses[0], HumanInputRequiredResponse)
return responses
def test_openapi_pause_without_web_app_recipient_emits_approval_channels(monkeypatch: pytest.MonkeyPatch):
responses = _build_paused_human_input_response(
monkeypatch,
recipients=[
SimpleNamespace(form_id="form-1", recipient_type=RecipientType.EMAIL_MEMBER, access_token="email-token"),
SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"),
],
)
hi_resp = responses[0]
assert hi_resp.data.form_token is None
assert hi_resp.data.approval_channels == ["console", "email"]
pause_resp = responses[-1]
assert pause_resp.data.reasons[0]["approval_channels"] == ["console", "email"]
def test_openapi_pause_with_web_app_recipient_sets_token_and_channels(monkeypatch: pytest.MonkeyPatch):
responses = _build_paused_human_input_response(
monkeypatch,
recipients=[
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.STANDALONE_WEB_APP,
access_token="web-app-token",
),
SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"),
],
)
hi_resp = responses[0]
assert hi_resp.data.form_token == "web-app-token"
assert hi_resp.data.approval_channels == ["console"]
pause_resp = responses[-1]
assert pause_resp.data.reasons[0]["approval_channels"] == ["console"]
def test_queue_workflow_paused_event_resolves_variable_select_options(monkeypatch: pytest.MonkeyPatch):
converter = _build_converter()
converter.workflow_start_to_stream_response(
@ -209,21 +294,9 @@ def test_queue_workflow_paused_event_resolves_variable_select_options(monkeypatc
)
expiration_time = datetime(2024, 1, 1, tzinfo=UTC)
session = _FakeSession(execute_rows=[("form-1", expiration_time, '{"display_in_ui": true}')])
class _FakeSession:
def execute(self, _stmt):
return [("form-1", expiration_time, '{"display_in_ui": true}')]
def scalars(self, _stmt):
return []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: _FakeSession())
monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: session)
monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object()))
reason = HumanInputRequired(

View File

@ -134,6 +134,7 @@ class TestWorkflowGenerateTaskPipeline:
"actions": [],
"display_in_ui": False,
"form_token": None,
"approval_channels": [],
"resolved_default_values": {},
"expiration_time": 1,
}

View File

@ -32,6 +32,7 @@ from models.human_input import (
EmailMemberRecipientPayload,
HumanInputFormRecipient,
RecipientType,
StandaloneWebAppRecipientPayload,
)
@ -307,6 +308,9 @@ class _DummyRecipient:
recipient_type: RecipientType
access_token: str
form: _DummyForm | None = None
recipient_payload: str = dataclasses.field(
default_factory=lambda: StandaloneWebAppRecipientPayload().model_dump_json()
)
class _FakeScalarResult:

View File

@ -0,0 +1,63 @@
import pytest
from core.workflow.human_input_policy import FormDisposition, enrich_human_input_pause_reasons
from graphon.entities.pause_reason import PauseReasonType
_HUMAN_INPUT_REASON = {"TYPE": PauseReasonType.HUMAN_INPUT_REQUIRED, "form_id": "f1"}
@pytest.mark.parametrize(
("dispositions", "expected_token", "expected_channels"),
[
({"f1": FormDisposition(form_token=None, approval_channels=["console", "email"])}, None, ["console", "email"]),
({"f1": FormDisposition(form_token="tok", approval_channels=[])}, "tok", []),
# form_id absent from the map (no recipient rows) falls back to no token, no channels.
({}, None, []),
],
)
def test_enrich_projects_disposition_onto_reason(dispositions, expected_token, expected_channels):
out = enrich_human_input_pause_reasons(
[dict(_HUMAN_INPUT_REASON)],
dispositions_by_form_id=dispositions,
expiration_times_by_form_id={},
)
assert out[0]["form_token"] == expected_token
assert out[0]["approval_channels"] == expected_channels
def test_enrich_leaves_non_human_input_reasons_untouched():
reason = {"TYPE": "something_else", "form_id": "f1"}
out = enrich_human_input_pause_reasons(
[reason],
dispositions_by_form_id={"f1": FormDisposition(form_token="tok", approval_channels=["email"])},
expiration_times_by_form_id={},
)
assert out[0] == reason
assert "form_token" not in out[0]
assert "approval_channels" not in out[0]
def test_pause_reason_payload_carries_approval_channels_through_factory():
# from_response_data maps fields by hand; this guards approval_channels/form_token
# (the fields this feature added) against being dropped in that mapping.
from core.app.entities.task_entities import (
HumanInputRequiredPauseReasonPayload,
HumanInputRequiredResponse,
)
data = HumanInputRequiredResponse.Data(
form_id="f",
node_id="n",
node_title="t",
form_content="c",
expiration_time=123,
form_token=None,
approval_channels=["console"],
)
payload = HumanInputRequiredPauseReasonPayload.from_response_data(data)
assert payload.approval_channels == ["console"]
assert payload.form_token is None

View File

@ -1,7 +1,16 @@
from types import SimpleNamespace
from core.workflow.human_input_forms import _load_form_tokens_by_form_id, load_form_tokens_by_form_id
from core.workflow.human_input_policy import HumanInputSurface
import pytest
from core.workflow.human_input_forms import (
load_form_dispositions_by_form_id,
load_form_tokens_by_form_id,
)
from core.workflow.human_input_policy import (
FormDisposition,
HumanInputSurface,
disposition_for_surface,
)
from models.human_input import RecipientType
@ -13,91 +22,100 @@ class _FakeSession:
return self._recipients
def test_load_form_tokens_by_form_id_prefers_backstage_token() -> None:
def _recipient(form_id: str, recipient_type: RecipientType, access_token: str | None) -> SimpleNamespace:
return SimpleNamespace(form_id=form_id, recipient_type=recipient_type, access_token=access_token)
@pytest.mark.parametrize(
("surface", "expected_token"),
[
# Unfiltered (no surface) picks the highest-priority recipient: backstage.
(None, "backstage-token"),
# SERVICE_API may only act on the web-app recipient.
(HumanInputSurface.SERVICE_API, "web-token"),
],
)
def test_load_form_tokens_picks_token_for_surface(surface, expected_token) -> None:
session = _FakeSession(
recipients=[
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.STANDALONE_WEB_APP,
access_token="web-token",
),
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.CONSOLE,
access_token="console-token",
),
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.BACKSTAGE,
access_token="backstage-token",
),
[
_recipient("form-1", RecipientType.STANDALONE_WEB_APP, "web-token"),
_recipient("form-1", RecipientType.CONSOLE, "console-token"),
_recipient("form-1", RecipientType.BACKSTAGE, "backstage-token"),
]
)
assert load_form_tokens_by_form_id(["form-1"], session=session) == {"form-1": "backstage-token"}
assert load_form_tokens_by_form_id(["form-1"], session=session, surface=surface) == {"form-1": expected_token}
def test_load_form_tokens_by_form_id_ignores_unsupported_recipients() -> None:
def test_load_form_tokens_drops_forms_without_actionable_token() -> None:
session = _FakeSession(
recipients=[
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.EMAIL_MEMBER,
access_token="email-token",
),
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.CONSOLE,
access_token=None,
),
[
_recipient("form-1", RecipientType.EMAIL_MEMBER, "email-token"),
_recipient("form-1", RecipientType.CONSOLE, None),
]
)
assert load_form_tokens_by_form_id(["form-1"], session=session) == {}
def test_load_form_tokens_by_form_id_uses_shared_priority() -> None:
def test_load_form_tokens_service_api_surface_uses_web_token() -> None:
session = _FakeSession(
recipients=[
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.STANDALONE_WEB_APP,
access_token="web-token",
),
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.CONSOLE,
access_token="console-token",
),
[
_recipient("form-1", RecipientType.STANDALONE_WEB_APP, "web-token"),
_recipient("form-1", RecipientType.CONSOLE, "console-token"),
_recipient("form-1", RecipientType.BACKSTAGE, "backstage-token"),
]
)
assert _load_form_tokens_by_form_id(session, ["form-1"]) == {"form-1": "console-token"}
assert load_form_tokens_by_form_id(["form-1"], session=session, surface=HumanInputSurface.SERVICE_API) == {
"form-1": "web-token"
}
def test_load_form_tokens_by_form_id_uses_web_token_for_service_api_surface() -> None:
def test_load_dispositions_openapi_webapp_form_is_resumable() -> None:
session = _FakeSession(
recipients=[
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.STANDALONE_WEB_APP,
access_token="web-token",
),
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.CONSOLE,
access_token="console-token",
),
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.BACKSTAGE,
access_token="backstage-token",
),
[
_recipient("form-1", RecipientType.STANDALONE_WEB_APP, "web-token"),
_recipient("form-1", RecipientType.BACKSTAGE, "backstage-token"),
]
)
assert load_form_tokens_by_form_id(
["form-1"],
session=session,
surface=HumanInputSurface.SERVICE_API,
) == {"form-1": "web-token"}
assert load_form_dispositions_by_form_id(["form-1"], session=session, surface=HumanInputSurface.OPENAPI) == {
"form-1": FormDisposition(form_token="web-token", approval_channels=["console"])
}
def test_load_dispositions_openapi_backstage_only_form_yields_channels_not_token() -> None:
session = _FakeSession([_recipient("form-1", RecipientType.BACKSTAGE, "backstage-token")])
assert load_form_dispositions_by_form_id(["form-1"], session=session, surface=HumanInputSurface.OPENAPI) == {
"form-1": FormDisposition(form_token=None, approval_channels=["console"])
}
# disposition_for_surface partitions recipients into a surface-actionable resume
# token plus the approval channels of the recipients the surface may NOT act on.
_WEB = (RecipientType.STANDALONE_WEB_APP, "tok_web")
_BACKSTAGE = (RecipientType.BACKSTAGE, "tok_b")
_CONSOLE = (RecipientType.CONSOLE, "tok_c")
_EMAIL_MEMBER = (RecipientType.EMAIL_MEMBER, "t1")
_EMAIL_EXTERNAL = (RecipientType.EMAIL_EXTERNAL, "t2")
@pytest.mark.parametrize(
("recipients", "surface", "expected"),
[
# Token surface acts on the web-app recipient; blocked recipients become channels.
([_BACKSTAGE, _WEB], HumanInputSurface.OPENAPI, FormDisposition("tok_web", ["console"])),
([_EMAIL_MEMBER, _EMAIL_EXTERNAL], HumanInputSurface.OPENAPI, FormDisposition(None, ["email"])),
([_EMAIL_MEMBER, _BACKSTAGE], HumanInputSurface.OPENAPI, FormDisposition(None, ["console", "email"])),
# CONSOLE acts on console/backstage; a web-app recipient is blocked → web_app channel.
([_CONSOLE, _WEB], HumanInputSurface.CONSOLE, FormDisposition("tok_c", ["web_app"])),
([_WEB], HumanInputSurface.CONSOLE, FormDisposition(None, ["web_app"])),
# No surface: unfiltered priority token, channels never populated.
([_BACKSTAGE], None, FormDisposition("tok_b", [])),
([_WEB, _EMAIL_MEMBER], None, FormDisposition("tok_web", [])),
],
)
def test_disposition_for_surface_partitions_token_and_channels(recipients, surface, expected) -> None:
assert disposition_for_surface(recipients, surface=surface) == expected

View File

@ -12,38 +12,28 @@ from graphon.runtime import VariablePool
from models.human_input import RecipientType
def test_service_api_only_allows_public_webapp_forms() -> None:
assert is_recipient_type_allowed_for_surface(
RecipientType.STANDALONE_WEB_APP,
HumanInputSurface.SERVICE_API,
)
assert not is_recipient_type_allowed_for_surface(
RecipientType.CONSOLE,
HumanInputSurface.SERVICE_API,
)
assert not is_recipient_type_allowed_for_surface(
RecipientType.BACKSTAGE,
HumanInputSurface.SERVICE_API,
)
assert not is_recipient_type_allowed_for_surface(
RecipientType.EMAIL_MEMBER,
HumanInputSurface.SERVICE_API,
)
def test_console_only_allows_internal_console_surfaces() -> None:
assert is_recipient_type_allowed_for_surface(
RecipientType.CONSOLE,
HumanInputSurface.CONSOLE,
)
assert is_recipient_type_allowed_for_surface(
RecipientType.BACKSTAGE,
HumanInputSurface.CONSOLE,
)
assert not is_recipient_type_allowed_for_surface(
RecipientType.STANDALONE_WEB_APP,
HumanInputSurface.CONSOLE,
)
# Token surfaces (SERVICE_API, OPENAPI) may act only on public web-app forms;
# CONSOLE may act on internal console/backstage forms. OPENAPI mirrors SERVICE_API
# today but is pinned independently because the two are expected to diverge.
@pytest.mark.parametrize(
("recipient_type", "surface", "allowed"),
[
(RecipientType.STANDALONE_WEB_APP, HumanInputSurface.SERVICE_API, True),
(RecipientType.CONSOLE, HumanInputSurface.SERVICE_API, False),
(RecipientType.BACKSTAGE, HumanInputSurface.SERVICE_API, False),
(RecipientType.EMAIL_MEMBER, HumanInputSurface.SERVICE_API, False),
(RecipientType.STANDALONE_WEB_APP, HumanInputSurface.OPENAPI, True),
(RecipientType.CONSOLE, HumanInputSurface.OPENAPI, False),
(RecipientType.BACKSTAGE, HumanInputSurface.OPENAPI, False),
(RecipientType.CONSOLE, HumanInputSurface.CONSOLE, True),
(RecipientType.BACKSTAGE, HumanInputSurface.CONSOLE, True),
(RecipientType.STANDALONE_WEB_APP, HumanInputSurface.CONSOLE, False),
],
)
def test_recipient_type_allowed_per_surface(
recipient_type: RecipientType, surface: HumanInputSurface, allowed: bool
) -> None:
assert is_recipient_type_allowed_for_surface(recipient_type, surface) is allowed
def test_preferred_form_token_uses_shared_priority_order() -> None:
@ -56,6 +46,17 @@ def test_preferred_form_token_uses_shared_priority_order() -> None:
assert get_preferred_form_token(recipients) == "backstage-token"
def test_preferred_form_token_skips_prioritized_type_with_empty_token() -> None:
# An empty token is not actionable: the highest-priority recipient that
# actually carries a token wins, not the highest-priority type.
recipients = [
(RecipientType.BACKSTAGE, ""),
(RecipientType.CONSOLE, "console-token"),
]
assert get_preferred_form_token(recipients) == "console-token"
def test_resolve_variable_select_input_options_uses_runtime_values() -> None:
variable_pool = VariablePool()
variable_pool.add(("start", "options"), ["approve", "reject"])

View File

@ -1,34 +0,0 @@
"""Tests for OPENAPI surface in HumanInputPolicy and human_input_forms."""
from __future__ import annotations
from core.workflow.human_input_policy import HumanInputSurface, is_recipient_type_allowed_for_surface
from models.human_input import RecipientType
def test_openapi_surface_exists():
assert HumanInputSurface.OPENAPI == "openapi"
def test_openapi_allows_standalone_web_app():
assert is_recipient_type_allowed_for_surface(RecipientType.STANDALONE_WEB_APP, HumanInputSurface.OPENAPI)
def test_openapi_rejects_console_recipient():
assert not is_recipient_type_allowed_for_surface(RecipientType.CONSOLE, HumanInputSurface.OPENAPI)
def test_openapi_rejects_backstage_recipient():
assert not is_recipient_type_allowed_for_surface(RecipientType.BACKSTAGE, HumanInputSurface.OPENAPI)
def test_get_surface_form_token_openapi_picks_standalone_web_app():
"""OPENAPI surface should pick STANDALONE_WEB_APP token, same as SERVICE_API."""
from core.workflow.human_input_forms import _get_surface_form_token
recipients = [
(RecipientType.BACKSTAGE, "backstage-token"),
(RecipientType.STANDALONE_WEB_APP, "web-token"),
]
token = _get_surface_form_token(recipients, surface=HumanInputSurface.OPENAPI)
assert token == "web-token"

View File

@ -0,0 +1,19 @@
import pytest
from models.human_input import RecipientType
@pytest.mark.parametrize(
("recipient_type", "expected_label"),
[
(RecipientType.EMAIL_MEMBER, "email"),
(RecipientType.EMAIL_EXTERNAL, "email"),
(RecipientType.CONSOLE, "console"),
(RecipientType.BACKSTAGE, "console"),
(RecipientType.STANDALONE_WEB_APP, "web_app"),
],
)
def test_approval_channel_label_collapses_delivery_types(recipient_type: RecipientType, expected_label: str) -> None:
# Both email types collapse to "email" and console/backstage to "console":
# the user-facing approval channel, not the internal recipient type.
assert recipient_type.approval_channel_label == expected_label

View File

@ -16,12 +16,14 @@ from core.app.app_config.entities import WorkflowUIBasedAppConfig
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.task_entities import StreamEvent
from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext, _WorkflowGenerateEntityWrapper
from core.workflow.human_input_policy import FormDisposition, HumanInputSurface
from graphon.entities.pause_reason import HumanInputRequired
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus
from graphon.nodes.human_input.entities import SelectInputConfig, StringListSource
from graphon.nodes.human_input.enums import ValueSourceType
from graphon.runtime import GraphRuntimeState, VariablePool
from models.enums import CreatorUserRole
from models.human_input import RecipientType
from models.model import AppMode
from models.workflow import WorkflowRun
from repositories.api_workflow_node_execution_repository import WorkflowNodeExecutionSnapshot
@ -763,7 +765,11 @@ def test_build_snapshot_events_preserves_public_form_token(monkeypatch: pytest.M
snapshot = _build_snapshot(WorkflowNodeExecutionStatus.PAUSED)
resumption_context = _build_resumption_context("task-ctx")
monkeypatch.setattr(
service_module, "load_form_tokens_by_form_id", lambda form_ids, session=None, surface=None: {"form-1": "wtok"}
service_module,
"load_form_dispositions_by_form_id",
lambda form_ids, session=None, surface=None: {
"form-1": FormDisposition(form_token="wtok", approval_channels=[])
},
)
session_maker = _SessionMaker(
SimpleNamespace(
@ -803,12 +809,99 @@ def test_build_snapshot_events_preserves_public_form_token(monkeypatch: pytest.M
assert pause_data["reasons"][0]["expiration_time"] == int(datetime(2024, 1, 1, tzinfo=UTC).timestamp())
def _build_recipient_snapshot_events(recipients: Sequence[Any]) -> list[Mapping[str, Any]]:
"""Drive the reconnect snapshot pause path for the OPENAPI surface.
Lets the real disposition loader run against a fake session whose ``scalars``
yields the given recipients, so the reconnect path derives the same token and
approval channels as the live path for the same recipient set.
"""
workflow_run = _build_workflow_run(WorkflowExecutionStatus.PAUSED)
snapshot = _build_snapshot(WorkflowNodeExecutionStatus.PAUSED)
resumption_context = _build_resumption_context("task-ctx")
expiration_time = datetime(2024, 1, 1, tzinfo=UTC)
session_maker = _SessionMaker(
SimpleNamespace(
execute=lambda _stmt: [("form-1", expiration_time, '{"display_in_ui": true}')],
scalars=lambda _stmt: list(recipients),
)
)
pause_entity = _FakePauseEntity(
pause_id="pause-1",
workflow_run_id="run-1",
paused_at_value=expiration_time,
pause_reasons=[
HumanInputRequired(
form_id="form-1",
form_content="content",
node_id="node-1",
node_title="Human Input",
)
],
)
return _build_snapshot_events(
workflow_run=workflow_run,
node_snapshots=[snapshot],
task_id="task-ctx",
message_context=None,
pause_entity=pause_entity,
resumption_context=resumption_context,
session_maker=cast(sessionmaker[Session], session_maker),
human_input_surface=HumanInputSurface.OPENAPI,
)
def test_reconnect_pause_without_web_app_recipient_emits_approval_channels() -> None:
events = _build_recipient_snapshot_events(
recipients=[
SimpleNamespace(form_id="form-1", recipient_type=RecipientType.EMAIL_MEMBER, access_token="email-token"),
SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"),
],
)
human_input_event = events[-2]
assert human_input_event["event"] == StreamEvent.HUMAN_INPUT_REQUIRED
assert human_input_event["data"]["form_token"] is None
assert human_input_event["data"]["approval_channels"] == ["console", "email"]
pause_data = events[-1]["data"]
assert pause_data["reasons"][0]["form_token"] is None
assert pause_data["reasons"][0]["approval_channels"] == ["console", "email"]
def test_reconnect_pause_with_web_app_recipient_sets_token_and_channels() -> None:
events = _build_recipient_snapshot_events(
recipients=[
SimpleNamespace(
form_id="form-1",
recipient_type=RecipientType.STANDALONE_WEB_APP,
access_token="web-app-token",
),
SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"),
],
)
human_input_event = events[-2]
assert human_input_event["event"] == StreamEvent.HUMAN_INPUT_REQUIRED
assert human_input_event["data"]["form_token"] == "web-app-token"
assert human_input_event["data"]["approval_channels"] == ["console"]
pause_data = events[-1]["data"]
assert pause_data["reasons"][0]["form_token"] == "web-app-token"
assert pause_data["reasons"][0]["approval_channels"] == ["console"]
def test_build_snapshot_events_resolves_pause_reason_select_options(monkeypatch: pytest.MonkeyPatch) -> None:
workflow_run = _build_workflow_run(WorkflowExecutionStatus.PAUSED)
snapshot = _build_snapshot(WorkflowNodeExecutionStatus.PAUSED)
resumption_context = _build_resumption_context("task-ctx", select_options=["approve", "reject"])
monkeypatch.setattr(
service_module, "load_form_tokens_by_form_id", lambda form_ids, session=None, surface=None: {"form-1": "wtok"}
service_module,
"load_form_dispositions_by_form_id",
lambda form_ids, session=None, surface=None: {
"form-1": FormDisposition(form_token="wtok", approval_channels=[])
},
)
session_maker = _SessionMaker(
SimpleNamespace(
@ -886,7 +979,11 @@ def test_build_workflow_event_stream_loads_pause_tokens_without_flask_app_contex
service_module, "_load_resumption_context", MagicMock(return_value=_build_resumption_context("task-1"))
)
monkeypatch.setattr(
service_module, "load_form_tokens_by_form_id", lambda form_ids, session=None, surface=None: {"form-1": "wtok"}
service_module,
"load_form_dispositions_by_form_id",
lambda form_ids, session=None, surface=None: {
"form-1": FormDisposition(form_token="wtok", approval_channels=[])
},
)
session = SimpleNamespace(