From d9f54f8bd78013a77e01d61464a8edd36cb687d5 Mon Sep 17 00:00:00 2001 From: sxxtony <166789813+sxxtony@users.noreply.github.com> Date: Sun, 12 Apr 2026 22:46:52 -0700 Subject: [PATCH 1/2] refactor: migrate WorkflowPause and WorkflowPauseReason to TypeBase (#34688) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/models/workflow.py | 37 +++++++++++-------- .../sqlalchemy_api_workflow_run_repository.py | 12 +++--- ..._sqlalchemy_api_workflow_run_repository.py | 6 --- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/api/models/workflow.py b/api/models/workflow.py index bb4d6a7ec9..3bd24b220a 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -61,7 +61,7 @@ from factories import variable_factory from libs import helper from .account import Account -from .base import Base, DefaultFieldsMixin, TypeBase +from .base import Base, DefaultFieldsDCMixin, TypeBase from .engine import db from .enums import CreatorUserRole, DraftVariableType, ExecutionOffLoadType, WorkflowRunTriggeredFrom from .types import EnumText, LongText, StringUUID @@ -742,8 +742,8 @@ class WorkflowRun(Base): exceptions_count: Mapped[int] = mapped_column(sa.Integer, server_default=sa.text("0"), nullable=True) pause: Mapped[Optional["WorkflowPause"]] = orm.relationship( - "WorkflowPause", - primaryjoin="WorkflowRun.id == foreign(WorkflowPause.workflow_run_id)", + lambda: WorkflowPause, + primaryjoin=lambda: WorkflowRun.id == orm.foreign(WorkflowPause.workflow_run_id), uselist=False, # require explicit preloading. lazy="raise", @@ -1941,7 +1941,7 @@ def is_system_variable_editable(name: str) -> bool: return name in _EDITABLE_SYSTEM_VARIABLE -class WorkflowPause(DefaultFieldsMixin, Base): +class WorkflowPause(DefaultFieldsDCMixin, TypeBase): """ WorkflowPause records the paused state and related metadata for a specific workflow run. @@ -1980,6 +1980,11 @@ class WorkflowPause(DefaultFieldsMixin, Base): nullable=False, ) + # state_object_key stores the object key referencing the serialized runtime state + # of the `GraphEngine`. This object captures the complete execution context of the + # workflow at the moment it was paused, enabling accurate resumption. + state_object_key: Mapped[str] = mapped_column(String(length=255), nullable=False) + # `resumed_at` records the timestamp when the suspended workflow was resumed. # It is set to `NULL` if the workflow has not been resumed. # @@ -1988,25 +1993,23 @@ class WorkflowPause(DefaultFieldsMixin, Base): resumed_at: Mapped[datetime | None] = mapped_column( sa.DateTime, nullable=True, + default=None, ) - # state_object_key stores the object key referencing the serialized runtime state - # of the `GraphEngine`. This object captures the complete execution context of the - # workflow at the moment it was paused, enabling accurate resumption. - state_object_key: Mapped[str] = mapped_column(String(length=255), nullable=False) - - # Relationship to WorkflowRun + # Relationship to WorkflowRun (uses lambda to resolve across Base/TypeBase registries) workflow_run: Mapped["WorkflowRun"] = orm.relationship( + lambda: WorkflowRun, foreign_keys=[workflow_run_id], # require explicit preloading. lazy="raise", uselist=False, - primaryjoin="WorkflowPause.workflow_run_id == WorkflowRun.id", + primaryjoin=lambda: WorkflowPause.workflow_run_id == WorkflowRun.id, back_populates="pause", + init=False, ) -class WorkflowPauseReason(DefaultFieldsMixin, Base): +class WorkflowPauseReason(DefaultFieldsDCMixin, TypeBase): __tablename__ = "workflow_pause_reasons" # `pause_id` represents the identifier of the pause, @@ -2049,16 +2052,20 @@ class WorkflowPauseReason(DefaultFieldsMixin, Base): lazy="raise", uselist=False, primaryjoin="WorkflowPauseReason.pause_id == WorkflowPause.id", + init=False, ) @classmethod - def from_entity(cls, pause_reason: PauseReason) -> "WorkflowPauseReason": + def from_entity(cls, *, pause_id: str, pause_reason: PauseReason) -> "WorkflowPauseReason": if isinstance(pause_reason, HumanInputRequired): return cls( - type_=PauseReasonType.HUMAN_INPUT_REQUIRED, form_id=pause_reason.form_id, node_id=pause_reason.node_id + pause_id=pause_id, + type_=PauseReasonType.HUMAN_INPUT_REQUIRED, + form_id=pause_reason.form_id, + node_id=pause_reason.node_id, ) elif isinstance(pause_reason, SchedulingPause): - return cls(type_=PauseReasonType.SCHEDULED_PAUSE, message=pause_reason.message, node_id="") + return cls(pause_id=pause_id, type_=PauseReasonType.SCHEDULED_PAUSE, message=pause_reason.message) else: raise AssertionError(f"Unknown pause reason type: {pause_reason}") diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index 9267be2636..b760696c5e 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -41,7 +41,6 @@ from libs.datetime_utils import naive_utc_now from libs.helper import convert_datetime_to_date from libs.infinite_scroll_pagination import InfiniteScrollPagination from libs.time_parser import get_time_threshold -from libs.uuid_utils import uuidv7 from models.enums import WorkflowRunTriggeredFrom from models.human_input import HumanInputForm from models.workflow import WorkflowAppLog, WorkflowArchiveLog, WorkflowPause, WorkflowPauseReason, WorkflowRun @@ -744,12 +743,11 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): # Upload the state file # Create the pause record - pause_model = WorkflowPause() - pause_model.id = str(uuidv7()) - pause_model.workflow_id = workflow_run.workflow_id - pause_model.workflow_run_id = workflow_run.id - pause_model.state_object_key = state_obj_key - pause_model.created_at = naive_utc_now() + pause_model = WorkflowPause( + workflow_id=workflow_run.workflow_id, + workflow_run_id=workflow_run.id, + state_object_key=state_obj_key, + ) pause_reason_models = [] for reason in pause_reasons: if isinstance(reason, HumanInputRequired): diff --git a/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py b/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py index d28cfda159..64c93ac07c 100644 --- a/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py +++ b/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py @@ -220,7 +220,6 @@ class TestDeleteRunsWithRelated: created_by=test_scope.user_id, ) pause = WorkflowPause( - id=str(uuid4()), workflow_id=test_scope.workflow_id, workflow_run_id=workflow_run.id, state_object_key=f"workflow-state-{uuid4()}.json", @@ -280,7 +279,6 @@ class TestCountRunsWithRelated: created_by=test_scope.user_id, ) pause = WorkflowPause( - id=str(uuid4()), workflow_id=test_scope.workflow_id, workflow_run_id=workflow_run.id, state_object_key=f"workflow-state-{uuid4()}.json", @@ -544,7 +542,6 @@ class TestPrivateWorkflowPauseEntity: status=WorkflowExecutionStatus.RUNNING, ) pause = WorkflowPause( - id=str(uuid4()), workflow_id=test_scope.workflow_id, workflow_run_id=workflow_run.id, state_object_key=f"workflow-state-{uuid4()}.json", @@ -574,7 +571,6 @@ class TestPrivateWorkflowPauseEntity: ) state_key = f"workflow-state-{uuid4()}.json" pause = WorkflowPause( - id=str(uuid4()), workflow_id=test_scope.workflow_id, workflow_run_id=workflow_run.id, state_object_key=state_key, @@ -606,7 +602,6 @@ class TestPrivateWorkflowPauseEntity: ) state_key = f"workflow-state-{uuid4()}.json" pause = WorkflowPause( - id=str(uuid4()), workflow_id=test_scope.workflow_id, workflow_run_id=workflow_run.id, state_object_key=state_key, @@ -672,7 +667,6 @@ class TestBuildHumanInputRequiredReason: status=WorkflowExecutionStatus.RUNNING, ) pause = WorkflowPause( - id=str(uuid4()), workflow_id=test_scope.workflow_id, workflow_run_id=workflow_run.id, state_object_key=f"workflow-state-{uuid4()}.json", From b0079e55b43b1ef64863d29e91d8319e535a2723 Mon Sep 17 00:00:00 2001 From: Statxc Date: Mon, 13 Apr 2026 07:47:44 +0200 Subject: [PATCH 2/2] refactor(api): type WorkflowAppLog.to_dict with WorkflowAppLogDict TypedDict (#34682) --- api/models/workflow.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/api/models/workflow.py b/api/models/workflow.py index 3bd24b220a..77964b851f 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1196,6 +1196,18 @@ class WorkflowAppLogCreatedFrom(StrEnum): raise ValueError(f"invalid workflow app log created from value {value}") +class WorkflowAppLogDict(TypedDict): + id: str + tenant_id: str + app_id: str + workflow_id: str + workflow_run_id: str + created_from: WorkflowAppLogCreatedFrom + created_by_role: CreatorUserRole + created_by: str + created_at: datetime + + class WorkflowAppLog(TypeBase): """ Workflow App execution log, excluding workflow debugging records. @@ -1273,8 +1285,8 @@ class WorkflowAppLog(TypeBase): created_by_role = CreatorUserRole(self.created_by_role) return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None - def to_dict(self): - return { + def to_dict(self) -> WorkflowAppLogDict: + result: WorkflowAppLogDict = { "id": self.id, "tenant_id": self.tenant_id, "app_id": self.app_id, @@ -1285,6 +1297,7 @@ class WorkflowAppLog(TypeBase): "created_by": self.created_by, "created_at": self.created_at, } + return result class WorkflowArchiveLog(TypeBase):