From 13eb9f7d7d3072326ae95645c27a2dcf91bc5dda Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Fri, 29 Aug 2025 14:40:54 +0800 Subject: [PATCH] feat(api): implement truncation for draft var --- .../console/app/workflow_draft_variable.py | 20 + .../app/apps/advanced_chat/app_generator.py | 2 +- api/core/app/apps/base_app_generator.py | 5 +- api/core/app/apps/workflow/app_generator.py | 4 +- .../workflow_draft_variable_service.py | 357 ++++++++++++++++-- api/services/workflow_service.py | 1 + 6 files changed, 358 insertions(+), 31 deletions(-) diff --git a/api/controllers/console/app/workflow_draft_variable.py b/api/controllers/console/app/workflow_draft_variable.py index a0b73f7e07..c1d98f6587 100644 --- a/api/controllers/console/app/workflow_draft_variable.py +++ b/api/controllers/console/app/workflow_draft_variable.py @@ -13,6 +13,7 @@ from controllers.console.app.error import ( from controllers.console.app.wraps import get_app_model from controllers.console.wraps import account_initialization_required, setup_required from controllers.web.error import InvalidArgumentError, NotFoundError +from core.file import helpers as file_helpers from core.variables.segment_group import SegmentGroup from core.variables.segments import ArrayFileSegment, FileSegment, Segment from core.variables.types import SegmentType @@ -41,6 +42,7 @@ def _convert_values_to_json_serializable_object(value: Segment) -> Any: def _serialize_var_value(variable: WorkflowDraftVariable) -> Any: + """Serialize variable value. If variable is truncated, return the truncated value.""" value = variable.get_value() # create a copy of the value to avoid affecting the model cache. value = value.model_copy(deep=True) @@ -74,6 +76,22 @@ def _serialize_variable_type(workflow_draft_var: WorkflowDraftVariable) -> str: return value_type.exposed_type().value +def _serialize_full_content(variable: WorkflowDraftVariable) -> dict | None: + """Serialize full_content information for large variables.""" + if not variable.is_truncated(): + return None + + variable_file = variable.variable_file + assert variable_file is not None + + return { + "size_bytes": variable_file.size, + "value_type": variable_file.value_type.exposed_type().value, + "length": variable_file.length, + "download_url": file_helpers.get_signed_file_url(variable_file.upload_file_id, as_attachment=True), + } + + _WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS = { "id": fields.String, "type": fields.String(attribute=lambda model: model.get_variable_type()), @@ -83,11 +101,13 @@ _WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS = { "value_type": fields.String(attribute=_serialize_variable_type), "edited": fields.Boolean(attribute=lambda model: model.edited), "visible": fields.Boolean, + "is_truncated": fields.Boolean(attribute=lambda model: model.file_id is not None), } _WORKFLOW_DRAFT_VARIABLE_FIELDS = dict( _WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS, value=fields.Raw(attribute=_serialize_var_value), + full_content=fields.Raw(attribute=_serialize_full_content), ) _WORKFLOW_DRAFT_ENV_VARIABLE_FIELDS = { diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 52ae20ee16..bcc5473afc 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -461,7 +461,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, stream=stream, - draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from), + draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from, account=user), ) return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from) diff --git a/api/core/app/apps/base_app_generator.py b/api/core/app/apps/base_app_generator.py index b420ffb8bf..32a316276d 100644 --- a/api/core/app/apps/base_app_generator.py +++ b/api/core/app/apps/base_app_generator.py @@ -14,6 +14,7 @@ from core.workflow.repositories.draft_variable_repository import ( ) from factories import file_factory from libs.orjson import orjson_dumps +from models import Account, EndUser from services.workflow_draft_variable_service import DraftVariableSaver as DraftVariableSaverImpl if TYPE_CHECKING: @@ -182,8 +183,9 @@ class BaseAppGenerator: @final @staticmethod - def _get_draft_var_saver_factory(invoke_from: InvokeFrom) -> DraftVariableSaverFactory: + def _get_draft_var_saver_factory(invoke_from: InvokeFrom, account: Account | EndUser) -> DraftVariableSaverFactory: if invoke_from == InvokeFrom.DEBUGGER: + assert isinstance(account, Account) def draft_var_saver_factory( session: Session, @@ -200,6 +202,7 @@ class BaseAppGenerator: node_type=node_type, node_execution_id=node_execution_id, enclosing_node_id=enclosing_node_id, + user=account, ) else: diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 22b0234604..51519f6fc9 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -244,9 +244,7 @@ class WorkflowAppGenerator(BaseAppGenerator): worker_thread.start() - draft_var_saver_factory = self._get_draft_var_saver_factory( - invoke_from, - ) + draft_var_saver_factory = self._get_draft_var_saver_factory(invoke_from, user) # return response or stream generator response = self._handle_response( diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index b3b581093e..17435508da 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -1,32 +1,43 @@ import dataclasses +import json import logging from collections.abc import Mapping, Sequence +from concurrent.futures import ThreadPoolExecutor from enum import StrEnum from typing import Any, ClassVar -from sqlalchemy import Engine, orm +from sqlalchemy import Engine, orm, select from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.sql.expression import and_, or_ +from configs import dify_config from core.app.entities.app_invoke_entities import InvokeFrom from core.file.models import File from core.variables import Segment, StringSegment, Variable from core.variables.consts import SELECTORS_LENGTH -from core.variables.segments import ArrayFileSegment, FileSegment +from core.variables.segments import ( + ArrayFileSegment, + FileSegment, +) from core.variables.types import SegmentType from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, ENVIRONMENT_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID from core.workflow.enums import SystemVariableKey from core.workflow.nodes import NodeType from core.workflow.nodes.variable_assigner.common.helpers import get_updated_variables from core.workflow.variable_loader import VariableLoader +from extensions.ext_storage import storage from factories.file_factory import StorageKeyLoader from factories.variable_factory import build_segment, segment_to_variable from libs.datetime_utils import naive_utc_now +from libs.uuid_utils import uuidv7 from models import App, Conversation +from models.account import Account from models.enums import DraftVariableType -from models.workflow import Workflow, WorkflowDraftVariable, is_system_variable_editable +from models.workflow import Workflow, WorkflowDraftVariable, WorkflowDraftVariableFile, is_system_variable_editable from repositories.factory import DifyAPIRepositoryFactory +from services.file_service import FileService +from services.variable_truncator import VariableTruncator logger = logging.getLogger(__name__) @@ -37,6 +48,12 @@ class WorkflowDraftVariableList: total: int | None = None +@dataclasses.dataclass(frozen=True) +class DraftVarFileDeletion: + draft_var_id: str + draft_var_file_id: str + + class WorkflowDraftVariableError(Exception): pass @@ -87,7 +104,26 @@ class DraftVarLoader(VariableLoader): srv = WorkflowDraftVariableService(session) draft_vars = srv.get_draft_variables_by_selectors(self._app_id, selectors) + # Important: + files: list[File] = [] + # FileSegment and ArrayFileSegment are not subject to offloading, so their values + # can be safely accessed before any offloading logic is applied. for draft_var in draft_vars: + value = draft_var.get_value() + if isinstance(value, FileSegment): + files.append(value.value) + elif isinstance(value, ArrayFileSegment): + files.extend(value.value) + with Session(bind=self._engine) as session: + storage_key_loader = StorageKeyLoader(session, tenant_id=self._tenant_id) + storage_key_loader.load_storage_keys(files) + + offloaded_draft_vars = [] + for draft_var in draft_vars: + if draft_var.is_truncated(): + offloaded_draft_vars.append(draft_var) + continue + segment = draft_var.get_value() variable = segment_to_variable( segment=segment, @@ -99,20 +135,51 @@ class DraftVarLoader(VariableLoader): selector_tuple = self._selector_to_tuple(variable.selector) variable_by_selector[selector_tuple] = variable - # Important: - files: list[File] = [] - for draft_var in draft_vars: - value = draft_var.get_value() - if isinstance(value, FileSegment): - files.append(value.value) - elif isinstance(value, ArrayFileSegment): - files.extend(value.value) - with Session(bind=self._engine) as session: - storage_key_loader = StorageKeyLoader(session, tenant_id=self._tenant_id) - storage_key_loader.load_storage_keys(files) + # Load offloaded variables using multithreading. + # This approach reduces loading time by querying external systems concurrently. + with ThreadPoolExecutor(max_workers=10) as executor: + offloaded_variables = executor.map(self._load_offloaded_variable, offloaded_draft_vars) + for selector, variable in offloaded_variables: + variable_by_selector[selector] = variable return list(variable_by_selector.values()) + def _load_offloaded_variable(self, draft_var: WorkflowDraftVariable) -> tuple[tuple[str, str], Variable]: + # This logic is closely tied to `WorkflowDraftVaribleService._try_offload_large_variable` + # and must remain synchronized with it. + # Ideally, these should be co-located for better maintainability. + # However, due to the current code structure, this is not straightforward. + + variable_file = draft_var.variable_file + assert variable_file is not None + upload_file = variable_file.upload_file + assert upload_file is not None + content = storage.load(upload_file.key) + if variable_file.value_type == SegmentType.STRING: + # The inferenced type is StringSegment, which is not correct inside this function. + segment: Segment = StringSegment(value=content.decode()) + + variable = segment_to_variable( + segment=segment, + selector=draft_var.get_selector(), + id=draft_var.id, + name=draft_var.name, + description=draft_var.description, + ) + return (draft_var.node_id, draft_var.name), variable + + deserialized = json.loads(content) + segment = WorkflowDraftVariable.build_segment_with_type(variable_file.value_type, deserialized) + variable = segment_to_variable( + segment=segment, + selector=draft_var.get_selector(), + id=draft_var.id, + name=draft_var.name, + description=draft_var.description, + ) + # No special handling needed for ArrayFileSegment, as we do not offload ArrayFileSegment + return (draft_var.node_id, draft_var.name), variable + class WorkflowDraftVariableService: _session: Session @@ -138,13 +205,24 @@ class WorkflowDraftVariableService: ) def get_variable(self, variable_id: str) -> WorkflowDraftVariable | None: - return self._session.query(WorkflowDraftVariable).where(WorkflowDraftVariable.id == variable_id).first() + return ( + self._session.query(WorkflowDraftVariable) + .options(orm.selectinload(WorkflowDraftVariable.variable_file)) + .where(WorkflowDraftVariable.id == variable_id) + .first() + ) def get_draft_variables_by_selectors( self, app_id: str, selectors: Sequence[list[str]], ) -> list[WorkflowDraftVariable]: + """ + Retrieve WorkflowDraftVariable instances based on app_id and selectors. + + The returned WorkflowDraftVariable objects are guaranteed to have their + associated variable_file and variable_file.upload_file relationships preloaded. + """ ors = [] for selector in selectors: assert len(selector) >= SELECTORS_LENGTH, f"Invalid selector to get: {selector}" @@ -159,7 +237,14 @@ class WorkflowDraftVariableService: # combined using `UNION` to fetch all rows. # Benchmarking indicates that both approaches yield comparable performance. variables = ( - self._session.query(WorkflowDraftVariable).where(WorkflowDraftVariable.app_id == app_id, or_(*ors)).all() + self._session.query(WorkflowDraftVariable) + .options( + orm.selectinload(WorkflowDraftVariable.variable_file).selectinload( + WorkflowDraftVariableFile.upload_file + ) + ) + .where(WorkflowDraftVariable.app_id == app_id, or_(*ors)) + .all() ) return variables @@ -170,8 +255,10 @@ class WorkflowDraftVariableService: if page == 1: total = query.count() variables = ( - # Do not load the `value` field. - query.options(orm.defer(WorkflowDraftVariable.value)) + # Do not load the `value` field + query.options( + orm.defer(WorkflowDraftVariable.value, raiseload=True), + ) .order_by(WorkflowDraftVariable.created_at.desc()) .limit(limit) .offset((page - 1) * limit) @@ -186,7 +273,11 @@ class WorkflowDraftVariableService: WorkflowDraftVariable.node_id == node_id, ) query = self._session.query(WorkflowDraftVariable).where(*criteria) - variables = query.order_by(WorkflowDraftVariable.created_at.desc()).all() + variables = ( + query.options(orm.selectinload(WorkflowDraftVariable.variable_file)) + .order_by(WorkflowDraftVariable.created_at.desc()) + .all() + ) return WorkflowDraftVariableList(variables=variables) def list_node_variables(self, app_id: str, node_id: str) -> WorkflowDraftVariableList: @@ -210,6 +301,7 @@ class WorkflowDraftVariableService: def _get_variable(self, app_id: str, node_id: str, name: str) -> WorkflowDraftVariable | None: variable = ( self._session.query(WorkflowDraftVariable) + .options(orm.selectinload(WorkflowDraftVariable.variable_file)) .where( WorkflowDraftVariable.app_id == app_id, WorkflowDraftVariable.node_id == node_id, @@ -278,7 +370,7 @@ class WorkflowDraftVariableService: self._session.flush() return None - outputs_dict = node_exec.outputs_dict or {} + outputs_dict = node_exec.load_full_outputs(self._session, storage) or {} # a sentinel value used to check the absent of the output variable key. absent = object() @@ -323,6 +415,45 @@ class WorkflowDraftVariableService: return self._reset_node_var_or_sys_var(workflow, variable) def delete_variable(self, variable: WorkflowDraftVariable): + if not variable.is_truncated(): + self._session.delete(variable) + return + + variable_query = ( + select(WorkflowDraftVariable) + .options( + orm.selectinload(WorkflowDraftVariable.variable_file).selectinload( + WorkflowDraftVariableFile.upload_file + ), + ) + .where(WorkflowDraftVariable.id == variable.id) + ) + variable_reloaded = self._session.execute(variable_query).scalars().first() + variable_file = variable_reloaded.variable_file + if variable_file is None: + _logger.warning( + "Associated WorkflowDraftVariableFile not found, draft_var_id=%s, file_id=%s", + variable_reloaded.id, + variable_reloaded.file_id, + ) + self._session.delete(variable) + return + + upload_file = variable_file.upload_file + if upload_file is None: + _logger.warning( + "Associated UploadFile not found, draft_var_id=%s, file_id=%s, upload_file_id=%s", + variable_reloaded.id, + variable_reloaded.file_id, + variable_file.upload_file_id, + ) + self._session.delete(variable) + self._session.delete(variable_file) + return + + storage.delete(upload_file.key) + self._session.delete(upload_file) + self._session.delete(upload_file) self._session.delete(variable) def delete_workflow_variables(self, app_id: str): @@ -332,6 +463,38 @@ class WorkflowDraftVariableService: .delete(synchronize_session=False) ) + def delete_workflow_draft_variable_file(self, deletions: list[DraftVarFileDeletion]): + variable_files_query = ( + select(WorkflowDraftVariableFile) + .options(orm.selectinload(WorkflowDraftVariableFile.upload_file)) + .where(WorkflowDraftVariableFile.id.in_([i.draft_var_file_id for i in deletions])) + ) + variable_files = self._session.execute(variable_files_query).scalars().all() + variable_files_by_id = {i.id: i for i in variable_files} + for i in deletions: + variable_file = variable_files_by_id.get(i.draft_var_file_id) + if variable_file is None: + _logger.warning( + "Associated WorkflowDraftVariableFile not found, draft_var_id=%s, file_id=%s", + i.draft_var_id, + i.draft_var_file_id, + ) + continue + + upload_file = variable_file.upload_file + if upload_file is None: + _logger.warning( + "Associated UploadFile not found, draft_var_id=%s, file_id=%s, upload_file_id=%s", + i.draft_var_id, + i.draft_var_file_id, + variable_file.upload_file_id, + ) + self._session.delete(variable_file) + else: + storage.delete(upload_file.key) + self._session.delete(upload_file) + self._session.delete(variable_file) + def delete_node_variables(self, app_id: str, node_id: str): return self._delete_node_variables(app_id, node_id) @@ -476,6 +639,7 @@ def _batch_upsert_draft_variable( "visible": stmt.excluded.visible, "editable": stmt.excluded.editable, "node_execution_id": stmt.excluded.node_execution_id, + "file_id": stmt.excluded.file_id, }, ) elif policy == _UpsertPolicy.IGNORE: @@ -495,6 +659,7 @@ def _model_to_insertion_dict(model: WorkflowDraftVariable) -> dict[str, Any]: "value_type": model.value_type, "value": model.value, "node_execution_id": model.node_execution_id, + "file_id": model.file_id, } if model.visible is not None: d["visible"] = model.visible @@ -524,6 +689,28 @@ def _build_segment_for_serialized_values(v: Any) -> Segment: return build_segment(WorkflowDraftVariable.rebuild_file_types(v)) +def _make_filename_trans_table() -> dict[int, str]: + linux_chars = ["/", "\x00"] + windows_chars = [ + "<", + ">", + ":", + '"', + "/", + "\\", + "|", + "?", + "*", + ] + windows_chars.extend(chr(i) for i in range(32)) + + trans_table = dict.fromkeys(linux_chars + windows_chars, "_") + return str.maketrans(trans_table) + + +_FILENAME_TRANS_TABLE = _make_filename_trans_table() + + class DraftVariableSaver: # _DUMMY_OUTPUT_IDENTITY is a placeholder output for workflow nodes. # Its sole possible value is `None`. @@ -573,6 +760,7 @@ class DraftVariableSaver: node_id: str, node_type: NodeType, node_execution_id: str, + user: Account, enclosing_node_id: str | None = None, ): # Important: `node_execution_id` parameter refers to the primary key (`id`) of the @@ -583,6 +771,7 @@ class DraftVariableSaver: self._node_id = node_id self._node_type = node_type self._node_execution_id = node_execution_id + self._user = user self._enclosing_node_id = enclosing_node_id def _create_dummy_output_variable(self): @@ -692,17 +881,133 @@ class DraftVariableSaver: else: value_seg = _build_segment_for_serialized_values(value) draft_vars.append( - WorkflowDraftVariable.new_node_variable( - app_id=self._app_id, - node_id=self._node_id, + self._create_draft_variable( name=name, - node_execution_id=self._node_execution_id, value=value_seg, - visible=self._should_variable_be_visible(self._node_id, self._node_type, name), - ) + visible=True, + editable=True, + ), + # WorkflowDraftVariable.new_node_variable( + # app_id=self._app_id, + # node_id=self._node_id, + # name=name, + # node_execution_id=self._node_execution_id, + # value=value_seg, + # visible=self._should_variable_be_visible(self._node_id, self._node_type, name), + # ) ) return draft_vars + def _generate_filename(self, name: str): + node_id_escaped = self._node_id.translate(_FILENAME_TRANS_TABLE) + return f"{node_id_escaped}-{name}" + + def _try_offload_large_variable( + self, + name: str, + value_seg: Segment, + ) -> tuple[Segment, WorkflowDraftVariableFile] | None: + # This logic is closely tied to `DraftVarLoader._load_offloaded_variable` and must remain + # synchronized with it. + # Ideally, these should be co-located for better maintainability. + # However, due to the current code structure, this is not straightforward. + truncator = VariableTruncator( + max_size_bytes=dify_config.WORKFLOW_VARIABLE_TRUNCATION_MAX_SIZE, + array_element_limit=dify_config.WORKFLOW_VARIABLE_TRUNCATION_ARRAY_LENGTH, + string_length_limit=dify_config.WORKFLOW_VARIABLE_TRUNCATION_STRING_LENGTH, + ) + truncation_result = truncator.truncate(value_seg) + if not truncation_result.truncated: + return None + + original_length = None + if isinstance(value_seg.value, (list, dict)): + original_length = len(value_seg.value) + + # Prepare content for storage + if isinstance(value_seg, StringSegment): + # For string types, store as plain text + original_content_serialized = value_seg.value + content_type = "text/plain" + filename = f"{self._generate_filename(name)}.txt" + else: + # For other types, store as JSON + original_content_serialized = json.dumps(value_seg.value, ensure_ascii=False, separators=(",", ":")) + content_type = "application/json" + filename = f"{self._generate_filename(name)}.json" + + original_size = len(original_content_serialized.encode("utf-8")) + + bind = self._session.get_bind() + assert isinstance(bind, Engine) + file_srv = FileService(bind) + + upload_file = file_srv.upload_file( + filename=filename, + content=original_content_serialized.encode(), + mimetype=content_type, + user=self._user, + ) + + # Create WorkflowDraftVariableFile record + variable_file = WorkflowDraftVariableFile( + id=uuidv7(), + upload_file_id=upload_file.id, + size=original_size, + length=original_length, + value_type=value_seg.value_type, + app_id=self._app_id, + tenant_id=self._user.current_tenant_id, + user_id=self._user.id, + ) + engine = bind = self._session.get_bind() + assert isinstance(engine, Engine) + with Session(bind=engine, expire_on_commit=False) as session: + session.add(variable_file) + session.commit() + + return truncation_result.result, variable_file + + def _create_draft_variable( + self, + *, + name: str, + value: Segment, + visible: bool = True, + editable: bool = True, + ) -> WorkflowDraftVariable: + """Create a draft variable with large variable handling and truncation.""" + # Handle Segment values + + offload_result = self._try_offload_large_variable(name, value) + + if offload_result is None: + # Create the draft variable + draft_var = WorkflowDraftVariable.new_node_variable( + app_id=self._app_id, + node_id=self._node_id, + name=name, + node_execution_id=self._node_execution_id, + value=value, + visible=visible, + editable=editable, + ) + return draft_var + else: + truncated, var_file = offload_result + # Create the draft variable + draft_var = WorkflowDraftVariable.new_node_variable( + app_id=self._app_id, + node_id=self._node_id, + name=name, + node_execution_id=self._node_execution_id, + value=truncated, + visible=visible, + editable=False, + file_id=var_file.id, + ) + return draft_var + def save( self, process_data: Mapping[str, Any] | None = None, diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 3a68379789..1fb006bcd6 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -433,6 +433,7 @@ class WorkflowService: node_type=NodeType(workflow_node_execution.node_type), enclosing_node_id=enclosing_node_id, node_execution_id=node_execution.id, + user=account, ) draft_var_saver.save(process_data=node_execution.process_data, outputs=node_execution.outputs) session.commit()