From 2fd337e6109933c839e9bf062782932f02cbe356 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Fri, 29 Aug 2025 14:46:45 +0800 Subject: [PATCH] feat(api): add WorkflowNodeExecutionOffload model --- ...c2d166_add_workflownodeexecutionoffload.py | 72 +++++++++ api/models/__init__.py | 2 + api/models/enums.py | 6 + api/models/workflow.py | 148 +++++++++++++++++- 4 files changed, 225 insertions(+), 3 deletions(-) create mode 100644 api/migrations/versions/2025_08_21_1559-b45e25c2d166_add_workflownodeexecutionoffload.py diff --git a/api/migrations/versions/2025_08_21_1559-b45e25c2d166_add_workflownodeexecutionoffload.py b/api/migrations/versions/2025_08_21_1559-b45e25c2d166_add_workflownodeexecutionoffload.py new file mode 100644 index 0000000000..b2c31f08c7 --- /dev/null +++ b/api/migrations/versions/2025_08_21_1559-b45e25c2d166_add_workflownodeexecutionoffload.py @@ -0,0 +1,72 @@ +"""add WorkflowNodeExecutionOffload + +Revision ID: b45e25c2d166 +Revises: 76db8b6ed8f1 +Create Date: 2025-08-21 15:59:00.329004 + +""" + +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "b45e25c2d166" +down_revision = "76db8b6ed8f1" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "workflow_node_execution_offload", + sa.Column( + "id", + models.types.StringUUID(), + server_default=sa.text("uuidv7()"), + nullable=False, + ), + sa.Column( + "created_at", + sa.DateTime(), + server_default=sa.text("CURRENT_TIMESTAMP"), + nullable=False, + ), + sa.Column( + "tenant_id", + models.types.StringUUID(), + nullable=False, + ), + sa.Column( + "app_id", + models.types.StringUUID(), + nullable=False, + ), + sa.Column( + "node_execution_id", + models.types.StringUUID(), + nullable=True, + ), + sa.Column( + "type", + sa.String(20), + nullable=False, + ), + sa.Column( + "file_id", + models.types.StringUUID(), + nullable=False, + ), + sa.PrimaryKeyConstraint("id", name=op.f("workflow_node_execution_offload_pkey")), + sa.UniqueConstraint( + "node_execution_id", + "type", + name=op.f("workflow_node_execution_offload_node_execution_id_key"), + postgresql_nulls_not_distinct=False, + ), + ) + + +def downgrade(): + op.drop_table("workflow_node_execution_offload") diff --git a/api/models/__init__.py b/api/models/__init__.py index 1b4bdd32e4..a75ca1e9b1 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -86,6 +86,7 @@ from .workflow import ( WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowNodeExecutionModel, + WorkflowNodeExecutionOffload, WorkflowNodeExecutionTriggeredFrom, WorkflowRun, WorkflowType, @@ -172,6 +173,7 @@ __all__ = [ "WorkflowAppLog", "WorkflowAppLogCreatedFrom", "WorkflowNodeExecutionModel", + "WorkflowNodeExecutionOffload", "WorkflowNodeExecutionTriggeredFrom", "WorkflowRun", "WorkflowRunTriggeredFrom", diff --git a/api/models/enums.py b/api/models/enums.py index cc9f28a7bb..98ef03e2d9 100644 --- a/api/models/enums.py +++ b/api/models/enums.py @@ -30,3 +30,9 @@ class MessageStatus(StrEnum): NORMAL = "normal" ERROR = "error" + + +class ExecutionOffLoadType(StrEnum): + INPUTS = "inputs" + PROCESS_DATA = "process_data" + OUTPUTS = "outputs" diff --git a/api/models/workflow.py b/api/models/workflow.py index 1af64b538b..c79cd45dc0 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, Any, Optional, Union from uuid import uuid4 import sqlalchemy as sa -from sqlalchemy import DateTime, exists, orm, select +from sqlalchemy import DateTime, Select, exists, orm, select from core.file.constants import maybe_file_object from core.file.models import File @@ -15,6 +15,7 @@ from core.variables import utils as variable_utils from core.variables.variables import FloatVariable, IntegerVariable, StringVariable from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID from core.workflow.nodes.enums import NodeType +from extensions.ext_storage import Storage from factories.variable_factory import TypeMismatchError, build_segment_with_type from libs.datetime_utils import naive_utc_now from libs.uuid_utils import uuidv7 @@ -36,7 +37,7 @@ from libs import helper from .account import Account from .base import Base from .engine import db -from .enums import CreatorUserRole, DraftVariableType +from .enums import CreatorUserRole, DraftVariableType, ExecutionOffLoadType from .types import EnumText, StringUUID logger = logging.getLogger(__name__) @@ -612,7 +613,7 @@ class WorkflowNodeExecutionTriggeredFrom(StrEnum): WORKFLOW_RUN = "workflow-run" -class WorkflowNodeExecutionModel(Base): +class WorkflowNodeExecutionModel(Base): # This model is expected to have `offload_data` preloaded in most cases. """ Workflow Node Execution @@ -728,6 +729,32 @@ class WorkflowNodeExecutionModel(Base): created_by: Mapped[str] = mapped_column(StringUUID) finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime) + offload_data: Mapped[list["WorkflowNodeExecutionOffload"]] = orm.relationship( + "WorkflowNodeExecutionOffload", + primaryjoin="WorkflowNodeExecutionModel.id == foreign(WorkflowNodeExecutionOffload.node_execution_id)", + uselist=True, + lazy="raise", + back_populates="execution", + ) + + @staticmethod + def preload_offload_data( + query: Select[tuple["WorkflowNodeExecutionModel"]] | orm.Query["WorkflowNodeExecutionModel"], + ): + return query.options(orm.selectinload(WorkflowNodeExecutionModel.offload_data)) + + @staticmethod + def preload_offload_data_and_files( + query: Select[tuple["WorkflowNodeExecutionModel"]] | orm.Query["WorkflowNodeExecutionModel"], + ): + return query.options( + orm.selectinload(WorkflowNodeExecutionModel.offload_data).options( + # Using `joinedload` instead of `selectinload` to minimize database roundtrips, + # as `selectinload` would require separate queries for `inputs_file` and `outputs_file`. + orm.selectinload(WorkflowNodeExecutionOffload.file), + ) + ) + @property def created_by_account(self): created_by_role = CreatorUserRole(self.created_by_role) @@ -779,6 +806,121 @@ class WorkflowNodeExecutionModel(Base): return extras + def _get_offload_by_type(self, type_: ExecutionOffLoadType) -> Optional["WorkflowNodeExecutionOffload"]: + return next(iter([i for i in self.offload_data if i.type_ == type_]), None) + + @property + def inputs_truncated(self) -> bool: + """Check if inputs were truncated (offloaded to external storage).""" + return self._get_offload_by_type(ExecutionOffLoadType.INPUTS) is not None + + @property + def outputs_truncated(self) -> bool: + """Check if outputs were truncated (offloaded to external storage).""" + return self._get_offload_by_type(ExecutionOffLoadType.OUTPUTS) is not None + + @property + def process_data_truncated(self) -> bool: + """Check if process_data were truncated (offloaded to external storage).""" + return self._get_offload_by_type(ExecutionOffLoadType.PROCESS_DATA) is not None + + @staticmethod + def _load_full_content(session: orm.Session, file_id: str, storage: Storage): + from .model import UploadFile + + stmt = sa.select(UploadFile).where(UploadFile.id == file_id) + file = session.scalars(stmt).first() + assert file is not None, f"UploadFile with id {file_id} should exist but not" + content = storage.load(file.key) + return json.loads(content) + + def load_full_inputs(self, session: orm.Session, storage: Storage) -> Mapping[str, Any] | None: + offload = self._get_offload_by_type(ExecutionOffLoadType.INPUTS) + if offload is None: + return self.inputs_dict + + return self._load_full_content(session, offload.file_id, storage) + + def load_full_outputs(self, session: orm.Session, storage: Storage) -> Mapping[str, Any] | None: + offload: WorkflowNodeExecutionOffload | None = self._get_offload_by_type(ExecutionOffLoadType.OUTPUTS) + if offload is None: + return self.outputs_dict + + return self._load_full_content(session, offload.file_id, storage) + + def load_full_process_data(self, session: orm.Session, storage: Storage) -> Mapping[str, Any] | None: + offload: WorkflowNodeExecutionOffload | None = self._get_offload_by_type(ExecutionOffLoadType.PROCESS_DATA) + if offload is None: + return self.process_data_dict + + return self._load_full_content(session, offload.file_id, storage) + + +class WorkflowNodeExecutionOffload(Base): + __tablename__ = "workflow_node_execution_offload" + __table_args__ = ( + UniqueConstraint( + "node_execution_id", + "type", + # Treat `NULL` as distinct for this unique index, so + # we can have mutitple records with `NULL` node_exeution_id, simplify garbage collection process. + postgresql_nulls_not_distinct=False, + ), + ) + _HASH_COL_SIZE = 64 + + id: Mapped[str] = mapped_column( + StringUUID, + primary_key=True, + server_default=sa.text("uuidv7()"), + ) + + created_at: Mapped[datetime] = mapped_column( + DateTime, default=naive_utc_now, server_default=func.current_timestamp() + ) + + tenant_id: Mapped[str] = mapped_column(StringUUID) + app_id: Mapped[str] = mapped_column(StringUUID) + + # `node_execution_id` indicates the `WorkflowNodeExecutionModel` associated with this offload record. + # A value of `None` signifies that this offload record is not linked to any execution record + # and should be considered for garbage collection. + node_execution_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + type_: Mapped[ExecutionOffLoadType] = mapped_column(EnumText(ExecutionOffLoadType), name="type", nullable=False) + + # Design Decision: Combining inputs and outputs into a single object was considered to reduce I/O + # operations. However, due to the current design of `WorkflowNodeExecutionRepository`, + # the `save` method is called at two distinct times: + # + # - When the node starts execution: the `inputs` field exists, but the `outputs` field is absent + # - When the node completes execution (either succeeded or failed): the `outputs` field becomes available + # + # It's difficult to correlate these two successive calls to `save` for combined storage. + # Converting the `WorkflowNodeExecutionRepository` to buffer the first `save` call and flush + # when execution completes was also considered, but this would make the execution state unobservable + # until completion, significantly damaging the observability of workflow execution. + # + # Given these constraints, `inputs` and `outputs` are stored separately to maintain real-time + # observability and system reliability. + + # `file_id` references to the offloaded storage object containing the data. + file_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + + execution: Mapped[WorkflowNodeExecutionModel] = orm.relationship( + foreign_keys=[node_execution_id], + lazy="raise", + uselist=False, + primaryjoin="WorkflowNodeExecutionOffload.node_execution_id == WorkflowNodeExecutionModel.id", + back_populates="offload_data", + ) + + file: Mapped[Optional["UploadFile"]] = orm.relationship( + foreign_keys=[file_id], + lazy="raise", + uselist=False, + primaryjoin="WorkflowNodeExecutionOffload.file_id == UploadFile.id", + ) + class WorkflowAppLogCreatedFrom(Enum): """