test(api): add tests about submission response

This commit is contained in:
QuantumGhost 2026-05-07 10:51:51 +08:00
parent 23e59c6778
commit 37681bce8c
7 changed files with 385 additions and 1 deletions

View File

@ -5,6 +5,7 @@ Part of #32454 — replaces the mock-based unit tests with real database interac
from __future__ import annotations
import json
from collections.abc import Generator
from dataclasses import dataclass
from datetime import timedelta
@ -174,6 +175,10 @@ def _create_submitted_form(
action_id: str = "approve",
action_title: str = "Approve",
node_title: str = "Approval",
form_content: str = "content",
rendered_content: str | None = None,
inputs: list[dict] | None = None,
submitted_data: dict | None = None,
) -> HumanInputForm:
expiration_time = naive_utc_now() + timedelta(days=1)
form_definition = FormDefinition(
@ -191,10 +196,12 @@ def _create_submitted_form(
workflow_run_id=workflow_run_id,
node_id="node-id",
form_definition=form_definition.model_dump_json(),
rendered_content=f"Rendered {action_title}",
rendered_content=rendered_content or f"Rendered {action_title}",
status=HumanInputFormStatus.SUBMITTED,
expiration_time=expiration_time,
selected_action_id=action_id,
submitted_data=None if submitted_data is None else json.dumps(submitted_data),
submitted_at=naive_utc_now(),
)
session.add(form)
session.flush()
@ -349,6 +356,127 @@ class TestGetByMessageIds:
# msg2 has no content
assert result[1] == []
def test_submitted_content_populates_submission_data_from_stored_form_data(
self,
db_session_with_containers: Session,
repository: SQLAlchemyExecutionExtraContentRepository,
test_scope: _TestScope,
) -> None:
workflow_run_id = str(uuid4())
conversation = _create_conversation(db_session_with_containers, test_scope)
msg = _create_message(db_session_with_containers, test_scope, conversation.id, workflow_run_id)
stored_submission_data = {"decision": "approve", "comment": "Looks good"}
form = _create_submitted_form(
db_session_with_containers,
test_scope,
workflow_run_id=workflow_run_id,
submitted_data=stored_submission_data,
)
_create_human_input_content(
db_session_with_containers,
workflow_run_id=workflow_run_id,
message_id=msg.id,
form_id=form.id,
)
db_session_with_containers.commit()
result = repository.get_by_message_ids([msg.id])
content = result[0][0]
assert content.form_submission_data is not None
assert content.form_submission_data.submitted_data == stored_submission_data
def test_submitted_content_exposes_select_and_file_form_data(
self,
db_session_with_containers: Session,
repository: SQLAlchemyExecutionExtraContentRepository,
test_scope: _TestScope,
) -> None:
workflow_run_id = str(uuid4())
conversation = _create_conversation(db_session_with_containers, test_scope)
msg = _create_message(db_session_with_containers, test_scope, conversation.id, workflow_run_id)
submitted_data = {
"decision": "approve",
"attachment": {
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/file.txt",
"filename": "file.txt",
"extension": ".txt",
"mime_type": "text/plain",
},
"attachments": [
{
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/first.txt",
"filename": "first.txt",
"extension": ".txt",
"mime_type": "text/plain",
},
{
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/second.txt",
"filename": "second.txt",
"extension": ".txt",
"mime_type": "text/plain",
},
],
}
form = _create_submitted_form(
db_session_with_containers,
test_scope,
workflow_run_id=workflow_run_id,
form_content=(
"Decision: {{#$output.decision#}}\n"
"Attachment: {{#$output.attachment#}}\n"
"Attachments: {{#$output.attachments#}}"
),
rendered_content=(
"Decision: {{#$output.decision#}}\n"
"Attachment: {{#$output.attachment#}}\n"
"Attachments: {{#$output.attachments#}}"
),
inputs=[
{
"type": "select",
"output_variable_name": "decision",
"option_source": {"type": "constant", "value": ["approve", "reject"]},
},
{
"type": "file",
"output_variable_name": "attachment",
"allowed_file_types": ["document"],
"allowed_file_upload_methods": ["remote_url"],
},
{
"type": "file_list",
"output_variable_name": "attachments",
"allowed_file_types": ["document"],
"allowed_file_upload_methods": ["remote_url"],
"number_limits": 3,
},
],
submitted_data=submitted_data,
)
_create_human_input_content(
db_session_with_containers,
workflow_run_id=workflow_run_id,
message_id=msg.id,
form_id=form.id,
)
db_session_with_containers.commit()
result = repository.get_by_message_ids([msg.id])
content = result[0][0]
assert content.form_submission_data is not None
assert content.form_submission_data.submitted_data == submitted_data
assert content.form_submission_data.rendered_content == (
"Decision: approve\nAttachment: [file]\nAttachments: [2 files]"
)
def test_returns_unsubmitted_form_definition(
self,
db_session_with_containers: Session,

View File

@ -7,6 +7,7 @@ from core.app.entities.queue_entities import QueueHumanInputFormFilledEvent, Que
from core.workflow.system_variables import build_system_variables
from graphon.entities import WorkflowStartReason
from graphon.runtime import GraphRuntimeState, VariablePool
from graphon.variables.segments import StringSegment
def _build_converter():
@ -63,6 +64,37 @@ def test_human_input_form_filled_stream_response_contains_rendered_content():
assert resp.data.action_id == "Approve"
def test_human_input_form_filled_stream_response_serializes_submitted_data():
converter = _build_converter()
converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
reason=WorkflowStartReason.INITIAL,
)
queue_event = QueueHumanInputFormFilledEvent(
node_execution_id="exec-1",
node_id="node-1",
node_type="human-input",
node_title="Human Input",
rendered_content="# Title\nvalue",
action_id="Approve",
action_text="Approve",
submitted_data={
"decision": StringSegment(value="approve"),
"comment": StringSegment(value="looks good"),
},
)
resp = converter.human_input_form_filled_to_stream_response(event=queue_event, task_id="task-1")
assert resp.data.submitted_data == {
"decision": "approve",
"comment": "looks good",
}
def test_human_input_form_timeout_stream_response_contains_timeout_metadata():
converter = _build_converter()
converter.workflow_start_to_stream_response(

View File

@ -9,6 +9,7 @@ from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom
from core.app.entities.queue_entities import (
QueueAgentLogEvent,
QueueHumanInputFormFilledEvent,
QueueIterationCompletedEvent,
QueueLoopCompletedEvent,
QueueNodeExceptionEvent,
@ -30,6 +31,7 @@ from graphon.graph_events import (
NodeRunAgentLogEvent,
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunHumanInputFormFilledEvent,
NodeRunIterationSucceededEvent,
NodeRunLoopFailedEvent,
NodeRunRetryEvent,
@ -39,6 +41,7 @@ from graphon.graph_events import (
)
from graphon.node_events import NodeRunResult
from graphon.runtime import GraphRuntimeState, VariablePool
from graphon.variables.segments import StringSegment
from graphon.variables.variables import StringVariable
@ -361,6 +364,38 @@ class TestWorkflowBasedAppRunner:
assert any(isinstance(event, QueueIterationCompletedEvent) for event in published)
assert any(isinstance(event, QueueLoopCompletedEvent) for event in published)
def test_handle_human_input_form_filled_event_preserves_submitted_data(self):
published: list[object] = []
class _QueueManager:
def publish(self, event, publish_from):
published.append(event)
runner = WorkflowBasedAppRunner(queue_manager=_QueueManager(), app_id="app")
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(system_variables=default_system_variables()),
start_at=0.0,
)
workflow_entry = SimpleNamespace(graph_engine=SimpleNamespace(graph_runtime_state=graph_runtime_state))
runner._handle_event(
workflow_entry,
NodeRunHumanInputFormFilledEvent(
id="exec",
node_id="node",
node_type=BuiltinNodeTypes.HUMAN_INPUT,
node_title="Human Input",
rendered_content="content",
action_id="approve",
action_text="Approve",
submitted_data={"decision": StringSegment(value="approve")},
),
)
queue_event = published[-1]
assert isinstance(queue_event, QueueHumanInputFormFilledEvent)
assert queue_event.submitted_data == {"decision": StringSegment(value="approve")}
@pytest.mark.parametrize(
("event_factory", "queue_event_cls"),
[

View File

@ -26,6 +26,7 @@ def test_human_input_content_defaults_and_domain_alias() -> None:
rendered_content="Please confirm",
action_id="confirm",
action_text="Confirm",
submitted_data={"answer": "yes"},
)
# Act
@ -41,4 +42,5 @@ def test_human_input_content_defaults_and_domain_alias() -> None:
assert content.type == ExecutionContentType.HUMAN_INPUT
assert content.form_definition is form_definition
assert content.form_submission_data is submission_data
assert content.form_submission_data.submitted_data == {"answer": "yes"}
assert ExecutionExtraContentDomainModel is HumanInputContent

View File

@ -586,6 +586,73 @@ def test_mark_submitted_updates_and_raises_when_missing(monkeypatch: pytest.Monk
assert record.submitted_data == {"k": "v"}
def test_mark_submitted_serializes_select_and_file_payloads(monkeypatch: pytest.MonkeyPatch) -> None:
fixed_now = datetime(2024, 1, 1, 0, 0, 0)
monkeypatch.setattr("core.repositories.human_input_repository.naive_utc_now", lambda: fixed_now)
form = _DummyForm(
id="f-complex",
workflow_run_id=None,
node_id="node",
tenant_id="tenant",
app_id="app",
form_definition=_make_form_definition_json(include_expiration_time=True),
rendered_content="<p>x</p>",
expiration_time=fixed_now,
)
recipient = _DummyRecipient(
id="r-complex",
form_id=form.id,
recipient_type=RecipientType.CONSOLE,
access_token="tok",
)
session = _FakeSession(forms={form.id: form}, recipients={recipient.id: recipient})
_patch_session_factory(monkeypatch, session)
payload = {
"decision": "approve",
"attachment": {
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/file.txt",
"filename": "file.txt",
"extension": ".txt",
"mime_type": "text/plain",
},
"attachments": [
{
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/first.txt",
"filename": "first.txt",
"extension": ".txt",
"mime_type": "text/plain",
},
{
"type": "document",
"transfer_method": "remote_url",
"remote_url": "https://example.com/second.txt",
"filename": "second.txt",
"extension": ".txt",
"mime_type": "text/plain",
},
],
}
repo = HumanInputFormSubmissionRepository()
record = repo.mark_submitted(
form_id=form.id,
recipient_id=recipient.id,
selected_action_id="approve",
form_data=payload,
submission_user_id="user-1",
submission_end_user_id="end-user-1",
)
assert json.loads(form.submitted_data or "") == payload
assert record.submitted_data == payload
def test_mark_timeout_invalid_status_raises(monkeypatch: pytest.MonkeyPatch) -> None:
form = _DummyForm(
id="f",

View File

@ -400,6 +400,70 @@ def test_dify_human_input_runtime_preserves_webapp_delivery_for_web_invocations(
assert params.delivery_methods[1].config.recipients.include_bound_group is True
def test_dify_human_input_runtime_restore_submitted_data_rehydrates_files() -> None:
runtime = DifyHumanInputNodeRuntime(_build_run_context())
file_value = File(
file_id="file-1",
file_type=FileType.DOCUMENT,
transfer_method=FileTransferMethod.LOCAL_FILE,
related_id="upload-1",
filename="resume.pdf",
extension=".pdf",
mime_type="application/pdf",
size=128,
)
file_list_value = [
File(
file_id="file-2",
file_type=FileType.DOCUMENT,
transfer_method=FileTransferMethod.LOCAL_FILE,
related_id="upload-2",
filename="first.pdf",
extension=".pdf",
mime_type="application/pdf",
size=64,
),
File(
file_id="file-3",
file_type=FileType.DOCUMENT,
transfer_method=FileTransferMethod.REMOTE_URL,
remote_url="https://example.com/second.pdf",
filename="second.pdf",
extension=".pdf",
mime_type="application/pdf",
size=96,
),
]
runtime._file_reference_factory.build_from_mapping = MagicMock(side_effect=[file_value, *file_list_value]) # type: ignore[method-assign]
node_data = HumanInputNodeData(
title="Human Input",
inputs=[
FileInputConfig(output_variable_name="attachment"),
FileListInputConfig(output_variable_name="attachments", number_limits=2),
],
)
restored = runtime.restore_submitted_data(
node_data=node_data,
submitted_data={
"attachment": {"upload_file_id": "upload-1", "type": "document", "transfer_method": "local_file"},
"attachments": [
{"upload_file_id": "upload-2", "type": "document", "transfer_method": "local_file"},
{
"url": "https://example.com/second.pdf",
"type": "document",
"transfer_method": "remote_url",
},
],
},
)
assert restored["attachment"] is file_value
assert restored["attachments"] == file_list_value
assert isinstance(FileSegment(value=restored["attachment"]), FileSegment)
assert isinstance(ArrayFileSegment(value=restored["attachments"]), ArrayFileSegment)
def test_build_dify_llm_file_saver_wires_runtime_adapters(monkeypatch: pytest.MonkeyPatch) -> None:
file_saver_cls = MagicMock(return_value=sentinel.file_saver)
monkeypatch.setattr("graphon.nodes.llm.file_saver.FileSaverImpl", file_saver_cls)

View File

@ -0,0 +1,56 @@
from __future__ import annotations
import json
from datetime import timedelta
from typing import cast
from sqlalchemy.orm import Session, sessionmaker
from graphon.nodes.human_input.entities import FormDefinition, UserActionConfig
from graphon.nodes.human_input.enums import HumanInputFormStatus
from libs.datetime_utils import naive_utc_now
from models.execution_extra_content import HumanInputContent as HumanInputContentModel
from models.human_input import HumanInputForm
from repositories.sqlalchemy_execution_extra_content_repository import SQLAlchemyExecutionExtraContentRepository
def test_map_human_input_content_populates_submission_data_from_stored_form_submission() -> None:
expiration_time = naive_utc_now() + timedelta(days=1)
stored_submission_data = {"decision": "approve", "comment": "Looks good"}
form_definition = FormDefinition(
form_content="content",
inputs=[],
user_actions=[UserActionConfig(id="approve", title="Approve")],
rendered_content="Rendered Approve",
expiration_time=expiration_time,
node_title="Approval",
display_in_ui=True,
)
form = HumanInputForm(
tenant_id="tenant-1",
app_id="app-1",
workflow_run_id="workflow-run-1",
node_id="node-1",
form_definition=form_definition.model_dump_json(),
rendered_content="Rendered Approve",
expiration_time=expiration_time,
selected_action_id="approve",
submitted_data=json.dumps(stored_submission_data),
submitted_at=naive_utc_now(),
status=HumanInputFormStatus.SUBMITTED,
)
form.id = "form-1"
model = HumanInputContentModel.new(
workflow_run_id="workflow-run-1",
form_id=form.id,
message_id="message-1",
)
model.id = "content-1"
model.form = form
repository = SQLAlchemyExecutionExtraContentRepository(cast(sessionmaker[Session], object()))
content = repository._map_human_input_content(model, {})
assert content is not None
assert content.form_submission_data is not None
assert content.form_submission_data.submitted_data == stored_submission_data