Merge branch 'main' into feat/new-agent-node

This commit is contained in:
zyssyz123 2026-04-13 13:58:02 +08:00 committed by GitHub
commit c7a7c73034
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 42 additions and 30 deletions

View File

@ -61,7 +61,7 @@ from factories import variable_factory
from libs import helper
from .account import Account
from .base import Base, DefaultFieldsMixin, TypeBase
from .base import Base, DefaultFieldsDCMixin, TypeBase
from .engine import db
from .enums import CreatorUserRole, DraftVariableType, ExecutionOffLoadType, WorkflowRunTriggeredFrom
from .types import EnumText, LongText, StringUUID
@ -742,8 +742,8 @@ class WorkflowRun(Base):
exceptions_count: Mapped[int] = mapped_column(sa.Integer, server_default=sa.text("0"), nullable=True)
pause: Mapped[Optional["WorkflowPause"]] = orm.relationship(
"WorkflowPause",
primaryjoin="WorkflowRun.id == foreign(WorkflowPause.workflow_run_id)",
lambda: WorkflowPause,
primaryjoin=lambda: WorkflowRun.id == orm.foreign(WorkflowPause.workflow_run_id),
uselist=False,
# require explicit preloading.
lazy="raise",
@ -1196,6 +1196,18 @@ class WorkflowAppLogCreatedFrom(StrEnum):
raise ValueError(f"invalid workflow app log created from value {value}")
class WorkflowAppLogDict(TypedDict):
id: str
tenant_id: str
app_id: str
workflow_id: str
workflow_run_id: str
created_from: WorkflowAppLogCreatedFrom
created_by_role: CreatorUserRole
created_by: str
created_at: datetime
class WorkflowAppLog(TypeBase):
"""
Workflow App execution log, excluding workflow debugging records.
@ -1273,8 +1285,8 @@ class WorkflowAppLog(TypeBase):
created_by_role = CreatorUserRole(self.created_by_role)
return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
def to_dict(self):
return {
def to_dict(self) -> WorkflowAppLogDict:
result: WorkflowAppLogDict = {
"id": self.id,
"tenant_id": self.tenant_id,
"app_id": self.app_id,
@ -1285,6 +1297,7 @@ class WorkflowAppLog(TypeBase):
"created_by": self.created_by,
"created_at": self.created_at,
}
return result
class WorkflowArchiveLog(TypeBase):
@ -1941,7 +1954,7 @@ def is_system_variable_editable(name: str) -> bool:
return name in _EDITABLE_SYSTEM_VARIABLE
class WorkflowPause(DefaultFieldsMixin, Base):
class WorkflowPause(DefaultFieldsDCMixin, TypeBase):
"""
WorkflowPause records the paused state and related metadata for a specific workflow run.
@ -1980,6 +1993,11 @@ class WorkflowPause(DefaultFieldsMixin, Base):
nullable=False,
)
# state_object_key stores the object key referencing the serialized runtime state
# of the `GraphEngine`. This object captures the complete execution context of the
# workflow at the moment it was paused, enabling accurate resumption.
state_object_key: Mapped[str] = mapped_column(String(length=255), nullable=False)
# `resumed_at` records the timestamp when the suspended workflow was resumed.
# It is set to `NULL` if the workflow has not been resumed.
#
@ -1988,25 +2006,23 @@ class WorkflowPause(DefaultFieldsMixin, Base):
resumed_at: Mapped[datetime | None] = mapped_column(
sa.DateTime,
nullable=True,
default=None,
)
# state_object_key stores the object key referencing the serialized runtime state
# of the `GraphEngine`. This object captures the complete execution context of the
# workflow at the moment it was paused, enabling accurate resumption.
state_object_key: Mapped[str] = mapped_column(String(length=255), nullable=False)
# Relationship to WorkflowRun
# Relationship to WorkflowRun (uses lambda to resolve across Base/TypeBase registries)
workflow_run: Mapped["WorkflowRun"] = orm.relationship(
lambda: WorkflowRun,
foreign_keys=[workflow_run_id],
# require explicit preloading.
lazy="raise",
uselist=False,
primaryjoin="WorkflowPause.workflow_run_id == WorkflowRun.id",
primaryjoin=lambda: WorkflowPause.workflow_run_id == WorkflowRun.id,
back_populates="pause",
init=False,
)
class WorkflowPauseReason(DefaultFieldsMixin, Base):
class WorkflowPauseReason(DefaultFieldsDCMixin, TypeBase):
__tablename__ = "workflow_pause_reasons"
# `pause_id` represents the identifier of the pause,
@ -2049,16 +2065,20 @@ class WorkflowPauseReason(DefaultFieldsMixin, Base):
lazy="raise",
uselist=False,
primaryjoin="WorkflowPauseReason.pause_id == WorkflowPause.id",
init=False,
)
@classmethod
def from_entity(cls, pause_reason: PauseReason) -> "WorkflowPauseReason":
def from_entity(cls, *, pause_id: str, pause_reason: PauseReason) -> "WorkflowPauseReason":
if isinstance(pause_reason, HumanInputRequired):
return cls(
type_=PauseReasonType.HUMAN_INPUT_REQUIRED, form_id=pause_reason.form_id, node_id=pause_reason.node_id
pause_id=pause_id,
type_=PauseReasonType.HUMAN_INPUT_REQUIRED,
form_id=pause_reason.form_id,
node_id=pause_reason.node_id,
)
elif isinstance(pause_reason, SchedulingPause):
return cls(type_=PauseReasonType.SCHEDULED_PAUSE, message=pause_reason.message, node_id="")
return cls(pause_id=pause_id, type_=PauseReasonType.SCHEDULED_PAUSE, message=pause_reason.message)
else:
raise AssertionError(f"Unknown pause reason type: {pause_reason}")

