feat(api): implement truncation for draft var

This commit is contained in:
QuantumGhost 2025-08-29 14:40:54 +08:00
parent d7db58cabd
commit 13eb9f7d7d
6 changed files with 358 additions and 31 deletions

View File

@ -13,6 +13,7 @@ from controllers.console.app.error import (
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.web.error import InvalidArgumentError, NotFoundError
from core.file import helpers as file_helpers
from core.variables.segment_group import SegmentGroup
from core.variables.segments import ArrayFileSegment, FileSegment, Segment
from core.variables.types import SegmentType
@ -41,6 +42,7 @@ def _convert_values_to_json_serializable_object(value: Segment) -> Any:
def _serialize_var_value(variable: WorkflowDraftVariable) -> Any:
"""Serialize variable value. If variable is truncated, return the truncated value."""
value = variable.get_value()
# create a copy of the value to avoid affecting the model cache.
value = value.model_copy(deep=True)
@ -74,6 +76,22 @@ def _serialize_variable_type(workflow_draft_var: WorkflowDraftVariable) -> str:
return value_type.exposed_type().value
def _serialize_full_content(variable: WorkflowDraftVariable) -> dict | None:
"""Serialize full_content information for large variables."""
if not variable.is_truncated():
return None
variable_file = variable.variable_file
assert variable_file is not None
return {
"size_bytes": variable_file.size,
"value_type": variable_file.value_type.exposed_type().value,
"length": variable_file.length,
"download_url": file_helpers.get_signed_file_url(variable_file.upload_file_id, as_attachment=True),
}
_WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS = {
"id": fields.String,
"type": fields.String(attribute=lambda model: model.get_variable_type()),
@ -83,11 +101,13 @@ _WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS = {
"value_type": fields.String(attribute=_serialize_variable_type),
"edited": fields.Boolean(attribute=lambda model: model.edited),
"visible": fields.Boolean,
"is_truncated": fields.Boolean(attribute=lambda model: model.file_id is not None),
}
_WORKFLOW_DRAFT_VARIABLE_FIELDS = dict(
_WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS,
value=fields.Raw(attribute=_serialize_var_value),
full_content=fields.Raw(attribute=_serialize_full_content),
)
_WORKFLOW_DRAFT_ENV_VARIABLE_FIELDS = {

View File

@ -461,7 +461,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
stream=stream,
draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from),
draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from, account=user),
)
return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)

View File

@ -14,6 +14,7 @@ from core.workflow.repositories.draft_variable_repository import (
)
from factories import file_factory
from libs.orjson import orjson_dumps
from models import Account, EndUser
from services.workflow_draft_variable_service import DraftVariableSaver as DraftVariableSaverImpl
if TYPE_CHECKING:
@ -182,8 +183,9 @@ class BaseAppGenerator:
@final
@staticmethod
def _get_draft_var_saver_factory(invoke_from: InvokeFrom) -> DraftVariableSaverFactory:
def _get_draft_var_saver_factory(invoke_from: InvokeFrom, account: Account | EndUser) -> DraftVariableSaverFactory:
if invoke_from == InvokeFrom.DEBUGGER:
assert isinstance(account, Account)
def draft_var_saver_factory(
session: Session,
@ -200,6 +202,7 @@ class BaseAppGenerator:
node_type=node_type,
node_execution_id=node_execution_id,
enclosing_node_id=enclosing_node_id,
user=account,
)
else:

View File

