diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py b/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py index 4ffd377967..ad8c0b2a04 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py @@ -114,6 +114,9 @@ class StaticRepo(HumanInputFormRepository): def get_form(self, node_id: str) -> HumanInputFormEntity | None: return self._forms_by_node_id.get(node_id) + def set_forms(self, forms_by_node_id: Mapping[str, HumanInputFormEntity]) -> None: + self._forms_by_node_id = dict(forms_by_node_id) + def create_form(self, params: FormCreateParams) -> HumanInputFormEntity: raise AssertionError("create_form should not be called in resume scenario") @@ -190,8 +193,12 @@ def _build_graph(runtime_state: GraphRuntimeState, repo: HumanInputFormRepositor end_data = EndNodeData( title="End", outputs=[ - OutputVariableEntity(variable="res_a", value_selector=["human_a", "__action_id"]), - OutputVariableEntity(variable="res_b", value_selector=["human_b", "__action_id"]), + OutputVariableEntity(variable="res_a_action", value_selector=["human_a", "__action_id"]), + OutputVariableEntity(variable="res_a_decision", value_selector=["human_a", "decision"]), + OutputVariableEntity(variable="res_a_attachment", value_selector=["human_a", "attachment"]), + OutputVariableEntity(variable="res_b_action", value_selector=["human_b", "__action_id"]), + OutputVariableEntity(variable="res_b_decision", value_selector=["human_b", "decision"]), + OutputVariableEntity(variable="res_b_attachments", value_selector=["human_b", "attachments"]), ], desc=None, ) @@ -229,13 +236,13 @@ def _run_graph(graph: Graph, runtime_state: GraphRuntimeState) -> list[object]: return list(engine.run()) -def _form(submitted: bool, action_id: str | None) -> StaticForm: +def _form(submitted: bool, action_id: str | None, data: Mapping[str, Any] | None = None) -> StaticForm: return StaticForm( form_id="form", rendered="rendered", is_submitted=submitted, action_id=action_id, - data={}, + data=data, status_value=HumanInputFormStatus.SUBMITTED if submitted else HumanInputFormStatus.WAITING, ) @@ -259,7 +266,21 @@ def test_parallel_human_input_join_completes_after_second_resume() -> None: first_resume_state = pause_store.load() first_resume_repo = StaticRepo( { - "human_a": _form(submitted=True, action_id="approve"), + "human_a": _form( + submitted=True, + action_id="approve", + data={ + "decision": "approve", + "attachment": { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/resume.pdf", + "filename": "resume.pdf", + "extension": ".pdf", + "mime_type": "application/pdf", + }, + }, + ), "human_b": _form(submitted=False, action_id=None), } ) @@ -269,19 +290,68 @@ def test_parallel_human_input_join_completes_after_second_resume() -> None: assert isinstance(first_resume_events[0], GraphRunStartedEvent) assert first_resume_events[0].reason is WorkflowStartReason.RESUMPTION assert isinstance(first_resume_events[-1], GraphRunPausedEvent) - pause_store.save(first_resume_state) - - second_resume_state = pause_store.load() - second_resume_repo = StaticRepo( + second_resume_state = first_resume_state + first_resume_repo.set_forms( { - "human_a": _form(submitted=True, action_id="approve"), - "human_b": _form(submitted=True, action_id="approve"), + "human_a": _form( + submitted=True, + action_id="approve", + data={ + "decision": "approve", + "attachment": { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/resume.pdf", + "filename": "resume.pdf", + "extension": ".pdf", + "mime_type": "application/pdf", + }, + }, + ), + "human_b": _form( + submitted=True, + action_id="approve", + data={ + "decision": "reject", + "attachments": [ + { + "type": "image", + "transfer_method": "remote_url", + "remote_url": "https://example.com/a.png", + "filename": "a.png", + "extension": ".png", + "mime_type": "image/png", + }, + { + "type": "image", + "transfer_method": "remote_url", + "remote_url": "https://example.com/b.png", + "filename": "b.png", + "extension": ".png", + "mime_type": "image/png", + }, + ], + }, + ), } ) - second_resume_graph = _build_graph(second_resume_state, second_resume_repo) - second_resume_events = _run_graph(second_resume_graph, second_resume_state) + second_resume_events = _run_graph(first_resume_graph, second_resume_state) assert isinstance(second_resume_events[0], GraphRunStartedEvent) assert second_resume_events[0].reason is WorkflowStartReason.RESUMPTION assert isinstance(second_resume_events[-1], GraphRunSucceededEvent) assert any(isinstance(event, NodeRunSucceededEvent) and event.node_id == "end" for event in second_resume_events) + second_resume_outputs = second_resume_state.outputs + assert second_resume_outputs["res_a_action"] == "approve" + assert second_resume_outputs["res_a_decision"] == "approve" + assert isinstance(second_resume_outputs["res_a_attachment"], File) + res_a_attachment_in_second_outputs = second_resume_outputs["res_a_attachment"] + assert isinstance(res_a_attachment_in_second_outputs, File) + assert res_a_attachment_in_second_outputs.filename == "resume.pdf" + assert res_a_attachment_in_second_outputs.type == FileType.DOCUMENT + assert res_a_attachment_in_second_outputs.transfer_method == FileTransferMethod.REMOTE_URL + assert second_resume_outputs["res_b_action"] == "approve" + assert second_resume_outputs["res_b_decision"] == "reject" + assert isinstance(second_resume_outputs["res_b_attachments"], list) + assert [file.filename for file in second_resume_outputs["res_b_attachments"]] == ["a.png", "b.png"] + assert all(file.type == FileType.IMAGE for file in second_resume_outputs["res_b_attachments"]) diff --git a/api/tests/unit_tests/core/workflow/test_form_input_serialization_compat.py b/api/tests/unit_tests/core/workflow/test_form_input_serialization_compat.py new file mode 100644 index 0000000000..cc83a17dfc --- /dev/null +++ b/api/tests/unit_tests/core/workflow/test_form_input_serialization_compat.py @@ -0,0 +1,338 @@ +import json +from typing import Any + +from pydantic import TypeAdapter + +from core.app.entities.task_entities import HumanInputRequiredResponse +from core.entities.execution_extra_content import ( + HumanInputContent, + HumanInputFormDefinition, +) +from graphon.entities.pause_reason import HumanInputRequired +from graphon.nodes.human_input.entities import ( + FormDefinition, + FormInputConfig, + HumanInputNodeData, +) +from graphon.nodes.human_input.enums import ButtonStyle, TimeoutUnit, ValueSourceType + + +def _legacy_form_input_payloads() -> list[dict[str, Any]]: + return [ + { + "type": "paragraph", + "output_variable_name": "name", + "default": { + "type": "constant", + "selector": [], + "value": "Alice", + }, + }, + { + "type": "select", + "output_variable_name": "decision", + "option_source": { + "type": "constant", + "selector": [], + "value": ["approve", "reject"], + }, + }, + { + "type": "file", + "output_variable_name": "attachment", + "allowed_file_types": ["document"], + "allowed_file_extensions": [], + "allowed_file_upload_methods": ["remote_url"], + }, + { + "type": "file-list", + "output_variable_name": "attachments", + "allowed_file_types": ["document"], + "allowed_file_extensions": [], + "allowed_file_upload_methods": ["remote_url"], + "number_limits": 3, + }, + { + "type": "paragraph", + "output_variable_name": "summary", + "default": None, + }, + ] + + +def _legacy_user_action_payloads() -> list[dict[str, Any]]: + return [ + { + "id": "approve", + "title": "Approve", + "button_style": "primary", + }, + { + "id": "reject", + "title": "Reject", + "button_style": "default", + }, + ] + + +def _validate_legacy_json(model_class: type, payload: dict[str, Any]) -> Any: + adapter = TypeAdapter(model_class) + return adapter.validate_json(json.dumps(payload)) + + +def test_form_input_accepts_current_serialized_payload() -> None: + payload = { + "type": "paragraph", + "output_variable_name": "name", + "default": { + "type": "constant", + "selector": [], + "value": "Alice", + }, + } + + restored = _validate_legacy_json(FormInputConfig, payload) + assert restored.default is not None + assert restored.default.type == ValueSourceType.CONSTANT + + +def test_human_input_node_data_accepts_current_serialized_payload() -> None: + payload = { + "type": "human-input", + "title": "Human Input", + "form_content": "Hello {{#$output.name#}}", + "inputs": _legacy_form_input_payloads(), + "user_actions": _legacy_user_action_payloads(), + "timeout": 2, + "timeout_unit": "day", + } + + restored = _validate_legacy_json(HumanInputNodeData, payload) + assert restored.inputs[0].output_variable_name == "name" + assert restored.timeout_unit == TimeoutUnit.DAY + + +def test_form_definition_accepts_current_serialized_payload() -> None: + payload = { + "form_content": "Please confirm", + "inputs": _legacy_form_input_payloads(), + "user_actions": _legacy_user_action_payloads(), + "rendered_content": "Please confirm", + "expiration_time": "2024-01-01T00:00:00Z", + "default_values": {"name": "Alice"}, + "node_title": "Human Input", + "display_in_ui": True, + } + + restored = _validate_legacy_json(FormDefinition, payload) + assert restored.inputs[2].output_variable_name == "attachment" + assert restored.user_actions[0].id == "approve" + assert restored.user_actions[0].button_style == ButtonStyle.PRIMARY + + +def test_human_input_required_pause_reason_accepts_current_serialized_payload() -> None: + payload = { + "TYPE": "human_input_required", + "form_id": "form-1", + "form_content": "Please confirm", + "inputs": _legacy_form_input_payloads(), + "actions": _legacy_user_action_payloads(), + "node_id": "node-1", + "node_title": "Human Input", + "resolved_default_values": {"name": "Alice"}, + } + + restored = _validate_legacy_json(HumanInputRequired, payload) + assert restored.inputs[1].output_variable_name == "decision" + assert restored.actions[0].id == "approve" + assert restored.TYPE == "human_input_required" + + +def test_human_input_form_definition_accepts_current_serialized_payload() -> None: + payload = { + "form_id": "form-1", + "node_id": "node-1", + "node_title": "Human Input", + "form_content": "Please confirm", + "inputs": _legacy_form_input_payloads(), + "actions": _legacy_user_action_payloads(), + "display_in_ui": True, + "form_token": "token-1", + "resolved_default_values": {"name": "Alice"}, + "expiration_time": 1700000000, + } + + restored = _validate_legacy_json(HumanInputFormDefinition, payload) + assert restored.inputs[3].output_variable_name == "attachments" + assert restored.actions[0].id == "approve" + + +def test_human_input_content_accepts_current_serialized_payload() -> None: + payload = { + "workflow_run_id": "run-1", + "submitted": True, + "form_definition": { + "form_id": "form-1", + "node_id": "node-1", + "node_title": "Human Input", + "form_content": "Please confirm", + "inputs": _legacy_form_input_payloads(), + "actions": _legacy_user_action_payloads(), + "display_in_ui": True, + "form_token": "token-1", + "resolved_default_values": {"name": "Alice"}, + "expiration_time": 1700000000, + }, + "form_submission_data": { + "node_id": "node-1", + "node_title": "Human Input", + "rendered_content": "Please confirm", + "action_id": "approve", + "action_text": "Approve", + }, + "type": "human_input", + } + + restored = _validate_legacy_json(HumanInputContent, payload) + assert restored.form_definition is not None + assert restored.form_definition.inputs[0].output_variable_name == "name" + + +def test_human_input_content_accepts_current_serialized_payload_with_form_data() -> None: + payload = { + "workflow_run_id": "run-1", + "submitted": True, + "form_definition": { + "form_id": "form-1", + "node_id": "node-1", + "node_title": "Human Input", + "form_content": "Please confirm", + "inputs": [ + { + "type": "select", + "output_variable_name": "decision", + "option_source": {"type": "constant", "selector": [], "value": ["approve", "reject"]}, + }, + { + "type": "file", + "output_variable_name": "attachment", + "allowed_file_types": ["document"], + "allowed_file_extensions": [], + "allowed_file_upload_methods": ["remote_url"], + }, + { + "type": "file-list", + "output_variable_name": "attachments", + "allowed_file_types": ["document"], + "allowed_file_extensions": [], + "allowed_file_upload_methods": ["remote_url"], + "number_limits": 3, + }, + ], + "actions": _legacy_user_action_payloads(), + "display_in_ui": True, + "form_token": "token-1", + "resolved_default_values": {"decision": "approve"}, + "expiration_time": 1700000000, + }, + "form_submission_data": { + "node_id": "node-1", + "node_title": "Human Input", + "rendered_content": "Please confirm", + "action_id": "approve", + "action_text": "Approve", + "submitted_data": { + "decision": "approve", + "attachment": { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/file.txt", + "filename": "file.txt", + "extension": ".txt", + "mime_type": "text/plain", + }, + "attachments": [ + { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/first.txt", + "filename": "first.txt", + "extension": ".txt", + "mime_type": "text/plain", + } + ], + }, + }, + "type": "human_input", + } + + restored = HumanInputContent.model_validate_json(json.dumps(payload)) + assert restored.form_submission_data is not None + assert restored.form_submission_data.submitted_data == payload["form_submission_data"]["submitted_data"] + + +def test_human_input_content_accepts_legacy_serialized_payload_with_form_data() -> None: + payload = { + "workflow_run_id": "run-1", + "submitted": True, + "form_definition": { + "form_id": "form-1", + "node_id": "node-1", + "node_title": "Human Input", + "form_content": "Please confirm", + "inputs": _legacy_form_input_payloads(), + "actions": _legacy_user_action_payloads(), + "display_in_ui": True, + "form_token": "token-1", + "resolved_default_values": {"decision": "approve"}, + "expiration_time": 1700000000, + }, + "form_submission_data": { + "node_id": "node-1", + "node_title": "Human Input", + "rendered_content": "Please confirm", + "action_id": "approve", + "action_text": "Approve", + "form_data": { + "decision": "approve", + "attachment": { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/file.txt", + "filename": "file.txt", + "extension": ".txt", + "mime_type": "text/plain", + }, + }, + }, + "type": "human_input", + } + + restored = HumanInputContent.model_validate_json(json.dumps(payload)) + assert restored.form_submission_data is not None + assert restored.form_submission_data.submitted_data is None + + +def test_human_input_required_response_accepts_current_serialized_payload() -> None: + payload = { + "event": "human_input_required", + "task_id": "task-1", + "workflow_run_id": "run-1", + "data": { + "form_id": "form-1", + "node_id": "node-1", + "node_title": "Human Input", + "form_content": "Please confirm", + "inputs": _legacy_form_input_payloads(), + "actions": _legacy_user_action_payloads(), + "display_in_ui": True, + "form_token": "token-1", + "resolved_default_values": {"name": "Alice"}, + "expiration_time": 1700000000, + }, + } + + restored = _validate_legacy_json(HumanInputRequiredResponse, payload) + assert restored.data.inputs[1].output_variable_name == "decision" + assert restored.data.actions[0].id == "approve" + assert restored.event == "human_input_required"