feat(api): expose sumitted_data to frontend

This commit is contained in:
QuantumGhost 2026-05-07 10:40:12 +08:00
parent 9ad5d89b07
commit d6f607f6e7
9 changed files with 146 additions and 41 deletions

View File

@ -388,17 +388,19 @@ class WorkflowResponseConverter:
self, *, event: QueueHumanInputFormFilledEvent, task_id: str
) -> HumanInputFormFilledResponse:
run_id = self._ensure_workflow_run_id()
return HumanInputFormFilledResponse(
task_id=task_id,
workflow_run_id=run_id,
data=HumanInputFormFilledResponse.Data(
node_id=event.node_id,
node_title=event.node_title,
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
),
data = HumanInputFormFilledResponse.Data(
node_id=event.node_id,
node_title=event.node_title,
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
)
if event.submitted_data is not None:
runtime_type_converter = WorkflowRuntimeTypeConverter()
data.submitted_data = runtime_type_converter.value_to_json_encodable_recursive(event.submitted_data)
return HumanInputFormFilledResponse(task_id=task_id, workflow_run_id=run_id, data=data)
def human_input_form_timeout_to_stream_response(
self, *, event: QueueHumanInputFormTimeoutEvent, task_id: str

View File

@ -432,6 +432,7 @@ class WorkflowBasedAppRunner:
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
submitted_data=event.submitted_data,
)
)
elif isinstance(event, NodeRunHumanInputFormTimeoutEvent):

View File

