test(api): add file and file list form type in resumption test

This commit is contained in:
QuantumGhost 2026-05-07 10:58:51 +08:00
parent 02e42fd66f
commit b73a4d2700
2 changed files with 421 additions and 13 deletions

View File

@ -114,6 +114,9 @@ class StaticRepo(HumanInputFormRepository):
def get_form(self, node_id: str) -> HumanInputFormEntity | None:
return self._forms_by_node_id.get(node_id)
def set_forms(self, forms_by_node_id: Mapping[str, HumanInputFormEntity]) -> None:
self._forms_by_node_id = dict(forms_by_node_id)
def create_form(self, params: FormCreateParams) -> HumanInputFormEntity:
raise AssertionError("create_form should not be called in resume scenario")
@ -190,8 +193,12 @@ def _build_graph(runtime_state: GraphRuntimeState, repo: HumanInputFormRepositor
end_data = EndNodeData(
title="End",
outputs=[
OutputVariableEntity(variable="res_a", value_selector=["human_a", "__action_id"]),
OutputVariableEntity(variable="res_b", value_selector=["human_b", "__action_id"]),
OutputVariableEntity(variable="res_a_action", value_selector=["human_a", "__action_id"]),
OutputVariableEntity(variable="res_a_decision", value_selector=["human_a", "decision"]),
OutputVariableEntity(variable="res_a_attachment", value_selector=["human_a", "attachment"]),
OutputVariableEntity(variable="res_b_action", value_selector=["human_b", "__action_id"]),
OutputVariableEntity(variable="res_b_decision", value_selector=["human_b", "decision"]),
OutputVariableEntity(variable="res_b_attachments", value_selector=["human_b", "attachments"]),
],
desc=None,
)
@ -229,13 +236,13 @@ def _run_graph(graph: Graph, runtime_state: GraphRuntimeState) -> list[object]:
return list(engine.run())
def _form(submitted: bool, action_id: str | None) -> StaticForm:
def _form(submitted: bool, action_id: str | None, data: Mapping[str, Any] | None = None) -> StaticForm:
return StaticForm(
form_id="form",
rendered="rendered",
is_submitted=submitted,
action_id=action_id,
data={},
data=data,
status_value=HumanInputFormStatus.SUBMITTED if submitted else HumanInputFormStatus.WAITING,
)
@ -259,7 +266,21 @@ def test_parallel_human_input_join_completes_after_second_resume() -> None:
first_resume_state = pause_store.load()
first_resume_repo = StaticRepo(
{
"human_a": _form(submitted=True, action_id="approve"),
"human_a": _form(
submitted=True,
action_id="approve",
data={
"decision": "approve",
"attachment": {
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/resume.pdf",
"filename": "resume.pdf",
"extension": ".pdf",
"mime_type": "application/pdf",
},
},
),
"human_b": _form(submitted=False, action_id=None),
}
)
@ -269,19 +290,68 @@ def test_parallel_human_input_join_completes_after_second_resume() -> None:
assert isinstance(first_resume_events[0], GraphRunStartedEvent)
assert first_resume_events[0].reason is WorkflowStartReason.RESUMPTION
assert isinstance(first_resume_events[-1], GraphRunPausedEvent)
pause_store.save(first_resume_state)
second_resume_state = pause_store.load()
second_resume_repo = StaticRepo(
second_resume_state = first_resume_state
first_resume_repo.set_forms(
{
"human_a": _form(submitted=True, action_id="approve"),
"human_b": _form(submitted=True, action_id="approve"),
"human_a": _form(
submitted=True,
action_id="approve",
data={
"decision": "approve",
"attachment": {
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/resume.pdf",
"filename": "resume.pdf",
"extension": ".pdf",
"mime_type": "application/pdf",
},
},
),
"human_b": _form(
submitted=True,
action_id="approve",
data={
"decision": "reject",
"attachments": [
{
"type": "image",
"transfer_method": "remote_url",
"remote_url": "https://example.com/a.png",
"filename": "a.png",
"extension": ".png",
"mime_type": "image/png",
},
{
"type": "image",
"transfer_method": "remote_url",
"remote_url": "https://example.com/b.png",
"filename": "b.png",
"extension": ".png",
"mime_type": "image/png",
},
],
},
),
}
)
second_resume_graph = _build_graph(second_resume_state, second_resume_repo)
second_resume_events = _run_graph(second_resume_graph, second_resume_state)
second_resume_events = _run_graph(first_resume_graph, second_resume_state)
assert isinstance(second_resume_events[0], GraphRunStartedEvent)
assert second_resume_events[0].reason is WorkflowStartReason.RESUMPTION
assert isinstance(second_resume_events[-1], GraphRunSucceededEvent)
assert any(isinstance(event, NodeRunSucceededEvent) and event.node_id == "end" for event in second_resume_events)
second_resume_outputs = second_resume_state.outputs
assert second_resume_outputs["res_a_action"] == "approve"
assert second_resume_outputs["res_a_decision"] == "approve"
assert isinstance(second_resume_outputs["res_a_attachment"], File)
res_a_attachment_in_second_outputs = second_resume_outputs["res_a_attachment"]
assert isinstance(res_a_attachment_in_second_outputs, File)
assert res_a_attachment_in_second_outputs.filename == "resume.pdf"
assert res_a_attachment_in_second_outputs.type == FileType.DOCUMENT
assert res_a_attachment_in_second_outputs.transfer_method == FileTransferMethod.REMOTE_URL
assert second_resume_outputs["res_b_action"] == "approve"
assert second_resume_outputs["res_b_decision"] == "reject"
assert isinstance(second_resume_outputs["res_b_attachments"], list)
assert [file.filename for file in second_resume_outputs["res_b_attachments"]] == ["a.png", "b.png"]
assert all(file.type == FileType.IMAGE for file in second_resume_outputs["res_b_attachments"])

View File

@ -0,0 +1,338 @@
import json
from typing import Any
from pydantic import TypeAdapter
from core.app.entities.task_entities import HumanInputRequiredResponse
from core.entities.execution_extra_content import (
HumanInputContent,
HumanInputFormDefinition,
)
from graphon.entities.pause_reason import HumanInputRequired
from graphon.nodes.human_input.entities import (
FormDefinition,
FormInputConfig,
HumanInputNodeData,
)
from graphon.nodes.human_input.enums import ButtonStyle, TimeoutUnit, ValueSourceType
def _legacy_form_input_payloads() -> list[dict[str, Any]]:
return [
{
"type": "paragraph",
"output_variable_name": "name",
"default": {
"type": "constant",
"selector": [],
"value": "Alice",
},
},
{
"type": "select",
"output_variable_name": "decision",
"option_source": {
"type": "constant",
"selector": [],
"value": ["approve", "reject"],
},
},
{
"type": "file",
"output_variable_name": "attachment",
"allowed_file_types": ["document"],
"allowed_file_extensions": [],
"allowed_file_upload_methods": ["remote_url"],
},
{
"type": "file-list",
"output_variable_name": "attachments",
"allowed_file_types": ["document"],
"allowed_file_extensions": [],
"allowed_file_upload_methods": ["remote_url"],
"number_limits": 3,
},
{
"type": "paragraph",
"output_variable_name": "summary",
"default": None,
},
]
def _legacy_user_action_payloads() -> list[dict[str, Any]]:
return [
{
"id": "approve",
"title": "Approve",
"button_style": "primary",
},
{
"id": "reject",
"title": "Reject",
"button_style": "default",
},
]
def _validate_legacy_json(model_class: type, payload: dict[str, Any]) -> Any:
adapter = TypeAdapter(model_class)
return adapter.validate_json(json.dumps(payload))
def test_form_input_accepts_current_serialized_payload() -> None:
payload = {
"type": "paragraph",
"output_variable_name": "name",
"default": {
"type": "constant",
"selector": [],
"value": "Alice",
},
}
restored = _validate_legacy_json(FormInputConfig, payload)
assert restored.default is not None
assert restored.default.type == ValueSourceType.CONSTANT
def test_human_input_node_data_accepts_current_serialized_payload() -> None:
payload = {
"type": "human-input",
"title": "Human Input",
"form_content": "Hello {{#$output.name#}}",
"inputs": _legacy_form_input_payloads(),
"user_actions": _legacy_user_action_payloads(),
"timeout": 2,
"timeout_unit": "day",
}
restored = _validate_legacy_json(HumanInputNodeData, payload)
assert restored.inputs[0].output_variable_name == "name"
assert restored.timeout_unit == TimeoutUnit.DAY
def test_form_definition_accepts_current_serialized_payload() -> None:
payload = {
"form_content": "Please confirm",
"inputs": _legacy_form_input_payloads(),
"user_actions": _legacy_user_action_payloads(),
"rendered_content": "Please confirm",
"expiration_time": "2024-01-01T00:00:00Z",
"default_values": {"name": "Alice"},
"node_title": "Human Input",
"display_in_ui": True,
}
restored = _validate_legacy_json(FormDefinition, payload)
assert restored.inputs[2].output_variable_name == "attachment"
assert restored.user_actions[0].id == "approve"
assert restored.user_actions[0].button_style == ButtonStyle.PRIMARY
def test_human_input_required_pause_reason_accepts_current_serialized_payload() -> None:
payload = {
"TYPE": "human_input_required",
"form_id": "form-1",
"form_content": "Please confirm",
"inputs": _legacy_form_input_payloads(),
"actions": _legacy_user_action_payloads(),
"node_id": "node-1",
"node_title": "Human Input",
"resolved_default_values": {"name": "Alice"},
}
restored = _validate_legacy_json(HumanInputRequired, payload)
assert restored.inputs[1].output_variable_name == "decision"
assert restored.actions[0].id == "approve"
assert restored.TYPE == "human_input_required"
def test_human_input_form_definition_accepts_current_serialized_payload() -> None:
payload = {
"form_id": "form-1",
"node_id": "node-1",
"node_title": "Human Input",
"form_content": "Please confirm",
"inputs": _legacy_form_input_payloads(),
"actions": _legacy_user_action_payloads(),
"display_in_ui": True,
"form_token": "token-1",
"resolved_default_values": {"name": "Alice"},
"expiration_time": 1700000000,
}
restored = _validate_legacy_json(HumanInputFormDefinition, payload)
assert restored.inputs[3].output_variable_name == "attachments"
assert restored.actions[0].id == "approve"
def test_human_input_content_accepts_current_serialized_payload() -> None:
payload = {
"workflow_run_id": "run-1",
"submitted": True,
"form_definition": {
"form_id": "form-1",
"node_id": "node-1",
"node_title": "Human Input",
"form_content": "Please confirm",
"inputs": _legacy_form_input_payloads(),
"actions": _legacy_user_action_payloads(),
"display_in_ui": True,
"form_token": "token-1",
"resolved_default_values": {"name": "Alice"},
"expiration_time": 1700000000,
},
"form_submission_data": {
"node_id": "node-1",
"node_title": "Human Input",
"rendered_content": "Please confirm",
"action_id": "approve",
"action_text": "Approve",
},
"type": "human_input",
}
restored = _validate_legacy_json(HumanInputContent, payload)
assert restored.form_definition is not None
assert restored.form_definition.inputs[0].output_variable_name == "name"
def test_human_input_content_accepts_current_serialized_payload_with_form_data() -> None:
payload = {
"workflow_run_id": "run-1",
"submitted": True,
"form_definition": {
"form_id": "form-1",
"node_id": "node-1",
"node_title": "Human Input",
"form_content": "Please confirm",
"inputs": [
{
"type": "select",
"output_variable_name": "decision",
"option_source": {"type": "constant", "selector": [], "value": ["approve", "reject"]},
},
{
"type": "file",
"output_variable_name": "attachment",
"allowed_file_types": ["document"],
"allowed_file_extensions": [],
"allowed_file_upload_methods": ["remote_url"],
},
{
"type": "file-list",
"output_variable_name": "attachments",
"allowed_file_types": ["document"],
"allowed_file_extensions": [],
"allowed_file_upload_methods": ["remote_url"],
"number_limits": 3,
},
],
"actions": _legacy_user_action_payloads(),
"display_in_ui": True,
"form_token": "token-1",
"resolved_default_values": {"decision": "approve"},
"expiration_time": 1700000000,
},
"form_submission_data": {
"node_id": "node-1",
"node_title": "Human Input",
"rendered_content": "Please confirm",
"action_id": "approve",
"action_text": "Approve",
"submitted_data": {
"decision": "approve",
"attachment": {
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/file.txt",
"filename": "file.txt",
"extension": ".txt",
"mime_type": "text/plain",
},
"attachments": [
{
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/first.txt",
"filename": "first.txt",
"extension": ".txt",
"mime_type": "text/plain",
}
],
},
},
"type": "human_input",
}
restored = HumanInputContent.model_validate_json(json.dumps(payload))
assert restored.form_submission_data is not None
assert restored.form_submission_data.submitted_data == payload["form_submission_data"]["submitted_data"]
def test_human_input_content_accepts_legacy_serialized_payload_with_form_data() -> None:
payload = {
"workflow_run_id": "run-1",
"submitted": True,
"form_definition": {
"form_id": "form-1",
"node_id": "node-1",
"node_title": "Human Input",
"form_content": "Please confirm",
"inputs": _legacy_form_input_payloads(),
"actions": _legacy_user_action_payloads(),
"display_in_ui": True,
"form_token": "token-1",
"resolved_default_values": {"decision": "approve"},
"expiration_time": 1700000000,
},
"form_submission_data": {
"node_id": "node-1",
"node_title": "Human Input",
"rendered_content": "Please confirm",
"action_id": "approve",
"action_text": "Approve",
"form_data": {
"decision": "approve",
"attachment": {
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/file.txt",
"filename": "file.txt",
"extension": ".txt",
"mime_type": "text/plain",
},
},
},
"type": "human_input",
}
restored = HumanInputContent.model_validate_json(json.dumps(payload))
assert restored.form_submission_data is not None
assert restored.form_submission_data.submitted_data is None
def test_human_input_required_response_accepts_current_serialized_payload() -> None:
payload = {
"event": "human_input_required",
"task_id": "task-1",
"workflow_run_id": "run-1",
"data": {
"form_id": "form-1",
"node_id": "node-1",
"node_title": "Human Input",
"form_content": "Please confirm",
"inputs": _legacy_form_input_payloads(),
"actions": _legacy_user_action_payloads(),
"display_in_ui": True,
"form_token": "token-1",
"resolved_default_values": {"name": "Alice"},
"expiration_time": 1700000000,
},
}
restored = _validate_legacy_json(HumanInputRequiredResponse, payload)
assert restored.data.inputs[1].output_variable_name == "decision"
assert restored.data.actions[0].id == "approve"
assert restored.event == "human_input_required"