diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index bd685d5189..27c7d19e85 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -388,17 +388,19 @@ class WorkflowResponseConverter: self, *, event: QueueHumanInputFormFilledEvent, task_id: str ) -> HumanInputFormFilledResponse: run_id = self._ensure_workflow_run_id() - return HumanInputFormFilledResponse( - task_id=task_id, - workflow_run_id=run_id, - data=HumanInputFormFilledResponse.Data( - node_id=event.node_id, - node_title=event.node_title, - rendered_content=event.rendered_content, - action_id=event.action_id, - action_text=event.action_text, - ), + data = HumanInputFormFilledResponse.Data( + node_id=event.node_id, + node_title=event.node_title, + rendered_content=event.rendered_content, + action_id=event.action_id, + action_text=event.action_text, ) + if event.submitted_data is not None: + runtime_type_converter = WorkflowRuntimeTypeConverter() + + data.submitted_data = runtime_type_converter.value_to_json_encodable_recursive(event.submitted_data) + + return HumanInputFormFilledResponse(task_id=task_id, workflow_run_id=run_id, data=data) def human_input_form_timeout_to_stream_response( self, *, event: QueueHumanInputFormTimeoutEvent, task_id: str diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 047b54c86c..03707a6477 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -432,6 +432,7 @@ class WorkflowBasedAppRunner: rendered_content=event.rendered_content, action_id=event.action_id, action_text=event.action_text, + submitted_data=event.submitted_data, ) ) elif isinstance(event, NodeRunHumanInputFormTimeoutEvent): diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 221b7fb058..a0e7881ede 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -11,6 +11,7 @@ from graphon.entities import WorkflowStartReason from graphon.entities.pause_reason import PauseReason from graphon.enums import NodeType, WorkflowNodeExecutionMetadataKey from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk +from graphon.variables.segments import Segment class QueueEvent(StrEnum): @@ -508,6 +509,10 @@ class QueueHumanInputFormFilledEvent(AppQueueEvent): action_id: str action_text: str + # Keep the field name aligned with Graphon so the app-layer bridge does not + # need to translate between two equivalent payload names. + submitted_data: Mapping[str, Segment] | None = None + class QueueHumanInputFormTimeoutEvent(AppQueueEvent): """ diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 6e4ca69cf0..090c4d6fec 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -9,7 +9,7 @@ from core.rag.entities import RetrievalSourceMetadata from graphon.entities import WorkflowStartReason from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from graphon.model_runtime.entities.llm_entities import LLMResult, LLMUsage -from graphon.nodes.human_input.entities import FormInput, UserAction +from graphon.nodes.human_input.entities import FormInputConfig, UserActionConfig class AnnotationReplyAccount(BaseModel): @@ -283,8 +283,8 @@ class HumanInputRequiredResponse(StreamResponse): node_id: str node_title: str form_content: str - inputs: Sequence[FormInput] = Field(default_factory=list) - actions: Sequence[UserAction] = Field(default_factory=list) + inputs: Sequence[FormInputConfig] = Field(default_factory=list) + actions: Sequence[UserActionConfig] = Field(default_factory=list) display_in_ui: bool = False form_token: str | None = None resolved_default_values: Mapping[str, Any] = Field(default_factory=dict) @@ -307,6 +307,8 @@ class HumanInputFormFilledResponse(StreamResponse): action_id: str action_text: str + submitted_data: Mapping[str, Any] | None = None + event: StreamEvent = StreamEvent.HUMAN_INPUT_FORM_FILLED workflow_run_id: str data: Data diff --git a/api/core/entities/execution_extra_content.py b/api/core/entities/execution_extra_content.py index f11c670069..43252ccb2c 100644 --- a/api/core/entities/execution_extra_content.py +++ b/api/core/entities/execution_extra_content.py @@ -3,7 +3,7 @@ from __future__ import annotations from collections.abc import Mapping, Sequence from typing import Any, TypeAlias -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, JsonValue from graphon.nodes.human_input.entities import FormInputConfig, UserActionConfig from models.execution_extra_content import ExecutionContentType @@ -19,6 +19,8 @@ class HumanInputFormDefinition(BaseModel): inputs: Sequence[FormInputConfig] = Field(default_factory=list) actions: Sequence[UserActionConfig] = Field(default_factory=list) display_in_ui: bool = False + + # `form_token` is `None` if the corresponding form has been submitted. form_token: str | None = None resolved_default_values: Mapping[str, Any] = Field(default_factory=dict) expiration_time: int @@ -29,16 +31,31 @@ class HumanInputFormSubmissionData(BaseModel): node_id: str node_title: str + + # deprecate: the rendered_content is deprecated and only for historical reasons. rendered_content: str + + # The identifier of action user has chosen. action_id: str + # The button text of the action user has chosen. action_text: str + # submitted_data records the submitted form data. + # Keys correspond to `output_variable_name` of HumanInput inputs. + # Values are serialized JSON forms of runtime values, including file dictionaries. + # + # For form submitted before this field is introduced, this field is populated from + # the stored submission data. + submitted_data: Mapping[str, JsonValue] | None = None + class HumanInputContent(BaseModel): model_config = ConfigDict(frozen=True) workflow_run_id: str submitted: bool + # Both the form_defintion and the form_submission_data are present in + # HumanInputContent. For historical records, the form_definition: HumanInputFormDefinition | None = None form_submission_data: HumanInputFormSubmissionData | None = None type: ExecutionContentType = Field(default=ExecutionContentType.HUMAN_INPUT) diff --git a/api/core/workflow/node_runtime.py b/api/core/workflow/node_runtime.py index b8725853c4..c6cb217dd1 100644 --- a/api/core/workflow/node_runtime.py +++ b/api/core/workflow/node_runtime.py @@ -42,7 +42,12 @@ from graphon.model_runtime.entities.llm_entities import ( from graphon.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool from graphon.model_runtime.entities.model_entities import AIModelEntity from graphon.model_runtime.model_providers.base.large_language_model import LargeLanguageModel -from graphon.nodes.human_input.entities import HumanInputNodeData +from graphon.nodes.human_input.entities import ( + FileInputConfig, + FileListInputConfig, + FormInputConfig, + HumanInputNodeData, +) from graphon.nodes.llm.runtime_protocols import ( PreparedLLMProtocol, PromptMessageSerializerProtocol, @@ -625,6 +630,7 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol): self._run_context = resolve_dify_run_context(run_context) self._workflow_execution_id_getter = workflow_execution_id_getter self._form_repository = form_repository + self._file_reference_factory = DifyFileReferenceFactory(self._run_context) def _invoke_source(self) -> str: invoke_from = self._run_context.invoke_from @@ -678,6 +684,23 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol): repo = self.build_form_repository() return repo.get_form(node_id) + def restore_submitted_data( + self, + *, + node_data: HumanInputNodeData, + submitted_data: Mapping[str, Any], + ) -> Mapping[str, Any]: + restored_data: dict[str, Any] = dict(submitted_data) + for input_config in node_data.inputs: + output_variable_name = input_config.output_variable_name + if output_variable_name not in submitted_data: + continue + restored_data[output_variable_name] = self._restore_submitted_value( + input_config=input_config, + value=submitted_data[output_variable_name], + ) + return restored_data + def create_form( self, *, @@ -698,6 +721,55 @@ class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol): ) return repo.create_form(params) + def _restore_submitted_value( + self, + *, + input_config: FormInputConfig, + value: Any, + ) -> Any: + if isinstance(input_config, FileInputConfig): + return self._restore_submitted_file_value( + output_variable_name=input_config.output_variable_name, + value=value, + ) + if isinstance(input_config, FileListInputConfig): + return self._restore_submitted_file_list_value( + output_variable_name=input_config.output_variable_name, + value=value, + ) + return value + + def _restore_submitted_file_value( + self, + *, + output_variable_name: str, + value: Any, + ) -> Any: + if not isinstance(value, Mapping): + msg = ( + "HumanInput file submission must be persisted as a mapping, " + f"output_variable_name={output_variable_name}" + ) + raise ValueError(msg) + return self._file_reference_factory.build_from_mapping(mapping=value) + + def _restore_submitted_file_list_value( + self, + *, + output_variable_name: str, + value: Any, + ) -> list[Any]: + if not isinstance(value, list): + msg = ( + "HumanInput file-list submission must be persisted as a list, " + f"output_variable_name={output_variable_name}" + ) + raise ValueError(msg) + if any(not isinstance(item, Mapping) for item in value): + msg = f"HumanInput file-list submission must contain mappings, output_variable_name={output_variable_name}" + raise ValueError(msg) + return [self._file_reference_factory.build_from_mapping(mapping=item) for item in value] + def build_dify_llm_file_saver( *, diff --git a/api/repositories/sqlalchemy_execution_extra_content_repository.py b/api/repositories/sqlalchemy_execution_extra_content_repository.py index 67f8795d3f..f695fd39d9 100644 --- a/api/repositories/sqlalchemy_execution_extra_content_repository.py +++ b/api/repositories/sqlalchemy_execution_extra_content_repository.py @@ -117,7 +117,7 @@ class SQLAlchemyExecutionExtraContentRepository(ExecutionExtraContentRepository) definition_payload["expiration_time"] = form.expiration_time form_definition = FormDefinition.model_validate(definition_payload) except ValueError: - logger.warning("Failed to load form definition for HumanInputContent(id=%s)", model.id) + logger.warning("Failed to load form definition for HumanInputContent(id=%s)", model.id, exc_info=True) return None node_title = form_definition.node_title or form.node_id display_in_ui = bool(form_definition.display_in_ui) @@ -125,22 +125,20 @@ class SQLAlchemyExecutionExtraContentRepository(ExecutionExtraContentRepository) submitted = form.submitted_at is not None or form.status == HumanInputFormStatus.SUBMITTED if not submitted: form_token = self._resolve_form_token(recipients_by_form_id.get(form.id, [])) - return HumanInputContentDomainModel( - workflow_run_id=model.workflow_run_id, - submitted=False, - form_definition=HumanInputFormDefinition( - form_id=form.id, - node_id=form.node_id, - node_title=node_title, - form_content=form.rendered_content, - inputs=form_definition.inputs, - actions=form_definition.user_actions, - display_in_ui=display_in_ui, - form_token=form_token, - resolved_default_values=form_definition.default_values, - expiration_time=int(form.expiration_time.timestamp()), - ), - ) + else: + form_token = None + form_definition_domain_model = HumanInputFormDefinition( + form_id=form.id, + node_id=form.node_id, + node_title=node_title, + form_content=form.rendered_content, + inputs=form_definition.inputs, + actions=form_definition.user_actions, + display_in_ui=display_in_ui, + form_token=form_token, + resolved_default_values=form_definition.default_values, + expiration_time=int(form.expiration_time.timestamp()), + ) selected_action_id = form.selected_action_id if not selected_action_id: @@ -164,17 +162,20 @@ class SQLAlchemyExecutionExtraContentRepository(ExecutionExtraContentRepository) form.rendered_content, submitted_data, _extract_output_field_names(form_definition.form_content), + form_definition.inputs, ) return HumanInputContentDomainModel( workflow_run_id=model.workflow_run_id, - submitted=True, + submitted=submitted, + form_definition=form_definition_domain_model, form_submission_data=HumanInputFormSubmissionData( node_id=form.node_id, node_title=node_title, rendered_content=rendered_content, action_id=selected_action_id, action_text=action_text, + submitted_data=submitted_data, ), ) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index d4b9095ce5..71a9724edd 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -55,7 +55,7 @@ from graphon.node_events import NodeRunResult from graphon.nodes import BuiltinNodeTypes from graphon.nodes.base.node import Node from graphon.nodes.http_request import HTTP_REQUEST_CONFIG_FILTER_KEY, build_http_request_config -from graphon.nodes.human_input.entities import HumanInputNodeData, validate_human_input_submission +from graphon.nodes.human_input.entities import HumanInputNodeData from graphon.nodes.human_input.enums import HumanInputFormKind from graphon.nodes.human_input.human_input_node import HumanInputNode from graphon.nodes.start.entities import StartNodeData @@ -78,6 +78,7 @@ from services.errors.app import ( WorkflowHashNotEqualError, WorkflowNotFoundError, ) +from services.human_input_service import HumanInputService from services.workflow.workflow_converter import WorkflowConverter from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError @@ -1047,18 +1048,22 @@ class WorkflowService: ) node_data = node.node_data - validate_human_input_submission( - inputs=node_data.inputs, - user_actions=node_data.user_actions, + human_input_service = HumanInputService(session_factory=sessionmaker(db.engine)) + normalized_form_inputs = human_input_service.validate_and_normalize_submission( + tenant_id=app_model.tenant_id, + form_definition=node_data, selected_action_id=action, form_data=form_inputs, ) rendered_content = node.render_form_content_before_submission() - outputs: dict[str, Any] = dict(form_inputs) + outputs: dict[str, Any] = dict(normalized_form_inputs) outputs["__action_id"] = action outputs["__rendered_content"] = node.render_form_content_with_outputs( - rendered_content, outputs, node_data.outputs_field_names() + rendered_content, + outputs, + node_data.outputs_field_names(), + node_data.inputs, ) enclosing_node_type_and_id = draft_workflow.get_enclosing_node_type_and_id(node_config) diff --git a/api/tests/test_containers_integration_tests/helpers/execution_extra_content.py b/api/tests/test_containers_integration_tests/helpers/execution_extra_content.py index 2fd289dfbc..2a1638d126 100644 --- a/api/tests/test_containers_integration_tests/helpers/execution_extra_content.py +++ b/api/tests/test_containers_integration_tests/helpers/execution_extra_content.py @@ -5,7 +5,7 @@ from datetime import timedelta from decimal import Decimal from uuid import uuid4 -from graphon.nodes.human_input.entities import FormDefinition, UserAction +from graphon.nodes.human_input.entities import FormDefinition, UserActionConfig from libs.datetime_utils import naive_utc_now from models.account import Account, Tenant, TenantAccountJoin from models.enums import ConversationFromSource, InvokeFrom @@ -116,7 +116,7 @@ def create_human_input_message_fixture(db_session) -> HumanInputMessageFixture: form_definition = FormDefinition( form_content="content", inputs=[], - user_actions=[UserAction(id=action_id, title=action_text)], + user_actions=[UserActionConfig(id=action_id, title=action_text)], rendered_content="Rendered block", expiration_time=naive_utc_now() + timedelta(days=1), node_title=node_title,