@ -11,6 +11,7 @@ from graphon.entities import WorkflowStartReason
from graphon.entities.pause_reason import PauseReason
from graphon.enums import NodeType, WorkflowNodeExecutionMetadataKey
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from graphon.variables.segments import Segment
class QueueEvent(StrEnum):
@ -508,6 +509,10 @@ class QueueHumanInputFormFilledEvent(AppQueueEvent):
action_id: str
action_text: str
# Keep the field name aligned with Graphon so the app-layer bridge does not
# need to translate between two equivalent payload names.
submitted_data: Mapping[str, Segment] | None = None
class QueueHumanInputFormTimeoutEvent(AppQueueEvent):
"""

View File

@ -9,7 +9,7 @@ from core.rag.entities import RetrievalSourceMetadata
from graphon.entities import WorkflowStartReason
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMUsage
from graphon.nodes.human_input.entities import FormInput, UserAction
from graphon.nodes.human_input.entities import FormInputConfig, UserActionConfig
class AnnotationReplyAccount(BaseModel):
@ -283,8 +283,8 @@ class HumanInputRequiredResponse(StreamResponse):
node_id: str
node_title: str
form_content: str
inputs: Sequence[FormInput] = Field(default_factory=list)
actions: Sequence[UserAction] = Field(default_factory=list)
inputs: Sequence[FormInputConfig] = Field(default_factory=list)
actions: Sequence[UserActionConfig] = Field(default_factory=list)
display_in_ui: bool = False
form_token: str | None = None
resolved_default_values: Mapping[str, Any] = Field(default_factory=dict)
@ -307,6 +307,8 @@ class HumanInputFormFilledResponse(StreamResponse):
action_id: str
action_text: str
submitted_data: Mapping[str, Any] | None = None
event: StreamEvent = StreamEvent.HUMAN_INPUT_FORM_FILLED
workflow_run_id: str
data: Data

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from collections.abc import Mapping, Sequence
from typing import Any, TypeAlias
from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, JsonValue
from graphon.nodes.human_input.entities import FormInputConfig, UserActionConfig
from models.execution_extra_content import ExecutionContentType
@ -19,6 +19,8 @@ class HumanInputFormDefinition(BaseModel):
inputs: Sequence[FormInputConfig] = Field(default_factory=list)
actions: Sequence[UserActionConfig] = Field(default_factory=list)
display_in_ui: bool = False
# `form_token` is `None` if the corresponding form has been submitted.
form_token: str | None = None
resolved_default_values: Mapping[str, Any] = Field(default_factory=dict)
expiration_time: int
@ -29,16 +31,31 @@ class HumanInputFormSubmissionData(BaseModel):
node_id: str
node_title: str
# deprecate: the rendered_content is deprecated and only for historical reasons.
rendered_content: str
# The identifier of action user has chosen.
action_id: str
# The button text of the action user has chosen.
action_text: str
# submitted_data records the submitted form data.
# Keys correspond to `output_variable_name` of HumanInput inputs.
# Values are serialized JSON forms of runtime values, including file dictionaries.
#
# For form submitted before this field is introduced, this field is populated from
# the stored submission data.
submitted_data: Mapping[str, JsonValue] | None = None
class HumanInputContent(BaseModel):
model_config = ConfigDict(frozen=True)
workflow_run_id: str
submitted: bool
# Both the form_defintion and the form_submission_data are present in
# HumanInputContent. For historical records, the
form_definition: HumanInputFormDefinition | None = None
form_submission_data: HumanInputFormSubmissionData | None = None
type: ExecutionContentType = Field(default=ExecutionContentType.HUMAN_INPUT)

View File

@ -42,7 +42,12 @@ from graphon.model_runtime.entities.llm_entities import (
from graphon.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool
from graphon.model_runtime.entities.model_entities import AIModelEntity
from graphon.model_runtime.model_providers.base.large_language_model import LargeLanguageModel
from graphon.nodes.human_input.entities import HumanInputNodeData
from graphon.nodes.human_input.entities import (
FileInputConfig,
FileListInputConfig,
FormInputConfig,
HumanInputNodeData,
)
from graphon.nodes.llm.runtime_protocols import (
PreparedLLMProtocol,
PromptMessageSerializerProtocol,
@ -625,6 +630,7 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol):
self._run_context = resolve_dify_run_context(run_context)
self._workflow_execution_id_getter = workflow_execution_id_getter
self._form_repository = form_repository
self._file_reference_factory = DifyFileReferenceFactory(self._run_context)
def _invoke_source(self) -> str:
invoke_from = self._run_context.invoke_from
@ -678,6 +684,23 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol):
repo = self.build_form_repository()
return repo.get_form(node_id)
def restore_submitted_data(
self,
*,
node_data: HumanInputNodeData,
submitted_data: Mapping[str, Any],
) -> Mapping[str, Any]:
restored_data: dict[str, Any] = dict(submitted_data)
for input_config in node_data.inputs:
output_variable_name = input_config.output_variable_name
if output_variable_name not in submitted_data:
continue
restored_data[output_variable_name] = self._restore_submitted_value(
input_config=input_config,
value=submitted_data[output_variable_name],
)
return restored_data
def create_form(
self,
*,
@ -698,6 +721,55 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol):
)
return repo.create_form(params)
def _restore_submitted_value(
self,
*,
input_config: FormInputConfig,
value: Any,
) -> Any:
if isinstance(input_config, FileInputConfig):
return self._restore_submitted_file_value(
output_variable_name=input_config.output_variable_name,
value=value,
)
if isinstance(input_config, FileListInputConfig):
return self._restore_submitted_file_list_value(
output_variable_name=input_config.output_variable_name,
value=value,
)
return value
def _restore_submitted_file_value(
self,
*,
output_variable_name: str,
value: Any,
) -> Any:
if not isinstance(value, Mapping):
msg = (
"HumanInput file submission must be persisted as a mapping, "
f"output_variable_name={output_variable_name}"
)
raise ValueError(msg)
return self._file_reference_factory.build_from_mapping(mapping=value)
def _restore_submitted_file_list_value(
self,
*,
output_variable_name: str,
value: Any,
) -> list[Any]:
if not isinstance(value, list):
msg = (
"HumanInput file-list submission must be persisted as a list, "
f"output_variable_name={output_variable_name}"
)
raise ValueError(msg)
if any(not isinstance(item, Mapping) for item in value):
msg = f"HumanInput file-list submission must contain mappings, output_variable_name={output_variable_name}"
raise ValueError(msg)
return [self._file_reference_factory.build_from_mapping(mapping=item) for item in value]
def build_dify_llm_file_saver(
*,

View File

@ -117,7 +117,7 @@ class SQLAlchemyExecutionExtraContentRepository(ExecutionExtraContentRepository)
definition_payload["expiration_time"] = form.expiration_time
form_definition = FormDefinition.model_validate(definition_payload)
except ValueError:
logger.warning("Failed to load form definition for HumanInputContent(id=%s)", model.id)
logger.warning("Failed to load form definition for HumanInputContent(id=%s)", model.id, exc_info=True)
return None
node_title = form_definition.node_title or form.node_id
display_in_ui = bool(form_definition.display_in_ui)
@ -125,22 +125,20 @@ class SQLAlchemyExecutionExtraContentRepository(ExecutionExtraContentRepository)
submitted = form.submitted_at is not None or form.status == HumanInputFormStatus.SUBMITTED
if not submitted:
form_token = self._resolve_form_token(recipients_by_form_id.get(form.id, []))
return HumanInputContentDomainModel(
workflow_run_id=model.workflow_run_id,
submitted=False,
form_definition=HumanInputFormDefinition(
form_id=form.id,
node_id=form.node_id,
node_title=node_title,
form_content=form.rendered_content,
inputs=form_definition.inputs,
actions=form_definition.user_actions,
display_in_ui=display_in_ui,
form_token=form_token,
resolved_default_values=form_definition.default_values,
expiration_time=int(form.expiration_time.timestamp()),
),
)
else:
form_token = None
form_definition_domain_model = HumanInputFormDefinition(
form_id=form.id,
node_id=form.node_id,
node_title=node_title,
form_content=form.rendered_content,
inputs=form_definition.inputs,
actions=form_definition.user_actions,
display_in_ui=display_in_ui,
form_token=form_token,
resolved_default_values=form_definition.default_values,
expiration_time=int(form.expiration_time.timestamp()),
)
selected_action_id = form.selected_action_id
if not selected_action_id:
@ -164,17 +162,20 @@ class SQLAlchemyExecutionExtraContentRepository(ExecutionExtraContentRepository)
form.rendered_content,
submitted_data,
_extract_output_field_names(form_definition.form_content),
form_definition.inputs,
)
return HumanInputContentDomainModel(
workflow_run_id=model.workflow_run_id,
submitted=True,
submitted=submitted,
form_definition=form_definition_domain_model,
form_submission_data=HumanInputFormSubmissionData(
node_id=form.node_id,
node_title=node_title,
rendered_content=rendered_content,
action_id=selected_action_id,
action_text=action_text,
submitted_data=submitted_data,
),
)

View File

@ -55,7 +55,7 @@ from graphon.node_events import NodeRunResult
from graphon.nodes import BuiltinNodeTypes
from graphon.nodes.base.node import Node
from graphon.nodes.http_request import HTTP_REQUEST_CONFIG_FILTER_KEY, build_http_request_config
from graphon.nodes.human_input.entities import HumanInputNodeData, validate_human_input_submission
from graphon.nodes.human_input.entities import HumanInputNodeData
from graphon.nodes.human_input.enums import HumanInputFormKind
from graphon.nodes.human_input.human_input_node import HumanInputNode
from graphon.nodes.start.entities import StartNodeData
@ -78,6 +78,7 @@ from services.errors.app import (
WorkflowHashNotEqualError,
WorkflowNotFoundError,
)
from services.human_input_service import HumanInputService
from services.workflow.workflow_converter import WorkflowConverter
from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError
@ -1047,18 +1048,22 @@ class WorkflowService:
)
node_data = node.node_data
validate_human_input_submission(
inputs=node_data.inputs,
user_actions=node_data.user_actions,
human_input_service = HumanInputService(session_factory=sessionmaker(db.engine))
normalized_form_inputs = human_input_service.validate_and_normalize_submission(
tenant_id=app_model.tenant_id,
form_definition=node_data,
selected_action_id=action,
form_data=form_inputs,
)
rendered_content = node.render_form_content_before_submission()
outputs: dict[str, Any] = dict(form_inputs)
outputs: dict[str, Any] = dict(normalized_form_inputs)
outputs["__action_id"] = action
outputs["__rendered_content"] = node.render_form_content_with_outputs(
rendered_content, outputs, node_data.outputs_field_names()
rendered_content,
outputs,
node_data.outputs_field_names(),
node_data.inputs,
)
enclosing_node_type_and_id = draft_workflow.get_enclosing_node_type_and_id(node_config)

View File

@ -5,7 +5,7 @@ from datetime import timedelta
from decimal import Decimal
from uuid import uuid4
from graphon.nodes.human_input.entities import FormDefinition, UserAction
from graphon.nodes.human_input.entities import FormDefinition, UserActionConfig
from libs.datetime_utils import naive_utc_now
from models.account import Account, Tenant, TenantAccountJoin
from models.enums import ConversationFromSource, InvokeFrom
@ -116,7 +116,7 @@ def create_human_input_message_fixture(db_session) -> HumanInputMessageFixture:
form_definition = FormDefinition(
form_content="content",
inputs=[],
user_actions=[UserAction(id=action_id, title=action_text)],
user_actions=[UserActionConfig(id=action_id, title=action_text)],
rendered_content="Rendered block",
expiration_time=naive_utc_now() + timedelta(days=1),
node_title=node_title,