From f533e992d46931afa3f2cafd00b415b9ef8c2d19 Mon Sep 17 00:00:00 2001 From: GareArc Date: Tue, 16 Jun 2026 16:11:29 -0700 Subject: [PATCH] fix(hitl): scope OpenAPI/Service-API resume to author-configured webapp forms MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pause-time token emission now draws only from the recipient set each API surface is allowed to act on (emit ⊆ validate), so the CLI/OpenAPI caller is never handed a token the resume endpoint would reject as 404 (WTA-867). A form's recipients are partitioned once, per surface, into a single FormDisposition: the surface-actionable recipient yields `form_token`, while the rest are reported as `approval_channels` (e.g. ["email", "console"]) so the caller is told where approval actually happens. Token and channels are two projections of one decision (disposition_for_surface) loaded by one recipient query (load_form_dispositions_by_form_id); the live pause path and the reconnect snapshot path consume the same FormDisposition so they cannot drift. RecipientType carries its user-facing approval-channel label as an enum tuple value, set in __new__, so a new recipient type cannot be declared without one. Tests: consolidate recipient/disposition/enrich tests into parametrized matrices, add CONSOLE-surface and empty-token coverage, extract a shared fake session for the pause-event tests. --- api/controllers/openapi/_errors.py | 15 ++ api/controllers/openapi/human_input_form.py | 20 ++- .../common/workflow_response_converter.py | 18 +- api/core/app/entities/task_entities.py | 3 + api/core/workflow/human_input_forms.py | 95 +++++------ api/core/workflow/human_input_policy.py | 39 ++++- api/models/human_input.py | 23 ++- .../workflow_event_snapshot_service.py | 19 ++- .../openapi/test_error_contract.py | 4 + .../openapi/test_human_input_form.py | 46 ++++- .../test_generate_task_pipeline_core.py | 1 + .../app/apps/test_workflow_pause_events.py | 157 +++++++++++++----- .../test_generate_task_pipeline_core.py | 1 + .../test_human_input_form_repository_impl.py | 4 + .../workflow/test_enrich_pause_reasons.py | 63 +++++++ .../core/workflow/test_human_input_forms.py | 152 +++++++++-------- .../core/workflow/test_human_input_policy.py | 65 ++++---- .../test_human_input_policy_openapi.py | 34 ---- .../models/test_recipient_type_label.py | 19 +++ .../test_workflow_event_snapshot_service.py | 103 +++++++++++- 20 files changed, 620 insertions(+), 261 deletions(-) create mode 100644 api/tests/unit_tests/core/workflow/test_enrich_pause_reasons.py delete mode 100644 api/tests/unit_tests/core/workflow/test_human_input_policy_openapi.py create mode 100644 api/tests/unit_tests/models/test_recipient_type_label.py diff --git a/api/controllers/openapi/_errors.py b/api/controllers/openapi/_errors.py index 38c068bd35..5e82c2614d 100644 --- a/api/controllers/openapi/_errors.py +++ b/api/controllers/openapi/_errors.py @@ -63,6 +63,8 @@ class OpenApiErrorCode(StrEnum): FILE_EXTENSION_BLOCKED = "file_extension_blocked" MEMBER_LIMIT_EXCEEDED = "member_limit_exceeded" MEMBER_LICENSE_EXCEEDED = "member_license_exceeded" + HUMAN_INPUT_FORM_NOT_FOUND = "form_not_found" + RECIPIENT_SURFACE_MISMATCH = "recipient_surface_mismatch" class ErrorDetail(BaseModel): @@ -239,3 +241,16 @@ class MemberLicenseExceeded(OpenApiError): # noqa: N818 error_code = OpenApiErrorCode.MEMBER_LICENSE_EXCEEDED description = "Workspace member license capacity reached." hint = "Contact your workspace administrator to expand the license seat count." + + +class HumanInputFormNotFound(OpenApiError): # noqa: N818 + code = 404 + error_code = OpenApiErrorCode.HUMAN_INPUT_FORM_NOT_FOUND + description = "No human-input form matches this token. It may be wrong, expired, or already submitted." + + +class RecipientSurfaceMismatch(OpenApiError): # noqa: N818 + code = 403 + error_code = OpenApiErrorCode.RECIPIENT_SURFACE_MISMATCH + description = "This form's recipient can't be submitted via the OpenAPI surface." + hint = "Action it through its channel (web app or console)." diff --git a/api/controllers/openapi/human_input_form.py b/api/controllers/openapi/human_input_form.py index 995315150c..e5930ea8fa 100644 --- a/api/controllers/openapi/human_input_form.py +++ b/api/controllers/openapi/human_input_form.py @@ -12,16 +12,20 @@ import logging from flask import Response from flask_restx import Resource -from werkzeug.exceptions import BadRequest, NotFound +from werkzeug.exceptions import BadRequest from controllers.common.human_input import HumanInputFormSubmitPayload, stringify_form_default_values from controllers.common.schema import register_schema_models from controllers.openapi import openapi_ns from controllers.openapi._contract import accepts, returns +from controllers.openapi._errors import HumanInputFormNotFound, RecipientSurfaceMismatch from controllers.openapi._models import FormSubmitResponse, HumanInputFormDefinitionResponse from controllers.openapi.auth.composition import auth_router from controllers.openapi.auth.data import AuthData -from core.workflow.human_input_policy import HumanInputSurface, is_recipient_type_allowed_for_surface +from core.workflow.human_input_policy import ( + HumanInputSurface, + is_recipient_type_allowed_for_surface, +) from extensions.ext_database import db from libs.helper import to_timestamp from libs.oauth_bearer import Scope @@ -47,12 +51,12 @@ def _jsonify_form_definition(form) -> Response: def _ensure_form_belongs_to_app(form, app_model: App) -> None: if form.app_id != app_model.id or form.tenant_id != app_model.tenant_id: - raise NotFound("Form not found") + raise HumanInputFormNotFound() def _ensure_form_is_allowed_for_openapi(form) -> None: if not is_recipient_type_allowed_for_surface(form.recipient_type, HumanInputSurface.OPENAPI): - raise NotFound("Form not found") + raise RecipientSurfaceMismatch() @openapi_ns.route("/apps//form/human_input/") @@ -60,11 +64,11 @@ class OpenApiWorkflowHumanInputFormApi(Resource): @openapi_ns.response(200, "Form definition", openapi_ns.models[HumanInputFormDefinitionResponse.__name__]) @auth_router.guard(scope=Scope.APPS_RUN) def get(self, app_id: str, form_token: str, *, auth_data: AuthData): - app_model, caller, caller_kind = auth_data.require_app_context() + app_model, _caller, _caller_kind = auth_data.require_app_context() service = HumanInputService(db.engine) form = service.get_form_by_token(form_token) if form is None: - raise NotFound("Form not found") + raise HumanInputFormNotFound() _ensure_form_belongs_to_app(form, app_model) _ensure_form_is_allowed_for_openapi(form) @@ -80,7 +84,7 @@ class OpenApiWorkflowHumanInputFormApi(Resource): service = HumanInputService(db.engine) form = service.get_form_by_token(form_token) if form is None: - raise NotFound("Form not found") + raise HumanInputFormNotFound() _ensure_form_belongs_to_app(form, app_model) _ensure_form_is_allowed_for_openapi(form) @@ -106,6 +110,6 @@ class OpenApiWorkflowHumanInputFormApi(Resource): submission_end_user_id=submission_end_user_id, ) except FormNotFoundError: - raise NotFound("Form not found") + raise HumanInputFormNotFound() return FormSubmitResponse() diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index 502b1907ba..93493a4ad8 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -51,8 +51,11 @@ from core.tools.entities.tool_entities import ToolProviderType from core.tools.tool_manager import ToolManager from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE from core.trigger.trigger_manager import TriggerManager -from core.workflow.human_input_forms import load_form_tokens_by_form_id +from core.workflow.human_input_forms import ( + load_form_dispositions_by_form_id, +) from core.workflow.human_input_policy import ( + FormDisposition, HumanInputSurface, enrich_human_input_pause_reasons, resolve_human_input_pause_reason_inputs, @@ -340,13 +343,14 @@ class WorkflowResponseConverter: human_input_form_ids = [reason.form_id for reason in resolved_reasons if isinstance(reason, HumanInputRequired)] expiration_times_by_form_id: dict[str, datetime] = {} display_in_ui_by_form_id: dict[str, bool] = {} - form_token_by_form_id: dict[str, str] = {} + dispositions_by_form_id: dict[str, FormDisposition] = {} if human_input_form_ids: stmt = select( HumanInputForm.id, HumanInputForm.expiration_time, HumanInputForm.form_definition, ).where(HumanInputForm.id.in_(human_input_form_ids)) + hitl_surface = _INVOKE_FROM_TO_HITL_SURFACE.get(self._application_generate_entity.invoke_from) with Session(bind=db.engine) as session: for form_id, expiration_time, form_definition in session.execute(stmt): expiration_times_by_form_id[str(form_id)] = expiration_time @@ -355,17 +359,17 @@ class WorkflowResponseConverter: except (TypeError, json.JSONDecodeError): definition_payload = {} display_in_ui_by_form_id[str(form_id)] = bool(definition_payload.get("display_in_ui")) - form_token_by_form_id = load_form_tokens_by_form_id( + dispositions_by_form_id = load_form_dispositions_by_form_id( human_input_form_ids, session=session, - surface=_INVOKE_FROM_TO_HITL_SURFACE.get(self._application_generate_entity.invoke_from), + surface=hitl_surface, ) # Reconnect paths must preserve the same pause-reason contract as live streams; # otherwise clients see schema drift after resume. pause_reasons = enrich_human_input_pause_reasons( pause_reasons, - form_tokens_by_form_id=form_token_by_form_id, + dispositions_by_form_id=dispositions_by_form_id, expiration_times_by_form_id={ form_id: int(expiration_time.timestamp()) for form_id, expiration_time in expiration_times_by_form_id.items() @@ -379,6 +383,7 @@ class WorkflowResponseConverter: expiration_time = expiration_times_by_form_id.get(reason.form_id) if expiration_time is None: raise ValueError(f"HumanInputForm not found for pause reason, form_id={reason.form_id}") + disposition = dispositions_by_form_id.get(reason.form_id) responses.append( HumanInputRequiredResponse( task_id=task_id, @@ -391,7 +396,8 @@ class WorkflowResponseConverter: inputs=reason.inputs, actions=reason.actions, display_in_ui=display_in_ui_by_form_id.get(reason.form_id, False), - form_token=form_token_by_form_id.get(reason.form_id), + form_token=disposition.form_token if disposition else None, + approval_channels=list(disposition.approval_channels) if disposition else [], resolved_default_values=reason.resolved_default_values, expiration_time=int(expiration_time.timestamp()), ), diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index defec9f946..9cac379b7e 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -288,6 +288,7 @@ class HumanInputRequiredResponse(StreamResponse): actions: Sequence[UserActionConfig] = Field(default_factory=list) display_in_ui: bool = False form_token: str | None = None + approval_channels: list[str] = Field(default_factory=list) resolved_default_values: Mapping[str, Any] = Field(default_factory=dict) expiration_time: int = Field(..., description="Unix timestamp in seconds") @@ -311,6 +312,7 @@ class HumanInputRequiredPauseReasonPayload(BaseModel): actions: Sequence[UserActionConfig] = Field(default_factory=list) display_in_ui: bool = False form_token: str | None = None + approval_channels: list[str] = Field(default_factory=list) resolved_default_values: Mapping[str, Any] = Field(default_factory=dict) expiration_time: int @@ -325,6 +327,7 @@ class HumanInputRequiredPauseReasonPayload(BaseModel): actions=data.actions, display_in_ui=data.display_in_ui, form_token=data.form_token, + approval_channels=data.approval_channels, resolved_default_values=data.resolved_default_values, expiration_time=data.expiration_time, ) diff --git a/api/core/workflow/human_input_forms.py b/api/core/workflow/human_input_forms.py index fe3c161a32..b850cd2391 100644 --- a/api/core/workflow/human_input_forms.py +++ b/api/core/workflow/human_input_forms.py @@ -12,60 +12,61 @@ from collections.abc import Sequence from sqlalchemy import select from sqlalchemy.orm import Session -from core.workflow.human_input_policy import HumanInputSurface, get_preferred_form_token +from core.workflow.human_input_policy import ( + FormDisposition, + HumanInputSurface, + disposition_for_surface, +) from extensions.ext_database import db from models.human_input import HumanInputFormRecipient, RecipientType +def load_form_dispositions_by_form_id( + form_ids: Sequence[str], + *, + session: Session | None = None, + surface: HumanInputSurface | None = None, +) -> dict[str, FormDisposition]: + """Resolve each paused form's resume token and approval channels for `surface`.""" + unique_form_ids = list(dict.fromkeys(form_ids)) + if not unique_form_ids: + return {} + + if session is not None: + return _load_form_dispositions_by_form_id(session, unique_form_ids, surface=surface) + + with Session(bind=db.engine, expire_on_commit=False) as new_session: + return _load_form_dispositions_by_form_id(new_session, unique_form_ids, surface=surface) + + +def _load_form_dispositions_by_form_id( + session: Session, + form_ids: Sequence[str], + *, + surface: HumanInputSurface | None, +) -> dict[str, FormDisposition]: + recipients_by_form_id: dict[str, list[tuple[RecipientType, str]]] = {} + stmt = select(HumanInputFormRecipient).where(HumanInputFormRecipient.form_id.in_(form_ids)) + for recipient in session.scalars(stmt): + recipients_by_form_id.setdefault(recipient.form_id, []).append( + (recipient.recipient_type, recipient.access_token or "") + ) + return { + form_id: disposition_for_surface(recipients, surface=surface) + for form_id, recipients in recipients_by_form_id.items() + } + + def load_form_tokens_by_form_id( form_ids: Sequence[str], *, session: Session | None = None, surface: HumanInputSurface | None = None, ) -> dict[str, str]: - """Load the preferred access token for each human input form.""" - unique_form_ids = list(dict.fromkeys(form_ids)) - if not unique_form_ids: - return {} - - if session is not None: - return _load_form_tokens_by_form_id(session, unique_form_ids, surface=surface) - - with Session(bind=db.engine, expire_on_commit=False) as new_session: - return _load_form_tokens_by_form_id(new_session, unique_form_ids, surface=surface) - - -def _load_form_tokens_by_form_id( - session: Session, - form_ids: Sequence[str], - *, - surface: HumanInputSurface | None = None, -) -> dict[str, str]: - recipients_by_form_id: dict[str, list[tuple[RecipientType, str]]] = {} - stmt = select(HumanInputFormRecipient).where(HumanInputFormRecipient.form_id.in_(form_ids)) - for recipient in session.scalars(stmt): - if not recipient.access_token: - continue - recipients_by_form_id.setdefault(recipient.form_id, []).append( - (recipient.recipient_type, recipient.access_token) - ) - - tokens_by_form_id: dict[str, str] = {} - for form_id, recipients in recipients_by_form_id.items(): - token = _get_surface_form_token(recipients, surface=surface) - if token is not None: - tokens_by_form_id[form_id] = token - return tokens_by_form_id - - -def _get_surface_form_token( - recipients: Sequence[tuple[RecipientType, str]], - *, - surface: HumanInputSurface | None, -) -> str | None: - if surface in {HumanInputSurface.SERVICE_API, HumanInputSurface.OPENAPI}: - for recipient_type, token in recipients: - if recipient_type == RecipientType.STANDALONE_WEB_APP and token: - return token - - return get_preferred_form_token(recipients) + """Resume tokens only, for callers that don't surface approval channels.""" + dispositions = load_form_dispositions_by_form_id(form_ids, session=session, surface=surface) + return { + form_id: disposition.form_token + for form_id, disposition in dispositions.items() + if disposition.form_token is not None + } diff --git a/api/core/workflow/human_input_policy.py b/api/core/workflow/human_input_policy.py index e95d753ae9..3df1952ce9 100644 --- a/api/core/workflow/human_input_policy.py +++ b/api/core/workflow/human_input_policy.py @@ -2,7 +2,7 @@ from __future__ import annotations from collections.abc import Mapping, Sequence from enum import StrEnum -from typing import Any +from typing import Any, NamedTuple from graphon.entities.pause_reason import HumanInputRequired, PauseReason, PauseReasonType from graphon.nodes.human_input.entities import FormInputConfig, SelectInputConfig @@ -20,7 +20,7 @@ class HumanInputSurface(StrEnum): # SERVICE_API and OPENAPI are intentionally narrower than CONSOLE: token callers # should only be able to act on end-user web forms, not internal console flows. -_ALLOWED_RECIPIENT_TYPES_BY_SURFACE: dict[HumanInputSurface, frozenset[RecipientType]] = { +ALLOWED_RECIPIENT_TYPES_BY_SURFACE: dict[HumanInputSurface, frozenset[RecipientType]] = { HumanInputSurface.SERVICE_API: frozenset({RecipientType.STANDALONE_WEB_APP}), HumanInputSurface.CONSOLE: frozenset({RecipientType.CONSOLE, RecipientType.BACKSTAGE}), HumanInputSurface.OPENAPI: frozenset({RecipientType.STANDALONE_WEB_APP}), @@ -41,7 +41,7 @@ def is_recipient_type_allowed_for_surface( ) -> bool: if recipient_type is None: return False - return recipient_type in _ALLOWED_RECIPIENT_TYPES_BY_SURFACE[surface] + return recipient_type in ALLOWED_RECIPIENT_TYPES_BY_SURFACE[surface] def get_preferred_form_token( @@ -59,10 +59,37 @@ def get_preferred_form_token( return chosen_token +class FormDisposition(NamedTuple): + """How a paused form resolves for one API surface. + + A form's recipients split into those the surface may act on (yielding a resume + `form_token`) and those it may not (their channels named in `approval_channels` + so the caller is told where approval actually happens instead). + """ + + form_token: str | None + approval_channels: list[str] + + +def disposition_for_surface( + recipients: Sequence[tuple[RecipientType, str]], + *, + surface: HumanInputSurface | None, +) -> FormDisposition: + if surface is None: + return FormDisposition(form_token=get_preferred_form_token(recipients), approval_channels=[]) + allowed = ALLOWED_RECIPIENT_TYPES_BY_SURFACE[surface] + actionable = [(recipient_type, token) for recipient_type, token in recipients if recipient_type in allowed] + blocked_channels = { + recipient_type.approval_channel_label for recipient_type, _ in recipients if recipient_type not in allowed + } + return FormDisposition(form_token=get_preferred_form_token(actionable), approval_channels=sorted(blocked_channels)) + + def enrich_human_input_pause_reasons( reasons: Sequence[Mapping[str, Any]], *, - form_tokens_by_form_id: Mapping[str, str], + dispositions_by_form_id: Mapping[str, FormDisposition], expiration_times_by_form_id: Mapping[str, int], ) -> list[dict[str, Any]]: enriched: list[dict[str, Any]] = [] @@ -71,7 +98,9 @@ def enrich_human_input_pause_reasons( if updated.get("TYPE") == PauseReasonType.HUMAN_INPUT_REQUIRED: form_id = updated.get("form_id") if isinstance(form_id, str): - updated["form_token"] = form_tokens_by_form_id.get(form_id) + disposition = dispositions_by_form_id.get(form_id) + updated["form_token"] = disposition.form_token if disposition else None + updated["approval_channels"] = list(disposition.approval_channels) if disposition else [] expiration_time = expiration_times_by_form_id.get(form_id) if expiration_time is not None: updated["expiration_time"] = expiration_time diff --git a/api/models/human_input.py b/api/models/human_input.py index d11274bc92..7ae4c9403a 100644 --- a/api/models/human_input.py +++ b/api/models/human_input.py @@ -135,19 +135,32 @@ class HumanInputDelivery(DefaultFieldsMixin, Base): class RecipientType(StrEnum): + # Second value = approval-channel label (surfaced in `approval_channels`). # EMAIL_MEMBER member means that the - EMAIL_MEMBER = "email_member" - EMAIL_EXTERNAL = "email_external" + EMAIL_MEMBER = "email_member", "email" + EMAIL_EXTERNAL = "email_external", "email" # STANDALONE_WEB_APP is used by the standalone web app. # # It's not used while running workflows / chatflows containing HumanInput # node inside console. - STANDALONE_WEB_APP = "standalone_web_app" + STANDALONE_WEB_APP = "standalone_web_app", "web_app" # CONSOLE is used while running workflows / chatflows containing HumanInput # node inside console. (E.G. running installed apps or debugging workflows / chatflows) - CONSOLE = "console" + CONSOLE = "console", "console" # BACKSTAGE is used for backstage input inside console. - BACKSTAGE = "backstage" + BACKSTAGE = "backstage", "console" + + _approval_channel_label: str + + def __new__(cls, value: str, approval_channel_label: str) -> "RecipientType": + member = str.__new__(cls, value) + member._value_ = value + member._approval_channel_label = approval_channel_label + return member + + @property + def approval_channel_label(self) -> str: + return self._approval_channel_label @final diff --git a/api/services/workflow_event_snapshot_service.py b/api/services/workflow_event_snapshot_service.py index dad7dff292..c693f6318b 100644 --- a/api/services/workflow_event_snapshot_service.py +++ b/api/services/workflow_event_snapshot_service.py @@ -23,8 +23,11 @@ from core.app.entities.task_entities import ( WorkflowStartStreamResponse, ) from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext -from core.workflow.human_input_forms import load_form_tokens_by_form_id +from core.workflow.human_input_forms import ( + load_form_dispositions_by_form_id, +) from core.workflow.human_input_policy import ( + FormDisposition, HumanInputSurface, enrich_human_input_pause_reasons, resolve_human_input_pause_reason_inputs, @@ -359,7 +362,7 @@ def _build_human_input_required_events( expiration_times_by_form_id: dict[str, int] = {} display_in_ui_by_form_id: dict[str, bool] = {} - form_tokens_by_form_id: dict[str, str] = {} + dispositions_by_form_id: dict[str, FormDisposition] = {} if human_input_form_ids and session_maker is not None: stmt = select(HumanInputForm.id, HumanInputForm.expiration_time, HumanInputForm.form_definition).where( HumanInputForm.id.in_(human_input_form_ids) @@ -372,7 +375,7 @@ def _build_human_input_required_events( except (TypeError, json.JSONDecodeError): definition_payload = {} display_in_ui_by_form_id[str(form_id)] = bool(definition_payload.get("display_in_ui")) - form_tokens_by_form_id = load_form_tokens_by_form_id( + dispositions_by_form_id = load_form_dispositions_by_form_id( human_input_form_ids, session=session, surface=human_input_surface, @@ -393,6 +396,7 @@ def _build_human_input_required_events( reason.inputs, variable_pool=variable_pool, ) + disposition = dispositions_by_form_id.get(form_id) response = HumanInputRequiredResponse( task_id=task_id, @@ -405,7 +409,8 @@ def _build_human_input_required_events( inputs=resolved_inputs, actions=reason.actions, display_in_ui=display_in_ui_by_form_id.get(form_id, False), - form_token=form_tokens_by_form_id.get(form_id), + form_token=disposition.form_token if disposition else None, + approval_channels=list(disposition.approval_channels) if disposition else [], resolved_default_values=reason.resolved_default_values, expiration_time=expiration_time, ), @@ -493,11 +498,11 @@ def _build_pause_event( for form_id in [reason.get("form_id")] if isinstance(form_id, str) ] - form_tokens_by_form_id: dict[str, str] = {} + dispositions_by_form_id: dict[str, FormDisposition] = {} expiration_times_by_form_id: dict[str, int] = {} if human_input_form_ids and session_maker is not None: with session_maker() as session: - form_tokens_by_form_id = load_form_tokens_by_form_id( + dispositions_by_form_id = load_form_dispositions_by_form_id( human_input_form_ids, session=session, surface=human_input_surface, @@ -512,7 +517,7 @@ def _build_pause_event( # otherwise clients see schema drift after resume. reasons = enrich_human_input_pause_reasons( reasons, - form_tokens_by_form_id=form_tokens_by_form_id, + dispositions_by_form_id=dispositions_by_form_id, expiration_times_by_form_id=expiration_times_by_form_id, ) diff --git a/api/tests/unit_tests/controllers/openapi/test_error_contract.py b/api/tests/unit_tests/controllers/openapi/test_error_contract.py index 45a577443b..788a7215ed 100644 --- a/api/tests/unit_tests/controllers/openapi/test_error_contract.py +++ b/api/tests/unit_tests/controllers/openapi/test_error_contract.py @@ -26,11 +26,13 @@ from controllers.openapi._errors import ( ErrorBody, ErrorDetail, FilenameNotExists, + HumanInputFormNotFound, MemberLicenseExceeded, MemberLimitExceeded, OpenApiError, OpenApiErrorCode, OpenApiErrorFormatter, + RecipientSurfaceMismatch, ) from controllers.service_api.app.error import ( AppUnavailableError, @@ -319,6 +321,8 @@ ERROR_MATRIX = [ (BlockedFileExtensionError(), 400, "file_extension_blocked"), (MemberLimitExceeded(), 403, "member_limit_exceeded"), (MemberLicenseExceeded(), 403, "member_license_exceeded"), + (HumanInputFormNotFound(), 404, "form_not_found"), + (RecipientSurfaceMismatch(), 403, "recipient_surface_mismatch"), ] diff --git a/api/tests/unit_tests/controllers/openapi/test_human_input_form.py b/api/tests/unit_tests/controllers/openapi/test_human_input_form.py index f8d296deb3..5659cd6eef 100644 --- a/api/tests/unit_tests/controllers/openapi/test_human_input_form.py +++ b/api/tests/unit_tests/controllers/openapi/test_human_input_form.py @@ -11,8 +11,9 @@ from unittest.mock import Mock import pytest from flask import Flask -from werkzeug.exceptions import NotFound, UnprocessableEntity +from werkzeug.exceptions import UnprocessableEntity +from controllers.openapi._errors import HumanInputFormNotFound, RecipientSurfaceMismatch from controllers.openapi.auth.data import AuthData from libs.oauth_bearer import Scope, TokenType from models.human_input import RecipientType @@ -89,7 +90,7 @@ class TestOpenApiHumanInputFormGet: caller = SimpleNamespace(id="acct-1") with app.test_request_context("/openapi/v1/apps/app-1/form/human_input/bad"): - with pytest.raises(NotFound): + with pytest.raises(HumanInputFormNotFound): api.get.__wrapped__( api, app_id="app-1", @@ -101,7 +102,10 @@ class TestOpenApiHumanInputFormGet: from controllers.openapi.human_input_form import OpenApiWorkflowHumanInputFormApi form = SimpleNamespace( - app_id="other-app", tenant_id="tenant-1", expiration_time=datetime(2099, 1, 1, tzinfo=UTC) + app_id="other-app", + tenant_id="tenant-1", + recipient_type=RecipientType.STANDALONE_WEB_APP, + expiration_time=datetime(2099, 1, 1, tzinfo=UTC), ) service_mock = Mock() service_mock.get_form_by_token.return_value = form @@ -114,7 +118,7 @@ class TestOpenApiHumanInputFormGet: caller = SimpleNamespace(id="acct-1") with app.test_request_context("/openapi/v1/apps/app-1/form/human_input/tok-1"): - with pytest.raises(NotFound): + with pytest.raises(HumanInputFormNotFound): api.get.__wrapped__( api, app_id="app-1", @@ -142,7 +146,7 @@ class TestOpenApiHumanInputFormGet: caller = SimpleNamespace(id="acct-1") with app.test_request_context("/openapi/v1/apps/app-1/form/human_input/tok-1"): - with pytest.raises(NotFound): + with pytest.raises(RecipientSurfaceMismatch): api.get.__wrapped__( api, app_id="app-1", @@ -234,6 +238,38 @@ class TestOpenApiHumanInputFormPost: ) assert result == ({}, 200) + def test_post_standalone_web_app_recipient_submits( + self, app: Flask, bypass_pipeline, monkeypatch: pytest.MonkeyPatch + ): + from controllers.openapi.human_input_form import OpenApiWorkflowHumanInputFormApi + + form = self._make_form(recipient_type=RecipientType.STANDALONE_WEB_APP) + service_mock = Mock() + service_mock.get_form_by_token.return_value = form + + module = sys.modules["controllers.openapi.human_input_form"] + monkeypatch.setattr(module, "HumanInputService", lambda _engine: service_mock) + monkeypatch.setattr(module, "db", SimpleNamespace(engine=object())) + + api = OpenApiWorkflowHumanInputFormApi() + app_model = SimpleNamespace(id="app-1", tenant_id="tenant-1") + caller = SimpleNamespace(id="anyone") + + with app.test_request_context( + "/openapi/v1/apps/app-1/form/human_input/tok-1", + method="POST", + json={"action": "approve", "inputs": {}}, + ): + result = api.post.__wrapped__( + api, + app_id="app-1", + form_token="tok-1", + auth_data=_make_auth_data(app_model, caller, "end_user"), + ) + + service_mock.submit_form_by_token.assert_called_once() + assert result == ({}, 200) + def test_post_rejects_invalid_body_with_422(self, app: Flask, bypass_pipeline): """Malformed body → 422 via @accepts (was an unmapped pydantic error → 500).""" from controllers.openapi.human_input_form import OpenApiWorkflowHumanInputFormApi diff --git a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py index b75f6d4494..28f416ac27 100644 --- a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py +++ b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py @@ -175,6 +175,7 @@ class TestAdvancedChatGenerateTaskPipeline: "actions": [{"id": "approve", "title": "Approve", "button_style": "default"}], "display_in_ui": True, "form_token": "token-1", + "approval_channels": [], "resolved_default_values": {}, "expiration_time": 123, } diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py b/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py index 319e603b35..098a63a8f9 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py @@ -26,6 +26,26 @@ from models.account import Account from models.human_input import RecipientType +class _FakeSession: + """Stub session: `execute` feeds the form-expiration query, `scalars` the recipients.""" + + def __init__(self, *, execute_rows=(), scalars_rows=()): + self._execute_rows = execute_rows + self._scalars_rows = scalars_rows + + def execute(self, _stmt): + return list(self._execute_rows) + + def scalars(self, _stmt): + return list(self._scalars_rows) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + class _RecordingWorkflowAppRunner(WorkflowAppRunner): def __init__(self, **kwargs): super().__init__(**kwargs) @@ -97,11 +117,11 @@ def test_graph_run_paused_event_emits_queue_pause_event(): assert queue_event.paused_nodes == ["node-pause-1"] -def _build_converter(): +def _build_converter(*, invoke_from: InvokeFrom = InvokeFrom.SERVICE_API): application_generate_entity = SimpleNamespace( inputs={}, files=[], - invoke_from=InvokeFrom.SERVICE_API, + invoke_from=invoke_from, app_config=SimpleNamespace(app_id="app-id", tenant_id="tenant-id"), ) system_variables = build_system_variables( @@ -131,32 +151,15 @@ def test_queue_workflow_paused_event_to_stream_responses(monkeypatch: pytest.Mon ) expiration_time = datetime(2024, 1, 1, tzinfo=UTC) + session = _FakeSession( + execute_rows=[("form-1", expiration_time, '{"display_in_ui": true}')], + scalars_rows=[ + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.CONSOLE, access_token="console-token"), + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"), + ], + ) - class _FakeSession: - def execute(self, _stmt): - return [("form-1", expiration_time, '{"display_in_ui": true}')] - - def scalars(self, _stmt): - return [ - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.CONSOLE, - access_token="console-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.BACKSTAGE, - access_token="backstage-token", - ), - ] - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc, tb): - return False - - monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: _FakeSession()) + monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: session) monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object())) reason = HumanInputRequired( @@ -195,10 +198,92 @@ def test_queue_workflow_paused_event_to_stream_responses(monkeypatch: pytest.Mon assert hi_resp.data.inputs[0].output_variable_name == "field" assert hi_resp.data.actions[0].id == "approve" assert hi_resp.data.display_in_ui is True - assert hi_resp.data.form_token == "backstage-token" + assert hi_resp.data.form_token is None + assert hi_resp.data.approval_channels == ["console"] assert hi_resp.data.expiration_time == int(expiration_time.timestamp()) +def _build_paused_human_input_response(monkeypatch, recipients): + """Drive the live OPENAPI pause path with the given recipients via a fake session.""" + converter = _build_converter(invoke_from=InvokeFrom.OPENAPI) + converter.workflow_start_to_stream_response( + task_id="task", + workflow_run_id="run-id", + workflow_id="workflow-id", + reason=WorkflowStartReason.INITIAL, + ) + + expiration_time = datetime(2024, 1, 1, tzinfo=UTC) + session = _FakeSession( + execute_rows=[("form-1", expiration_time, '{"display_in_ui": true}')], + scalars_rows=list(recipients), + ) + + monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: session) + monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object())) + + reason = HumanInputRequired( + form_id="form-1", + form_content="Rendered", + inputs=[ParagraphInputConfig(output_variable_name="field")], + actions=[UserActionConfig(id="approve", title="Approve")], + node_id="node-id", + node_title="Human Step", + ) + queue_event = QueueWorkflowPausedEvent( + reasons=[reason], + outputs={}, + paused_nodes=["node-id"], + ) + + runtime_state = GraphRuntimeState(variable_pool=VariablePool(), start_at=0.0) + responses = converter.workflow_pause_to_stream_response( + event=queue_event, + task_id="task", + graph_runtime_state=runtime_state, + ) + assert isinstance(responses[0], HumanInputRequiredResponse) + return responses + + +def test_openapi_pause_without_web_app_recipient_emits_approval_channels(monkeypatch: pytest.MonkeyPatch): + responses = _build_paused_human_input_response( + monkeypatch, + recipients=[ + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.EMAIL_MEMBER, access_token="email-token"), + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"), + ], + ) + + hi_resp = responses[0] + assert hi_resp.data.form_token is None + assert hi_resp.data.approval_channels == ["console", "email"] + + pause_resp = responses[-1] + assert pause_resp.data.reasons[0]["approval_channels"] == ["console", "email"] + + +def test_openapi_pause_with_web_app_recipient_sets_token_and_channels(monkeypatch: pytest.MonkeyPatch): + responses = _build_paused_human_input_response( + monkeypatch, + recipients=[ + SimpleNamespace( + form_id="form-1", + recipient_type=RecipientType.STANDALONE_WEB_APP, + access_token="web-app-token", + ), + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"), + ], + ) + + hi_resp = responses[0] + assert hi_resp.data.form_token == "web-app-token" + assert hi_resp.data.approval_channels == ["console"] + + pause_resp = responses[-1] + assert pause_resp.data.reasons[0]["approval_channels"] == ["console"] + + def test_queue_workflow_paused_event_resolves_variable_select_options(monkeypatch: pytest.MonkeyPatch): converter = _build_converter() converter.workflow_start_to_stream_response( @@ -209,21 +294,9 @@ def test_queue_workflow_paused_event_resolves_variable_select_options(monkeypatc ) expiration_time = datetime(2024, 1, 1, tzinfo=UTC) + session = _FakeSession(execute_rows=[("form-1", expiration_time, '{"display_in_ui": true}')]) - class _FakeSession: - def execute(self, _stmt): - return [("form-1", expiration_time, '{"display_in_ui": true}')] - - def scalars(self, _stmt): - return [] - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc, tb): - return False - - monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: _FakeSession()) + monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: session) monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object())) reason = HumanInputRequired( diff --git a/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py b/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py index ea21a1cc1a..0aaee900e3 100644 --- a/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py +++ b/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py @@ -134,6 +134,7 @@ class TestWorkflowGenerateTaskPipeline: "actions": [], "display_in_ui": False, "form_token": None, + "approval_channels": [], "resolved_default_values": {}, "expiration_time": 1, } diff --git a/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py b/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py index 5bd35e6d3c..27c32d47ee 100644 --- a/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py +++ b/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py @@ -32,6 +32,7 @@ from models.human_input import ( EmailMemberRecipientPayload, HumanInputFormRecipient, RecipientType, + StandaloneWebAppRecipientPayload, ) @@ -307,6 +308,9 @@ class _DummyRecipient: recipient_type: RecipientType access_token: str form: _DummyForm | None = None + recipient_payload: str = dataclasses.field( + default_factory=lambda: StandaloneWebAppRecipientPayload().model_dump_json() + ) class _FakeScalarResult: diff --git a/api/tests/unit_tests/core/workflow/test_enrich_pause_reasons.py b/api/tests/unit_tests/core/workflow/test_enrich_pause_reasons.py new file mode 100644 index 0000000000..f53a8eba73 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/test_enrich_pause_reasons.py @@ -0,0 +1,63 @@ +import pytest + +from core.workflow.human_input_policy import FormDisposition, enrich_human_input_pause_reasons +from graphon.entities.pause_reason import PauseReasonType + +_HUMAN_INPUT_REASON = {"TYPE": PauseReasonType.HUMAN_INPUT_REQUIRED, "form_id": "f1"} + + +@pytest.mark.parametrize( + ("dispositions", "expected_token", "expected_channels"), + [ + ({"f1": FormDisposition(form_token=None, approval_channels=["console", "email"])}, None, ["console", "email"]), + ({"f1": FormDisposition(form_token="tok", approval_channels=[])}, "tok", []), + # form_id absent from the map (no recipient rows) falls back to no token, no channels. + ({}, None, []), + ], +) +def test_enrich_projects_disposition_onto_reason(dispositions, expected_token, expected_channels): + out = enrich_human_input_pause_reasons( + [dict(_HUMAN_INPUT_REASON)], + dispositions_by_form_id=dispositions, + expiration_times_by_form_id={}, + ) + + assert out[0]["form_token"] == expected_token + assert out[0]["approval_channels"] == expected_channels + + +def test_enrich_leaves_non_human_input_reasons_untouched(): + reason = {"TYPE": "something_else", "form_id": "f1"} + + out = enrich_human_input_pause_reasons( + [reason], + dispositions_by_form_id={"f1": FormDisposition(form_token="tok", approval_channels=["email"])}, + expiration_times_by_form_id={}, + ) + + assert out[0] == reason + assert "form_token" not in out[0] + assert "approval_channels" not in out[0] + + +def test_pause_reason_payload_carries_approval_channels_through_factory(): + # from_response_data maps fields by hand; this guards approval_channels/form_token + # (the fields this feature added) against being dropped in that mapping. + from core.app.entities.task_entities import ( + HumanInputRequiredPauseReasonPayload, + HumanInputRequiredResponse, + ) + + data = HumanInputRequiredResponse.Data( + form_id="f", + node_id="n", + node_title="t", + form_content="c", + expiration_time=123, + form_token=None, + approval_channels=["console"], + ) + payload = HumanInputRequiredPauseReasonPayload.from_response_data(data) + + assert payload.approval_channels == ["console"] + assert payload.form_token is None diff --git a/api/tests/unit_tests/core/workflow/test_human_input_forms.py b/api/tests/unit_tests/core/workflow/test_human_input_forms.py index e508815b35..c84c7d578b 100644 --- a/api/tests/unit_tests/core/workflow/test_human_input_forms.py +++ b/api/tests/unit_tests/core/workflow/test_human_input_forms.py @@ -1,7 +1,16 @@ from types import SimpleNamespace -from core.workflow.human_input_forms import _load_form_tokens_by_form_id, load_form_tokens_by_form_id -from core.workflow.human_input_policy import HumanInputSurface +import pytest + +from core.workflow.human_input_forms import ( + load_form_dispositions_by_form_id, + load_form_tokens_by_form_id, +) +from core.workflow.human_input_policy import ( + FormDisposition, + HumanInputSurface, + disposition_for_surface, +) from models.human_input import RecipientType @@ -13,91 +22,100 @@ class _FakeSession: return self._recipients -def test_load_form_tokens_by_form_id_prefers_backstage_token() -> None: +def _recipient(form_id: str, recipient_type: RecipientType, access_token: str | None) -> SimpleNamespace: + return SimpleNamespace(form_id=form_id, recipient_type=recipient_type, access_token=access_token) + + +@pytest.mark.parametrize( + ("surface", "expected_token"), + [ + # Unfiltered (no surface) picks the highest-priority recipient: backstage. + (None, "backstage-token"), + # SERVICE_API may only act on the web-app recipient. + (HumanInputSurface.SERVICE_API, "web-token"), + ], +) +def test_load_form_tokens_picks_token_for_surface(surface, expected_token) -> None: session = _FakeSession( - recipients=[ - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.STANDALONE_WEB_APP, - access_token="web-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.CONSOLE, - access_token="console-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.BACKSTAGE, - access_token="backstage-token", - ), + [ + _recipient("form-1", RecipientType.STANDALONE_WEB_APP, "web-token"), + _recipient("form-1", RecipientType.CONSOLE, "console-token"), + _recipient("form-1", RecipientType.BACKSTAGE, "backstage-token"), ] ) - assert load_form_tokens_by_form_id(["form-1"], session=session) == {"form-1": "backstage-token"} + assert load_form_tokens_by_form_id(["form-1"], session=session, surface=surface) == {"form-1": expected_token} -def test_load_form_tokens_by_form_id_ignores_unsupported_recipients() -> None: +def test_load_form_tokens_drops_forms_without_actionable_token() -> None: session = _FakeSession( - recipients=[ - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.EMAIL_MEMBER, - access_token="email-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.CONSOLE, - access_token=None, - ), + [ + _recipient("form-1", RecipientType.EMAIL_MEMBER, "email-token"), + _recipient("form-1", RecipientType.CONSOLE, None), ] ) assert load_form_tokens_by_form_id(["form-1"], session=session) == {} -def test_load_form_tokens_by_form_id_uses_shared_priority() -> None: +def test_load_form_tokens_service_api_surface_uses_web_token() -> None: session = _FakeSession( - recipients=[ - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.STANDALONE_WEB_APP, - access_token="web-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.CONSOLE, - access_token="console-token", - ), + [ + _recipient("form-1", RecipientType.STANDALONE_WEB_APP, "web-token"), + _recipient("form-1", RecipientType.CONSOLE, "console-token"), + _recipient("form-1", RecipientType.BACKSTAGE, "backstage-token"), ] ) - assert _load_form_tokens_by_form_id(session, ["form-1"]) == {"form-1": "console-token"} + assert load_form_tokens_by_form_id(["form-1"], session=session, surface=HumanInputSurface.SERVICE_API) == { + "form-1": "web-token" + } -def test_load_form_tokens_by_form_id_uses_web_token_for_service_api_surface() -> None: +def test_load_dispositions_openapi_webapp_form_is_resumable() -> None: session = _FakeSession( - recipients=[ - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.STANDALONE_WEB_APP, - access_token="web-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.CONSOLE, - access_token="console-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.BACKSTAGE, - access_token="backstage-token", - ), + [ + _recipient("form-1", RecipientType.STANDALONE_WEB_APP, "web-token"), + _recipient("form-1", RecipientType.BACKSTAGE, "backstage-token"), ] ) - assert load_form_tokens_by_form_id( - ["form-1"], - session=session, - surface=HumanInputSurface.SERVICE_API, - ) == {"form-1": "web-token"} + assert load_form_dispositions_by_form_id(["form-1"], session=session, surface=HumanInputSurface.OPENAPI) == { + "form-1": FormDisposition(form_token="web-token", approval_channels=["console"]) + } + + +def test_load_dispositions_openapi_backstage_only_form_yields_channels_not_token() -> None: + session = _FakeSession([_recipient("form-1", RecipientType.BACKSTAGE, "backstage-token")]) + + assert load_form_dispositions_by_form_id(["form-1"], session=session, surface=HumanInputSurface.OPENAPI) == { + "form-1": FormDisposition(form_token=None, approval_channels=["console"]) + } + + +# disposition_for_surface partitions recipients into a surface-actionable resume +# token plus the approval channels of the recipients the surface may NOT act on. +_WEB = (RecipientType.STANDALONE_WEB_APP, "tok_web") +_BACKSTAGE = (RecipientType.BACKSTAGE, "tok_b") +_CONSOLE = (RecipientType.CONSOLE, "tok_c") +_EMAIL_MEMBER = (RecipientType.EMAIL_MEMBER, "t1") +_EMAIL_EXTERNAL = (RecipientType.EMAIL_EXTERNAL, "t2") + + +@pytest.mark.parametrize( + ("recipients", "surface", "expected"), + [ + # Token surface acts on the web-app recipient; blocked recipients become channels. + ([_BACKSTAGE, _WEB], HumanInputSurface.OPENAPI, FormDisposition("tok_web", ["console"])), + ([_EMAIL_MEMBER, _EMAIL_EXTERNAL], HumanInputSurface.OPENAPI, FormDisposition(None, ["email"])), + ([_EMAIL_MEMBER, _BACKSTAGE], HumanInputSurface.OPENAPI, FormDisposition(None, ["console", "email"])), + # CONSOLE acts on console/backstage; a web-app recipient is blocked → web_app channel. + ([_CONSOLE, _WEB], HumanInputSurface.CONSOLE, FormDisposition("tok_c", ["web_app"])), + ([_WEB], HumanInputSurface.CONSOLE, FormDisposition(None, ["web_app"])), + # No surface: unfiltered priority token, channels never populated. + ([_BACKSTAGE], None, FormDisposition("tok_b", [])), + ([_WEB, _EMAIL_MEMBER], None, FormDisposition("tok_web", [])), + ], +) +def test_disposition_for_surface_partitions_token_and_channels(recipients, surface, expected) -> None: + assert disposition_for_surface(recipients, surface=surface) == expected diff --git a/api/tests/unit_tests/core/workflow/test_human_input_policy.py b/api/tests/unit_tests/core/workflow/test_human_input_policy.py index 651b69216a..a1c6fee98c 100644 --- a/api/tests/unit_tests/core/workflow/test_human_input_policy.py +++ b/api/tests/unit_tests/core/workflow/test_human_input_policy.py @@ -12,38 +12,28 @@ from graphon.runtime import VariablePool from models.human_input import RecipientType -def test_service_api_only_allows_public_webapp_forms() -> None: - assert is_recipient_type_allowed_for_surface( - RecipientType.STANDALONE_WEB_APP, - HumanInputSurface.SERVICE_API, - ) - assert not is_recipient_type_allowed_for_surface( - RecipientType.CONSOLE, - HumanInputSurface.SERVICE_API, - ) - assert not is_recipient_type_allowed_for_surface( - RecipientType.BACKSTAGE, - HumanInputSurface.SERVICE_API, - ) - assert not is_recipient_type_allowed_for_surface( - RecipientType.EMAIL_MEMBER, - HumanInputSurface.SERVICE_API, - ) - - -def test_console_only_allows_internal_console_surfaces() -> None: - assert is_recipient_type_allowed_for_surface( - RecipientType.CONSOLE, - HumanInputSurface.CONSOLE, - ) - assert is_recipient_type_allowed_for_surface( - RecipientType.BACKSTAGE, - HumanInputSurface.CONSOLE, - ) - assert not is_recipient_type_allowed_for_surface( - RecipientType.STANDALONE_WEB_APP, - HumanInputSurface.CONSOLE, - ) +# Token surfaces (SERVICE_API, OPENAPI) may act only on public web-app forms; +# CONSOLE may act on internal console/backstage forms. OPENAPI mirrors SERVICE_API +# today but is pinned independently because the two are expected to diverge. +@pytest.mark.parametrize( + ("recipient_type", "surface", "allowed"), + [ + (RecipientType.STANDALONE_WEB_APP, HumanInputSurface.SERVICE_API, True), + (RecipientType.CONSOLE, HumanInputSurface.SERVICE_API, False), + (RecipientType.BACKSTAGE, HumanInputSurface.SERVICE_API, False), + (RecipientType.EMAIL_MEMBER, HumanInputSurface.SERVICE_API, False), + (RecipientType.STANDALONE_WEB_APP, HumanInputSurface.OPENAPI, True), + (RecipientType.CONSOLE, HumanInputSurface.OPENAPI, False), + (RecipientType.BACKSTAGE, HumanInputSurface.OPENAPI, False), + (RecipientType.CONSOLE, HumanInputSurface.CONSOLE, True), + (RecipientType.BACKSTAGE, HumanInputSurface.CONSOLE, True), + (RecipientType.STANDALONE_WEB_APP, HumanInputSurface.CONSOLE, False), + ], +) +def test_recipient_type_allowed_per_surface( + recipient_type: RecipientType, surface: HumanInputSurface, allowed: bool +) -> None: + assert is_recipient_type_allowed_for_surface(recipient_type, surface) is allowed def test_preferred_form_token_uses_shared_priority_order() -> None: @@ -56,6 +46,17 @@ def test_preferred_form_token_uses_shared_priority_order() -> None: assert get_preferred_form_token(recipients) == "backstage-token" +def test_preferred_form_token_skips_prioritized_type_with_empty_token() -> None: + # An empty token is not actionable: the highest-priority recipient that + # actually carries a token wins, not the highest-priority type. + recipients = [ + (RecipientType.BACKSTAGE, ""), + (RecipientType.CONSOLE, "console-token"), + ] + + assert get_preferred_form_token(recipients) == "console-token" + + def test_resolve_variable_select_input_options_uses_runtime_values() -> None: variable_pool = VariablePool() variable_pool.add(("start", "options"), ["approve", "reject"]) diff --git a/api/tests/unit_tests/core/workflow/test_human_input_policy_openapi.py b/api/tests/unit_tests/core/workflow/test_human_input_policy_openapi.py deleted file mode 100644 index b78e821237..0000000000 --- a/api/tests/unit_tests/core/workflow/test_human_input_policy_openapi.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Tests for OPENAPI surface in HumanInputPolicy and human_input_forms.""" - -from __future__ import annotations - -from core.workflow.human_input_policy import HumanInputSurface, is_recipient_type_allowed_for_surface -from models.human_input import RecipientType - - -def test_openapi_surface_exists(): - assert HumanInputSurface.OPENAPI == "openapi" - - -def test_openapi_allows_standalone_web_app(): - assert is_recipient_type_allowed_for_surface(RecipientType.STANDALONE_WEB_APP, HumanInputSurface.OPENAPI) - - -def test_openapi_rejects_console_recipient(): - assert not is_recipient_type_allowed_for_surface(RecipientType.CONSOLE, HumanInputSurface.OPENAPI) - - -def test_openapi_rejects_backstage_recipient(): - assert not is_recipient_type_allowed_for_surface(RecipientType.BACKSTAGE, HumanInputSurface.OPENAPI) - - -def test_get_surface_form_token_openapi_picks_standalone_web_app(): - """OPENAPI surface should pick STANDALONE_WEB_APP token, same as SERVICE_API.""" - from core.workflow.human_input_forms import _get_surface_form_token - - recipients = [ - (RecipientType.BACKSTAGE, "backstage-token"), - (RecipientType.STANDALONE_WEB_APP, "web-token"), - ] - token = _get_surface_form_token(recipients, surface=HumanInputSurface.OPENAPI) - assert token == "web-token" diff --git a/api/tests/unit_tests/models/test_recipient_type_label.py b/api/tests/unit_tests/models/test_recipient_type_label.py new file mode 100644 index 0000000000..2433bd315f --- /dev/null +++ b/api/tests/unit_tests/models/test_recipient_type_label.py @@ -0,0 +1,19 @@ +import pytest + +from models.human_input import RecipientType + + +@pytest.mark.parametrize( + ("recipient_type", "expected_label"), + [ + (RecipientType.EMAIL_MEMBER, "email"), + (RecipientType.EMAIL_EXTERNAL, "email"), + (RecipientType.CONSOLE, "console"), + (RecipientType.BACKSTAGE, "console"), + (RecipientType.STANDALONE_WEB_APP, "web_app"), + ], +) +def test_approval_channel_label_collapses_delivery_types(recipient_type: RecipientType, expected_label: str) -> None: + # Both email types collapse to "email" and console/backstage to "console": + # the user-facing approval channel, not the internal recipient type. + assert recipient_type.approval_channel_label == expected_label diff --git a/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py b/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py index eafbabe1f9..dc3a6a3120 100644 --- a/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py +++ b/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py @@ -16,12 +16,14 @@ from core.app.app_config.entities import WorkflowUIBasedAppConfig from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity from core.app.entities.task_entities import StreamEvent from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext, _WorkflowGenerateEntityWrapper +from core.workflow.human_input_policy import FormDisposition, HumanInputSurface from graphon.entities.pause_reason import HumanInputRequired from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus from graphon.nodes.human_input.entities import SelectInputConfig, StringListSource from graphon.nodes.human_input.enums import ValueSourceType from graphon.runtime import GraphRuntimeState, VariablePool from models.enums import CreatorUserRole +from models.human_input import RecipientType from models.model import AppMode from models.workflow import WorkflowRun from repositories.api_workflow_node_execution_repository import WorkflowNodeExecutionSnapshot @@ -763,7 +765,11 @@ def test_build_snapshot_events_preserves_public_form_token(monkeypatch: pytest.M snapshot = _build_snapshot(WorkflowNodeExecutionStatus.PAUSED) resumption_context = _build_resumption_context("task-ctx") monkeypatch.setattr( - service_module, "load_form_tokens_by_form_id", lambda form_ids, session=None, surface=None: {"form-1": "wtok"} + service_module, + "load_form_dispositions_by_form_id", + lambda form_ids, session=None, surface=None: { + "form-1": FormDisposition(form_token="wtok", approval_channels=[]) + }, ) session_maker = _SessionMaker( SimpleNamespace( @@ -803,12 +809,99 @@ def test_build_snapshot_events_preserves_public_form_token(monkeypatch: pytest.M assert pause_data["reasons"][0]["expiration_time"] == int(datetime(2024, 1, 1, tzinfo=UTC).timestamp()) +def _build_recipient_snapshot_events(recipients: Sequence[Any]) -> list[Mapping[str, Any]]: + """Drive the reconnect snapshot pause path for the OPENAPI surface. + + Lets the real disposition loader run against a fake session whose ``scalars`` + yields the given recipients, so the reconnect path derives the same token and + approval channels as the live path for the same recipient set. + """ + workflow_run = _build_workflow_run(WorkflowExecutionStatus.PAUSED) + snapshot = _build_snapshot(WorkflowNodeExecutionStatus.PAUSED) + resumption_context = _build_resumption_context("task-ctx") + expiration_time = datetime(2024, 1, 1, tzinfo=UTC) + session_maker = _SessionMaker( + SimpleNamespace( + execute=lambda _stmt: [("form-1", expiration_time, '{"display_in_ui": true}')], + scalars=lambda _stmt: list(recipients), + ) + ) + pause_entity = _FakePauseEntity( + pause_id="pause-1", + workflow_run_id="run-1", + paused_at_value=expiration_time, + pause_reasons=[ + HumanInputRequired( + form_id="form-1", + form_content="content", + node_id="node-1", + node_title="Human Input", + ) + ], + ) + + return _build_snapshot_events( + workflow_run=workflow_run, + node_snapshots=[snapshot], + task_id="task-ctx", + message_context=None, + pause_entity=pause_entity, + resumption_context=resumption_context, + session_maker=cast(sessionmaker[Session], session_maker), + human_input_surface=HumanInputSurface.OPENAPI, + ) + + +def test_reconnect_pause_without_web_app_recipient_emits_approval_channels() -> None: + events = _build_recipient_snapshot_events( + recipients=[ + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.EMAIL_MEMBER, access_token="email-token"), + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"), + ], + ) + + human_input_event = events[-2] + assert human_input_event["event"] == StreamEvent.HUMAN_INPUT_REQUIRED + assert human_input_event["data"]["form_token"] is None + assert human_input_event["data"]["approval_channels"] == ["console", "email"] + + pause_data = events[-1]["data"] + assert pause_data["reasons"][0]["form_token"] is None + assert pause_data["reasons"][0]["approval_channels"] == ["console", "email"] + + +def test_reconnect_pause_with_web_app_recipient_sets_token_and_channels() -> None: + events = _build_recipient_snapshot_events( + recipients=[ + SimpleNamespace( + form_id="form-1", + recipient_type=RecipientType.STANDALONE_WEB_APP, + access_token="web-app-token", + ), + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"), + ], + ) + + human_input_event = events[-2] + assert human_input_event["event"] == StreamEvent.HUMAN_INPUT_REQUIRED + assert human_input_event["data"]["form_token"] == "web-app-token" + assert human_input_event["data"]["approval_channels"] == ["console"] + + pause_data = events[-1]["data"] + assert pause_data["reasons"][0]["form_token"] == "web-app-token" + assert pause_data["reasons"][0]["approval_channels"] == ["console"] + + def test_build_snapshot_events_resolves_pause_reason_select_options(monkeypatch: pytest.MonkeyPatch) -> None: workflow_run = _build_workflow_run(WorkflowExecutionStatus.PAUSED) snapshot = _build_snapshot(WorkflowNodeExecutionStatus.PAUSED) resumption_context = _build_resumption_context("task-ctx", select_options=["approve", "reject"]) monkeypatch.setattr( - service_module, "load_form_tokens_by_form_id", lambda form_ids, session=None, surface=None: {"form-1": "wtok"} + service_module, + "load_form_dispositions_by_form_id", + lambda form_ids, session=None, surface=None: { + "form-1": FormDisposition(form_token="wtok", approval_channels=[]) + }, ) session_maker = _SessionMaker( SimpleNamespace( @@ -886,7 +979,11 @@ def test_build_workflow_event_stream_loads_pause_tokens_without_flask_app_contex service_module, "_load_resumption_context", MagicMock(return_value=_build_resumption_context("task-1")) ) monkeypatch.setattr( - service_module, "load_form_tokens_by_form_id", lambda form_ids, session=None, surface=None: {"form-1": "wtok"} + service_module, + "load_form_dispositions_by_form_id", + lambda form_ids, session=None, surface=None: { + "form-1": FormDisposition(form_token="wtok", approval_channels=[]) + }, ) session = SimpleNamespace(