test(api): add tests for delivery and file inputs

This commit is contained in:
QuantumGhost 2026-05-09 02:13:37 +08:00
parent c4b2985361
commit e099ba8679
3 changed files with 326 additions and 31 deletions

View File

@ -1,8 +1,12 @@
import json
import uuid
from io import BytesIO
from unittest.mock import MagicMock
import pytest
from flask.testing import FlaskClient
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.workflow.human_input_adapter import (
EmailDeliveryConfig,
@ -11,14 +15,21 @@ from core.workflow.human_input_adapter import (
ExternalRecipient,
)
from graphon.enums import BuiltinNodeTypes
from graphon.nodes.human_input.entities import HumanInputNodeData
from graphon.nodes.human_input.entities import FileInputConfig, HumanInputNodeData
from graphon.nodes.human_input.enums import HumanInputFormKind
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
from models.model import App, AppMode
from models.human_input import HumanInputForm, HumanInputFormRecipient, HumanInputFormUploadFile
from models.model import App, AppMode, UploadFile
from models.workflow import Workflow, WorkflowType
from services.workflow_service import WorkflowService
def _create_app_with_draft_workflow(session, *, delivery_method_id: uuid.UUID) -> tuple[App, Account]:
def _create_app_with_draft_workflow(
session: Session,
*,
delivery_method_id: uuid.UUID,
include_file_input: bool = False,
) -> tuple[App, Account]:
tenant = Tenant(name="Test Tenant")
account = Account(name="Tester", email="tester@example.com")
session.add_all([tenant, account])
@ -65,7 +76,7 @@ def _create_app_with_draft_workflow(session, *, delivery_method_id: uuid.UUID) -
title="Human Input",
delivery_methods=[email_method],
form_content="Hello Human Input",
inputs=[],
inputs=[FileInputConfig(output_variable_name="attachment")] if include_file_input else [],
user_actions=[],
).model_dump(mode="json")
node_data["type"] = BuiltinNodeTypes.HUMAN_INPUT
@ -110,3 +121,71 @@ def test_human_input_delivery_test_sends_email(
assert send_mock.call_count == 1
assert send_mock.call_args.kwargs["to"] == "recipient@example.com"
def test_human_input_delivery_test_form_accepts_file_upload(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
monkeypatch: pytest.MonkeyPatch,
) -> None:
delivery_method_id = uuid.uuid4()
app, account = _create_app_with_draft_workflow(
db_session_with_containers,
delivery_method_id=delivery_method_id,
include_file_input=True,
)
monkeypatch.setattr("services.human_input_delivery_test_service.mail.is_inited", lambda: True)
monkeypatch.setattr("services.human_input_delivery_test_service.mail.send", MagicMock())
WorkflowService().test_human_input_delivery(
app_model=app,
account=account,
node_id="human-node",
delivery_method_id=str(delivery_method_id),
)
form = db_session_with_containers.scalar(
select(HumanInputForm)
.where(
HumanInputForm.app_id == app.id,
HumanInputForm.form_kind == HumanInputFormKind.DELIVERY_TEST,
HumanInputForm.workflow_run_id.is_(None),
)
.limit(1)
)
assert form is not None
recipient = db_session_with_containers.scalar(
select(HumanInputFormRecipient).where(HumanInputFormRecipient.form_id == form.id).limit(1)
)
assert recipient is not None
assert recipient.access_token is not None
token_response = test_client_with_containers.post(f"/api/form/human_input/{recipient.access_token}/upload-token")
assert token_response.status_code == 200
upload_token = token_response.get_json()["upload_token"]
upload_response = test_client_with_containers.post(
"/api/form/human_input/files/upload",
data={"file": (BytesIO(b"delivery test content"), "evidence.txt")},
content_type="multipart/form-data",
headers={"Authorization": f"Bearer {upload_token}"},
)
assert upload_response.status_code == 201, upload_response.get_data(as_text=True)
upload_file_id = upload_response.get_json()["id"]
db_session_with_containers.expire_all()
upload_file = db_session_with_containers.get(UploadFile, upload_file_id)
assert upload_file is not None
assert upload_file.tenant_id == app.tenant_id
assert upload_file.created_by == account.id
link = db_session_with_containers.scalar(
select(HumanInputFormUploadFile)
.where(
HumanInputFormUploadFile.form_id == form.id,
HumanInputFormUploadFile.upload_file_id == upload_file_id,
)
.limit(1)
)
assert link is not None

View File

@ -11,6 +11,7 @@ from unittest.mock import Mock
import pytest
from werkzeug.exceptions import NotFound
from controllers.common.human_input import HumanInputFormSubmitPayload
from controllers.service_api.app.human_input_form import WorkflowHumanInputFormApi
from models.human_input import RecipientType
from tests.unit_tests.controllers.service_api.conftest import _unwrap
@ -145,6 +146,71 @@ class TestWorkflowHumanInputFormApi:
submission_end_user_id="end-user-1",
)
def test_post_accepts_select_file_and_file_list_inputs(self, app, monkeypatch: pytest.MonkeyPatch) -> None:
form = SimpleNamespace(
app_id="app-1",
tenant_id="tenant-1",
recipient_type=RecipientType.STANDALONE_WEB_APP,
)
service_mock = Mock()
service_mock.get_form_by_token.return_value = form
workflow_module = sys.modules["controllers.service_api.app.human_input_form"]
monkeypatch.setattr(workflow_module, "HumanInputService", lambda _engine: service_mock)
monkeypatch.setattr(workflow_module, "db", SimpleNamespace(engine=object()))
api = WorkflowHumanInputFormApi()
handler = _unwrap(api.post)
app_model = SimpleNamespace(id="app-1", tenant_id="tenant-1")
end_user = SimpleNamespace(id="end-user-1")
inputs = {
"decision": "approve",
"attachment": {
"transfer_method": "local_file",
"upload_file_id": "4e0d1b87-52f2-49f6-b8c6-95cd9c954b3e",
"type": "document",
},
"attachments": [
{
"transfer_method": "local_file",
"upload_file_id": "1a77f0df-c0e6-461c-987c-e72526f341ee",
"type": "document",
},
{
"transfer_method": "remote_url",
"url": "https://example.com/report.pdf",
"type": "document",
},
],
}
with app.test_request_context(
"/form/human_input/token-1",
method="POST",
json={"inputs": inputs, "action": "approve", "user": "external-1"},
):
response, status = handler(api, app_model=app_model, end_user=end_user, form_token="token-1")
assert response == {}
assert status == 200
service_mock.submit_form_by_token.assert_called_once_with(
recipient_type=RecipientType.STANDALONE_WEB_APP,
form_token="token-1",
selected_action_id="approve",
form_data=inputs,
submission_end_user_id="end-user-1",
)
def test_submit_payload_schema_documents_select_file_and_file_list_inputs(self) -> None:
schema = HumanInputFormSubmitPayload.model_json_schema()
inputs_schema = schema["properties"]["inputs"]
assert "select input" in inputs_schema["description"]
examples = inputs_schema["examples"]
assert examples[0]["decision"] == "approve"
assert examples[0]["attachment"]["transfer_method"] == "local_file"
assert examples[0]["attachment"]["upload_file_id"] == "4e0d1b87-52f2-49f6-b8c6-95cd9c954b3e"
assert examples[0]["attachments"][1]["transfer_method"] == "remote_url"
@pytest.mark.parametrize(
"recipient_type",
[

View File

@ -1,38 +1,46 @@
from __future__ import annotations
from datetime import timedelta
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
import models.account as account_module
import services.human_input_file_upload_service as service_module
from graphon.nodes.human_input.enums import HumanInputFormStatus
from graphon.enums import WorkflowExecutionStatus
from graphon.nodes.human_input.enums import HumanInputFormKind, HumanInputFormStatus
from libs.datetime_utils import naive_utc_now
from models.account import Account, Tenant, TenantAccountJoin
from models.base import Base
from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom
from models.human_input import (
HumanInputForm,
HumanInputFormRecipient,
HumanInputFormUploadFile,
HumanInputFormUploadToken,
)
from models.model import EndUser
from services.human_input_file_upload_service import (
HITL_UPLOAD_TOKEN_PREFIX,
HUMAN_INPUT_END_USER_SESSION_PREFIX,
HUMAN_INPUT_END_USER_TYPE,
HumanInputFileUploadService,
)
from models.model import App, AppMode, EndUser
from models.workflow import WorkflowRun, WorkflowType
from services.human_input_file_upload_service import HITL_UPLOAD_TOKEN_PREFIX, HumanInputFileUploadService
from services.human_input_service import FormSubmittedError
@pytest.fixture
def session_maker():
def session_maker(monkeypatch: pytest.MonkeyPatch):
engine = create_engine("sqlite:///:memory:")
monkeypatch.setattr(account_module, "db", SimpleNamespace(engine=engine))
Base.metadata.create_all(
engine,
tables=[
Tenant.__table__,
Account.__table__,
TenantAccountJoin.__table__,
App.__table__,
EndUser.__table__,
WorkflowRun.__table__,
HumanInputForm.__table__,
HumanInputFormRecipient.__table__,
HumanInputFormUploadToken.__table__,
@ -49,23 +57,113 @@ def session_maker():
HumanInputFormUploadToken.__table__,
HumanInputFormRecipient.__table__,
HumanInputForm.__table__,
WorkflowRun.__table__,
EndUser.__table__,
App.__table__,
TenantAccountJoin.__table__,
Account.__table__,
Tenant.__table__,
],
)
engine.dispose()
def _create_waiting_form(session_maker) -> tuple[str, str]:
def _create_waiting_form(
session_maker,
*,
created_by_role: CreatorUserRole = CreatorUserRole.ACCOUNT,
form_kind: HumanInputFormKind = HumanInputFormKind.RUNTIME,
) -> tuple[str, str, str]:
form_id = "00000000-0000-0000-0000-000000000001"
recipient_id = "00000000-0000-0000-0000-000000000002"
workflow_run_id = None
if form_kind == HumanInputFormKind.RUNTIME:
workflow_run_id = "00000000-0000-0000-0000-000000000012"
tenant_id = "00000000-0000-0000-0000-000000000010"
app_id = "00000000-0000-0000-0000-000000000011"
now = naive_utc_now()
created_by = (
"00000000-0000-0000-0000-000000000020"
if created_by_role == CreatorUserRole.ACCOUNT
else "00000000-0000-0000-0000-000000000021"
)
with session_maker.begin() as session:
tenant = Tenant(name="tenant-1")
tenant.id = tenant_id
session.add(tenant)
if created_by_role == CreatorUserRole.ACCOUNT:
account = Account(name="owner", email="owner@example.com")
account.id = created_by
session.add(account)
session.add(
TenantAccountJoin(
tenant_id=tenant_id,
account_id=created_by,
current=True,
)
)
app_creator = created_by
else:
end_user = EndUser(
tenant_id=tenant_id,
app_id=app_id,
type="web_app",
is_anonymous=False,
session_id="session-1",
external_user_id="external-1",
)
end_user.id = created_by
session.add(end_user)
app_creator = "00000000-0000-0000-0000-000000000020"
account = Account(name="owner", email="owner@example.com")
account.id = app_creator
session.add(account)
session.add(
TenantAccountJoin(
tenant_id=tenant_id,
account_id=app_creator,
current=True,
)
)
app = App(
tenant_id=tenant_id,
name="app-1",
description="",
mode=AppMode.WORKFLOW,
icon_type="emoji",
icon="app",
icon_background="#ffffff",
enable_site=True,
enable_api=True,
created_by=app_creator,
updated_by=app_creator,
)
app.id = app_id
session.add(app)
if workflow_run_id is not None:
workflow_run = WorkflowRun(
tenant_id=tenant_id,
app_id=app_id,
workflow_id="00000000-0000-0000-0000-000000000013",
type=WorkflowType.WORKFLOW,
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
version="1",
graph="{}",
inputs="{}",
status=WorkflowExecutionStatus.RUNNING,
created_by_role=created_by_role,
created_by=created_by,
created_at=now,
)
workflow_run.id = workflow_run_id
session.add(workflow_run)
session.add(
HumanInputForm(
id=form_id,
tenant_id="00000000-0000-0000-0000-000000000010",
app_id="00000000-0000-0000-0000-000000000011",
workflow_run_id="00000000-0000-0000-0000-000000000012",
tenant_id=tenant_id,
app_id=app_id,
workflow_run_id=workflow_run_id,
form_kind=form_kind,
node_id="node-1",
form_definition="{}",
rendered_content="content",
@ -83,40 +181,53 @@ def _create_waiting_form(session_maker) -> tuple[str, str]:
access_token="form-token-1",
)
)
return form_id, recipient_id
return form_id, recipient_id, created_by
def test_issue_upload_token_creates_technical_end_user_and_token(
def test_issue_upload_token_persists_token_without_technical_end_user(
monkeypatch: pytest.MonkeyPatch,
session_maker,
) -> None:
form_id, recipient_id = _create_waiting_form(session_maker)
form_id, recipient_id, _created_by = _create_waiting_form(session_maker)
monkeypatch.setattr(service_module.secrets, "token_urlsafe", lambda _bytes: "random-value")
token = HumanInputFileUploadService(session_maker).issue_upload_token("form-token-1")
assert token.upload_token == f"{HITL_UPLOAD_TOKEN_PREFIX}random-value"
with session_maker() as session:
end_user = session.scalar(select(EndUser).where(EndUser.type == HUMAN_INPUT_END_USER_TYPE))
assert end_user is not None
assert end_user.session_id == f"{HUMAN_INPUT_END_USER_SESSION_PREFIX}{recipient_id}"
token_model = session.scalar(select(HumanInputFormUploadToken))
assert token_model is not None
assert token_model.form_id == form_id
assert token_model.recipient_id == recipient_id
assert token_model.end_user_id == end_user.id
assert token_model.token == token.upload_token
assert session.scalar(select(EndUser).where(EndUser.type == "human-input")) is None
def test_validate_upload_token_and_record_file(session_maker) -> None:
form_id, recipient_id = _create_waiting_form(session_maker)
def test_validate_upload_token_returns_account_owner_and_record_file_link(session_maker) -> None:
form_id, recipient_id, created_by = _create_waiting_form(session_maker, created_by_role=CreatorUserRole.ACCOUNT)
token = HumanInputFileUploadService(session_maker).issue_upload_token("form-token-1")
workflow_run_repository = MagicMock()
workflow_run_repository.get_workflow_run_by_id.return_value = SimpleNamespace(
tenant_id="00000000-0000-0000-0000-000000000010",
app_id="00000000-0000-0000-0000-000000000011",
created_by_role=CreatorUserRole.ACCOUNT,
created_by=created_by,
)
context = HumanInputFileUploadService(session_maker).validate_upload_token(token.upload_token)
context = HumanInputFileUploadService(
session_maker,
workflow_run_repository=workflow_run_repository,
).validate_upload_token(token.upload_token)
assert context.form_id == form_id
assert context.recipient_id == recipient_id
assert context.end_user.type == HUMAN_INPUT_END_USER_TYPE
assert isinstance(context.owner, Account)
assert context.owner.id == created_by
assert context.owner.current_tenant_id == "00000000-0000-0000-0000-000000000010"
workflow_run_repository.get_workflow_run_by_id.assert_called_once_with(
tenant_id="00000000-0000-0000-0000-000000000010",
app_id="00000000-0000-0000-0000-000000000011",
run_id="00000000-0000-0000-0000-000000000012",
)
HumanInputFileUploadService(session_maker).record_upload_file(
context=context,
@ -126,13 +237,52 @@ def test_validate_upload_token_and_record_file(session_maker) -> None:
with session_maker() as session:
link = session.scalar(select(HumanInputFormUploadFile))
assert link is not None
assert link.tenant_id == context.tenant_id
assert link.app_id == context.app_id
assert link.form_id == form_id
assert link.upload_token_id == context.upload_token_id
assert link.end_user_id == context.end_user.id
def test_validate_upload_token_returns_end_user_owner(session_maker) -> None:
form_id, recipient_id, created_by = _create_waiting_form(session_maker, created_by_role=CreatorUserRole.END_USER)
token = HumanInputFileUploadService(session_maker).issue_upload_token("form-token-1")
workflow_run_repository = MagicMock()
workflow_run_repository.get_workflow_run_by_id.return_value = SimpleNamespace(
tenant_id="00000000-0000-0000-0000-000000000010",
app_id="00000000-0000-0000-0000-000000000011",
created_by_role=CreatorUserRole.END_USER,
created_by=created_by,
)
context = HumanInputFileUploadService(
session_maker,
workflow_run_repository=workflow_run_repository,
).validate_upload_token(token.upload_token)
assert context.form_id == form_id
assert context.recipient_id == recipient_id
assert isinstance(context.owner, EndUser)
assert context.owner.id == created_by
def test_validate_upload_token_allows_delivery_test_form(session_maker) -> None:
form_id, recipient_id, _created_by = _create_waiting_form(
session_maker,
form_kind=HumanInputFormKind.DELIVERY_TEST,
)
token = HumanInputFileUploadService(session_maker).issue_upload_token("form-token-1")
context = HumanInputFileUploadService(session_maker).validate_upload_token(token.upload_token)
assert context.form_id == form_id
assert context.recipient_id == recipient_id
assert isinstance(context.owner, Account)
assert context.owner.id == "00000000-0000-0000-0000-000000000020"
assert context.owner.current_tenant_id == "00000000-0000-0000-0000-000000000010"
def test_validate_upload_token_rejects_submitted_form(session_maker) -> None:
form_id, _recipient_id = _create_waiting_form(session_maker)
form_id, _recipient_id, _created_by = _create_waiting_form(session_maker)
token = HumanInputFileUploadService(session_maker).issue_upload_token("form-token-1")
with session_maker.begin() as session:
form = session.get(HumanInputForm, form_id)