diff --git a/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_execution_extra_content_repository.py b/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_execution_extra_content_repository.py index fbc171a303..1b26a798a1 100644 --- a/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_execution_extra_content_repository.py +++ b/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_execution_extra_content_repository.py @@ -5,6 +5,7 @@ Part of #32454 — replaces the mock-based unit tests with real database interac from __future__ import annotations +import json from collections.abc import Generator from dataclasses import dataclass from datetime import timedelta @@ -174,6 +175,10 @@ def _create_submitted_form( action_id: str = "approve", action_title: str = "Approve", node_title: str = "Approval", + form_content: str = "content", + rendered_content: str | None = None, + inputs: list[dict] | None = None, + submitted_data: dict | None = None, ) -> HumanInputForm: expiration_time = naive_utc_now() + timedelta(days=1) form_definition = FormDefinition( @@ -191,10 +196,12 @@ def _create_submitted_form( workflow_run_id=workflow_run_id, node_id="node-id", form_definition=form_definition.model_dump_json(), - rendered_content=f"Rendered {action_title}", + rendered_content=rendered_content or f"Rendered {action_title}", status=HumanInputFormStatus.SUBMITTED, expiration_time=expiration_time, selected_action_id=action_id, + submitted_data=None if submitted_data is None else json.dumps(submitted_data), + submitted_at=naive_utc_now(), ) session.add(form) session.flush() @@ -349,6 +356,127 @@ class TestGetByMessageIds: # msg2 has no content assert result[1] == [] + def test_submitted_content_populates_submission_data_from_stored_form_data( + self, + db_session_with_containers: Session, + repository: SQLAlchemyExecutionExtraContentRepository, + test_scope: _TestScope, + ) -> None: + workflow_run_id = str(uuid4()) + conversation = _create_conversation(db_session_with_containers, test_scope) + msg = _create_message(db_session_with_containers, test_scope, conversation.id, workflow_run_id) + stored_submission_data = {"decision": "approve", "comment": "Looks good"} + form = _create_submitted_form( + db_session_with_containers, + test_scope, + workflow_run_id=workflow_run_id, + submitted_data=stored_submission_data, + ) + _create_human_input_content( + db_session_with_containers, + workflow_run_id=workflow_run_id, + message_id=msg.id, + form_id=form.id, + ) + db_session_with_containers.commit() + + result = repository.get_by_message_ids([msg.id]) + + content = result[0][0] + assert content.form_submission_data is not None + assert content.form_submission_data.submitted_data == stored_submission_data + + def test_submitted_content_exposes_select_and_file_form_data( + self, + db_session_with_containers: Session, + repository: SQLAlchemyExecutionExtraContentRepository, + test_scope: _TestScope, + ) -> None: + workflow_run_id = str(uuid4()) + conversation = _create_conversation(db_session_with_containers, test_scope) + msg = _create_message(db_session_with_containers, test_scope, conversation.id, workflow_run_id) + submitted_data = { + "decision": "approve", + "attachment": { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/file.txt", + "filename": "file.txt", + "extension": ".txt", + "mime_type": "text/plain", + }, + "attachments": [ + { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/first.txt", + "filename": "first.txt", + "extension": ".txt", + "mime_type": "text/plain", + }, + { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/second.txt", + "filename": "second.txt", + "extension": ".txt", + "mime_type": "text/plain", + }, + ], + } + form = _create_submitted_form( + db_session_with_containers, + test_scope, + workflow_run_id=workflow_run_id, + form_content=( + "Decision: {{#$output.decision#}}\n" + "Attachment: {{#$output.attachment#}}\n" + "Attachments: {{#$output.attachments#}}" + ), + rendered_content=( + "Decision: {{#$output.decision#}}\n" + "Attachment: {{#$output.attachment#}}\n" + "Attachments: {{#$output.attachments#}}" + ), + inputs=[ + { + "type": "select", + "output_variable_name": "decision", + "option_source": {"type": "constant", "value": ["approve", "reject"]}, + }, + { + "type": "file", + "output_variable_name": "attachment", + "allowed_file_types": ["document"], + "allowed_file_upload_methods": ["remote_url"], + }, + { + "type": "file_list", + "output_variable_name": "attachments", + "allowed_file_types": ["document"], + "allowed_file_upload_methods": ["remote_url"], + "number_limits": 3, + }, + ], + submitted_data=submitted_data, + ) + _create_human_input_content( + db_session_with_containers, + workflow_run_id=workflow_run_id, + message_id=msg.id, + form_id=form.id, + ) + db_session_with_containers.commit() + + result = repository.get_by_message_ids([msg.id]) + + content = result[0][0] + assert content.form_submission_data is not None + assert content.form_submission_data.submitted_data == submitted_data + assert content.form_submission_data.rendered_content == ( + "Decision: approve\nAttachment: [file]\nAttachments: [2 files]" + ) + def test_returns_unsubmitted_form_definition( self, db_session_with_containers: Session, diff --git a/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py index 1bef6f69cd..9df351fb7a 100644 --- a/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py +++ b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py @@ -7,6 +7,7 @@ from core.app.entities.queue_entities import QueueHumanInputFormFilledEvent, Que from core.workflow.system_variables import build_system_variables from graphon.entities import WorkflowStartReason from graphon.runtime import GraphRuntimeState, VariablePool +from graphon.variables.segments import StringSegment def _build_converter(): @@ -63,6 +64,37 @@ def test_human_input_form_filled_stream_response_contains_rendered_content(): assert resp.data.action_id == "Approve" +def test_human_input_form_filled_stream_response_serializes_submitted_data(): + converter = _build_converter() + converter.workflow_start_to_stream_response( + task_id="task-1", + workflow_run_id="run-1", + workflow_id="wf-1", + reason=WorkflowStartReason.INITIAL, + ) + + queue_event = QueueHumanInputFormFilledEvent( + node_execution_id="exec-1", + node_id="node-1", + node_type="human-input", + node_title="Human Input", + rendered_content="# Title\nvalue", + action_id="Approve", + action_text="Approve", + submitted_data={ + "decision": StringSegment(value="approve"), + "comment": StringSegment(value="looks good"), + }, + ) + + resp = converter.human_input_form_filled_to_stream_response(event=queue_event, task_id="task-1") + + assert resp.data.submitted_data == { + "decision": "approve", + "comment": "looks good", + } + + def test_human_input_form_timeout_stream_response_contains_timeout_metadata(): converter = _build_converter() converter.workflow_start_to_stream_response( diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py index 58c7bfa4bc..adabf5d495 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py @@ -9,6 +9,7 @@ from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom from core.app.entities.queue_entities import ( QueueAgentLogEvent, + QueueHumanInputFormFilledEvent, QueueIterationCompletedEvent, QueueLoopCompletedEvent, QueueNodeExceptionEvent, @@ -30,6 +31,7 @@ from graphon.graph_events import ( NodeRunAgentLogEvent, NodeRunExceptionEvent, NodeRunFailedEvent, + NodeRunHumanInputFormFilledEvent, NodeRunIterationSucceededEvent, NodeRunLoopFailedEvent, NodeRunRetryEvent, @@ -39,6 +41,7 @@ from graphon.graph_events import ( ) from graphon.node_events import NodeRunResult from graphon.runtime import GraphRuntimeState, VariablePool +from graphon.variables.segments import StringSegment from graphon.variables.variables import StringVariable @@ -361,6 +364,38 @@ class TestWorkflowBasedAppRunner: assert any(isinstance(event, QueueIterationCompletedEvent) for event in published) assert any(isinstance(event, QueueLoopCompletedEvent) for event in published) + def test_handle_human_input_form_filled_event_preserves_submitted_data(self): + published: list[object] = [] + + class _QueueManager: + def publish(self, event, publish_from): + published.append(event) + + runner = WorkflowBasedAppRunner(queue_manager=_QueueManager(), app_id="app") + graph_runtime_state = GraphRuntimeState( + variable_pool=VariablePool(system_variables=default_system_variables()), + start_at=0.0, + ) + workflow_entry = SimpleNamespace(graph_engine=SimpleNamespace(graph_runtime_state=graph_runtime_state)) + + runner._handle_event( + workflow_entry, + NodeRunHumanInputFormFilledEvent( + id="exec", + node_id="node", + node_type=BuiltinNodeTypes.HUMAN_INPUT, + node_title="Human Input", + rendered_content="content", + action_id="approve", + action_text="Approve", + submitted_data={"decision": StringSegment(value="approve")}, + ), + ) + + queue_event = published[-1] + assert isinstance(queue_event, QueueHumanInputFormFilledEvent) + assert queue_event.submitted_data == {"decision": StringSegment(value="approve")} + @pytest.mark.parametrize( ("event_factory", "queue_event_cls"), [ diff --git a/api/tests/unit_tests/core/entities/test_entities_execution_extra_content.py b/api/tests/unit_tests/core/entities/test_entities_execution_extra_content.py index 6c15e54f26..d0849e7b88 100644 --- a/api/tests/unit_tests/core/entities/test_entities_execution_extra_content.py +++ b/api/tests/unit_tests/core/entities/test_entities_execution_extra_content.py @@ -26,6 +26,7 @@ def test_human_input_content_defaults_and_domain_alias() -> None: rendered_content="Please confirm", action_id="confirm", action_text="Confirm", + submitted_data={"answer": "yes"}, ) # Act @@ -41,4 +42,5 @@ def test_human_input_content_defaults_and_domain_alias() -> None: assert content.type == ExecutionContentType.HUMAN_INPUT assert content.form_definition is form_definition assert content.form_submission_data is submission_data + assert content.form_submission_data.submitted_data == {"answer": "yes"} assert ExecutionExtraContentDomainModel is HumanInputContent diff --git a/api/tests/unit_tests/core/repositories/test_human_input_repository.py b/api/tests/unit_tests/core/repositories/test_human_input_repository.py index 418537675d..edd8be8618 100644 --- a/api/tests/unit_tests/core/repositories/test_human_input_repository.py +++ b/api/tests/unit_tests/core/repositories/test_human_input_repository.py @@ -586,6 +586,73 @@ def test_mark_submitted_updates_and_raises_when_missing(monkeypatch: pytest.Monk assert record.submitted_data == {"k": "v"} +def test_mark_submitted_serializes_select_and_file_payloads(monkeypatch: pytest.MonkeyPatch) -> None: + fixed_now = datetime(2024, 1, 1, 0, 0, 0) + monkeypatch.setattr("core.repositories.human_input_repository.naive_utc_now", lambda: fixed_now) + + form = _DummyForm( + id="f-complex", + workflow_run_id=None, + node_id="node", + tenant_id="tenant", + app_id="app", + form_definition=_make_form_definition_json(include_expiration_time=True), + rendered_content="
x
", + expiration_time=fixed_now, + ) + recipient = _DummyRecipient( + id="r-complex", + form_id=form.id, + recipient_type=RecipientType.CONSOLE, + access_token="tok", + ) + session = _FakeSession(forms={form.id: form}, recipients={recipient.id: recipient}) + _patch_session_factory(monkeypatch, session) + + payload = { + "decision": "approve", + "attachment": { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/file.txt", + "filename": "file.txt", + "extension": ".txt", + "mime_type": "text/plain", + }, + "attachments": [ + { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/first.txt", + "filename": "first.txt", + "extension": ".txt", + "mime_type": "text/plain", + }, + { + "type": "document", + "transfer_method": "remote_url", + "remote_url": "https://example.com/second.txt", + "filename": "second.txt", + "extension": ".txt", + "mime_type": "text/plain", + }, + ], + } + + repo = HumanInputFormSubmissionRepository() + record = repo.mark_submitted( + form_id=form.id, + recipient_id=recipient.id, + selected_action_id="approve", + form_data=payload, + submission_user_id="user-1", + submission_end_user_id="end-user-1", + ) + + assert json.loads(form.submitted_data or "") == payload + assert record.submitted_data == payload + + def test_mark_timeout_invalid_status_raises(monkeypatch: pytest.MonkeyPatch) -> None: form = _DummyForm( id="f", diff --git a/api/tests/unit_tests/core/workflow/test_node_runtime.py b/api/tests/unit_tests/core/workflow/test_node_runtime.py index 1389ea3957..69611c7a17 100644 --- a/api/tests/unit_tests/core/workflow/test_node_runtime.py +++ b/api/tests/unit_tests/core/workflow/test_node_runtime.py @@ -400,6 +400,70 @@ def test_dify_human_input_runtime_preserves_webapp_delivery_for_web_invocations( assert params.delivery_methods[1].config.recipients.include_bound_group is True +def test_dify_human_input_runtime_restore_submitted_data_rehydrates_files() -> None: + runtime = DifyHumanInputNodeRuntime(_build_run_context()) + file_value = File( + file_id="file-1", + file_type=FileType.DOCUMENT, + transfer_method=FileTransferMethod.LOCAL_FILE, + related_id="upload-1", + filename="resume.pdf", + extension=".pdf", + mime_type="application/pdf", + size=128, + ) + file_list_value = [ + File( + file_id="file-2", + file_type=FileType.DOCUMENT, + transfer_method=FileTransferMethod.LOCAL_FILE, + related_id="upload-2", + filename="first.pdf", + extension=".pdf", + mime_type="application/pdf", + size=64, + ), + File( + file_id="file-3", + file_type=FileType.DOCUMENT, + transfer_method=FileTransferMethod.REMOTE_URL, + remote_url="https://example.com/second.pdf", + filename="second.pdf", + extension=".pdf", + mime_type="application/pdf", + size=96, + ), + ] + runtime._file_reference_factory.build_from_mapping = MagicMock(side_effect=[file_value, *file_list_value]) # type: ignore[method-assign] + node_data = HumanInputNodeData( + title="Human Input", + inputs=[ + FileInputConfig(output_variable_name="attachment"), + FileListInputConfig(output_variable_name="attachments", number_limits=2), + ], + ) + + restored = runtime.restore_submitted_data( + node_data=node_data, + submitted_data={ + "attachment": {"upload_file_id": "upload-1", "type": "document", "transfer_method": "local_file"}, + "attachments": [ + {"upload_file_id": "upload-2", "type": "document", "transfer_method": "local_file"}, + { + "url": "https://example.com/second.pdf", + "type": "document", + "transfer_method": "remote_url", + }, + ], + }, + ) + + assert restored["attachment"] is file_value + assert restored["attachments"] == file_list_value + assert isinstance(FileSegment(value=restored["attachment"]), FileSegment) + assert isinstance(ArrayFileSegment(value=restored["attachments"]), ArrayFileSegment) + + def test_build_dify_llm_file_saver_wires_runtime_adapters(monkeypatch: pytest.MonkeyPatch) -> None: file_saver_cls = MagicMock(return_value=sentinel.file_saver) monkeypatch.setattr("graphon.nodes.llm.file_saver.FileSaverImpl", file_saver_cls) diff --git a/api/tests/unit_tests/repositories/test_sqlalchemy_execution_extra_content_repository.py b/api/tests/unit_tests/repositories/test_sqlalchemy_execution_extra_content_repository.py new file mode 100644 index 0000000000..8547acc047 --- /dev/null +++ b/api/tests/unit_tests/repositories/test_sqlalchemy_execution_extra_content_repository.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import json +from datetime import timedelta +from typing import cast + +from sqlalchemy.orm import Session, sessionmaker + +from graphon.nodes.human_input.entities import FormDefinition, UserActionConfig +from graphon.nodes.human_input.enums import HumanInputFormStatus +from libs.datetime_utils import naive_utc_now +from models.execution_extra_content import HumanInputContent as HumanInputContentModel +from models.human_input import HumanInputForm +from repositories.sqlalchemy_execution_extra_content_repository import SQLAlchemyExecutionExtraContentRepository + + +def test_map_human_input_content_populates_submission_data_from_stored_form_submission() -> None: + expiration_time = naive_utc_now() + timedelta(days=1) + stored_submission_data = {"decision": "approve", "comment": "Looks good"} + form_definition = FormDefinition( + form_content="content", + inputs=[], + user_actions=[UserActionConfig(id="approve", title="Approve")], + rendered_content="Rendered Approve", + expiration_time=expiration_time, + node_title="Approval", + display_in_ui=True, + ) + form = HumanInputForm( + tenant_id="tenant-1", + app_id="app-1", + workflow_run_id="workflow-run-1", + node_id="node-1", + form_definition=form_definition.model_dump_json(), + rendered_content="Rendered Approve", + expiration_time=expiration_time, + selected_action_id="approve", + submitted_data=json.dumps(stored_submission_data), + submitted_at=naive_utc_now(), + status=HumanInputFormStatus.SUBMITTED, + ) + form.id = "form-1" + model = HumanInputContentModel.new( + workflow_run_id="workflow-run-1", + form_id=form.id, + message_id="message-1", + ) + model.id = "content-1" + model.form = form + repository = SQLAlchemyExecutionExtraContentRepository(cast(sessionmaker[Session], object())) + + content = repository._map_human_input_content(model, {}) + + assert content is not None + assert content.form_submission_data is not None + assert content.form_submission_data.submitted_data == stored_submission_data