From 621b75b3433f84d3e51237d568ba46bda6bd80f0 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Fri, 29 Aug 2025 14:49:58 +0800 Subject: [PATCH] feat(api): implement truncation for SSE events --- .../common/workflow_response_converter.py | 61 ++++++++++++++----- api/core/app/entities/task_entities.py | 13 ++++ 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index c8760d3cf0..f94f090cdf 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -54,6 +54,7 @@ from models import ( Account, EndUser, ) +from services.variable_truncator import VariableTruncator class WorkflowResponseConverter: @@ -65,6 +66,7 @@ class WorkflowResponseConverter: ) -> None: self._application_generate_entity = application_generate_entity self._user = user + self._truncator = VariableTruncator.default() def workflow_start_to_stream_response( self, @@ -156,7 +158,8 @@ class WorkflowResponseConverter: title=workflow_node_execution.title, index=workflow_node_execution.index, predecessor_node_id=workflow_node_execution.predecessor_node_id, - inputs=workflow_node_execution.inputs, + inputs=workflow_node_execution.get_response_inputs(), + inputs_truncated=workflow_node_execution.inputs_truncated, created_at=int(workflow_node_execution.created_at.timestamp()), parallel_id=event.parallel_id, parallel_start_node_id=event.parallel_start_node_id, @@ -210,9 +213,12 @@ class WorkflowResponseConverter: index=workflow_node_execution.index, title=workflow_node_execution.title, predecessor_node_id=workflow_node_execution.predecessor_node_id, - inputs=workflow_node_execution.inputs, - process_data=workflow_node_execution.process_data, - outputs=json_converter.to_json_encodable(workflow_node_execution.outputs), + inputs=workflow_node_execution.get_response_inputs(), + inputs_truncated=workflow_node_execution.inputs_truncated, + process_data=workflow_node_execution.get_response_process_data(), + process_data_truncated=workflow_node_execution.process_data_truncated, + outputs=json_converter.to_json_encodable(workflow_node_execution.get_response_outputs()), + outputs_truncated=workflow_node_execution.outputs_truncated, status=workflow_node_execution.status, error=workflow_node_execution.error, elapsed_time=workflow_node_execution.elapsed_time, @@ -255,9 +261,12 @@ class WorkflowResponseConverter: index=workflow_node_execution.index, title=workflow_node_execution.title, predecessor_node_id=workflow_node_execution.predecessor_node_id, - inputs=workflow_node_execution.inputs, - process_data=workflow_node_execution.process_data, - outputs=json_converter.to_json_encodable(workflow_node_execution.outputs), + inputs=workflow_node_execution.get_response_inputs(), + inputs_truncated=workflow_node_execution.inputs_truncated, + process_data=workflow_node_execution.get_response_process_data(), + process_data_truncated=workflow_node_execution.process_data_truncated, + outputs=json_converter.to_json_encodable(workflow_node_execution.get_response_outputs()), + outputs_truncated=workflow_node_execution.outputs_truncated, status=workflow_node_execution.status, error=workflow_node_execution.error, elapsed_time=workflow_node_execution.elapsed_time, @@ -326,6 +335,7 @@ class WorkflowResponseConverter: workflow_execution_id: str, event: QueueIterationStartEvent, ) -> IterationNodeStartStreamResponse: + new_inputs, truncated = self._truncator.truncate_io_mapping(event.inputs or {}) return IterationNodeStartStreamResponse( task_id=task_id, workflow_run_id=workflow_execution_id, @@ -336,7 +346,8 @@ class WorkflowResponseConverter: title=event.node_data.title, created_at=int(time.time()), extras={}, - inputs=event.inputs or {}, + inputs=new_inputs, + inputs_truncated=truncated, metadata=event.metadata or {}, parallel_id=event.parallel_id, parallel_start_node_id=event.parallel_start_node_id, @@ -359,7 +370,9 @@ class WorkflowResponseConverter: node_type=event.node_type.value, title=event.node_data.title, index=event.index, - pre_iteration_output=event.output, + # The `pre_iteration_output` field is not utilized by the frontend. + # Previously, it was assigned the value of `event.output`. + pre_iteration_output={}, created_at=int(time.time()), extras={}, parallel_id=event.parallel_id, @@ -377,6 +390,11 @@ class WorkflowResponseConverter: event: QueueIterationCompletedEvent, ) -> IterationNodeCompletedStreamResponse: json_converter = WorkflowRuntimeTypeConverter() + + new_inputs, inputs_truncated = self._truncator.truncate_io_mapping(event.inputs or {}) + new_outputs, outputs_truncated = self._truncator.truncate_io_mapping( + json_converter.to_json_encodable(event.outputs) or {} + ) return IterationNodeCompletedStreamResponse( task_id=task_id, workflow_run_id=workflow_execution_id, @@ -385,10 +403,12 @@ class WorkflowResponseConverter: node_id=event.node_id, node_type=event.node_type.value, title=event.node_data.title, - outputs=json_converter.to_json_encodable(event.outputs), + outputs=new_outputs, + outputs_truncated=outputs_truncated, created_at=int(time.time()), extras={}, - inputs=event.inputs or {}, + inputs=new_inputs, + inputs_truncated=inputs_truncated, status=WorkflowNodeExecutionStatus.SUCCEEDED if event.error is None else WorkflowNodeExecutionStatus.FAILED, @@ -406,6 +426,7 @@ class WorkflowResponseConverter: def workflow_loop_start_to_stream_response( self, *, task_id: str, workflow_execution_id: str, event: QueueLoopStartEvent ) -> LoopNodeStartStreamResponse: + new_inputs, truncated = self._truncator.truncate_io_mapping(event.inputs or {}) return LoopNodeStartStreamResponse( task_id=task_id, workflow_run_id=workflow_execution_id, @@ -416,7 +437,8 @@ class WorkflowResponseConverter: title=event.node_data.title, created_at=int(time.time()), extras={}, - inputs=event.inputs or {}, + inputs=new_inputs, + inputs_truncated=truncated, metadata=event.metadata or {}, parallel_id=event.parallel_id, parallel_start_node_id=event.parallel_start_node_id, @@ -439,7 +461,9 @@ class WorkflowResponseConverter: node_type=event.node_type.value, title=event.node_data.title, index=event.index, - pre_loop_output=event.output, + # The `pre_loop_output` field is not utilized by the frontend. + # Previously, it was assigned the value of `event.output`. + pre_loop_output={}, created_at=int(time.time()), extras={}, parallel_id=event.parallel_id, @@ -456,6 +480,11 @@ class WorkflowResponseConverter: workflow_execution_id: str, event: QueueLoopCompletedEvent, ) -> LoopNodeCompletedStreamResponse: + json_converter = WorkflowRuntimeTypeConverter() + new_inputs, inputs_truncated = self._truncator.truncate_io_mapping(event.inputs or {}) + new_outputs, outputs_truncated = self._truncator.truncate_io_mapping( + json_converter.to_json_encodable(event.outputs) or {} + ) return LoopNodeCompletedStreamResponse( task_id=task_id, workflow_run_id=workflow_execution_id, @@ -464,10 +493,12 @@ class WorkflowResponseConverter: node_id=event.node_id, node_type=event.node_type.value, title=event.node_data.title, - outputs=WorkflowRuntimeTypeConverter().to_json_encodable(event.outputs), + outputs=new_outputs, + outputs_truncated=outputs_truncated, created_at=int(time.time()), extras={}, - inputs=event.inputs or {}, + inputs=new_inputs, + inputs_truncated=inputs_truncated, status=WorkflowNodeExecutionStatus.SUCCEEDED if event.error is None else WorkflowNodeExecutionStatus.FAILED, diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index a1c0368354..24054881e8 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -260,6 +260,7 @@ class NodeStartStreamResponse(StreamResponse): index: int predecessor_node_id: Optional[str] = None inputs: Optional[Mapping[str, Any]] = None + inputs_truncated: bool = False created_at: int extras: dict = Field(default_factory=dict) parallel_id: Optional[str] = None @@ -317,8 +318,11 @@ class NodeFinishStreamResponse(StreamResponse): index: int predecessor_node_id: Optional[str] = None inputs: Optional[Mapping[str, Any]] = None + inputs_truncated: bool = False process_data: Optional[Mapping[str, Any]] = None + process_data_truncated: bool = False outputs: Optional[Mapping[str, Any]] = None + outputs_truncated: bool = True status: str error: Optional[str] = None elapsed_time: float @@ -386,8 +390,11 @@ class NodeRetryStreamResponse(StreamResponse): index: int predecessor_node_id: Optional[str] = None inputs: Optional[Mapping[str, Any]] = None + inputs_truncated: bool = False process_data: Optional[Mapping[str, Any]] = None + process_data_truncated: bool = False outputs: Optional[Mapping[str, Any]] = None + outputs_truncated: bool = False status: str error: Optional[str] = None elapsed_time: float @@ -506,6 +513,7 @@ class IterationNodeStartStreamResponse(StreamResponse): extras: dict = Field(default_factory=dict) metadata: Mapping = {} inputs: Mapping = {} + inputs_truncated: bool = False parallel_id: Optional[str] = None parallel_start_node_id: Optional[str] = None @@ -557,9 +565,11 @@ class IterationNodeCompletedStreamResponse(StreamResponse): node_type: str title: str outputs: Optional[Mapping] = None + outputs_truncated: bool = False created_at: int extras: Optional[dict] = None inputs: Optional[Mapping] = None + inputs_truncated: bool = False status: WorkflowNodeExecutionStatus error: Optional[str] = None elapsed_time: float @@ -593,6 +603,7 @@ class LoopNodeStartStreamResponse(StreamResponse): extras: dict = Field(default_factory=dict) metadata: Mapping = {} inputs: Mapping = {} + inputs_truncated: bool = False parallel_id: Optional[str] = None parallel_start_node_id: Optional[str] = None @@ -644,9 +655,11 @@ class LoopNodeCompletedStreamResponse(StreamResponse): node_type: str title: str outputs: Optional[Mapping] = None + outputs_truncated: bool = False created_at: int extras: Optional[dict] = None inputs: Optional[Mapping] = None + inputs_truncated: bool = False status: WorkflowNodeExecutionStatus error: Optional[str] = None elapsed_time: float