diff --git a/api/models/model.py b/api/models/model.py index e79b22d272..322f1c5b0e 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -1564,6 +1564,9 @@ class UploadFile(Base): sa.Index("upload_file_tenant_idx", "tenant_id"), ) + # NOTE: The `id` field is generated within the application to minimize extra roundtrips + # (especially when generating `source_url`). + # The `server_default` serves as a fallback mechanism. id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) storage_type: Mapped[str] = mapped_column(String(255), nullable=False) @@ -1622,6 +1625,7 @@ class UploadFile(Base): hash: str | None = None, source_url: str = "", ): + self.id = str(uuid.uuid4()) self.tenant_id = tenant_id self.storage_type = storage_type self.key = key diff --git a/api/models/workflow.py b/api/models/workflow.py index 4d0089fa4e..1af64b538b 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -17,11 +17,12 @@ from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIAB from core.workflow.nodes.enums import NodeType from factories.variable_factory import TypeMismatchError, build_segment_with_type from libs.datetime_utils import naive_utc_now +from libs.uuid_utils import uuidv7 from ._workflow_exc import NodeNotFoundError, WorkflowDataError if TYPE_CHECKING: - from models.model import AppMode + from models.model import AppMode, UploadFile from sqlalchemy import Index, PrimaryKeyConstraint, String, UniqueConstraint, func from sqlalchemy.orm import Mapped, declared_attr, mapped_column @@ -940,7 +941,10 @@ class WorkflowDraftVariable(Base): ] __tablename__ = "workflow_draft_variables" - __table_args__ = (UniqueConstraint(*unique_app_id_node_id_name()),) + __table_args__ = ( + UniqueConstraint(*unique_app_id_node_id_name()), + Index("workflow_draft_variable_file_id_idx", "file_id"), + ) # Required for instance variable annotation. __allow_unmapped__ = True @@ -1001,9 +1005,16 @@ class WorkflowDraftVariable(Base): selector: Mapped[str] = mapped_column(sa.String(255), nullable=False, name="selector") # The data type of this variable's value + # + # If the variable is offloaded, `value_type` represents the type of the truncated value, + # which may differ from the original value's type. Typically, they are the same, + # but in cases where the structurally truncated value still exceeds the size limit, + # text slicing is applied, and the `value_type` is converted to `STRING`. value_type: Mapped[SegmentType] = mapped_column(EnumText(SegmentType, length=20)) # The variable's value serialized as a JSON string + # + # If the variable is offloaded, `value` contains a truncated version, not the full original value. value: Mapped[str] = mapped_column(sa.Text, nullable=False, name="value") # Controls whether the variable should be displayed in the variable inspection panel @@ -1023,6 +1034,35 @@ class WorkflowDraftVariable(Base): default=None, ) + # Reference to WorkflowDraftVariableFile for offloaded large variables + # + # Indicates whether the current draft variable is offloaded. + # If not offloaded, this field will be None. + file_id: Mapped[str | None] = mapped_column( + StringUUID, + nullable=True, + default=None, + comment="Reference to WorkflowDraftVariableFile if variable is offloaded to external storage", + ) + + is_default_value: Mapped[bool] = mapped_column( + sa.Boolean, + nullable=False, + default=False, + comment=( + "Indicates whether the current value is the default for a conversation variable. " + "Always `FALSE` for other types of variables." + ), + ) + + # Relationship to WorkflowDraftVariableFile + variable_file: Mapped[Optional["WorkflowDraftVariableFile"]] = orm.relationship( + foreign_keys=[file_id], + lazy="raise", + uselist=False, + primaryjoin="WorkflowDraftVariableFile.id == WorkflowDraftVariable.file_id", + ) + # Cache for deserialized value # # NOTE(QuantumGhost): This field serves two purposes: @@ -1170,6 +1210,9 @@ class WorkflowDraftVariable(Base): case _: return DraftVariableType.NODE + def is_truncated(self) -> bool: + return self.file_id is not None + @classmethod def _new( cls, @@ -1180,6 +1223,7 @@ class WorkflowDraftVariable(Base): value: Segment, node_execution_id: str | None, description: str = "", + file_id: str | None = None, ) -> "WorkflowDraftVariable": variable = WorkflowDraftVariable() variable.created_at = _naive_utc_datetime() @@ -1189,6 +1233,7 @@ class WorkflowDraftVariable(Base): variable.node_id = node_id variable.name = name variable.set_value(value) + variable.file_id = file_id variable._set_selector(list(variable_utils.to_selector(node_id, name))) variable.node_execution_id = node_execution_id return variable @@ -1244,6 +1289,7 @@ class WorkflowDraftVariable(Base): node_execution_id: str, visible: bool = True, editable: bool = True, + file_id: str | None = None, ) -> "WorkflowDraftVariable": variable = cls._new( app_id=app_id, @@ -1251,6 +1297,7 @@ class WorkflowDraftVariable(Base): name=name, node_execution_id=node_execution_id, value=value, + file_id=file_id, ) variable.visible = visible variable.editable = editable @@ -1261,5 +1308,92 @@ class WorkflowDraftVariable(Base): return self.last_edited_at is not None +class WorkflowDraftVariableFile(Base): + """Stores metadata about files associated with large workflow draft variables. + + This model acts as an intermediary between WorkflowDraftVariable and UploadFile, + allowing for proper cleanup of orphaned files when variables are updated or deleted. + + The MIME type of the stored content is recorded in `UploadFile.mime_type`. + Possible values are 'application/json' for JSON types other than plain text, + and 'text/plain' for JSON strings. + """ + + __tablename__ = "workflow_draft_variable_files" + + # Primary key + id: Mapped[str] = mapped_column( + StringUUID, + primary_key=True, + default=uuidv7, + server_default=sa.text("uuidv7()"), + ) + + created_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + default=_naive_utc_datetime, + server_default=func.current_timestamp(), + ) + + tenant_id: Mapped[str] = mapped_column( + StringUUID, + nullable=False, + comment="The tenant to which the WorkflowDraftVariableFile belongs, referencing Tenant.id", + ) + + app_id: Mapped[str] = mapped_column( + StringUUID, + nullable=False, + comment="The application to which the WorkflowDraftVariableFile belongs, referencing App.id", + ) + + user_id: Mapped[str] = mapped_column( + StringUUID, + nullable=False, + comment="The owner to of the WorkflowDraftVariableFile, referencing Account.id", + ) + + # Reference to the `UploadFile.id` field + upload_file_id: Mapped[str] = mapped_column( + StringUUID, + nullable=False, + comment="Reference to UploadFile containing the large variable data", + ) + + # -------------- metadata about the variable content -------------- + + # The `size` is already recorded in UploadFiles. It is duplicated here to avoid an additional database lookup. + size: Mapped[int | None] = mapped_column( + sa.BigInteger, + nullable=False, + comment="Size of the original variable content in bytes", + ) + + length: Mapped[Optional[int]] = mapped_column( + sa.Integer, + nullable=True, + comment=( + "Length of the original variable content. For array and array-like types, " + "this represents the number of elements. For object types, it indicates the number of keys. " + "For other types, the value is NULL." + ), + ) + + # The `value_type` field records the type of the original value. + value_type: Mapped[SegmentType] = mapped_column( + EnumText(SegmentType, length=20), + nullable=False, + ) + + # Relationship to UploadFile + upload_file: Mapped["UploadFile"] = orm.relationship( + foreign_keys=[upload_file_id], + lazy="raise", + uselist=False, + primaryjoin="WorkflowDraftVariableFile.upload_file_id == UploadFile.id", + ) + + def is_system_variable_editable(name: str) -> bool: return name in _EDITABLE_SYSTEM_VARIABLE