View File

@ -41,7 +41,6 @@ from libs.datetime_utils import naive_utc_now
from libs.helper import convert_datetime_to_date
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from libs.time_parser import get_time_threshold
from libs.uuid_utils import uuidv7
from models.enums import WorkflowRunTriggeredFrom
from models.human_input import HumanInputForm
from models.workflow import WorkflowAppLog, WorkflowArchiveLog, WorkflowPause, WorkflowPauseReason, WorkflowRun
@ -744,12 +743,11 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
# Upload the state file
# Create the pause record
pause_model = WorkflowPause()
pause_model.id = str(uuidv7())
pause_model.workflow_id = workflow_run.workflow_id
pause_model.workflow_run_id = workflow_run.id
pause_model.state_object_key = state_obj_key
pause_model.created_at = naive_utc_now()
pause_model = WorkflowPause(
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
state_object_key=state_obj_key,
)
pause_reason_models = []
for reason in pause_reasons:
if isinstance(reason, HumanInputRequired):

View File

@ -220,7 +220,6 @@ class TestDeleteRunsWithRelated:
created_by=test_scope.user_id,
)
pause = WorkflowPause(
id=str(uuid4()),
workflow_id=test_scope.workflow_id,
workflow_run_id=workflow_run.id,
state_object_key=f"workflow-state-{uuid4()}.json",
@ -280,7 +279,6 @@ class TestCountRunsWithRelated:
created_by=test_scope.user_id,
)
pause = WorkflowPause(
id=str(uuid4()),
workflow_id=test_scope.workflow_id,
workflow_run_id=workflow_run.id,
state_object_key=f"workflow-state-{uuid4()}.json",
@ -544,7 +542,6 @@ class TestPrivateWorkflowPauseEntity:
status=WorkflowExecutionStatus.RUNNING,
)
pause = WorkflowPause(
id=str(uuid4()),
workflow_id=test_scope.workflow_id,
workflow_run_id=workflow_run.id,
state_object_key=f"workflow-state-{uuid4()}.json",
@ -574,7 +571,6 @@ class TestPrivateWorkflowPauseEntity:
)
state_key = f"workflow-state-{uuid4()}.json"
pause = WorkflowPause(
id=str(uuid4()),
workflow_id=test_scope.workflow_id,
workflow_run_id=workflow_run.id,
state_object_key=state_key,
@ -606,7 +602,6 @@ class TestPrivateWorkflowPauseEntity:
)
state_key = f"workflow-state-{uuid4()}.json"
pause = WorkflowPause(
id=str(uuid4()),
workflow_id=test_scope.workflow_id,
workflow_run_id=workflow_run.id,
state_object_key=state_key,
@ -672,7 +667,6 @@ class TestBuildHumanInputRequiredReason:
status=WorkflowExecutionStatus.RUNNING,
)
pause = WorkflowPause(
id=str(uuid4()),
workflow_id=test_scope.workflow_id,
workflow_run_id=workflow_run.id,
state_object_key=f"workflow-state-{uuid4()}.json",