@ -244,9 +244,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
worker_thread.start()
draft_var_saver_factory = self._get_draft_var_saver_factory(
invoke_from,
)
draft_var_saver_factory = self._get_draft_var_saver_factory(invoke_from, user)
# return response or stream generator
response = self._handle_response(

View File

@ -1,32 +1,43 @@
import dataclasses
import json
import logging
from collections.abc import Mapping, Sequence
from concurrent.futures import ThreadPoolExecutor
from enum import StrEnum
from typing import Any, ClassVar
from sqlalchemy import Engine, orm
from sqlalchemy import Engine, orm, select
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.sql.expression import and_, or_
from configs import dify_config
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file.models import File
from core.variables import Segment, StringSegment, Variable
from core.variables.consts import SELECTORS_LENGTH
from core.variables.segments import ArrayFileSegment, FileSegment
from core.variables.segments import (
ArrayFileSegment,
FileSegment,
)
from core.variables.types import SegmentType
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, ENVIRONMENT_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from core.workflow.enums import SystemVariableKey
from core.workflow.nodes import NodeType
from core.workflow.nodes.variable_assigner.common.helpers import get_updated_variables
from core.workflow.variable_loader import VariableLoader
from extensions.ext_storage import storage
from factories.file_factory import StorageKeyLoader
from factories.variable_factory import build_segment, segment_to_variable
from libs.datetime_utils import naive_utc_now
from libs.uuid_utils import uuidv7
from models import App, Conversation
from models.account import Account
from models.enums import DraftVariableType
from models.workflow import Workflow, WorkflowDraftVariable, is_system_variable_editable
from models.workflow import Workflow, WorkflowDraftVariable, WorkflowDraftVariableFile, is_system_variable_editable
from repositories.factory import DifyAPIRepositoryFactory
from services.file_service import FileService
from services.variable_truncator import VariableTruncator
logger = logging.getLogger(__name__)
@ -37,6 +48,12 @@ class WorkflowDraftVariableList:
total: int | None = None
@dataclasses.dataclass(frozen=True)
class DraftVarFileDeletion:
draft_var_id: str
draft_var_file_id: str
class WorkflowDraftVariableError(Exception):
pass
@ -87,7 +104,26 @@ class DraftVarLoader(VariableLoader):
srv = WorkflowDraftVariableService(session)
draft_vars = srv.get_draft_variables_by_selectors(self._app_id, selectors)
# Important:
files: list[File] = []
# FileSegment and ArrayFileSegment are not subject to offloading, so their values
# can be safely accessed before any offloading logic is applied.
for draft_var in draft_vars:
value = draft_var.get_value()
if isinstance(value, FileSegment):
files.append(value.value)
elif isinstance(value, ArrayFileSegment):
files.extend(value.value)
with Session(bind=self._engine) as session:
storage_key_loader = StorageKeyLoader(session, tenant_id=self._tenant_id)
storage_key_loader.load_storage_keys(files)
offloaded_draft_vars = []
for draft_var in draft_vars:
if draft_var.is_truncated():
offloaded_draft_vars.append(draft_var)
continue
segment = draft_var.get_value()
variable = segment_to_variable(
segment=segment,
@ -99,20 +135,51 @@ class DraftVarLoader(VariableLoader):
selector_tuple = self._selector_to_tuple(variable.selector)
variable_by_selector[selector_tuple] = variable
# Important:
files: list[File] = []
for draft_var in draft_vars:
value = draft_var.get_value()
if isinstance(value, FileSegment):
files.append(value.value)
elif isinstance(value, ArrayFileSegment):
files.extend(value.value)
with Session(bind=self._engine) as session:
storage_key_loader = StorageKeyLoader(session, tenant_id=self._tenant_id)
storage_key_loader.load_storage_keys(files)
# Load offloaded variables using multithreading.
# This approach reduces loading time by querying external systems concurrently.
with ThreadPoolExecutor(max_workers=10) as executor:
offloaded_variables = executor.map(self._load_offloaded_variable, offloaded_draft_vars)
for selector, variable in offloaded_variables:
variable_by_selector[selector] = variable
return list(variable_by_selector.values())
def _load_offloaded_variable(self, draft_var: WorkflowDraftVariable) -> tuple[tuple[str, str], Variable]:
# This logic is closely tied to `WorkflowDraftVaribleService._try_offload_large_variable`
# and must remain synchronized with it.
# Ideally, these should be co-located for better maintainability.
# However, due to the current code structure, this is not straightforward.
variable_file = draft_var.variable_file
assert variable_file is not None
upload_file = variable_file.upload_file
assert upload_file is not None
content = storage.load(upload_file.key)
if variable_file.value_type == SegmentType.STRING:
# The inferenced type is StringSegment, which is not correct inside this function.
segment: Segment = StringSegment(value=content.decode())
variable = segment_to_variable(
segment=segment,
selector=draft_var.get_selector(),
id=draft_var.id,
name=draft_var.name,
description=draft_var.description,
)
return (draft_var.node_id, draft_var.name), variable
deserialized = json.loads(content)
segment = WorkflowDraftVariable.build_segment_with_type(variable_file.value_type, deserialized)
variable = segment_to_variable(
segment=segment,
selector=draft_var.get_selector(),
id=draft_var.id,
name=draft_var.name,
description=draft_var.description,
)
# No special handling needed for ArrayFileSegment, as we do not offload ArrayFileSegment
return (draft_var.node_id, draft_var.name), variable
class WorkflowDraftVariableService:
_session: Session
@ -138,13 +205,24 @@ class WorkflowDraftVariableService:
)
def get_variable(self, variable_id: str) -> WorkflowDraftVariable | None:
return self._session.query(WorkflowDraftVariable).where(WorkflowDraftVariable.id == variable_id).first()
return (
self._session.query(WorkflowDraftVariable)
.options(orm.selectinload(WorkflowDraftVariable.variable_file))
.where(WorkflowDraftVariable.id == variable_id)
.first()
)
def get_draft_variables_by_selectors(
self,
app_id: str,
selectors: Sequence[list[str]],
) -> list[WorkflowDraftVariable]:
"""
Retrieve WorkflowDraftVariable instances based on app_id and selectors.
The returned WorkflowDraftVariable objects are guaranteed to have their
associated variable_file and variable_file.upload_file relationships preloaded.
"""
ors = []
for selector in selectors:
assert len(selector) >= SELECTORS_LENGTH, f"Invalid selector to get: {selector}"
@ -159,7 +237,14 @@ class WorkflowDraftVariableService:
# combined using `UNION` to fetch all rows.
# Benchmarking indicates that both approaches yield comparable performance.
variables = (
self._session.query(WorkflowDraftVariable).where(WorkflowDraftVariable.app_id == app_id, or_(*ors)).all()
self._session.query(WorkflowDraftVariable)
.options(
orm.selectinload(WorkflowDraftVariable.variable_file).selectinload(
WorkflowDraftVariableFile.upload_file
)
)
.where(WorkflowDraftVariable.app_id == app_id, or_(*ors))
.all()
)
return variables
@ -170,8 +255,10 @@ class WorkflowDraftVariableService:
if page == 1:
total = query.count()
variables = (
# Do not load the `value` field.
query.options(orm.defer(WorkflowDraftVariable.value))
# Do not load the `value` field
query.options(
orm.defer(WorkflowDraftVariable.value, raiseload=True),
)
.order_by(WorkflowDraftVariable.created_at.desc())
.limit(limit)
.offset((page - 1) * limit)
@ -186,7 +273,11 @@ class WorkflowDraftVariableService:
WorkflowDraftVariable.node_id == node_id,
)
query = self._session.query(WorkflowDraftVariable).where(*criteria)
variables = query.order_by(WorkflowDraftVariable.created_at.desc()).all()
variables = (
query.options(orm.selectinload(WorkflowDraftVariable.variable_file))
.order_by(WorkflowDraftVariable.created_at.desc())
.all()
)
return WorkflowDraftVariableList(variables=variables)
def list_node_variables(self, app_id: str, node_id: str) -> WorkflowDraftVariableList:
@ -210,6 +301,7 @@ class WorkflowDraftVariableService:
def _get_variable(self, app_id: str, node_id: str, name: str) -> WorkflowDraftVariable | None:
variable = (
self._session.query(WorkflowDraftVariable)
.options(orm.selectinload(WorkflowDraftVariable.variable_file))
.where(
WorkflowDraftVariable.app_id == app_id,
WorkflowDraftVariable.node_id == node_id,
@ -278,7 +370,7 @@ class WorkflowDraftVariableService:
self._session.flush()
return None
outputs_dict = node_exec.outputs_dict or {}
outputs_dict = node_exec.load_full_outputs(self._session, storage) or {}
# a sentinel value used to check the absent of the output variable key.
absent = object()
@ -323,6 +415,45 @@ class WorkflowDraftVariableService:
return self._reset_node_var_or_sys_var(workflow, variable)
def delete_variable(self, variable: WorkflowDraftVariable):
if not variable.is_truncated():
self._session.delete(variable)
return
variable_query = (
select(WorkflowDraftVariable)
.options(
orm.selectinload(WorkflowDraftVariable.variable_file).selectinload(
WorkflowDraftVariableFile.upload_file
),
)
.where(WorkflowDraftVariable.id == variable.id)
)
variable_reloaded = self._session.execute(variable_query).scalars().first()
variable_file = variable_reloaded.variable_file
if variable_file is None:
_logger.warning(
"Associated WorkflowDraftVariableFile not found, draft_var_id=%s, file_id=%s",
variable_reloaded.id,
variable_reloaded.file_id,
)
self._session.delete(variable)
return
upload_file = variable_file.upload_file
if upload_file is None:
_logger.warning(
"Associated UploadFile not found, draft_var_id=%s, file_id=%s, upload_file_id=%s",
variable_reloaded.id,
variable_reloaded.file_id,
variable_file.upload_file_id,
)
self._session.delete(variable)
self._session.delete(variable_file)
return
storage.delete(upload_file.key)
self._session.delete(upload_file)
self._session.delete(upload_file)
self._session.delete(variable)
def delete_workflow_variables(self, app_id: str):
@ -332,6 +463,38 @@ class WorkflowDraftVariableService:
.delete(synchronize_session=False)
)
def delete_workflow_draft_variable_file(self, deletions: list[DraftVarFileDeletion]):
variable_files_query = (
select(WorkflowDraftVariableFile)
.options(orm.selectinload(WorkflowDraftVariableFile.upload_file))
.where(WorkflowDraftVariableFile.id.in_([i.draft_var_file_id for i in deletions]))
)
variable_files = self._session.execute(variable_files_query).scalars().all()
variable_files_by_id = {i.id: i for i in variable_files}
for i in deletions:
variable_file = variable_files_by_id.get(i.draft_var_file_id)
if variable_file is None:
_logger.warning(
"Associated WorkflowDraftVariableFile not found, draft_var_id=%s, file_id=%s",
i.draft_var_id,
i.draft_var_file_id,
)
continue
upload_file = variable_file.upload_file
if upload_file is None:
_logger.warning(
"Associated UploadFile not found, draft_var_id=%s, file_id=%s, upload_file_id=%s",
i.draft_var_id,
i.draft_var_file_id,
variable_file.upload_file_id,
)
self._session.delete(variable_file)
else:
storage.delete(upload_file.key)
self._session.delete(upload_file)
self._session.delete(variable_file)
def delete_node_variables(self, app_id: str, node_id: str):
return self._delete_node_variables(app_id, node_id)
@ -476,6 +639,7 @@ def _batch_upsert_draft_variable(
"visible": stmt.excluded.visible,
"editable": stmt.excluded.editable,
"node_execution_id": stmt.excluded.node_execution_id,
"file_id": stmt.excluded.file_id,
},
)
elif policy == _UpsertPolicy.IGNORE:
@ -495,6 +659,7 @@ def _model_to_insertion_dict(model: WorkflowDraftVariable) -> dict[str, Any]:
"value_type": model.value_type,
"value": model.value,
"node_execution_id": model.node_execution_id,
"file_id": model.file_id,
}
if model.visible is not None:
d["visible"] = model.visible
@ -524,6 +689,28 @@ def _build_segment_for_serialized_values(v: Any) -> Segment:
return build_segment(WorkflowDraftVariable.rebuild_file_types(v))
def _make_filename_trans_table() -> dict[int, str]:
linux_chars = ["/", "\x00"]
windows_chars = [
"<",
">",
":",
'"',
"/",
"\\",
"|",
"?",
"*",
]
windows_chars.extend(chr(i) for i in range(32))
trans_table = dict.fromkeys(linux_chars + windows_chars, "_")
return str.maketrans(trans_table)
_FILENAME_TRANS_TABLE = _make_filename_trans_table()
class DraftVariableSaver:
# _DUMMY_OUTPUT_IDENTITY is a placeholder output for workflow nodes.
# Its sole possible value is `None`.
@ -573,6 +760,7 @@ class DraftVariableSaver:
node_id: str,
node_type: NodeType,
node_execution_id: str,
user: Account,
enclosing_node_id: str | None = None,
):
# Important: `node_execution_id` parameter refers to the primary key (`id`) of the
@ -583,6 +771,7 @@ class DraftVariableSaver:
self._node_id = node_id
self._node_type = node_type
self._node_execution_id = node_execution_id
self._user = user
self._enclosing_node_id = enclosing_node_id
def _create_dummy_output_variable(self):
@ -692,17 +881,133 @@ class DraftVariableSaver:
else:
value_seg = _build_segment_for_serialized_values(value)
draft_vars.append(
WorkflowDraftVariable.new_node_variable(
app_id=self._app_id,
node_id=self._node_id,
self._create_draft_variable(
name=name,
node_execution_id=self._node_execution_id,
value=value_seg,
visible=self._should_variable_be_visible(self._node_id, self._node_type, name),
)
visible=True,
editable=True,
),
# WorkflowDraftVariable.new_node_variable(
# app_id=self._app_id,
# node_id=self._node_id,
# name=name,
# node_execution_id=self._node_execution_id,
# value=value_seg,
# visible=self._should_variable_be_visible(self._node_id, self._node_type, name),
# )
)
return draft_vars
def _generate_filename(self, name: str):
node_id_escaped = self._node_id.translate(_FILENAME_TRANS_TABLE)
return f"{node_id_escaped}-{name}"
def _try_offload_large_variable(
self,
name: str,
value_seg: Segment,
) -> tuple[Segment, WorkflowDraftVariableFile] | None:
# This logic is closely tied to `DraftVarLoader._load_offloaded_variable` and must remain
# synchronized with it.
# Ideally, these should be co-located for better maintainability.
# However, due to the current code structure, this is not straightforward.
truncator = VariableTruncator(
max_size_bytes=dify_config.WORKFLOW_VARIABLE_TRUNCATION_MAX_SIZE,
array_element_limit=dify_config.WORKFLOW_VARIABLE_TRUNCATION_ARRAY_LENGTH,
string_length_limit=dify_config.WORKFLOW_VARIABLE_TRUNCATION_STRING_LENGTH,
)
truncation_result = truncator.truncate(value_seg)
if not truncation_result.truncated:
return None
original_length = None
if isinstance(value_seg.value, (list, dict)):
original_length = len(value_seg.value)
# Prepare content for storage
if isinstance(value_seg, StringSegment):
# For string types, store as plain text
original_content_serialized = value_seg.value
content_type = "text/plain"
filename = f"{self._generate_filename(name)}.txt"
else:
# For other types, store as JSON
original_content_serialized = json.dumps(value_seg.value, ensure_ascii=False, separators=(",", ":"))
content_type = "application/json"
filename = f"{self._generate_filename(name)}.json"
original_size = len(original_content_serialized.encode("utf-8"))
bind = self._session.get_bind()
assert isinstance(bind, Engine)
file_srv = FileService(bind)
upload_file = file_srv.upload_file(
filename=filename,
content=original_content_serialized.encode(),
mimetype=content_type,
user=self._user,
)
# Create WorkflowDraftVariableFile record
variable_file = WorkflowDraftVariableFile(
id=uuidv7(),
upload_file_id=upload_file.id,
size=original_size,
length=original_length,
value_type=value_seg.value_type,
app_id=self._app_id,
tenant_id=self._user.current_tenant_id,
user_id=self._user.id,
)
engine = bind = self._session.get_bind()
assert isinstance(engine, Engine)
with Session(bind=engine, expire_on_commit=False) as session:
session.add(variable_file)
session.commit()
return truncation_result.result, variable_file
def _create_draft_variable(
self,
*,
name: str,
value: Segment,
visible: bool = True,
editable: bool = True,
) -> WorkflowDraftVariable:
"""Create a draft variable with large variable handling and truncation."""
# Handle Segment values
offload_result = self._try_offload_large_variable(name, value)
if offload_result is None:
# Create the draft variable
draft_var = WorkflowDraftVariable.new_node_variable(
app_id=self._app_id,
node_id=self._node_id,
name=name,
node_execution_id=self._node_execution_id,
value=value,
visible=visible,
editable=editable,
)
return draft_var
else:
truncated, var_file = offload_result
# Create the draft variable
draft_var = WorkflowDraftVariable.new_node_variable(
app_id=self._app_id,
node_id=self._node_id,
name=name,
node_execution_id=self._node_execution_id,
value=truncated,
visible=visible,
editable=False,
file_id=var_file.id,
)
return draft_var
def save(
self,
process_data: Mapping[str, Any] | None = None,

View File

@ -433,6 +433,7 @@ class WorkflowService:
node_type=NodeType(workflow_node_execution.node_type),
enclosing_node_id=enclosing_node_id,
node_execution_id=node_execution.id,
user=account,
)
draft_var_saver.save(process_data=node_execution.process_data, outputs=node_execution.outputs)
session.commit()