diff --git a/api/controllers/openapi/_errors.py b/api/controllers/openapi/_errors.py index 38c068bd354..5e82c2614de 100644 --- a/api/controllers/openapi/_errors.py +++ b/api/controllers/openapi/_errors.py @@ -63,6 +63,8 @@ class OpenApiErrorCode(StrEnum): FILE_EXTENSION_BLOCKED = "file_extension_blocked" MEMBER_LIMIT_EXCEEDED = "member_limit_exceeded" MEMBER_LICENSE_EXCEEDED = "member_license_exceeded" + HUMAN_INPUT_FORM_NOT_FOUND = "form_not_found" + RECIPIENT_SURFACE_MISMATCH = "recipient_surface_mismatch" class ErrorDetail(BaseModel): @@ -239,3 +241,16 @@ class MemberLicenseExceeded(OpenApiError): # noqa: N818 error_code = OpenApiErrorCode.MEMBER_LICENSE_EXCEEDED description = "Workspace member license capacity reached." hint = "Contact your workspace administrator to expand the license seat count." + + +class HumanInputFormNotFound(OpenApiError): # noqa: N818 + code = 404 + error_code = OpenApiErrorCode.HUMAN_INPUT_FORM_NOT_FOUND + description = "No human-input form matches this token. It may be wrong, expired, or already submitted." + + +class RecipientSurfaceMismatch(OpenApiError): # noqa: N818 + code = 403 + error_code = OpenApiErrorCode.RECIPIENT_SURFACE_MISMATCH + description = "This form's recipient can't be submitted via the OpenAPI surface." + hint = "Action it through its channel (web app or console)." diff --git a/api/controllers/openapi/human_input_form.py b/api/controllers/openapi/human_input_form.py index 995315150cc..e5930ea8fa2 100644 --- a/api/controllers/openapi/human_input_form.py +++ b/api/controllers/openapi/human_input_form.py @@ -12,16 +12,20 @@ import logging from flask import Response from flask_restx import Resource -from werkzeug.exceptions import BadRequest, NotFound +from werkzeug.exceptions import BadRequest from controllers.common.human_input import HumanInputFormSubmitPayload, stringify_form_default_values from controllers.common.schema import register_schema_models from controllers.openapi import openapi_ns from controllers.openapi._contract import accepts, returns +from controllers.openapi._errors import HumanInputFormNotFound, RecipientSurfaceMismatch from controllers.openapi._models import FormSubmitResponse, HumanInputFormDefinitionResponse from controllers.openapi.auth.composition import auth_router from controllers.openapi.auth.data import AuthData -from core.workflow.human_input_policy import HumanInputSurface, is_recipient_type_allowed_for_surface +from core.workflow.human_input_policy import ( + HumanInputSurface, + is_recipient_type_allowed_for_surface, +) from extensions.ext_database import db from libs.helper import to_timestamp from libs.oauth_bearer import Scope @@ -47,12 +51,12 @@ def _jsonify_form_definition(form) -> Response: def _ensure_form_belongs_to_app(form, app_model: App) -> None: if form.app_id != app_model.id or form.tenant_id != app_model.tenant_id: - raise NotFound("Form not found") + raise HumanInputFormNotFound() def _ensure_form_is_allowed_for_openapi(form) -> None: if not is_recipient_type_allowed_for_surface(form.recipient_type, HumanInputSurface.OPENAPI): - raise NotFound("Form not found") + raise RecipientSurfaceMismatch() @openapi_ns.route("/apps//form/human_input/") @@ -60,11 +64,11 @@ class OpenApiWorkflowHumanInputFormApi(Resource): @openapi_ns.response(200, "Form definition", openapi_ns.models[HumanInputFormDefinitionResponse.__name__]) @auth_router.guard(scope=Scope.APPS_RUN) def get(self, app_id: str, form_token: str, *, auth_data: AuthData): - app_model, caller, caller_kind = auth_data.require_app_context() + app_model, _caller, _caller_kind = auth_data.require_app_context() service = HumanInputService(db.engine) form = service.get_form_by_token(form_token) if form is None: - raise NotFound("Form not found") + raise HumanInputFormNotFound() _ensure_form_belongs_to_app(form, app_model) _ensure_form_is_allowed_for_openapi(form) @@ -80,7 +84,7 @@ class OpenApiWorkflowHumanInputFormApi(Resource): service = HumanInputService(db.engine) form = service.get_form_by_token(form_token) if form is None: - raise NotFound("Form not found") + raise HumanInputFormNotFound() _ensure_form_belongs_to_app(form, app_model) _ensure_form_is_allowed_for_openapi(form) @@ -106,6 +110,6 @@ class OpenApiWorkflowHumanInputFormApi(Resource): submission_end_user_id=submission_end_user_id, ) except FormNotFoundError: - raise NotFound("Form not found") + raise HumanInputFormNotFound() return FormSubmitResponse() diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index c9486b5821f..67f37e78ab9 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -51,8 +51,11 @@ from core.tools.entities.tool_entities import ToolProviderType from core.tools.tool_manager import ToolManager from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE from core.trigger.trigger_manager import TriggerManager -from core.workflow.human_input_forms import load_form_tokens_by_form_id +from core.workflow.human_input_forms import ( + load_form_dispositions_by_form_id, +) from core.workflow.human_input_policy import ( + FormDisposition, HumanInputSurface, enrich_human_input_pause_reasons, resolve_human_input_pause_reason_inputs, @@ -340,13 +343,14 @@ class WorkflowResponseConverter: human_input_form_ids = [reason.form_id for reason in resolved_reasons if isinstance(reason, HumanInputRequired)] expiration_times_by_form_id: dict[str, datetime] = {} display_in_ui_by_form_id: dict[str, bool] = {} - form_token_by_form_id: dict[str, str] = {} + dispositions_by_form_id: dict[str, FormDisposition] = {} if human_input_form_ids: stmt = select( HumanInputForm.id, HumanInputForm.expiration_time, HumanInputForm.form_definition, ).where(HumanInputForm.id.in_(human_input_form_ids)) + hitl_surface = _INVOKE_FROM_TO_HITL_SURFACE.get(self._application_generate_entity.invoke_from) with Session(bind=db.engine) as session: for form_id, expiration_time, form_definition in session.execute(stmt): expiration_times_by_form_id[str(form_id)] = expiration_time @@ -355,17 +359,17 @@ class WorkflowResponseConverter: except (TypeError, json.JSONDecodeError): definition_payload = {} display_in_ui_by_form_id[str(form_id)] = bool(definition_payload.get("display_in_ui")) - form_token_by_form_id = load_form_tokens_by_form_id( + dispositions_by_form_id = load_form_dispositions_by_form_id( human_input_form_ids, session=session, - surface=_INVOKE_FROM_TO_HITL_SURFACE.get(self._application_generate_entity.invoke_from), + surface=hitl_surface, ) # Reconnect paths must preserve the same pause-reason contract as live streams; # otherwise clients see schema drift after resume. pause_reasons = enrich_human_input_pause_reasons( pause_reasons, - form_tokens_by_form_id=form_token_by_form_id, + dispositions_by_form_id=dispositions_by_form_id, expiration_times_by_form_id={ form_id: int(expiration_time.timestamp()) for form_id, expiration_time in expiration_times_by_form_id.items() @@ -379,6 +383,7 @@ class WorkflowResponseConverter: expiration_time = expiration_times_by_form_id.get(reason.form_id) if expiration_time is None: raise ValueError(f"HumanInputForm not found for pause reason, form_id={reason.form_id}") + disposition = dispositions_by_form_id.get(reason.form_id) responses.append( HumanInputRequiredResponse( task_id=task_id, @@ -391,7 +396,8 @@ class WorkflowResponseConverter: inputs=reason.inputs, actions=reason.actions, display_in_ui=display_in_ui_by_form_id.get(reason.form_id, False), - form_token=form_token_by_form_id.get(reason.form_id), + form_token=disposition.form_token if disposition else None, + approval_channels=list(disposition.approval_channels) if disposition else [], resolved_default_values=reason.resolved_default_values, expiration_time=int(expiration_time.timestamp()), ), diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 803fdacf78d..3a8107e0461 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -288,6 +288,7 @@ class HumanInputRequiredResponse(StreamResponse): actions: Sequence[UserActionConfig] = Field(default_factory=list) display_in_ui: bool = False form_token: str | None = None + approval_channels: list[str] = Field(default_factory=list) resolved_default_values: Mapping[str, Any] = Field(default_factory=dict) expiration_time: int = Field(..., description="Unix timestamp in seconds") @@ -311,6 +312,7 @@ class HumanInputRequiredPauseReasonPayload(BaseModel): actions: Sequence[UserActionConfig] = Field(default_factory=list) display_in_ui: bool = False form_token: str | None = None + approval_channels: list[str] = Field(default_factory=list) resolved_default_values: Mapping[str, Any] = Field(default_factory=dict) expiration_time: int @@ -325,6 +327,7 @@ class HumanInputRequiredPauseReasonPayload(BaseModel): actions=data.actions, display_in_ui=data.display_in_ui, form_token=data.form_token, + approval_channels=data.approval_channels, resolved_default_values=data.resolved_default_values, expiration_time=data.expiration_time, ) diff --git a/api/core/workflow/human_input_forms.py b/api/core/workflow/human_input_forms.py index fe3c161a326..b850cd23914 100644 --- a/api/core/workflow/human_input_forms.py +++ b/api/core/workflow/human_input_forms.py @@ -12,60 +12,61 @@ from collections.abc import Sequence from sqlalchemy import select from sqlalchemy.orm import Session -from core.workflow.human_input_policy import HumanInputSurface, get_preferred_form_token +from core.workflow.human_input_policy import ( + FormDisposition, + HumanInputSurface, + disposition_for_surface, +) from extensions.ext_database import db from models.human_input import HumanInputFormRecipient, RecipientType +def load_form_dispositions_by_form_id( + form_ids: Sequence[str], + *, + session: Session | None = None, + surface: HumanInputSurface | None = None, +) -> dict[str, FormDisposition]: + """Resolve each paused form's resume token and approval channels for `surface`.""" + unique_form_ids = list(dict.fromkeys(form_ids)) + if not unique_form_ids: + return {} + + if session is not None: + return _load_form_dispositions_by_form_id(session, unique_form_ids, surface=surface) + + with Session(bind=db.engine, expire_on_commit=False) as new_session: + return _load_form_dispositions_by_form_id(new_session, unique_form_ids, surface=surface) + + +def _load_form_dispositions_by_form_id( + session: Session, + form_ids: Sequence[str], + *, + surface: HumanInputSurface | None, +) -> dict[str, FormDisposition]: + recipients_by_form_id: dict[str, list[tuple[RecipientType, str]]] = {} + stmt = select(HumanInputFormRecipient).where(HumanInputFormRecipient.form_id.in_(form_ids)) + for recipient in session.scalars(stmt): + recipients_by_form_id.setdefault(recipient.form_id, []).append( + (recipient.recipient_type, recipient.access_token or "") + ) + return { + form_id: disposition_for_surface(recipients, surface=surface) + for form_id, recipients in recipients_by_form_id.items() + } + + def load_form_tokens_by_form_id( form_ids: Sequence[str], *, session: Session | None = None, surface: HumanInputSurface | None = None, ) -> dict[str, str]: - """Load the preferred access token for each human input form.""" - unique_form_ids = list(dict.fromkeys(form_ids)) - if not unique_form_ids: - return {} - - if session is not None: - return _load_form_tokens_by_form_id(session, unique_form_ids, surface=surface) - - with Session(bind=db.engine, expire_on_commit=False) as new_session: - return _load_form_tokens_by_form_id(new_session, unique_form_ids, surface=surface) - - -def _load_form_tokens_by_form_id( - session: Session, - form_ids: Sequence[str], - *, - surface: HumanInputSurface | None = None, -) -> dict[str, str]: - recipients_by_form_id: dict[str, list[tuple[RecipientType, str]]] = {} - stmt = select(HumanInputFormRecipient).where(HumanInputFormRecipient.form_id.in_(form_ids)) - for recipient in session.scalars(stmt): - if not recipient.access_token: - continue - recipients_by_form_id.setdefault(recipient.form_id, []).append( - (recipient.recipient_type, recipient.access_token) - ) - - tokens_by_form_id: dict[str, str] = {} - for form_id, recipients in recipients_by_form_id.items(): - token = _get_surface_form_token(recipients, surface=surface) - if token is not None: - tokens_by_form_id[form_id] = token - return tokens_by_form_id - - -def _get_surface_form_token( - recipients: Sequence[tuple[RecipientType, str]], - *, - surface: HumanInputSurface | None, -) -> str | None: - if surface in {HumanInputSurface.SERVICE_API, HumanInputSurface.OPENAPI}: - for recipient_type, token in recipients: - if recipient_type == RecipientType.STANDALONE_WEB_APP and token: - return token - - return get_preferred_form_token(recipients) + """Resume tokens only, for callers that don't surface approval channels.""" + dispositions = load_form_dispositions_by_form_id(form_ids, session=session, surface=surface) + return { + form_id: disposition.form_token + for form_id, disposition in dispositions.items() + if disposition.form_token is not None + } diff --git a/api/core/workflow/human_input_policy.py b/api/core/workflow/human_input_policy.py index e95d753ae96..d6f7df52354 100644 --- a/api/core/workflow/human_input_policy.py +++ b/api/core/workflow/human_input_policy.py @@ -2,14 +2,14 @@ from __future__ import annotations from collections.abc import Mapping, Sequence from enum import StrEnum -from typing import Any +from typing import Any, NamedTuple from graphon.entities.pause_reason import HumanInputRequired, PauseReason, PauseReasonType from graphon.nodes.human_input.entities import FormInputConfig, SelectInputConfig from graphon.nodes.human_input.enums import ValueSourceType from graphon.runtime.graph_runtime_state_protocol import ReadOnlyVariablePool from graphon.variables import ArrayStringSegment -from models.human_input import RecipientType +from models.human_input import ApprovalChannel, RecipientType class HumanInputSurface(StrEnum): @@ -20,7 +20,7 @@ class HumanInputSurface(StrEnum): # SERVICE_API and OPENAPI are intentionally narrower than CONSOLE: token callers # should only be able to act on end-user web forms, not internal console flows. -_ALLOWED_RECIPIENT_TYPES_BY_SURFACE: dict[HumanInputSurface, frozenset[RecipientType]] = { +ALLOWED_RECIPIENT_TYPES_BY_SURFACE: dict[HumanInputSurface, frozenset[RecipientType]] = { HumanInputSurface.SERVICE_API: frozenset({RecipientType.STANDALONE_WEB_APP}), HumanInputSurface.CONSOLE: frozenset({RecipientType.CONSOLE, RecipientType.BACKSTAGE}), HumanInputSurface.OPENAPI: frozenset({RecipientType.STANDALONE_WEB_APP}), @@ -41,7 +41,7 @@ def is_recipient_type_allowed_for_surface( ) -> bool: if recipient_type is None: return False - return recipient_type in _ALLOWED_RECIPIENT_TYPES_BY_SURFACE[surface] + return recipient_type in ALLOWED_RECIPIENT_TYPES_BY_SURFACE[surface] def get_preferred_form_token( @@ -59,10 +59,39 @@ def get_preferred_form_token( return chosen_token +class FormDisposition(NamedTuple): + """How a paused form resolves for one API surface. + + A form's recipients split into those the surface may act on (yielding a resume + `form_token`) and those it may not (their channels named in `approval_channels` + so the caller is told where approval actually happens instead). + """ + + form_token: str | None + approval_channels: list[ApprovalChannel] + + +def disposition_for_surface( + recipients: Sequence[tuple[RecipientType, str]], + *, + surface: HumanInputSurface | None, +) -> FormDisposition: + if surface is None: + return FormDisposition(form_token=get_preferred_form_token(recipients), approval_channels=[]) + allowed = ALLOWED_RECIPIENT_TYPES_BY_SURFACE[surface] + actionable = [(recipient_type, token) for recipient_type, token in recipients if recipient_type in allowed] + return FormDisposition( + form_token=get_preferred_form_token(actionable), + approval_channels=sorted( + {recipient_type.approval_channel for recipient_type, _ in recipients if recipient_type not in allowed} + ), + ) + + def enrich_human_input_pause_reasons( reasons: Sequence[Mapping[str, Any]], *, - form_tokens_by_form_id: Mapping[str, str], + dispositions_by_form_id: Mapping[str, FormDisposition], expiration_times_by_form_id: Mapping[str, int], ) -> list[dict[str, Any]]: enriched: list[dict[str, Any]] = [] @@ -71,7 +100,9 @@ def enrich_human_input_pause_reasons( if updated.get("TYPE") == PauseReasonType.HUMAN_INPUT_REQUIRED: form_id = updated.get("form_id") if isinstance(form_id, str): - updated["form_token"] = form_tokens_by_form_id.get(form_id) + disposition = dispositions_by_form_id.get(form_id) + updated["form_token"] = disposition.form_token if disposition else None + updated["approval_channels"] = list(disposition.approval_channels) if disposition else [] expiration_time = expiration_times_by_form_id.get(form_id) if expiration_time is not None: updated["expiration_time"] = expiration_time diff --git a/api/models/human_input.py b/api/models/human_input.py index d11274bc921..b84579a4e09 100644 --- a/api/models/human_input.py +++ b/api/models/human_input.py @@ -134,20 +134,40 @@ class HumanInputDelivery(DefaultFieldsMixin, Base): ) +class ApprovalChannel(StrEnum): + """Where a paused human input form can be approved, surfaced to API callers.""" + + EMAIL = "email" + WEB_APP = "web_app" + CONSOLE = "console" + + class RecipientType(StrEnum): - # EMAIL_MEMBER member means that the - EMAIL_MEMBER = "email_member" - EMAIL_EXTERNAL = "email_external" + # Second value = the approval channel this recipient maps to (surfaced in `approval_channels`). + EMAIL_MEMBER = "email_member", ApprovalChannel.EMAIL + EMAIL_EXTERNAL = "email_external", ApprovalChannel.EMAIL # STANDALONE_WEB_APP is used by the standalone web app. # # It's not used while running workflows / chatflows containing HumanInput # node inside console. - STANDALONE_WEB_APP = "standalone_web_app" + STANDALONE_WEB_APP = "standalone_web_app", ApprovalChannel.WEB_APP # CONSOLE is used while running workflows / chatflows containing HumanInput # node inside console. (E.G. running installed apps or debugging workflows / chatflows) - CONSOLE = "console" + CONSOLE = "console", ApprovalChannel.CONSOLE # BACKSTAGE is used for backstage input inside console. - BACKSTAGE = "backstage" + BACKSTAGE = "backstage", ApprovalChannel.CONSOLE + + _approval_channel: ApprovalChannel + + def __new__(cls, value: str, approval_channel: ApprovalChannel) -> "RecipientType": + member = str.__new__(cls, value) + member._value_ = value + member._approval_channel = approval_channel + return member + + @property + def approval_channel(self) -> ApprovalChannel: + return self._approval_channel @final diff --git a/api/services/workflow_event_snapshot_service.py b/api/services/workflow_event_snapshot_service.py index dad7dff292e..c693f6318bd 100644 --- a/api/services/workflow_event_snapshot_service.py +++ b/api/services/workflow_event_snapshot_service.py @@ -23,8 +23,11 @@ from core.app.entities.task_entities import ( WorkflowStartStreamResponse, ) from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext -from core.workflow.human_input_forms import load_form_tokens_by_form_id +from core.workflow.human_input_forms import ( + load_form_dispositions_by_form_id, +) from core.workflow.human_input_policy import ( + FormDisposition, HumanInputSurface, enrich_human_input_pause_reasons, resolve_human_input_pause_reason_inputs, @@ -359,7 +362,7 @@ def _build_human_input_required_events( expiration_times_by_form_id: dict[str, int] = {} display_in_ui_by_form_id: dict[str, bool] = {} - form_tokens_by_form_id: dict[str, str] = {} + dispositions_by_form_id: dict[str, FormDisposition] = {} if human_input_form_ids and session_maker is not None: stmt = select(HumanInputForm.id, HumanInputForm.expiration_time, HumanInputForm.form_definition).where( HumanInputForm.id.in_(human_input_form_ids) @@ -372,7 +375,7 @@ def _build_human_input_required_events( except (TypeError, json.JSONDecodeError): definition_payload = {} display_in_ui_by_form_id[str(form_id)] = bool(definition_payload.get("display_in_ui")) - form_tokens_by_form_id = load_form_tokens_by_form_id( + dispositions_by_form_id = load_form_dispositions_by_form_id( human_input_form_ids, session=session, surface=human_input_surface, @@ -393,6 +396,7 @@ def _build_human_input_required_events( reason.inputs, variable_pool=variable_pool, ) + disposition = dispositions_by_form_id.get(form_id) response = HumanInputRequiredResponse( task_id=task_id, @@ -405,7 +409,8 @@ def _build_human_input_required_events( inputs=resolved_inputs, actions=reason.actions, display_in_ui=display_in_ui_by_form_id.get(form_id, False), - form_token=form_tokens_by_form_id.get(form_id), + form_token=disposition.form_token if disposition else None, + approval_channels=list(disposition.approval_channels) if disposition else [], resolved_default_values=reason.resolved_default_values, expiration_time=expiration_time, ), @@ -493,11 +498,11 @@ def _build_pause_event( for form_id in [reason.get("form_id")] if isinstance(form_id, str) ] - form_tokens_by_form_id: dict[str, str] = {} + dispositions_by_form_id: dict[str, FormDisposition] = {} expiration_times_by_form_id: dict[str, int] = {} if human_input_form_ids and session_maker is not None: with session_maker() as session: - form_tokens_by_form_id = load_form_tokens_by_form_id( + dispositions_by_form_id = load_form_dispositions_by_form_id( human_input_form_ids, session=session, surface=human_input_surface, @@ -512,7 +517,7 @@ def _build_pause_event( # otherwise clients see schema drift after resume. reasons = enrich_human_input_pause_reasons( reasons, - form_tokens_by_form_id=form_tokens_by_form_id, + dispositions_by_form_id=dispositions_by_form_id, expiration_times_by_form_id=expiration_times_by_form_id, ) diff --git a/api/tests/unit_tests/controllers/openapi/test_error_contract.py b/api/tests/unit_tests/controllers/openapi/test_error_contract.py index 45a577443b7..788a7215ed2 100644 --- a/api/tests/unit_tests/controllers/openapi/test_error_contract.py +++ b/api/tests/unit_tests/controllers/openapi/test_error_contract.py @@ -26,11 +26,13 @@ from controllers.openapi._errors import ( ErrorBody, ErrorDetail, FilenameNotExists, + HumanInputFormNotFound, MemberLicenseExceeded, MemberLimitExceeded, OpenApiError, OpenApiErrorCode, OpenApiErrorFormatter, + RecipientSurfaceMismatch, ) from controllers.service_api.app.error import ( AppUnavailableError, @@ -319,6 +321,8 @@ ERROR_MATRIX = [ (BlockedFileExtensionError(), 400, "file_extension_blocked"), (MemberLimitExceeded(), 403, "member_limit_exceeded"), (MemberLicenseExceeded(), 403, "member_license_exceeded"), + (HumanInputFormNotFound(), 404, "form_not_found"), + (RecipientSurfaceMismatch(), 403, "recipient_surface_mismatch"), ] diff --git a/api/tests/unit_tests/controllers/openapi/test_human_input_form.py b/api/tests/unit_tests/controllers/openapi/test_human_input_form.py index f8d296deb3e..5659cd6eeff 100644 --- a/api/tests/unit_tests/controllers/openapi/test_human_input_form.py +++ b/api/tests/unit_tests/controllers/openapi/test_human_input_form.py @@ -11,8 +11,9 @@ from unittest.mock import Mock import pytest from flask import Flask -from werkzeug.exceptions import NotFound, UnprocessableEntity +from werkzeug.exceptions import UnprocessableEntity +from controllers.openapi._errors import HumanInputFormNotFound, RecipientSurfaceMismatch from controllers.openapi.auth.data import AuthData from libs.oauth_bearer import Scope, TokenType from models.human_input import RecipientType @@ -89,7 +90,7 @@ class TestOpenApiHumanInputFormGet: caller = SimpleNamespace(id="acct-1") with app.test_request_context("/openapi/v1/apps/app-1/form/human_input/bad"): - with pytest.raises(NotFound): + with pytest.raises(HumanInputFormNotFound): api.get.__wrapped__( api, app_id="app-1", @@ -101,7 +102,10 @@ class TestOpenApiHumanInputFormGet: from controllers.openapi.human_input_form import OpenApiWorkflowHumanInputFormApi form = SimpleNamespace( - app_id="other-app", tenant_id="tenant-1", expiration_time=datetime(2099, 1, 1, tzinfo=UTC) + app_id="other-app", + tenant_id="tenant-1", + recipient_type=RecipientType.STANDALONE_WEB_APP, + expiration_time=datetime(2099, 1, 1, tzinfo=UTC), ) service_mock = Mock() service_mock.get_form_by_token.return_value = form @@ -114,7 +118,7 @@ class TestOpenApiHumanInputFormGet: caller = SimpleNamespace(id="acct-1") with app.test_request_context("/openapi/v1/apps/app-1/form/human_input/tok-1"): - with pytest.raises(NotFound): + with pytest.raises(HumanInputFormNotFound): api.get.__wrapped__( api, app_id="app-1", @@ -142,7 +146,7 @@ class TestOpenApiHumanInputFormGet: caller = SimpleNamespace(id="acct-1") with app.test_request_context("/openapi/v1/apps/app-1/form/human_input/tok-1"): - with pytest.raises(NotFound): + with pytest.raises(RecipientSurfaceMismatch): api.get.__wrapped__( api, app_id="app-1", @@ -234,6 +238,38 @@ class TestOpenApiHumanInputFormPost: ) assert result == ({}, 200) + def test_post_standalone_web_app_recipient_submits( + self, app: Flask, bypass_pipeline, monkeypatch: pytest.MonkeyPatch + ): + from controllers.openapi.human_input_form import OpenApiWorkflowHumanInputFormApi + + form = self._make_form(recipient_type=RecipientType.STANDALONE_WEB_APP) + service_mock = Mock() + service_mock.get_form_by_token.return_value = form + + module = sys.modules["controllers.openapi.human_input_form"] + monkeypatch.setattr(module, "HumanInputService", lambda _engine: service_mock) + monkeypatch.setattr(module, "db", SimpleNamespace(engine=object())) + + api = OpenApiWorkflowHumanInputFormApi() + app_model = SimpleNamespace(id="app-1", tenant_id="tenant-1") + caller = SimpleNamespace(id="anyone") + + with app.test_request_context( + "/openapi/v1/apps/app-1/form/human_input/tok-1", + method="POST", + json={"action": "approve", "inputs": {}}, + ): + result = api.post.__wrapped__( + api, + app_id="app-1", + form_token="tok-1", + auth_data=_make_auth_data(app_model, caller, "end_user"), + ) + + service_mock.submit_form_by_token.assert_called_once() + assert result == ({}, 200) + def test_post_rejects_invalid_body_with_422(self, app: Flask, bypass_pipeline): """Malformed body → 422 via @accepts (was an unmapped pydantic error → 500).""" from controllers.openapi.human_input_form import OpenApiWorkflowHumanInputFormApi diff --git a/api/tests/unit_tests/controllers/service_api/app/test_hitl_service_api.py b/api/tests/unit_tests/controllers/service_api/app/test_hitl_service_api.py index 8686f49a4a8..576b95110a0 100644 --- a/api/tests/unit_tests/controllers/service_api/app/test_hitl_service_api.py +++ b/api/tests/unit_tests/controllers/service_api/app/test_hitl_service_api.py @@ -29,7 +29,7 @@ from core.app.entities.task_entities import ( WorkflowPauseStreamResponse, ) from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext, _WorkflowGenerateEntityWrapper -from core.workflow.human_input_policy import HumanInputSurface +from core.workflow.human_input_policy import FormDisposition, HumanInputSurface from core.workflow.system_variables import build_system_variables from graphon.entities import WorkflowStartReason from graphon.entities.pause_reason import HumanInputRequired, PauseReasonType @@ -592,8 +592,10 @@ class TestHitlServiceApi: monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object())) monkeypatch.setattr( workflow_response_converter, - "load_form_tokens_by_form_id", - lambda form_ids, session=None, surface=None: {"form-1": "token"}, + "load_form_dispositions_by_form_id", + lambda form_ids, session=None, surface=None: { + "form-1": FormDisposition(form_token="token", approval_channels=[]) + }, ) reason = HumanInputRequired( @@ -652,8 +654,10 @@ class TestHitlServiceApi: snapshot = _build_snapshot(WorkflowNodeExecutionStatus.PAUSED) resumption_context = _build_resumption_context("task-ctx") monkeypatch.setattr( - "services.workflow_event_snapshot_service.load_form_tokens_by_form_id", - lambda form_ids, session=None, surface=None: {"form-1": "wtok"}, + "services.workflow_event_snapshot_service.load_form_dispositions_by_form_id", + lambda form_ids, session=None, surface=None: { + "form-1": FormDisposition(form_token="wtok", approval_channels=[]) + }, ) class _SessionContext: diff --git a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py index b75f6d44943..28f416ac27f 100644 --- a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py +++ b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py @@ -175,6 +175,7 @@ class TestAdvancedChatGenerateTaskPipeline: "actions": [{"id": "approve", "title": "Approve", "button_style": "default"}], "display_in_ui": True, "form_token": "token-1", + "approval_channels": [], "resolved_default_values": {}, "expiration_time": 123, } diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py b/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py index 319e603b351..098a63a8f91 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py @@ -26,6 +26,26 @@ from models.account import Account from models.human_input import RecipientType +class _FakeSession: + """Stub session: `execute` feeds the form-expiration query, `scalars` the recipients.""" + + def __init__(self, *, execute_rows=(), scalars_rows=()): + self._execute_rows = execute_rows + self._scalars_rows = scalars_rows + + def execute(self, _stmt): + return list(self._execute_rows) + + def scalars(self, _stmt): + return list(self._scalars_rows) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + class _RecordingWorkflowAppRunner(WorkflowAppRunner): def __init__(self, **kwargs): super().__init__(**kwargs) @@ -97,11 +117,11 @@ def test_graph_run_paused_event_emits_queue_pause_event(): assert queue_event.paused_nodes == ["node-pause-1"] -def _build_converter(): +def _build_converter(*, invoke_from: InvokeFrom = InvokeFrom.SERVICE_API): application_generate_entity = SimpleNamespace( inputs={}, files=[], - invoke_from=InvokeFrom.SERVICE_API, + invoke_from=invoke_from, app_config=SimpleNamespace(app_id="app-id", tenant_id="tenant-id"), ) system_variables = build_system_variables( @@ -131,32 +151,15 @@ def test_queue_workflow_paused_event_to_stream_responses(monkeypatch: pytest.Mon ) expiration_time = datetime(2024, 1, 1, tzinfo=UTC) + session = _FakeSession( + execute_rows=[("form-1", expiration_time, '{"display_in_ui": true}')], + scalars_rows=[ + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.CONSOLE, access_token="console-token"), + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"), + ], + ) - class _FakeSession: - def execute(self, _stmt): - return [("form-1", expiration_time, '{"display_in_ui": true}')] - - def scalars(self, _stmt): - return [ - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.CONSOLE, - access_token="console-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.BACKSTAGE, - access_token="backstage-token", - ), - ] - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc, tb): - return False - - monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: _FakeSession()) + monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: session) monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object())) reason = HumanInputRequired( @@ -195,10 +198,92 @@ def test_queue_workflow_paused_event_to_stream_responses(monkeypatch: pytest.Mon assert hi_resp.data.inputs[0].output_variable_name == "field" assert hi_resp.data.actions[0].id == "approve" assert hi_resp.data.display_in_ui is True - assert hi_resp.data.form_token == "backstage-token" + assert hi_resp.data.form_token is None + assert hi_resp.data.approval_channels == ["console"] assert hi_resp.data.expiration_time == int(expiration_time.timestamp()) +def _build_paused_human_input_response(monkeypatch, recipients): + """Drive the live OPENAPI pause path with the given recipients via a fake session.""" + converter = _build_converter(invoke_from=InvokeFrom.OPENAPI) + converter.workflow_start_to_stream_response( + task_id="task", + workflow_run_id="run-id", + workflow_id="workflow-id", + reason=WorkflowStartReason.INITIAL, + ) + + expiration_time = datetime(2024, 1, 1, tzinfo=UTC) + session = _FakeSession( + execute_rows=[("form-1", expiration_time, '{"display_in_ui": true}')], + scalars_rows=list(recipients), + ) + + monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: session) + monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object())) + + reason = HumanInputRequired( + form_id="form-1", + form_content="Rendered", + inputs=[ParagraphInputConfig(output_variable_name="field")], + actions=[UserActionConfig(id="approve", title="Approve")], + node_id="node-id", + node_title="Human Step", + ) + queue_event = QueueWorkflowPausedEvent( + reasons=[reason], + outputs={}, + paused_nodes=["node-id"], + ) + + runtime_state = GraphRuntimeState(variable_pool=VariablePool(), start_at=0.0) + responses = converter.workflow_pause_to_stream_response( + event=queue_event, + task_id="task", + graph_runtime_state=runtime_state, + ) + assert isinstance(responses[0], HumanInputRequiredResponse) + return responses + + +def test_openapi_pause_without_web_app_recipient_emits_approval_channels(monkeypatch: pytest.MonkeyPatch): + responses = _build_paused_human_input_response( + monkeypatch, + recipients=[ + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.EMAIL_MEMBER, access_token="email-token"), + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"), + ], + ) + + hi_resp = responses[0] + assert hi_resp.data.form_token is None + assert hi_resp.data.approval_channels == ["console", "email"] + + pause_resp = responses[-1] + assert pause_resp.data.reasons[0]["approval_channels"] == ["console", "email"] + + +def test_openapi_pause_with_web_app_recipient_sets_token_and_channels(monkeypatch: pytest.MonkeyPatch): + responses = _build_paused_human_input_response( + monkeypatch, + recipients=[ + SimpleNamespace( + form_id="form-1", + recipient_type=RecipientType.STANDALONE_WEB_APP, + access_token="web-app-token", + ), + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"), + ], + ) + + hi_resp = responses[0] + assert hi_resp.data.form_token == "web-app-token" + assert hi_resp.data.approval_channels == ["console"] + + pause_resp = responses[-1] + assert pause_resp.data.reasons[0]["approval_channels"] == ["console"] + + def test_queue_workflow_paused_event_resolves_variable_select_options(monkeypatch: pytest.MonkeyPatch): converter = _build_converter() converter.workflow_start_to_stream_response( @@ -209,21 +294,9 @@ def test_queue_workflow_paused_event_resolves_variable_select_options(monkeypatc ) expiration_time = datetime(2024, 1, 1, tzinfo=UTC) + session = _FakeSession(execute_rows=[("form-1", expiration_time, '{"display_in_ui": true}')]) - class _FakeSession: - def execute(self, _stmt): - return [("form-1", expiration_time, '{"display_in_ui": true}')] - - def scalars(self, _stmt): - return [] - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc, tb): - return False - - monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: _FakeSession()) + monkeypatch.setattr(workflow_response_converter, "Session", lambda **_: session) monkeypatch.setattr(workflow_response_converter, "db", SimpleNamespace(engine=object())) reason = HumanInputRequired( diff --git a/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py b/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py index ea21a1cc1a6..0aaee900e37 100644 --- a/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py +++ b/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py @@ -134,6 +134,7 @@ class TestWorkflowGenerateTaskPipeline: "actions": [], "display_in_ui": False, "form_token": None, + "approval_channels": [], "resolved_default_values": {}, "expiration_time": 1, } diff --git a/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py b/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py index 5bd35e6d3c2..27c32d47ee2 100644 --- a/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py +++ b/api/tests/unit_tests/core/repositories/test_human_input_form_repository_impl.py @@ -32,6 +32,7 @@ from models.human_input import ( EmailMemberRecipientPayload, HumanInputFormRecipient, RecipientType, + StandaloneWebAppRecipientPayload, ) @@ -307,6 +308,9 @@ class _DummyRecipient: recipient_type: RecipientType access_token: str form: _DummyForm | None = None + recipient_payload: str = dataclasses.field( + default_factory=lambda: StandaloneWebAppRecipientPayload().model_dump_json() + ) class _FakeScalarResult: diff --git a/api/tests/unit_tests/core/workflow/test_enrich_pause_reasons.py b/api/tests/unit_tests/core/workflow/test_enrich_pause_reasons.py new file mode 100644 index 00000000000..f53a8eba734 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/test_enrich_pause_reasons.py @@ -0,0 +1,63 @@ +import pytest + +from core.workflow.human_input_policy import FormDisposition, enrich_human_input_pause_reasons +from graphon.entities.pause_reason import PauseReasonType + +_HUMAN_INPUT_REASON = {"TYPE": PauseReasonType.HUMAN_INPUT_REQUIRED, "form_id": "f1"} + + +@pytest.mark.parametrize( + ("dispositions", "expected_token", "expected_channels"), + [ + ({"f1": FormDisposition(form_token=None, approval_channels=["console", "email"])}, None, ["console", "email"]), + ({"f1": FormDisposition(form_token="tok", approval_channels=[])}, "tok", []), + # form_id absent from the map (no recipient rows) falls back to no token, no channels. + ({}, None, []), + ], +) +def test_enrich_projects_disposition_onto_reason(dispositions, expected_token, expected_channels): + out = enrich_human_input_pause_reasons( + [dict(_HUMAN_INPUT_REASON)], + dispositions_by_form_id=dispositions, + expiration_times_by_form_id={}, + ) + + assert out[0]["form_token"] == expected_token + assert out[0]["approval_channels"] == expected_channels + + +def test_enrich_leaves_non_human_input_reasons_untouched(): + reason = {"TYPE": "something_else", "form_id": "f1"} + + out = enrich_human_input_pause_reasons( + [reason], + dispositions_by_form_id={"f1": FormDisposition(form_token="tok", approval_channels=["email"])}, + expiration_times_by_form_id={}, + ) + + assert out[0] == reason + assert "form_token" not in out[0] + assert "approval_channels" not in out[0] + + +def test_pause_reason_payload_carries_approval_channels_through_factory(): + # from_response_data maps fields by hand; this guards approval_channels/form_token + # (the fields this feature added) against being dropped in that mapping. + from core.app.entities.task_entities import ( + HumanInputRequiredPauseReasonPayload, + HumanInputRequiredResponse, + ) + + data = HumanInputRequiredResponse.Data( + form_id="f", + node_id="n", + node_title="t", + form_content="c", + expiration_time=123, + form_token=None, + approval_channels=["console"], + ) + payload = HumanInputRequiredPauseReasonPayload.from_response_data(data) + + assert payload.approval_channels == ["console"] + assert payload.form_token is None diff --git a/api/tests/unit_tests/core/workflow/test_human_input_forms.py b/api/tests/unit_tests/core/workflow/test_human_input_forms.py index e508815b35e..c84c7d578be 100644 --- a/api/tests/unit_tests/core/workflow/test_human_input_forms.py +++ b/api/tests/unit_tests/core/workflow/test_human_input_forms.py @@ -1,7 +1,16 @@ from types import SimpleNamespace -from core.workflow.human_input_forms import _load_form_tokens_by_form_id, load_form_tokens_by_form_id -from core.workflow.human_input_policy import HumanInputSurface +import pytest + +from core.workflow.human_input_forms import ( + load_form_dispositions_by_form_id, + load_form_tokens_by_form_id, +) +from core.workflow.human_input_policy import ( + FormDisposition, + HumanInputSurface, + disposition_for_surface, +) from models.human_input import RecipientType @@ -13,91 +22,100 @@ class _FakeSession: return self._recipients -def test_load_form_tokens_by_form_id_prefers_backstage_token() -> None: +def _recipient(form_id: str, recipient_type: RecipientType, access_token: str | None) -> SimpleNamespace: + return SimpleNamespace(form_id=form_id, recipient_type=recipient_type, access_token=access_token) + + +@pytest.mark.parametrize( + ("surface", "expected_token"), + [ + # Unfiltered (no surface) picks the highest-priority recipient: backstage. + (None, "backstage-token"), + # SERVICE_API may only act on the web-app recipient. + (HumanInputSurface.SERVICE_API, "web-token"), + ], +) +def test_load_form_tokens_picks_token_for_surface(surface, expected_token) -> None: session = _FakeSession( - recipients=[ - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.STANDALONE_WEB_APP, - access_token="web-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.CONSOLE, - access_token="console-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.BACKSTAGE, - access_token="backstage-token", - ), + [ + _recipient("form-1", RecipientType.STANDALONE_WEB_APP, "web-token"), + _recipient("form-1", RecipientType.CONSOLE, "console-token"), + _recipient("form-1", RecipientType.BACKSTAGE, "backstage-token"), ] ) - assert load_form_tokens_by_form_id(["form-1"], session=session) == {"form-1": "backstage-token"} + assert load_form_tokens_by_form_id(["form-1"], session=session, surface=surface) == {"form-1": expected_token} -def test_load_form_tokens_by_form_id_ignores_unsupported_recipients() -> None: +def test_load_form_tokens_drops_forms_without_actionable_token() -> None: session = _FakeSession( - recipients=[ - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.EMAIL_MEMBER, - access_token="email-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.CONSOLE, - access_token=None, - ), + [ + _recipient("form-1", RecipientType.EMAIL_MEMBER, "email-token"), + _recipient("form-1", RecipientType.CONSOLE, None), ] ) assert load_form_tokens_by_form_id(["form-1"], session=session) == {} -def test_load_form_tokens_by_form_id_uses_shared_priority() -> None: +def test_load_form_tokens_service_api_surface_uses_web_token() -> None: session = _FakeSession( - recipients=[ - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.STANDALONE_WEB_APP, - access_token="web-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.CONSOLE, - access_token="console-token", - ), + [ + _recipient("form-1", RecipientType.STANDALONE_WEB_APP, "web-token"), + _recipient("form-1", RecipientType.CONSOLE, "console-token"), + _recipient("form-1", RecipientType.BACKSTAGE, "backstage-token"), ] ) - assert _load_form_tokens_by_form_id(session, ["form-1"]) == {"form-1": "console-token"} + assert load_form_tokens_by_form_id(["form-1"], session=session, surface=HumanInputSurface.SERVICE_API) == { + "form-1": "web-token" + } -def test_load_form_tokens_by_form_id_uses_web_token_for_service_api_surface() -> None: +def test_load_dispositions_openapi_webapp_form_is_resumable() -> None: session = _FakeSession( - recipients=[ - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.STANDALONE_WEB_APP, - access_token="web-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.CONSOLE, - access_token="console-token", - ), - SimpleNamespace( - form_id="form-1", - recipient_type=RecipientType.BACKSTAGE, - access_token="backstage-token", - ), + [ + _recipient("form-1", RecipientType.STANDALONE_WEB_APP, "web-token"), + _recipient("form-1", RecipientType.BACKSTAGE, "backstage-token"), ] ) - assert load_form_tokens_by_form_id( - ["form-1"], - session=session, - surface=HumanInputSurface.SERVICE_API, - ) == {"form-1": "web-token"} + assert load_form_dispositions_by_form_id(["form-1"], session=session, surface=HumanInputSurface.OPENAPI) == { + "form-1": FormDisposition(form_token="web-token", approval_channels=["console"]) + } + + +def test_load_dispositions_openapi_backstage_only_form_yields_channels_not_token() -> None: + session = _FakeSession([_recipient("form-1", RecipientType.BACKSTAGE, "backstage-token")]) + + assert load_form_dispositions_by_form_id(["form-1"], session=session, surface=HumanInputSurface.OPENAPI) == { + "form-1": FormDisposition(form_token=None, approval_channels=["console"]) + } + + +# disposition_for_surface partitions recipients into a surface-actionable resume +# token plus the approval channels of the recipients the surface may NOT act on. +_WEB = (RecipientType.STANDALONE_WEB_APP, "tok_web") +_BACKSTAGE = (RecipientType.BACKSTAGE, "tok_b") +_CONSOLE = (RecipientType.CONSOLE, "tok_c") +_EMAIL_MEMBER = (RecipientType.EMAIL_MEMBER, "t1") +_EMAIL_EXTERNAL = (RecipientType.EMAIL_EXTERNAL, "t2") + + +@pytest.mark.parametrize( + ("recipients", "surface", "expected"), + [ + # Token surface acts on the web-app recipient; blocked recipients become channels. + ([_BACKSTAGE, _WEB], HumanInputSurface.OPENAPI, FormDisposition("tok_web", ["console"])), + ([_EMAIL_MEMBER, _EMAIL_EXTERNAL], HumanInputSurface.OPENAPI, FormDisposition(None, ["email"])), + ([_EMAIL_MEMBER, _BACKSTAGE], HumanInputSurface.OPENAPI, FormDisposition(None, ["console", "email"])), + # CONSOLE acts on console/backstage; a web-app recipient is blocked → web_app channel. + ([_CONSOLE, _WEB], HumanInputSurface.CONSOLE, FormDisposition("tok_c", ["web_app"])), + ([_WEB], HumanInputSurface.CONSOLE, FormDisposition(None, ["web_app"])), + # No surface: unfiltered priority token, channels never populated. + ([_BACKSTAGE], None, FormDisposition("tok_b", [])), + ([_WEB, _EMAIL_MEMBER], None, FormDisposition("tok_web", [])), + ], +) +def test_disposition_for_surface_partitions_token_and_channels(recipients, surface, expected) -> None: + assert disposition_for_surface(recipients, surface=surface) == expected diff --git a/api/tests/unit_tests/core/workflow/test_human_input_policy.py b/api/tests/unit_tests/core/workflow/test_human_input_policy.py index 651b69216ae..a1c6fee98cf 100644 --- a/api/tests/unit_tests/core/workflow/test_human_input_policy.py +++ b/api/tests/unit_tests/core/workflow/test_human_input_policy.py @@ -12,38 +12,28 @@ from graphon.runtime import VariablePool from models.human_input import RecipientType -def test_service_api_only_allows_public_webapp_forms() -> None: - assert is_recipient_type_allowed_for_surface( - RecipientType.STANDALONE_WEB_APP, - HumanInputSurface.SERVICE_API, - ) - assert not is_recipient_type_allowed_for_surface( - RecipientType.CONSOLE, - HumanInputSurface.SERVICE_API, - ) - assert not is_recipient_type_allowed_for_surface( - RecipientType.BACKSTAGE, - HumanInputSurface.SERVICE_API, - ) - assert not is_recipient_type_allowed_for_surface( - RecipientType.EMAIL_MEMBER, - HumanInputSurface.SERVICE_API, - ) - - -def test_console_only_allows_internal_console_surfaces() -> None: - assert is_recipient_type_allowed_for_surface( - RecipientType.CONSOLE, - HumanInputSurface.CONSOLE, - ) - assert is_recipient_type_allowed_for_surface( - RecipientType.BACKSTAGE, - HumanInputSurface.CONSOLE, - ) - assert not is_recipient_type_allowed_for_surface( - RecipientType.STANDALONE_WEB_APP, - HumanInputSurface.CONSOLE, - ) +# Token surfaces (SERVICE_API, OPENAPI) may act only on public web-app forms; +# CONSOLE may act on internal console/backstage forms. OPENAPI mirrors SERVICE_API +# today but is pinned independently because the two are expected to diverge. +@pytest.mark.parametrize( + ("recipient_type", "surface", "allowed"), + [ + (RecipientType.STANDALONE_WEB_APP, HumanInputSurface.SERVICE_API, True), + (RecipientType.CONSOLE, HumanInputSurface.SERVICE_API, False), + (RecipientType.BACKSTAGE, HumanInputSurface.SERVICE_API, False), + (RecipientType.EMAIL_MEMBER, HumanInputSurface.SERVICE_API, False), + (RecipientType.STANDALONE_WEB_APP, HumanInputSurface.OPENAPI, True), + (RecipientType.CONSOLE, HumanInputSurface.OPENAPI, False), + (RecipientType.BACKSTAGE, HumanInputSurface.OPENAPI, False), + (RecipientType.CONSOLE, HumanInputSurface.CONSOLE, True), + (RecipientType.BACKSTAGE, HumanInputSurface.CONSOLE, True), + (RecipientType.STANDALONE_WEB_APP, HumanInputSurface.CONSOLE, False), + ], +) +def test_recipient_type_allowed_per_surface( + recipient_type: RecipientType, surface: HumanInputSurface, allowed: bool +) -> None: + assert is_recipient_type_allowed_for_surface(recipient_type, surface) is allowed def test_preferred_form_token_uses_shared_priority_order() -> None: @@ -56,6 +46,17 @@ def test_preferred_form_token_uses_shared_priority_order() -> None: assert get_preferred_form_token(recipients) == "backstage-token" +def test_preferred_form_token_skips_prioritized_type_with_empty_token() -> None: + # An empty token is not actionable: the highest-priority recipient that + # actually carries a token wins, not the highest-priority type. + recipients = [ + (RecipientType.BACKSTAGE, ""), + (RecipientType.CONSOLE, "console-token"), + ] + + assert get_preferred_form_token(recipients) == "console-token" + + def test_resolve_variable_select_input_options_uses_runtime_values() -> None: variable_pool = VariablePool() variable_pool.add(("start", "options"), ["approve", "reject"]) diff --git a/api/tests/unit_tests/core/workflow/test_human_input_policy_openapi.py b/api/tests/unit_tests/core/workflow/test_human_input_policy_openapi.py deleted file mode 100644 index b78e821237d..00000000000 --- a/api/tests/unit_tests/core/workflow/test_human_input_policy_openapi.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Tests for OPENAPI surface in HumanInputPolicy and human_input_forms.""" - -from __future__ import annotations - -from core.workflow.human_input_policy import HumanInputSurface, is_recipient_type_allowed_for_surface -from models.human_input import RecipientType - - -def test_openapi_surface_exists(): - assert HumanInputSurface.OPENAPI == "openapi" - - -def test_openapi_allows_standalone_web_app(): - assert is_recipient_type_allowed_for_surface(RecipientType.STANDALONE_WEB_APP, HumanInputSurface.OPENAPI) - - -def test_openapi_rejects_console_recipient(): - assert not is_recipient_type_allowed_for_surface(RecipientType.CONSOLE, HumanInputSurface.OPENAPI) - - -def test_openapi_rejects_backstage_recipient(): - assert not is_recipient_type_allowed_for_surface(RecipientType.BACKSTAGE, HumanInputSurface.OPENAPI) - - -def test_get_surface_form_token_openapi_picks_standalone_web_app(): - """OPENAPI surface should pick STANDALONE_WEB_APP token, same as SERVICE_API.""" - from core.workflow.human_input_forms import _get_surface_form_token - - recipients = [ - (RecipientType.BACKSTAGE, "backstage-token"), - (RecipientType.STANDALONE_WEB_APP, "web-token"), - ] - token = _get_surface_form_token(recipients, surface=HumanInputSurface.OPENAPI) - assert token == "web-token" diff --git a/api/tests/unit_tests/models/test_recipient_type_label.py b/api/tests/unit_tests/models/test_recipient_type_label.py new file mode 100644 index 00000000000..3e98c17ca99 --- /dev/null +++ b/api/tests/unit_tests/models/test_recipient_type_label.py @@ -0,0 +1,21 @@ +import pytest + +from models.human_input import ApprovalChannel, RecipientType + + +@pytest.mark.parametrize( + ("recipient_type", "expected_channel"), + [ + (RecipientType.EMAIL_MEMBER, ApprovalChannel.EMAIL), + (RecipientType.EMAIL_EXTERNAL, ApprovalChannel.EMAIL), + (RecipientType.CONSOLE, ApprovalChannel.CONSOLE), + (RecipientType.BACKSTAGE, ApprovalChannel.CONSOLE), + (RecipientType.STANDALONE_WEB_APP, ApprovalChannel.WEB_APP), + ], +) +def test_approval_channel_collapses_delivery_types( + recipient_type: RecipientType, expected_channel: ApprovalChannel +) -> None: + # Both email types collapse to EMAIL and console/backstage to CONSOLE: + # the user-facing approval channel, not the internal recipient type. + assert recipient_type.approval_channel == expected_channel diff --git a/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py b/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py index eafbabe1f9b..dc3a6a31205 100644 --- a/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py +++ b/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py @@ -16,12 +16,14 @@ from core.app.app_config.entities import WorkflowUIBasedAppConfig from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity from core.app.entities.task_entities import StreamEvent from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext, _WorkflowGenerateEntityWrapper +from core.workflow.human_input_policy import FormDisposition, HumanInputSurface from graphon.entities.pause_reason import HumanInputRequired from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus from graphon.nodes.human_input.entities import SelectInputConfig, StringListSource from graphon.nodes.human_input.enums import ValueSourceType from graphon.runtime import GraphRuntimeState, VariablePool from models.enums import CreatorUserRole +from models.human_input import RecipientType from models.model import AppMode from models.workflow import WorkflowRun from repositories.api_workflow_node_execution_repository import WorkflowNodeExecutionSnapshot @@ -763,7 +765,11 @@ def test_build_snapshot_events_preserves_public_form_token(monkeypatch: pytest.M snapshot = _build_snapshot(WorkflowNodeExecutionStatus.PAUSED) resumption_context = _build_resumption_context("task-ctx") monkeypatch.setattr( - service_module, "load_form_tokens_by_form_id", lambda form_ids, session=None, surface=None: {"form-1": "wtok"} + service_module, + "load_form_dispositions_by_form_id", + lambda form_ids, session=None, surface=None: { + "form-1": FormDisposition(form_token="wtok", approval_channels=[]) + }, ) session_maker = _SessionMaker( SimpleNamespace( @@ -803,12 +809,99 @@ def test_build_snapshot_events_preserves_public_form_token(monkeypatch: pytest.M assert pause_data["reasons"][0]["expiration_time"] == int(datetime(2024, 1, 1, tzinfo=UTC).timestamp()) +def _build_recipient_snapshot_events(recipients: Sequence[Any]) -> list[Mapping[str, Any]]: + """Drive the reconnect snapshot pause path for the OPENAPI surface. + + Lets the real disposition loader run against a fake session whose ``scalars`` + yields the given recipients, so the reconnect path derives the same token and + approval channels as the live path for the same recipient set. + """ + workflow_run = _build_workflow_run(WorkflowExecutionStatus.PAUSED) + snapshot = _build_snapshot(WorkflowNodeExecutionStatus.PAUSED) + resumption_context = _build_resumption_context("task-ctx") + expiration_time = datetime(2024, 1, 1, tzinfo=UTC) + session_maker = _SessionMaker( + SimpleNamespace( + execute=lambda _stmt: [("form-1", expiration_time, '{"display_in_ui": true}')], + scalars=lambda _stmt: list(recipients), + ) + ) + pause_entity = _FakePauseEntity( + pause_id="pause-1", + workflow_run_id="run-1", + paused_at_value=expiration_time, + pause_reasons=[ + HumanInputRequired( + form_id="form-1", + form_content="content", + node_id="node-1", + node_title="Human Input", + ) + ], + ) + + return _build_snapshot_events( + workflow_run=workflow_run, + node_snapshots=[snapshot], + task_id="task-ctx", + message_context=None, + pause_entity=pause_entity, + resumption_context=resumption_context, + session_maker=cast(sessionmaker[Session], session_maker), + human_input_surface=HumanInputSurface.OPENAPI, + ) + + +def test_reconnect_pause_without_web_app_recipient_emits_approval_channels() -> None: + events = _build_recipient_snapshot_events( + recipients=[ + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.EMAIL_MEMBER, access_token="email-token"), + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"), + ], + ) + + human_input_event = events[-2] + assert human_input_event["event"] == StreamEvent.HUMAN_INPUT_REQUIRED + assert human_input_event["data"]["form_token"] is None + assert human_input_event["data"]["approval_channels"] == ["console", "email"] + + pause_data = events[-1]["data"] + assert pause_data["reasons"][0]["form_token"] is None + assert pause_data["reasons"][0]["approval_channels"] == ["console", "email"] + + +def test_reconnect_pause_with_web_app_recipient_sets_token_and_channels() -> None: + events = _build_recipient_snapshot_events( + recipients=[ + SimpleNamespace( + form_id="form-1", + recipient_type=RecipientType.STANDALONE_WEB_APP, + access_token="web-app-token", + ), + SimpleNamespace(form_id="form-1", recipient_type=RecipientType.BACKSTAGE, access_token="backstage-token"), + ], + ) + + human_input_event = events[-2] + assert human_input_event["event"] == StreamEvent.HUMAN_INPUT_REQUIRED + assert human_input_event["data"]["form_token"] == "web-app-token" + assert human_input_event["data"]["approval_channels"] == ["console"] + + pause_data = events[-1]["data"] + assert pause_data["reasons"][0]["form_token"] == "web-app-token" + assert pause_data["reasons"][0]["approval_channels"] == ["console"] + + def test_build_snapshot_events_resolves_pause_reason_select_options(monkeypatch: pytest.MonkeyPatch) -> None: workflow_run = _build_workflow_run(WorkflowExecutionStatus.PAUSED) snapshot = _build_snapshot(WorkflowNodeExecutionStatus.PAUSED) resumption_context = _build_resumption_context("task-ctx", select_options=["approve", "reject"]) monkeypatch.setattr( - service_module, "load_form_tokens_by_form_id", lambda form_ids, session=None, surface=None: {"form-1": "wtok"} + service_module, + "load_form_dispositions_by_form_id", + lambda form_ids, session=None, surface=None: { + "form-1": FormDisposition(form_token="wtok", approval_channels=[]) + }, ) session_maker = _SessionMaker( SimpleNamespace( @@ -886,7 +979,11 @@ def test_build_workflow_event_stream_loads_pause_tokens_without_flask_app_contex service_module, "_load_resumption_context", MagicMock(return_value=_build_resumption_context("task-1")) ) monkeypatch.setattr( - service_module, "load_form_tokens_by_form_id", lambda form_ids, session=None, surface=None: {"form-1": "wtok"} + service_module, + "load_form_dispositions_by_form_id", + lambda form_ids, session=None, surface=None: { + "form-1": FormDisposition(form_token="wtok", approval_channels=[]) + }, ) session = SimpleNamespace( diff --git a/cli/src/commands/run/app/hitl-render.test.ts b/cli/src/commands/run/app/hitl-render.test.ts new file mode 100644 index 00000000000..92d91575735 --- /dev/null +++ b/cli/src/commands/run/app/hitl-render.test.ts @@ -0,0 +1,68 @@ +import type { HitlPauseData, HitlPausePayload } from './sse-collector' +import { describe, expect, it } from 'vitest' +import { buildHitlExitObject, renderHitlHint } from './hitl-render' + +function payload(overrides: Partial = {}): HitlPausePayload { + return { + event: 'human_input_required', + task_id: 'task-1', + workflow_run_id: 'run-1', + data: { + form_id: 'form-1', + node_id: 'node-1', + node_title: 'Approve', + form_content: 'Please approve', + inputs: [], + actions: [], + display_in_ui: false, + form_token: null, + approval_channels: ['email'], + resolved_default_values: {}, + expiration_time: 0, + ...overrides, + }, + } +} + +describe('renderHitlHint — non-resumable form (form_token null)', () => { + it.each<[string[], string]>([ + [['email'], 'form delivered via email — resume only from that channel'], + [['console'], 'form delivered via the console — resume only from that channel'], + [['web_app'], 'form delivered via the web app — resume only from that channel'], + [['console', 'email'], 'form delivered via the console or email — resume only from those channels'], + [[], 'form delivered via another channel — resume only from that channel'], + ])('renders %j as the channel note', (channels, expected) => { + const out = renderHitlHint('app-1', payload({ approval_channels: channels }), false) + expect(out).toBe(`hint: workflow paused — ${expected}\n`) + expect(out).not.toContain('difyctl resume') + }) + + it('falls back to a generic note when approval_channels is absent (older server)', () => { + const p = payload() + delete p.data.approval_channels + const out = renderHitlHint('app-1', p, false) + expect(out).toContain('another channel') + }) +}) + +describe('renderHitlHint — resumable form (form_token present)', () => { + it('renders the resume command and ignores approval_channels', () => { + const out = renderHitlHint('app-1', payload({ form_token: 'tok-123', approval_channels: [] }), false) + expect(out).toContain('difyctl resume app app-1 tok-123 --workflow-run-id run-1') + expect(out).not.toContain('delivered via') + }) +}) + +describe('buildHitlExitObject', () => { + it('carries approval_channels into the JSON exit object', () => { + const obj = buildHitlExitObject('app-1', payload({ approval_channels: ['email'] })) + expect(obj.approval_channels).toEqual(['email']) + expect(obj.form_token).toBeNull() + }) + + it('defaults approval_channels to [] when absent', () => { + const p = payload({ form_token: 'tok' }) + delete p.data.approval_channels + expect(buildHitlExitObject('app-1', p).approval_channels).toEqual([]) + }) +}) diff --git a/cli/src/commands/run/app/hitl-render.ts b/cli/src/commands/run/app/hitl-render.ts index 991c63e1112..6eb251f4b38 100644 --- a/cli/src/commands/run/app/hitl-render.ts +++ b/cli/src/commands/run/app/hitl-render.ts @@ -10,6 +10,7 @@ export type HitlExitObject = { node_id: string node_title: string form_token: string | null + approval_channels: string[] form_content: string inputs: unknown[] actions: unknown[] @@ -29,6 +30,7 @@ export function buildHitlExitObject(appId: string, payload: HitlPausePayload): H node_id: d.node_id, node_title: d.node_title, form_token: d.form_token, + approval_channels: d.approval_channels ?? [], form_content: d.form_content, inputs: d.inputs, actions: d.actions, @@ -92,15 +94,35 @@ export function renderHitlOutput(appId: string, payload: HitlPausePayload, isTex return `${renderHitlExit(obj)}\n` } -const EXTERNAL_CHANNEL_NOTE = 'form delivered via email/external channel — resume only from that channel' +// Server approval-channel labels → human wording for the pause hint. +const APPROVAL_CHANNEL_LABELS: Record = { + email: 'email', + console: 'the console', + web_app: 'the web app', +} + +function describeApprovalChannels(channels: string[]): string { + const labels = channels.map(c => APPROVAL_CHANNEL_LABELS[c] ?? c) + if (labels.length <= 1) + return labels[0] ?? 'another channel' + if (labels.length === 2) + return `${labels[0]} or ${labels[1]}` + return `${labels.slice(0, -1).join(', ')}, or ${labels[labels.length - 1]}` +} + +function externalChannelNote(channels: string[]): string { + const where = channels.length > 1 ? 'those channels' : 'that channel' + return `form delivered via ${describeApprovalChannels(channels)} — resume only from ${where}` +} export function renderHitlHint(appId: string, payload: HitlPausePayload, isErrTTY: boolean): string { const d = payload.data const cs = colorScheme(colorEnabled(isErrTTY)) if (d.form_token === null) { + const note = externalChannelNote(d.approval_channels ?? []) if (!isErrTTY) - return `hint: workflow paused — ${EXTERNAL_CHANNEL_NOTE}\n` - return `${cs.warningIcon()} ${cs.bold('workflow paused')} — ${cs.dim(EXTERNAL_CHANNEL_NOTE)}\n` + return `hint: workflow paused — ${note}\n` + return `${cs.warningIcon()} ${cs.bold('workflow paused')} — ${cs.dim(note)}\n` } const actions = (d.actions ?? []) as { id: string }[] let cmd = `difyctl resume app ${appId} ${d.form_token} --workflow-run-id ${payload.workflow_run_id}` diff --git a/cli/src/commands/run/app/sse-collector.ts b/cli/src/commands/run/app/sse-collector.ts index ba329746e73..043a9690d5f 100644 --- a/cli/src/commands/run/app/sse-collector.ts +++ b/cli/src/commands/run/app/sse-collector.ts @@ -13,6 +13,8 @@ export type HitlPauseData = { actions: unknown[] display_in_ui: boolean form_token: string | null + // Channels where the form can be approved when it is not CLI-resumable, e.g. ['email']. + approval_channels?: string[] resolved_default_values: Record expiration_time: number } diff --git a/cli/test/e2e/suites/run/run-app-hitl.e2e.ts b/cli/test/e2e/suites/run/run-app-hitl.e2e.ts index a582bc5282c..58290331c90 100644 --- a/cli/test/e2e/suites/run/run-app-hitl.e2e.ts +++ b/cli/test/e2e/suites/run/run-app-hitl.e2e.ts @@ -386,15 +386,7 @@ describeExternal('E2E / difyctl run app — HITL display_in_ui=false (4.5.8)', ( await fx.cleanup() }) - it('[P1] 4.5.8 HITL pause with display_in_ui=false: JSON contains display_in_ui=false and exit is 0', async () => { - // Spec 4.5.8: when the Human Input node has display_in_ui=false the CLI - // should indicate the form is delivered via an external channel. - // - // Current CLI behaviour (v1.0): the JSON field display_in_ui is correctly - // set to false. The stderr hint still includes the resume command (the - // "form delivered via external channel" hint is not yet implemented in CLI). - // This test verifies the current actual behaviour and will need updating - // once the CLI implements the display_in_ui=false hint distinction. + it('[P1] 4.5.8 HITL pause with display_in_ui=false: external-channel form is not CLI-resumable', async () => { const result = await fx.r([ 'run', 'app', @@ -407,7 +399,8 @@ describeExternal('E2E / difyctl run app — HITL display_in_ui=false (4.5.8)', ( const parsed = assertJson<{ status: string display_in_ui: boolean - form_token: string + form_token: string | null + approval_channels: string[] workflow_run_id: string }>(result) @@ -417,12 +410,13 @@ describeExternal('E2E / difyctl run app — HITL display_in_ui=false (4.5.8)', ( // status must be paused expect(parsed.status).toBe('paused') - // form_token must be present (resume is still possible even for external delivery) - expect(parsed.form_token, 'form_token must be non-empty').toBeTruthy() + // external delivery is not CLI-resumable: no token, channels name the real route + expect(parsed.form_token, 'form_token must be null for external delivery').toBeNull() + expect(parsed.approval_channels, 'approval_channels must name the delivery channel').toContain('email') - // stderr must contain a hint (current behaviour: hint includes resume command) - expect(result.stderr.trim().length, 'stderr must contain a hint').toBeGreaterThan(0) - expect(result.stderr).toMatch(/hint|resume|paused/i) + // stderr hint must describe the channel, not offer a resume command + expect(result.stderr).toMatch(/delivered via|resume only from/i) + expect(result.stderr).not.toMatch(/difyctl resume/i) }) }) diff --git a/packages/contracts/generated/api/openapi/types.gen.ts b/packages/contracts/generated/api/openapi/types.gen.ts index e1217f3f6d7..14f8ef0a818 100644 --- a/packages/contracts/generated/api/openapi/types.gen.ts +++ b/packages/contracts/generated/api/openapi/types.gen.ts @@ -332,6 +332,7 @@ export type OpenApiErrorCode | 'file_too_large' | 'filename_not_exists' | 'forbidden' + | 'form_not_found' | 'internal_server_error' | 'invalid_param' | 'member_license_exceeded' @@ -344,6 +345,7 @@ export type OpenApiErrorCode | 'provider_not_initialize' | 'provider_quota_exceeded' | 'rate_limit_error' + | 'recipient_surface_mismatch' | 'request_entity_too_large' | 'too_many_files' | 'too_many_requests' diff --git a/packages/contracts/generated/api/openapi/zod.gen.ts b/packages/contracts/generated/api/openapi/zod.gen.ts index 51a3cb8f480..57d65e62c0e 100644 --- a/packages/contracts/generated/api/openapi/zod.gen.ts +++ b/packages/contracts/generated/api/openapi/zod.gen.ts @@ -366,6 +366,7 @@ export const zOpenApiErrorCode = z.enum([ 'file_too_large', 'filename_not_exists', 'forbidden', + 'form_not_found', 'internal_server_error', 'invalid_param', 'member_license_exceeded', @@ -378,6 +379,7 @@ export const zOpenApiErrorCode = z.enum([ 'provider_not_initialize', 'provider_quota_exceeded', 'rate_limit_error', + 'recipient_surface_mismatch', 'request_entity_too_large', 'too_many_files', 'too_many_requests',