mirror of https://github.com/langgenius/dify.git
feat(api): implement truncation for SSE events
This commit is contained in:
parent
6b9d2e98b9
commit
621b75b343
|
|
@ -54,6 +54,7 @@ from models import (
|
|||
Account,
|
||||
EndUser,
|
||||
)
|
||||
from services.variable_truncator import VariableTruncator
|
||||
|
||||
|
||||
class WorkflowResponseConverter:
|
||||
|
|
@ -65,6 +66,7 @@ class WorkflowResponseConverter:
|
|||
) -> None:
|
||||
self._application_generate_entity = application_generate_entity
|
||||
self._user = user
|
||||
self._truncator = VariableTruncator.default()
|
||||
|
||||
def workflow_start_to_stream_response(
|
||||
self,
|
||||
|
|
@ -156,7 +158,8 @@ class WorkflowResponseConverter:
|
|||
title=workflow_node_execution.title,
|
||||
index=workflow_node_execution.index,
|
||||
predecessor_node_id=workflow_node_execution.predecessor_node_id,
|
||||
inputs=workflow_node_execution.inputs,
|
||||
inputs=workflow_node_execution.get_response_inputs(),
|
||||
inputs_truncated=workflow_node_execution.inputs_truncated,
|
||||
created_at=int(workflow_node_execution.created_at.timestamp()),
|
||||
parallel_id=event.parallel_id,
|
||||
parallel_start_node_id=event.parallel_start_node_id,
|
||||
|
|
@ -210,9 +213,12 @@ class WorkflowResponseConverter:
|
|||
index=workflow_node_execution.index,
|
||||
title=workflow_node_execution.title,
|
||||
predecessor_node_id=workflow_node_execution.predecessor_node_id,
|
||||
inputs=workflow_node_execution.inputs,
|
||||
process_data=workflow_node_execution.process_data,
|
||||
outputs=json_converter.to_json_encodable(workflow_node_execution.outputs),
|
||||
inputs=workflow_node_execution.get_response_inputs(),
|
||||
inputs_truncated=workflow_node_execution.inputs_truncated,
|
||||
process_data=workflow_node_execution.get_response_process_data(),
|
||||
process_data_truncated=workflow_node_execution.process_data_truncated,
|
||||
outputs=json_converter.to_json_encodable(workflow_node_execution.get_response_outputs()),
|
||||
outputs_truncated=workflow_node_execution.outputs_truncated,
|
||||
status=workflow_node_execution.status,
|
||||
error=workflow_node_execution.error,
|
||||
elapsed_time=workflow_node_execution.elapsed_time,
|
||||
|
|
@ -255,9 +261,12 @@ class WorkflowResponseConverter:
|
|||
index=workflow_node_execution.index,
|
||||
title=workflow_node_execution.title,
|
||||
predecessor_node_id=workflow_node_execution.predecessor_node_id,
|
||||
inputs=workflow_node_execution.inputs,
|
||||
process_data=workflow_node_execution.process_data,
|
||||
outputs=json_converter.to_json_encodable(workflow_node_execution.outputs),
|
||||
inputs=workflow_node_execution.get_response_inputs(),
|
||||
inputs_truncated=workflow_node_execution.inputs_truncated,
|
||||
process_data=workflow_node_execution.get_response_process_data(),
|
||||
process_data_truncated=workflow_node_execution.process_data_truncated,
|
||||
outputs=json_converter.to_json_encodable(workflow_node_execution.get_response_outputs()),
|
||||
outputs_truncated=workflow_node_execution.outputs_truncated,
|
||||
status=workflow_node_execution.status,
|
||||
error=workflow_node_execution.error,
|
||||
elapsed_time=workflow_node_execution.elapsed_time,
|
||||
|
|
@ -326,6 +335,7 @@ class WorkflowResponseConverter:
|
|||
workflow_execution_id: str,
|
||||
event: QueueIterationStartEvent,
|
||||
) -> IterationNodeStartStreamResponse:
|
||||
new_inputs, truncated = self._truncator.truncate_io_mapping(event.inputs or {})
|
||||
return IterationNodeStartStreamResponse(
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_execution_id,
|
||||
|
|
@ -336,7 +346,8 @@ class WorkflowResponseConverter:
|
|||
title=event.node_data.title,
|
||||
created_at=int(time.time()),
|
||||
extras={},
|
||||
inputs=event.inputs or {},
|
||||
inputs=new_inputs,
|
||||
inputs_truncated=truncated,
|
||||
metadata=event.metadata or {},
|
||||
parallel_id=event.parallel_id,
|
||||
parallel_start_node_id=event.parallel_start_node_id,
|
||||
|
|
@ -359,7 +370,9 @@ class WorkflowResponseConverter:
|
|||
node_type=event.node_type.value,
|
||||
title=event.node_data.title,
|
||||
index=event.index,
|
||||
pre_iteration_output=event.output,
|
||||
# The `pre_iteration_output` field is not utilized by the frontend.
|
||||
# Previously, it was assigned the value of `event.output`.
|
||||
pre_iteration_output={},
|
||||
created_at=int(time.time()),
|
||||
extras={},
|
||||
parallel_id=event.parallel_id,
|
||||
|
|
@ -377,6 +390,11 @@ class WorkflowResponseConverter:
|
|||
event: QueueIterationCompletedEvent,
|
||||
) -> IterationNodeCompletedStreamResponse:
|
||||
json_converter = WorkflowRuntimeTypeConverter()
|
||||
|
||||
new_inputs, inputs_truncated = self._truncator.truncate_io_mapping(event.inputs or {})
|
||||
new_outputs, outputs_truncated = self._truncator.truncate_io_mapping(
|
||||
json_converter.to_json_encodable(event.outputs) or {}
|
||||
)
|
||||
return IterationNodeCompletedStreamResponse(
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_execution_id,
|
||||
|
|
@ -385,10 +403,12 @@ class WorkflowResponseConverter:
|
|||
node_id=event.node_id,
|
||||
node_type=event.node_type.value,
|
||||
title=event.node_data.title,
|
||||
outputs=json_converter.to_json_encodable(event.outputs),
|
||||
outputs=new_outputs,
|
||||
outputs_truncated=outputs_truncated,
|
||||
created_at=int(time.time()),
|
||||
extras={},
|
||||
inputs=event.inputs or {},
|
||||
inputs=new_inputs,
|
||||
inputs_truncated=inputs_truncated,
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED
|
||||
if event.error is None
|
||||
else WorkflowNodeExecutionStatus.FAILED,
|
||||
|
|
@ -406,6 +426,7 @@ class WorkflowResponseConverter:
|
|||
def workflow_loop_start_to_stream_response(
|
||||
self, *, task_id: str, workflow_execution_id: str, event: QueueLoopStartEvent
|
||||
) -> LoopNodeStartStreamResponse:
|
||||
new_inputs, truncated = self._truncator.truncate_io_mapping(event.inputs or {})
|
||||
return LoopNodeStartStreamResponse(
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_execution_id,
|
||||
|
|
@ -416,7 +437,8 @@ class WorkflowResponseConverter:
|
|||
title=event.node_data.title,
|
||||
created_at=int(time.time()),
|
||||
extras={},
|
||||
inputs=event.inputs or {},
|
||||
inputs=new_inputs,
|
||||
inputs_truncated=truncated,
|
||||
metadata=event.metadata or {},
|
||||
parallel_id=event.parallel_id,
|
||||
parallel_start_node_id=event.parallel_start_node_id,
|
||||
|
|
@ -439,7 +461,9 @@ class WorkflowResponseConverter:
|
|||
node_type=event.node_type.value,
|
||||
title=event.node_data.title,
|
||||
index=event.index,
|
||||
pre_loop_output=event.output,
|
||||
# The `pre_loop_output` field is not utilized by the frontend.
|
||||
# Previously, it was assigned the value of `event.output`.
|
||||
pre_loop_output={},
|
||||
created_at=int(time.time()),
|
||||
extras={},
|
||||
parallel_id=event.parallel_id,
|
||||
|
|
@ -456,6 +480,11 @@ class WorkflowResponseConverter:
|
|||
workflow_execution_id: str,
|
||||
event: QueueLoopCompletedEvent,
|
||||
) -> LoopNodeCompletedStreamResponse:
|
||||
json_converter = WorkflowRuntimeTypeConverter()
|
||||
new_inputs, inputs_truncated = self._truncator.truncate_io_mapping(event.inputs or {})
|
||||
new_outputs, outputs_truncated = self._truncator.truncate_io_mapping(
|
||||
json_converter.to_json_encodable(event.outputs) or {}
|
||||
)
|
||||
return LoopNodeCompletedStreamResponse(
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_execution_id,
|
||||
|
|
@ -464,10 +493,12 @@ class WorkflowResponseConverter:
|
|||
node_id=event.node_id,
|
||||
node_type=event.node_type.value,
|
||||
title=event.node_data.title,
|
||||
outputs=WorkflowRuntimeTypeConverter().to_json_encodable(event.outputs),
|
||||
outputs=new_outputs,
|
||||
outputs_truncated=outputs_truncated,
|
||||
created_at=int(time.time()),
|
||||
extras={},
|
||||
inputs=event.inputs or {},
|
||||
inputs=new_inputs,
|
||||
inputs_truncated=inputs_truncated,
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED
|
||||
if event.error is None
|
||||
else WorkflowNodeExecutionStatus.FAILED,
|
||||
|
|
|
|||
|
|
@ -260,6 +260,7 @@ class NodeStartStreamResponse(StreamResponse):
|
|||
index: int
|
||||
predecessor_node_id: Optional[str] = None
|
||||
inputs: Optional[Mapping[str, Any]] = None
|
||||
inputs_truncated: bool = False
|
||||
created_at: int
|
||||
extras: dict = Field(default_factory=dict)
|
||||
parallel_id: Optional[str] = None
|
||||
|
|
@ -317,8 +318,11 @@ class NodeFinishStreamResponse(StreamResponse):
|
|||
index: int
|
||||
predecessor_node_id: Optional[str] = None
|
||||
inputs: Optional[Mapping[str, Any]] = None
|
||||
inputs_truncated: bool = False
|
||||
process_data: Optional[Mapping[str, Any]] = None
|
||||
process_data_truncated: bool = False
|
||||
outputs: Optional[Mapping[str, Any]] = None
|
||||
outputs_truncated: bool = True
|
||||
status: str
|
||||
error: Optional[str] = None
|
||||
elapsed_time: float
|
||||
|
|
@ -386,8 +390,11 @@ class NodeRetryStreamResponse(StreamResponse):
|
|||
index: int
|
||||
predecessor_node_id: Optional[str] = None
|
||||
inputs: Optional[Mapping[str, Any]] = None
|
||||
inputs_truncated: bool = False
|
||||
process_data: Optional[Mapping[str, Any]] = None
|
||||
process_data_truncated: bool = False
|
||||
outputs: Optional[Mapping[str, Any]] = None
|
||||
outputs_truncated: bool = False
|
||||
status: str
|
||||
error: Optional[str] = None
|
||||
elapsed_time: float
|
||||
|
|
@ -506,6 +513,7 @@ class IterationNodeStartStreamResponse(StreamResponse):
|
|||
extras: dict = Field(default_factory=dict)
|
||||
metadata: Mapping = {}
|
||||
inputs: Mapping = {}
|
||||
inputs_truncated: bool = False
|
||||
parallel_id: Optional[str] = None
|
||||
parallel_start_node_id: Optional[str] = None
|
||||
|
||||
|
|
@ -557,9 +565,11 @@ class IterationNodeCompletedStreamResponse(StreamResponse):
|
|||
node_type: str
|
||||
title: str
|
||||
outputs: Optional[Mapping] = None
|
||||
outputs_truncated: bool = False
|
||||
created_at: int
|
||||
extras: Optional[dict] = None
|
||||
inputs: Optional[Mapping] = None
|
||||
inputs_truncated: bool = False
|
||||
status: WorkflowNodeExecutionStatus
|
||||
error: Optional[str] = None
|
||||
elapsed_time: float
|
||||
|
|
@ -593,6 +603,7 @@ class LoopNodeStartStreamResponse(StreamResponse):
|
|||
extras: dict = Field(default_factory=dict)
|
||||
metadata: Mapping = {}
|
||||
inputs: Mapping = {}
|
||||
inputs_truncated: bool = False
|
||||
parallel_id: Optional[str] = None
|
||||
parallel_start_node_id: Optional[str] = None
|
||||
|
||||
|
|
@ -644,9 +655,11 @@ class LoopNodeCompletedStreamResponse(StreamResponse):
|
|||
node_type: str
|
||||
title: str
|
||||
outputs: Optional[Mapping] = None
|
||||
outputs_truncated: bool = False
|
||||
created_at: int
|
||||
extras: Optional[dict] = None
|
||||
inputs: Optional[Mapping] = None
|
||||
inputs_truncated: bool = False
|
||||
status: WorkflowNodeExecutionStatus
|
||||
error: Optional[str] = None
|
||||
elapsed_time: float
|
||||
|
|
|
|||
Loading…
Reference in New Issue