feat(api): Introduce WorkflowDraftVariableFile model

This model is used to track offloaded variables values for
`WorkflowDraftVariable`.
This commit is contained in:
QuantumGhost 2025-08-12 15:55:46 +08:00
parent e9c6038192
commit de08b2310c
2 changed files with 140 additions and 2 deletions

View File

@ -1564,6 +1564,9 @@ class UploadFile(Base):
sa.Index("upload_file_tenant_idx", "tenant_id"),
)
# NOTE: The `id` field is generated within the application to minimize extra roundtrips
# (especially when generating `source_url`).
# The `server_default` serves as a fallback mechanism.
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
storage_type: Mapped[str] = mapped_column(String(255), nullable=False)
@ -1622,6 +1625,7 @@ class UploadFile(Base):
hash: str | None = None,
source_url: str = "",
):
self.id = str(uuid.uuid4())
self.tenant_id = tenant_id
self.storage_type = storage_type
self.key = key

View File

@ -17,11 +17,12 @@ from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIAB
from core.workflow.nodes.enums import NodeType
from factories.variable_factory import TypeMismatchError, build_segment_with_type
from libs.datetime_utils import naive_utc_now
from libs.uuid_utils import uuidv7
from ._workflow_exc import NodeNotFoundError, WorkflowDataError
if TYPE_CHECKING:
from models.model import AppMode
from models.model import AppMode, UploadFile
from sqlalchemy import Index, PrimaryKeyConstraint, String, UniqueConstraint, func
from sqlalchemy.orm import Mapped, declared_attr, mapped_column
@ -940,7 +941,10 @@ class WorkflowDraftVariable(Base):
]
__tablename__ = "workflow_draft_variables"
__table_args__ = (UniqueConstraint(*unique_app_id_node_id_name()),)
__table_args__ = (
UniqueConstraint(*unique_app_id_node_id_name()),
Index("workflow_draft_variable_file_id_idx", "file_id"),
)
# Required for instance variable annotation.
__allow_unmapped__ = True
@ -1001,9 +1005,16 @@ class WorkflowDraftVariable(Base):
selector: Mapped[str] = mapped_column(sa.String(255), nullable=False, name="selector")
# The data type of this variable's value
#
# If the variable is offloaded, `value_type` represents the type of the truncated value,
# which may differ from the original value's type. Typically, they are the same,
# but in cases where the structurally truncated value still exceeds the size limit,
# text slicing is applied, and the `value_type` is converted to `STRING`.
value_type: Mapped[SegmentType] = mapped_column(EnumText(SegmentType, length=20))
# The variable's value serialized as a JSON string
#
# If the variable is offloaded, `value` contains a truncated version, not the full original value.
value: Mapped[str] = mapped_column(sa.Text, nullable=False, name="value")
# Controls whether the variable should be displayed in the variable inspection panel
@ -1023,6 +1034,35 @@ class WorkflowDraftVariable(Base):
default=None,
)
# Reference to WorkflowDraftVariableFile for offloaded large variables
#
# Indicates whether the current draft variable is offloaded.
# If not offloaded, this field will be None.
file_id: Mapped[str | None] = mapped_column(
StringUUID,
nullable=True,
default=None,
comment="Reference to WorkflowDraftVariableFile if variable is offloaded to external storage",
)
is_default_value: Mapped[bool] = mapped_column(
sa.Boolean,
nullable=False,
default=False,
comment=(
"Indicates whether the current value is the default for a conversation variable. "
"Always `FALSE` for other types of variables."
),
)
# Relationship to WorkflowDraftVariableFile
variable_file: Mapped[Optional["WorkflowDraftVariableFile"]] = orm.relationship(
foreign_keys=[file_id],
lazy="raise",
uselist=False,
primaryjoin="WorkflowDraftVariableFile.id == WorkflowDraftVariable.file_id",
)
# Cache for deserialized value
#
# NOTE(QuantumGhost): This field serves two purposes:
@ -1170,6 +1210,9 @@ class WorkflowDraftVariable(Base):
case _:
return DraftVariableType.NODE
def is_truncated(self) -> bool:
return self.file_id is not None
@classmethod
def _new(
cls,
@ -1180,6 +1223,7 @@ class WorkflowDraftVariable(Base):
value: Segment,
node_execution_id: str | None,
description: str = "",
file_id: str | None = None,
) -> "WorkflowDraftVariable":
variable = WorkflowDraftVariable()
variable.created_at = _naive_utc_datetime()
@ -1189,6 +1233,7 @@ class WorkflowDraftVariable(Base):
variable.node_id = node_id
variable.name = name
variable.set_value(value)
variable.file_id = file_id
variable._set_selector(list(variable_utils.to_selector(node_id, name)))
variable.node_execution_id = node_execution_id
return variable
@ -1244,6 +1289,7 @@ class WorkflowDraftVariable(Base):
node_execution_id: str,
visible: bool = True,
editable: bool = True,
file_id: str | None = None,
) -> "WorkflowDraftVariable":
variable = cls._new(
app_id=app_id,
@ -1251,6 +1297,7 @@ class WorkflowDraftVariable(Base):
name=name,
node_execution_id=node_execution_id,
value=value,
file_id=file_id,
)
variable.visible = visible
variable.editable = editable
@ -1261,5 +1308,92 @@ class WorkflowDraftVariable(Base):
return self.last_edited_at is not None
class WorkflowDraftVariableFile(Base):
"""Stores metadata about files associated with large workflow draft variables.
This model acts as an intermediary between WorkflowDraftVariable and UploadFile,
allowing for proper cleanup of orphaned files when variables are updated or deleted.
The MIME type of the stored content is recorded in `UploadFile.mime_type`.
Possible values are 'application/json' for JSON types other than plain text,
and 'text/plain' for JSON strings.
"""
__tablename__ = "workflow_draft_variable_files"
# Primary key
id: Mapped[str] = mapped_column(
StringUUID,
primary_key=True,
default=uuidv7,
server_default=sa.text("uuidv7()"),
)
created_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
default=_naive_utc_datetime,
server_default=func.current_timestamp(),
)
tenant_id: Mapped[str] = mapped_column(
StringUUID,
nullable=False,
comment="The tenant to which the WorkflowDraftVariableFile belongs, referencing Tenant.id",
)
app_id: Mapped[str] = mapped_column(
StringUUID,
nullable=False,
comment="The application to which the WorkflowDraftVariableFile belongs, referencing App.id",
)
user_id: Mapped[str] = mapped_column(
StringUUID,
nullable=False,
comment="The owner to of the WorkflowDraftVariableFile, referencing Account.id",
)
# Reference to the `UploadFile.id` field
upload_file_id: Mapped[str] = mapped_column(
StringUUID,
nullable=False,
comment="Reference to UploadFile containing the large variable data",
)
# -------------- metadata about the variable content --------------
# The `size` is already recorded in UploadFiles. It is duplicated here to avoid an additional database lookup.
size: Mapped[int | None] = mapped_column(
sa.BigInteger,
nullable=False,
comment="Size of the original variable content in bytes",
)
length: Mapped[Optional[int]] = mapped_column(
sa.Integer,
nullable=True,
comment=(
"Length of the original variable content. For array and array-like types, "
"this represents the number of elements. For object types, it indicates the number of keys. "
"For other types, the value is NULL."
),
)
# The `value_type` field records the type of the original value.
value_type: Mapped[SegmentType] = mapped_column(
EnumText(SegmentType, length=20),
nullable=False,
)
# Relationship to UploadFile
upload_file: Mapped["UploadFile"] = orm.relationship(
foreign_keys=[upload_file_id],
lazy="raise",
uselist=False,
primaryjoin="WorkflowDraftVariableFile.upload_file_id == UploadFile.id",
)
def is_system_variable_editable(name: str) -> bool:
return name in _EDITABLE_SYSTEM_VARIABLE