From b63b9c32f772945b7d37cd7f61f5cc4443b8feea Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Sat, 18 Oct 2025 20:06:46 +0800 Subject: [PATCH] refactor: models --- api/models/trigger.py | 327 +++++++++++++++++- api/models/workflow.py | 317 ----------------- .../trigger/trigger_provider_service.py | 8 +- api/services/trigger/trigger_service.py | 4 +- api/services/trigger/webhook_service.py | 3 +- api/tasks/trigger_processing_tasks.py | 4 +- 6 files changed, 337 insertions(+), 326 deletions(-) diff --git a/api/models/trigger.py b/api/models/trigger.py index 6eb54cc77f..351b4c2868 100644 --- a/api/models/trigger.py +++ b/api/models/trigger.py @@ -2,7 +2,8 @@ import json import time from collections.abc import Mapping from datetime import datetime -from typing import Any, cast +from enum import StrEnum +from typing import Any, Optional, cast import sqlalchemy as sa from sqlalchemy import DateTime, Index, Integer, String, UniqueConstraint, func @@ -12,8 +13,13 @@ from core.plugin.entities.plugin_daemon import CredentialType from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity from core.trigger.entities.entities import Subscription from core.trigger.utils.endpoint import parse_endpoint_id +from core.workflow.enums import NodeType +from extensions.ext_database import db +from libs.datetime_utils import naive_utc_now from models.base import Base -from models.types import StringUUID +from models.enums import CreatorUserRole +from models.model import Account +from models.types import EnumText, StringUUID class TriggerSubscription(Base): @@ -141,3 +147,320 @@ class TriggerOAuthTenantClient(Base): @property def oauth_params(self) -> Mapping[str, Any]: return cast(Mapping[str, Any], json.loads(self.encrypted_oauth_params or "{}")) + + +class WorkflowTriggerStatus(StrEnum): + """Workflow Trigger Execution Status""" + + PENDING = "pending" + QUEUED = "queued" + RUNNING = "running" + SUCCEEDED = "succeeded" + FAILED = "failed" + RATE_LIMITED = "rate_limited" + RETRYING = "retrying" + + +class WorkflowTriggerLog(Base): + """ + Workflow Trigger Log + + Track async trigger workflow runs with re-invocation capability + + Attributes: + - id (uuid) Trigger Log ID (used as workflow_trigger_log_id) + - tenant_id (uuid) Workspace ID + - app_id (uuid) App ID + - workflow_id (uuid) Workflow ID + - workflow_run_id (uuid) Optional - Associated workflow run ID when execution starts + - root_node_id (string) Optional - Custom starting node ID for workflow execution + - trigger_type (string) Type of trigger: webhook, schedule, plugin + - trigger_data (text) Full trigger data including inputs (JSON) + - inputs (text) Input parameters (JSON) + - outputs (text) Optional - Output content (JSON) + - status (string) Execution status + - error (text) Optional - Error message if failed + - queue_name (string) Celery queue used + - celery_task_id (string) Optional - Celery task ID for tracking + - retry_count (int) Number of retry attempts + - elapsed_time (float) Optional - Time consumption in seconds + - total_tokens (int) Optional - Total tokens used + - created_by_role (string) Creator role: account, end_user + - created_by (string) Creator ID + - created_at (timestamp) Creation time + - triggered_at (timestamp) Optional - When actually triggered + - finished_at (timestamp) Optional - Completion time + """ + + __tablename__ = "workflow_trigger_logs" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="workflow_trigger_log_pkey"), + sa.Index("workflow_trigger_log_tenant_app_idx", "tenant_id", "app_id"), + sa.Index("workflow_trigger_log_status_idx", "status"), + sa.Index("workflow_trigger_log_created_at_idx", "created_at"), + sa.Index("workflow_trigger_log_workflow_run_idx", "workflow_run_id"), + sa.Index("workflow_trigger_log_workflow_id_idx", "workflow_id"), + ) + + id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID, nullable=True) + root_node_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) + + trigger_type: Mapped[str] = mapped_column(String(50), nullable=False) + trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON + inputs: Mapped[str] = mapped_column(sa.Text, nullable=False) # Just inputs for easy viewing + outputs: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True) + + status: Mapped[str] = mapped_column(String(50), nullable=False, default=WorkflowTriggerStatus.PENDING) + error: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True) + + queue_name: Mapped[str] = mapped_column(String(100), nullable=False) + celery_task_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) + retry_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, default=0) + + elapsed_time: Mapped[Optional[float]] = mapped_column(sa.Float, nullable=True) + total_tokens: Mapped[Optional[int]] = mapped_column(sa.Integer, nullable=True) + + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + created_by_role: Mapped[str] = mapped_column(String(255), nullable=False) + created_by: Mapped[str] = mapped_column(String(255), nullable=False) + + triggered_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) + finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) + + @property + def created_by_account(self): + created_by_role = CreatorUserRole(self.created_by_role) + return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None + + @property + def created_by_end_user(self): + from models.model import EndUser + + created_by_role = CreatorUserRole(self.created_by_role) + return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for API responses""" + return { + "id": self.id, + "tenant_id": self.tenant_id, + "app_id": self.app_id, + "workflow_id": self.workflow_id, + "workflow_run_id": self.workflow_run_id, + "trigger_type": self.trigger_type, + "trigger_data": json.loads(self.trigger_data), + "inputs": json.loads(self.inputs), + "outputs": json.loads(self.outputs) if self.outputs else None, + "status": self.status, + "error": self.error, + "queue_name": self.queue_name, + "celery_task_id": self.celery_task_id, + "retry_count": self.retry_count, + "elapsed_time": self.elapsed_time, + "total_tokens": self.total_tokens, + "created_by_role": self.created_by_role, + "created_by": self.created_by, + "created_at": self.created_at.isoformat() if self.created_at else None, + "triggered_at": self.triggered_at.isoformat() if self.triggered_at else None, + "finished_at": self.finished_at.isoformat() if self.finished_at else None, + } + + +class WorkflowWebhookTrigger(Base): + """ + Workflow Webhook Trigger + + Attributes: + - id (uuid) Primary key + - app_id (uuid) App ID to bind to a specific app + - node_id (varchar) Node ID which node in the workflow + - tenant_id (uuid) Workspace ID + - webhook_id (varchar) Webhook ID for URL: https://api.dify.ai/triggers/webhook/:webhook_id + - created_by (varchar) User ID of the creator + - created_at (timestamp) Creation time + - updated_at (timestamp) Last update time + """ + + __tablename__ = "workflow_webhook_triggers" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="workflow_webhook_trigger_pkey"), + sa.Index("workflow_webhook_trigger_tenant_idx", "tenant_id"), + sa.UniqueConstraint("app_id", "node_id", name="uniq_node"), + sa.UniqueConstraint("webhook_id", name="uniq_webhook_id"), + ) + + id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + node_id: Mapped[str] = mapped_column(String(64), nullable=False) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + webhook_id: Mapped[str] = mapped_column(String(24), nullable=False) + created_by: Mapped[str] = mapped_column(StringUUID, nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + server_default=func.current_timestamp(), + server_onupdate=func.current_timestamp(), + ) + + +class WorkflowPluginTrigger(Base): + """ + Workflow Plugin Trigger + + Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger + + Attributes: + - id (uuid) Primary key + - app_id (uuid) App ID to bind to a specific app + - node_id (varchar) Node ID which node in the workflow + - tenant_id (uuid) Workspace ID + - provider_id (varchar) Plugin provider ID + - event_name (varchar) trigger name + - subscription_id (varchar) Subscription ID + - created_at (timestamp) Creation time + - updated_at (timestamp) Last update time + """ + + __tablename__ = "workflow_plugin_triggers" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"), + sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "event_name"), + sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"), + ) + + id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + node_id: Mapped[str] = mapped_column(String(64), nullable=False) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + provider_id: Mapped[str] = mapped_column(String(512), nullable=False) + event_name: Mapped[str] = mapped_column(String(255), nullable=False) + subscription_id: Mapped[str] = mapped_column(String(255), nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + server_default=func.current_timestamp(), + server_onupdate=func.current_timestamp(), + ) + + +class AppTriggerType(StrEnum): + """App Trigger Type Enum""" + + TRIGGER_WEBHOOK = NodeType.TRIGGER_WEBHOOK.value + TRIGGER_SCHEDULE = NodeType.TRIGGER_SCHEDULE.value + TRIGGER_PLUGIN = NodeType.TRIGGER_PLUGIN.value + + +class AppTriggerStatus(StrEnum): + """App Trigger Status Enum""" + + ENABLED = "enabled" + DISABLED = "disabled" + UNAUTHORIZED = "unauthorized" + + +class AppTrigger(Base): + """ + App Trigger + + Manages multiple triggers for an app with enable/disable and authorization states. + + Attributes: + - id (uuid) Primary key + - tenant_id (uuid) Workspace ID + - app_id (uuid) App ID + - trigger_type (string) Type: webhook, schedule, plugin + - title (string) Trigger title + + - status (string) Status: enabled, disabled, unauthorized, error + - node_id (string) Optional workflow node ID + - created_at (timestamp) Creation time + - updated_at (timestamp) Last update time + """ + + __tablename__ = "app_triggers" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="app_trigger_pkey"), + sa.Index("app_trigger_tenant_app_idx", "tenant_id", "app_id"), + ) + + id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + node_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=False) + trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False) + title: Mapped[str] = mapped_column(String(255), nullable=False) + provider_name: Mapped[str] = mapped_column(String(255), server_default="", nullable=True) + status: Mapped[str] = mapped_column( + EnumText(AppTriggerStatus, length=50), nullable=False, default=AppTriggerStatus.ENABLED + ) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + default=naive_utc_now(), + server_onupdate=func.current_timestamp(), + ) + + +class WorkflowSchedulePlan(Base): + """ + Workflow Schedule Configuration + + Store schedule configurations for time-based workflow triggers. + Uses cron expressions with timezone support for flexible scheduling. + + Attributes: + - id (uuid) Primary key + - app_id (uuid) App ID to bind to a specific app + - node_id (varchar) Starting node ID for workflow execution + - tenant_id (uuid) Workspace ID for multi-tenancy + - cron_expression (varchar) Cron expression defining schedule pattern + - timezone (varchar) Timezone for cron evaluation (e.g., 'Asia/Shanghai') + - next_run_at (timestamp) Next scheduled execution time + - created_at (timestamp) Creation timestamp + - updated_at (timestamp) Last update timestamp + """ + + __tablename__ = "workflow_schedule_plans" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="workflow_schedule_plan_pkey"), + sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node"), + sa.Index("workflow_schedule_plan_next_idx", "next_run_at"), + ) + + id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + node_id: Mapped[str] = mapped_column(String(64), nullable=False) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + + # Schedule configuration + cron_expression: Mapped[str] = mapped_column(String(255), nullable=False) + timezone: Mapped[str] = mapped_column(String(64), nullable=False) + + # Schedule control + next_run_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp() + ) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary representation""" + return { + "id": self.id, + "app_id": self.app_id, + "node_id": self.node_id, + "tenant_id": self.tenant_id, + "cron_expression": self.cron_expression, + "timezone": self.timezone, + "next_run_at": self.next_run_at.isoformat() if self.next_run_at else None, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + } diff --git a/api/models/workflow.py b/api/models/workflow.py index 7c907077cf..1ef5330504 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1631,320 +1631,3 @@ class WorkflowDraftVariableFile(Base): def is_system_variable_editable(name: str) -> bool: return name in _EDITABLE_SYSTEM_VARIABLE - - -class WorkflowTriggerStatus(StrEnum): - """Workflow Trigger Execution Status""" - - PENDING = "pending" - QUEUED = "queued" - RUNNING = "running" - SUCCEEDED = "succeeded" - FAILED = "failed" - RATE_LIMITED = "rate_limited" - RETRYING = "retrying" - - -class WorkflowTriggerLog(Base): - """ - Workflow Trigger Log - - Track async trigger workflow runs with re-invocation capability - - Attributes: - - id (uuid) Trigger Log ID (used as workflow_trigger_log_id) - - tenant_id (uuid) Workspace ID - - app_id (uuid) App ID - - workflow_id (uuid) Workflow ID - - workflow_run_id (uuid) Optional - Associated workflow run ID when execution starts - - root_node_id (string) Optional - Custom starting node ID for workflow execution - - trigger_type (string) Type of trigger: webhook, schedule, plugin - - trigger_data (text) Full trigger data including inputs (JSON) - - inputs (text) Input parameters (JSON) - - outputs (text) Optional - Output content (JSON) - - status (string) Execution status - - error (text) Optional - Error message if failed - - queue_name (string) Celery queue used - - celery_task_id (string) Optional - Celery task ID for tracking - - retry_count (int) Number of retry attempts - - elapsed_time (float) Optional - Time consumption in seconds - - total_tokens (int) Optional - Total tokens used - - created_by_role (string) Creator role: account, end_user - - created_by (string) Creator ID - - created_at (timestamp) Creation time - - triggered_at (timestamp) Optional - When actually triggered - - finished_at (timestamp) Optional - Completion time - """ - - __tablename__ = "workflow_trigger_logs" - __table_args__ = ( - sa.PrimaryKeyConstraint("id", name="workflow_trigger_log_pkey"), - sa.Index("workflow_trigger_log_tenant_app_idx", "tenant_id", "app_id"), - sa.Index("workflow_trigger_log_status_idx", "status"), - sa.Index("workflow_trigger_log_created_at_idx", "created_at"), - sa.Index("workflow_trigger_log_workflow_run_idx", "workflow_run_id"), - sa.Index("workflow_trigger_log_workflow_id_idx", "workflow_id"), - ) - - id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) - tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID, nullable=True) - root_node_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) - - trigger_type: Mapped[str] = mapped_column(String(50), nullable=False) - trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON - inputs: Mapped[str] = mapped_column(sa.Text, nullable=False) # Just inputs for easy viewing - outputs: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True) - - status: Mapped[str] = mapped_column(String(50), nullable=False, default=WorkflowTriggerStatus.PENDING) - error: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True) - - queue_name: Mapped[str] = mapped_column(String(100), nullable=False) - celery_task_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) - retry_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, default=0) - - elapsed_time: Mapped[Optional[float]] = mapped_column(sa.Float, nullable=True) - total_tokens: Mapped[Optional[int]] = mapped_column(sa.Integer, nullable=True) - - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) - created_by_role: Mapped[str] = mapped_column(String(255), nullable=False) - created_by: Mapped[str] = mapped_column(String(255), nullable=False) - - triggered_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) - finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) - - @property - def created_by_account(self): - created_by_role = CreatorUserRole(self.created_by_role) - return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None - - @property - def created_by_end_user(self): - from models.model import EndUser - - created_by_role = CreatorUserRole(self.created_by_role) - return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None - - def to_dict(self) -> dict: - """Convert to dictionary for API responses""" - return { - "id": self.id, - "tenant_id": self.tenant_id, - "app_id": self.app_id, - "workflow_id": self.workflow_id, - "workflow_run_id": self.workflow_run_id, - "trigger_type": self.trigger_type, - "trigger_data": json.loads(self.trigger_data), - "inputs": json.loads(self.inputs), - "outputs": json.loads(self.outputs) if self.outputs else None, - "status": self.status, - "error": self.error, - "queue_name": self.queue_name, - "celery_task_id": self.celery_task_id, - "retry_count": self.retry_count, - "elapsed_time": self.elapsed_time, - "total_tokens": self.total_tokens, - "created_by_role": self.created_by_role, - "created_by": self.created_by, - "created_at": self.created_at.isoformat() if self.created_at else None, - "triggered_at": self.triggered_at.isoformat() if self.triggered_at else None, - "finished_at": self.finished_at.isoformat() if self.finished_at else None, - } - - -class WorkflowWebhookTrigger(Base): - """ - Workflow Webhook Trigger - - Attributes: - - id (uuid) Primary key - - app_id (uuid) App ID to bind to a specific app - - node_id (varchar) Node ID which node in the workflow - - tenant_id (uuid) Workspace ID - - webhook_id (varchar) Webhook ID for URL: https://api.dify.ai/triggers/webhook/:webhook_id - - created_by (varchar) User ID of the creator - - created_at (timestamp) Creation time - - updated_at (timestamp) Last update time - """ - - __tablename__ = "workflow_webhook_triggers" - __table_args__ = ( - sa.PrimaryKeyConstraint("id", name="workflow_webhook_trigger_pkey"), - sa.Index("workflow_webhook_trigger_tenant_idx", "tenant_id"), - sa.UniqueConstraint("app_id", "node_id", name="uniq_node"), - sa.UniqueConstraint("webhook_id", name="uniq_webhook_id"), - ) - - id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) - app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - node_id: Mapped[str] = mapped_column(String(64), nullable=False) - tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - webhook_id: Mapped[str] = mapped_column(String(24), nullable=False) - created_by: Mapped[str] = mapped_column(StringUUID, nullable=False) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) - updated_at: Mapped[datetime] = mapped_column( - DateTime, - nullable=False, - server_default=func.current_timestamp(), - server_onupdate=func.current_timestamp(), - ) - - -class WorkflowPluginTrigger(Base): - """ - Workflow Plugin Trigger - - Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger - - Attributes: - - id (uuid) Primary key - - app_id (uuid) App ID to bind to a specific app - - node_id (varchar) Node ID which node in the workflow - - tenant_id (uuid) Workspace ID - - provider_id (varchar) Plugin provider ID - - event_name (varchar) trigger name - - subscription_id (varchar) Subscription ID - - created_at (timestamp) Creation time - - updated_at (timestamp) Last update time - """ - - __tablename__ = "workflow_plugin_triggers" - __table_args__ = ( - sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"), - sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "event_name"), - sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"), - ) - - id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) - app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - node_id: Mapped[str] = mapped_column(String(64), nullable=False) - tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - provider_id: Mapped[str] = mapped_column(String(512), nullable=False) - event_name: Mapped[str] = mapped_column(String(255), nullable=False) - subscription_id: Mapped[str] = mapped_column(String(255), nullable=False) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) - updated_at: Mapped[datetime] = mapped_column( - DateTime, - nullable=False, - server_default=func.current_timestamp(), - server_onupdate=func.current_timestamp(), - ) - - -class AppTriggerType(StrEnum): - """App Trigger Type Enum""" - - TRIGGER_WEBHOOK = "trigger-webhook" - TRIGGER_SCHEDULE = "trigger-schedule" - TRIGGER_PLUGIN = "trigger-plugin" - - -class AppTriggerStatus(StrEnum): - """App Trigger Status Enum""" - - ENABLED = "enabled" - DISABLED = "disabled" - UNAUTHORIZED = "unauthorized" - - -class AppTrigger(Base): - """ - App Trigger - - Manages multiple triggers for an app with enable/disable and authorization states. - - Attributes: - - id (uuid) Primary key - - tenant_id (uuid) Workspace ID - - app_id (uuid) App ID - - trigger_type (string) Type: webhook, schedule, plugin - - title (string) Trigger title - - - status (string) Status: enabled, disabled, unauthorized, error - - node_id (string) Optional workflow node ID - - created_at (timestamp) Creation time - - updated_at (timestamp) Last update time - """ - - __tablename__ = "app_triggers" - __table_args__ = ( - sa.PrimaryKeyConstraint("id", name="app_trigger_pkey"), - sa.Index("app_trigger_tenant_app_idx", "tenant_id", "app_id"), - ) - - id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) - tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - node_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=False) - trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False) - title: Mapped[str] = mapped_column(String(255), nullable=False) - provider_name: Mapped[str] = mapped_column(String(255), server_default="", nullable=True) - status: Mapped[str] = mapped_column( - EnumText(AppTriggerStatus, length=50), nullable=False, default=AppTriggerStatus.ENABLED - ) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) - updated_at: Mapped[datetime] = mapped_column( - DateTime, - nullable=False, - default=naive_utc_now(), - server_onupdate=func.current_timestamp(), - ) - - -class WorkflowSchedulePlan(Base): - """ - Workflow Schedule Configuration - - Store schedule configurations for time-based workflow triggers. - Uses cron expressions with timezone support for flexible scheduling. - - Attributes: - - id (uuid) Primary key - - app_id (uuid) App ID to bind to a specific app - - node_id (varchar) Starting node ID for workflow execution - - tenant_id (uuid) Workspace ID for multi-tenancy - - cron_expression (varchar) Cron expression defining schedule pattern - - timezone (varchar) Timezone for cron evaluation (e.g., 'Asia/Shanghai') - - next_run_at (timestamp) Next scheduled execution time - - created_at (timestamp) Creation timestamp - - updated_at (timestamp) Last update timestamp - """ - - __tablename__ = "workflow_schedule_plans" - __table_args__ = ( - sa.PrimaryKeyConstraint("id", name="workflow_schedule_plan_pkey"), - sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node"), - sa.Index("workflow_schedule_plan_next_idx", "next_run_at"), - ) - - id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) - app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - node_id: Mapped[str] = mapped_column(String(64), nullable=False) - tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - - # Schedule configuration - cron_expression: Mapped[str] = mapped_column(String(255), nullable=False) - timezone: Mapped[str] = mapped_column(String(64), nullable=False) - - # Schedule control - next_run_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) - updated_at: Mapped[datetime] = mapped_column( - DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp() - ) - - def to_dict(self) -> dict: - """Convert to dictionary representation""" - return { - "id": self.id, - "app_id": self.app_id, - "node_id": self.node_id, - "tenant_id": self.tenant_id, - "cron_expression": self.cron_expression, - "timezone": self.timezone, - "next_run_at": self.next_run_at.isoformat() if self.next_run_at else None, - "created_at": self.created_at.isoformat(), - "updated_at": self.updated_at.isoformat(), - } diff --git a/api/services/trigger/trigger_provider_service.py b/api/services/trigger/trigger_provider_service.py index 92f6e99a59..5b6b18c37b 100644 --- a/api/services/trigger/trigger_provider_service.py +++ b/api/services/trigger/trigger_provider_service.py @@ -28,8 +28,12 @@ from core.trigger.utils.encryption import ( from extensions.ext_database import db from extensions.ext_redis import redis_client from models.provider_ids import TriggerProviderID -from models.trigger import TriggerOAuthSystemClient, TriggerOAuthTenantClient, TriggerSubscription -from models.workflow import WorkflowPluginTrigger +from models.trigger import ( + TriggerOAuthSystemClient, + TriggerOAuthTenantClient, + TriggerSubscription, + WorkflowPluginTrigger, +) from services.plugin.plugin_service import PluginService logger = logging.getLogger(__name__) diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index c4f00aabbf..3ff6d99fbf 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -22,8 +22,8 @@ from extensions.ext_database import db from extensions.ext_redis import redis_client from models.model import App from models.provider_ids import TriggerProviderID -from models.trigger import TriggerSubscription -from models.workflow import AppTrigger, AppTriggerStatus, Workflow, WorkflowPluginTrigger +from models.trigger import AppTrigger, AppTriggerStatus, TriggerSubscription, WorkflowPluginTrigger +from models.workflow import Workflow from services.trigger.trigger_provider_service import TriggerProviderService from services.trigger.trigger_request_service import TriggerRequestService from services.workflow.entities import PluginTriggerDispatchData diff --git a/api/services/trigger/webhook_service.py b/api/services/trigger/webhook_service.py index 4e52ea4cdf..d661c39a7b 100644 --- a/api/services/trigger/webhook_service.py +++ b/api/services/trigger/webhook_service.py @@ -22,7 +22,8 @@ from extensions.ext_redis import redis_client from factories import file_factory from models.enums import WorkflowRunTriggeredFrom from models.model import App -from models.workflow import AppTrigger, AppTriggerStatus, AppTriggerType, Workflow, WorkflowWebhookTrigger +from models.trigger import AppTrigger, AppTriggerStatus, AppTriggerType, WorkflowWebhookTrigger +from models.workflow import Workflow from services.async_workflow_service import AsyncWorkflowService from services.end_user_service import EndUserService from services.workflow.entities import TriggerData diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index e4343bbde9..8305c1eafb 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -26,8 +26,8 @@ from extensions.ext_database import db from models.enums import WorkflowRunTriggeredFrom from models.model import EndUser from models.provider_ids import TriggerProviderID -from models.trigger import TriggerSubscription -from models.workflow import Workflow, WorkflowPluginTrigger +from models.trigger import TriggerSubscription, WorkflowPluginTrigger +from models.workflow import Workflow from services.async_workflow_service import AsyncWorkflowService from services.end_user_service import EndUserService from services.trigger.trigger_provider_service import TriggerProviderService