refactor: models

This commit is contained in:
Yeuoly 2025-10-18 20:06:46 +08:00
parent 65c6203ad7
commit b63b9c32f7
6 changed files with 337 additions and 326 deletions

View File

@ -2,7 +2,8 @@ import json
import time
from collections.abc import Mapping
from datetime import datetime
from typing import Any, cast
from enum import StrEnum
from typing import Any, Optional, cast
import sqlalchemy as sa
from sqlalchemy import DateTime, Index, Integer, String, UniqueConstraint, func
@ -12,8 +13,13 @@ from core.plugin.entities.plugin_daemon import CredentialType
from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity
from core.trigger.entities.entities import Subscription
from core.trigger.utils.endpoint import parse_endpoint_id
from core.workflow.enums import NodeType
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.base import Base
from models.types import StringUUID
from models.enums import CreatorUserRole
from models.model import Account
from models.types import EnumText, StringUUID
class TriggerSubscription(Base):
@ -141,3 +147,320 @@ class TriggerOAuthTenantClient(Base):
@property
def oauth_params(self) -> Mapping[str, Any]:
return cast(Mapping[str, Any], json.loads(self.encrypted_oauth_params or "{}"))
class WorkflowTriggerStatus(StrEnum):
"""Workflow Trigger Execution Status"""
PENDING = "pending"
QUEUED = "queued"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
RATE_LIMITED = "rate_limited"
RETRYING = "retrying"
class WorkflowTriggerLog(Base):
"""
Workflow Trigger Log
Track async trigger workflow runs with re-invocation capability
Attributes:
- id (uuid) Trigger Log ID (used as workflow_trigger_log_id)
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- workflow_id (uuid) Workflow ID
- workflow_run_id (uuid) Optional - Associated workflow run ID when execution starts
- root_node_id (string) Optional - Custom starting node ID for workflow execution
- trigger_type (string) Type of trigger: webhook, schedule, plugin
- trigger_data (text) Full trigger data including inputs (JSON)
- inputs (text) Input parameters (JSON)
- outputs (text) Optional - Output content (JSON)
- status (string) Execution status
- error (text) Optional - Error message if failed
- queue_name (string) Celery queue used
- celery_task_id (string) Optional - Celery task ID for tracking
- retry_count (int) Number of retry attempts
- elapsed_time (float) Optional - Time consumption in seconds
- total_tokens (int) Optional - Total tokens used
- created_by_role (string) Creator role: account, end_user
- created_by (string) Creator ID
- created_at (timestamp) Creation time
- triggered_at (timestamp) Optional - When actually triggered
- finished_at (timestamp) Optional - Completion time
"""
__tablename__ = "workflow_trigger_logs"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_trigger_log_pkey"),
sa.Index("workflow_trigger_log_tenant_app_idx", "tenant_id", "app_id"),
sa.Index("workflow_trigger_log_status_idx", "status"),
sa.Index("workflow_trigger_log_created_at_idx", "created_at"),
sa.Index("workflow_trigger_log_workflow_run_idx", "workflow_run_id"),
sa.Index("workflow_trigger_log_workflow_id_idx", "workflow_id"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID, nullable=True)
root_node_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
trigger_type: Mapped[str] = mapped_column(String(50), nullable=False)
trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON
inputs: Mapped[str] = mapped_column(sa.Text, nullable=False) # Just inputs for easy viewing
outputs: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True)
status: Mapped[str] = mapped_column(String(50), nullable=False, default=WorkflowTriggerStatus.PENDING)
error: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True)
queue_name: Mapped[str] = mapped_column(String(100), nullable=False)
celery_task_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
retry_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, default=0)
elapsed_time: Mapped[Optional[float]] = mapped_column(sa.Float, nullable=True)
total_tokens: Mapped[Optional[int]] = mapped_column(sa.Integer, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
created_by_role: Mapped[str] = mapped_column(String(255), nullable=False)
created_by: Mapped[str] = mapped_column(String(255), nullable=False)
triggered_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
@property
def created_by_account(self):
created_by_role = CreatorUserRole(self.created_by_role)
return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
@property
def created_by_end_user(self):
from models.model import EndUser
created_by_role = CreatorUserRole(self.created_by_role)
return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for API responses"""
return {
"id": self.id,
"tenant_id": self.tenant_id,
"app_id": self.app_id,
"workflow_id": self.workflow_id,
"workflow_run_id": self.workflow_run_id,
"trigger_type": self.trigger_type,
"trigger_data": json.loads(self.trigger_data),
"inputs": json.loads(self.inputs),
"outputs": json.loads(self.outputs) if self.outputs else None,
"status": self.status,
"error": self.error,
"queue_name": self.queue_name,
"celery_task_id": self.celery_task_id,
"retry_count": self.retry_count,
"elapsed_time": self.elapsed_time,
"total_tokens": self.total_tokens,
"created_by_role": self.created_by_role,
"created_by": self.created_by,
"created_at": self.created_at.isoformat() if self.created_at else None,
"triggered_at": self.triggered_at.isoformat() if self.triggered_at else None,
"finished_at": self.finished_at.isoformat() if self.finished_at else None,
}
class WorkflowWebhookTrigger(Base):
"""
Workflow Webhook Trigger
Attributes:
- id (uuid) Primary key
- app_id (uuid) App ID to bind to a specific app
- node_id (varchar) Node ID which node in the workflow
- tenant_id (uuid) Workspace ID
- webhook_id (varchar) Webhook ID for URL: https://api.dify.ai/triggers/webhook/:webhook_id
- created_by (varchar) User ID of the creator
- created_at (timestamp) Creation time
- updated_at (timestamp) Last update time
"""
__tablename__ = "workflow_webhook_triggers"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_webhook_trigger_pkey"),
sa.Index("workflow_webhook_trigger_tenant_idx", "tenant_id"),
sa.UniqueConstraint("app_id", "node_id", name="uniq_node"),
sa.UniqueConstraint("webhook_id", name="uniq_webhook_id"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
webhook_id: Mapped[str] = mapped_column(String(24), nullable=False)
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.current_timestamp(),
server_onupdate=func.current_timestamp(),
)
class WorkflowPluginTrigger(Base):
"""
Workflow Plugin Trigger
Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger
Attributes:
- id (uuid) Primary key
- app_id (uuid) App ID to bind to a specific app
- node_id (varchar) Node ID which node in the workflow
- tenant_id (uuid) Workspace ID
- provider_id (varchar) Plugin provider ID
- event_name (varchar) trigger name
- subscription_id (varchar) Subscription ID
- created_at (timestamp) Creation time
- updated_at (timestamp) Last update time
"""
__tablename__ = "workflow_plugin_triggers"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"),
sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "event_name"),
sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
provider_id: Mapped[str] = mapped_column(String(512), nullable=False)
event_name: Mapped[str] = mapped_column(String(255), nullable=False)
subscription_id: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.current_timestamp(),
server_onupdate=func.current_timestamp(),
)
class AppTriggerType(StrEnum):
"""App Trigger Type Enum"""
TRIGGER_WEBHOOK = NodeType.TRIGGER_WEBHOOK.value
TRIGGER_SCHEDULE = NodeType.TRIGGER_SCHEDULE.value
TRIGGER_PLUGIN = NodeType.TRIGGER_PLUGIN.value
class AppTriggerStatus(StrEnum):
"""App Trigger Status Enum"""
ENABLED = "enabled"
DISABLED = "disabled"
UNAUTHORIZED = "unauthorized"
class AppTrigger(Base):
"""
App Trigger
Manages multiple triggers for an app with enable/disable and authorization states.
Attributes:
- id (uuid) Primary key
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- trigger_type (string) Type: webhook, schedule, plugin
- title (string) Trigger title
- status (string) Status: enabled, disabled, unauthorized, error
- node_id (string) Optional workflow node ID
- created_at (timestamp) Creation time
- updated_at (timestamp) Last update time
"""
__tablename__ = "app_triggers"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="app_trigger_pkey"),
sa.Index("app_trigger_tenant_app_idx", "tenant_id", "app_id"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=False)
trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False)
title: Mapped[str] = mapped_column(String(255), nullable=False)
provider_name: Mapped[str] = mapped_column(String(255), server_default="", nullable=True)
status: Mapped[str] = mapped_column(
EnumText(AppTriggerStatus, length=50), nullable=False, default=AppTriggerStatus.ENABLED
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
default=naive_utc_now(),
server_onupdate=func.current_timestamp(),
)
class WorkflowSchedulePlan(Base):
"""
Workflow Schedule Configuration
Store schedule configurations for time-based workflow triggers.
Uses cron expressions with timezone support for flexible scheduling.
Attributes:
- id (uuid) Primary key
- app_id (uuid) App ID to bind to a specific app
- node_id (varchar) Starting node ID for workflow execution
- tenant_id (uuid) Workspace ID for multi-tenancy
- cron_expression (varchar) Cron expression defining schedule pattern
- timezone (varchar) Timezone for cron evaluation (e.g., 'Asia/Shanghai')
- next_run_at (timestamp) Next scheduled execution time
- created_at (timestamp) Creation timestamp
- updated_at (timestamp) Last update timestamp
"""
__tablename__ = "workflow_schedule_plans"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_schedule_plan_pkey"),
sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node"),
sa.Index("workflow_schedule_plan_next_idx", "next_run_at"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
# Schedule configuration
cron_expression: Mapped[str] = mapped_column(String(255), nullable=False)
timezone: Mapped[str] = mapped_column(String(64), nullable=False)
# Schedule control
next_run_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary representation"""
return {
"id": self.id,
"app_id": self.app_id,
"node_id": self.node_id,
"tenant_id": self.tenant_id,
"cron_expression": self.cron_expression,
"timezone": self.timezone,
"next_run_at": self.next_run_at.isoformat() if self.next_run_at else None,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}

View File

@ -1631,320 +1631,3 @@ class WorkflowDraftVariableFile(Base):
def is_system_variable_editable(name: str) -> bool:
return name in _EDITABLE_SYSTEM_VARIABLE
class WorkflowTriggerStatus(StrEnum):
"""Workflow Trigger Execution Status"""
PENDING = "pending"
QUEUED = "queued"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
RATE_LIMITED = "rate_limited"
RETRYING = "retrying"
class WorkflowTriggerLog(Base):
"""
Workflow Trigger Log
Track async trigger workflow runs with re-invocation capability
Attributes:
- id (uuid) Trigger Log ID (used as workflow_trigger_log_id)
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- workflow_id (uuid) Workflow ID
- workflow_run_id (uuid) Optional - Associated workflow run ID when execution starts
- root_node_id (string) Optional - Custom starting node ID for workflow execution
- trigger_type (string) Type of trigger: webhook, schedule, plugin
- trigger_data (text) Full trigger data including inputs (JSON)
- inputs (text) Input parameters (JSON)
- outputs (text) Optional - Output content (JSON)
- status (string) Execution status
- error (text) Optional - Error message if failed
- queue_name (string) Celery queue used
- celery_task_id (string) Optional - Celery task ID for tracking
- retry_count (int) Number of retry attempts
- elapsed_time (float) Optional - Time consumption in seconds
- total_tokens (int) Optional - Total tokens used
- created_by_role (string) Creator role: account, end_user
- created_by (string) Creator ID
- created_at (timestamp) Creation time
- triggered_at (timestamp) Optional - When actually triggered
- finished_at (timestamp) Optional - Completion time
"""
__tablename__ = "workflow_trigger_logs"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_trigger_log_pkey"),
sa.Index("workflow_trigger_log_tenant_app_idx", "tenant_id", "app_id"),
sa.Index("workflow_trigger_log_status_idx", "status"),
sa.Index("workflow_trigger_log_created_at_idx", "created_at"),
sa.Index("workflow_trigger_log_workflow_run_idx", "workflow_run_id"),
sa.Index("workflow_trigger_log_workflow_id_idx", "workflow_id"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID, nullable=True)
root_node_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
trigger_type: Mapped[str] = mapped_column(String(50), nullable=False)
trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON
inputs: Mapped[str] = mapped_column(sa.Text, nullable=False) # Just inputs for easy viewing
outputs: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True)
status: Mapped[str] = mapped_column(String(50), nullable=False, default=WorkflowTriggerStatus.PENDING)
error: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True)
queue_name: Mapped[str] = mapped_column(String(100), nullable=False)
celery_task_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
retry_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, default=0)
elapsed_time: Mapped[Optional[float]] = mapped_column(sa.Float, nullable=True)
total_tokens: Mapped[Optional[int]] = mapped_column(sa.Integer, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
created_by_role: Mapped[str] = mapped_column(String(255), nullable=False)
created_by: Mapped[str] = mapped_column(String(255), nullable=False)
triggered_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
@property
def created_by_account(self):
created_by_role = CreatorUserRole(self.created_by_role)
return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
@property
def created_by_end_user(self):
from models.model import EndUser
created_by_role = CreatorUserRole(self.created_by_role)
return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
def to_dict(self) -> dict:
"""Convert to dictionary for API responses"""
return {
"id": self.id,
"tenant_id": self.tenant_id,
"app_id": self.app_id,
"workflow_id": self.workflow_id,
"workflow_run_id": self.workflow_run_id,
"trigger_type": self.trigger_type,
"trigger_data": json.loads(self.trigger_data),
"inputs": json.loads(self.inputs),
"outputs": json.loads(self.outputs) if self.outputs else None,
"status": self.status,
"error": self.error,
"queue_name": self.queue_name,
"celery_task_id": self.celery_task_id,
"retry_count": self.retry_count,
"elapsed_time": self.elapsed_time,
"total_tokens": self.total_tokens,
"created_by_role": self.created_by_role,
"created_by": self.created_by,
"created_at": self.created_at.isoformat() if self.created_at else None,
"triggered_at": self.triggered_at.isoformat() if self.triggered_at else None,
"finished_at": self.finished_at.isoformat() if self.finished_at else None,
}
class WorkflowWebhookTrigger(Base):
"""
Workflow Webhook Trigger
Attributes:
- id (uuid) Primary key
- app_id (uuid) App ID to bind to a specific app
- node_id (varchar) Node ID which node in the workflow
- tenant_id (uuid) Workspace ID
- webhook_id (varchar) Webhook ID for URL: https://api.dify.ai/triggers/webhook/:webhook_id
- created_by (varchar) User ID of the creator
- created_at (timestamp) Creation time
- updated_at (timestamp) Last update time
"""
__tablename__ = "workflow_webhook_triggers"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_webhook_trigger_pkey"),
sa.Index("workflow_webhook_trigger_tenant_idx", "tenant_id"),
sa.UniqueConstraint("app_id", "node_id", name="uniq_node"),
sa.UniqueConstraint("webhook_id", name="uniq_webhook_id"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
webhook_id: Mapped[str] = mapped_column(String(24), nullable=False)
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.current_timestamp(),
server_onupdate=func.current_timestamp(),
)
class WorkflowPluginTrigger(Base):
"""
Workflow Plugin Trigger
Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger
Attributes:
- id (uuid) Primary key
- app_id (uuid) App ID to bind to a specific app
- node_id (varchar) Node ID which node in the workflow
- tenant_id (uuid) Workspace ID
- provider_id (varchar) Plugin provider ID
- event_name (varchar) trigger name
- subscription_id (varchar) Subscription ID
- created_at (timestamp) Creation time
- updated_at (timestamp) Last update time
"""
__tablename__ = "workflow_plugin_triggers"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"),
sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "event_name"),
sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
provider_id: Mapped[str] = mapped_column(String(512), nullable=False)
event_name: Mapped[str] = mapped_column(String(255), nullable=False)
subscription_id: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.current_timestamp(),
server_onupdate=func.current_timestamp(),
)
class AppTriggerType(StrEnum):
"""App Trigger Type Enum"""
TRIGGER_WEBHOOK = "trigger-webhook"
TRIGGER_SCHEDULE = "trigger-schedule"
TRIGGER_PLUGIN = "trigger-plugin"
class AppTriggerStatus(StrEnum):
"""App Trigger Status Enum"""
ENABLED = "enabled"
DISABLED = "disabled"
UNAUTHORIZED = "unauthorized"
class AppTrigger(Base):
"""
App Trigger
Manages multiple triggers for an app with enable/disable and authorization states.
Attributes:
- id (uuid) Primary key
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- trigger_type (string) Type: webhook, schedule, plugin
- title (string) Trigger title
- status (string) Status: enabled, disabled, unauthorized, error
- node_id (string) Optional workflow node ID
- created_at (timestamp) Creation time
- updated_at (timestamp) Last update time
"""
__tablename__ = "app_triggers"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="app_trigger_pkey"),
sa.Index("app_trigger_tenant_app_idx", "tenant_id", "app_id"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=False)
trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False)
title: Mapped[str] = mapped_column(String(255), nullable=False)
provider_name: Mapped[str] = mapped_column(String(255), server_default="", nullable=True)
status: Mapped[str] = mapped_column(
EnumText(AppTriggerStatus, length=50), nullable=False, default=AppTriggerStatus.ENABLED
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
default=naive_utc_now(),
server_onupdate=func.current_timestamp(),
)
class WorkflowSchedulePlan(Base):
"""
Workflow Schedule Configuration
Store schedule configurations for time-based workflow triggers.
Uses cron expressions with timezone support for flexible scheduling.
Attributes:
- id (uuid) Primary key
- app_id (uuid) App ID to bind to a specific app
- node_id (varchar) Starting node ID for workflow execution
- tenant_id (uuid) Workspace ID for multi-tenancy
- cron_expression (varchar) Cron expression defining schedule pattern
- timezone (varchar) Timezone for cron evaluation (e.g., 'Asia/Shanghai')
- next_run_at (timestamp) Next scheduled execution time
- created_at (timestamp) Creation timestamp
- updated_at (timestamp) Last update timestamp
"""
__tablename__ = "workflow_schedule_plans"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_schedule_plan_pkey"),
sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node"),
sa.Index("workflow_schedule_plan_next_idx", "next_run_at"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
# Schedule configuration
cron_expression: Mapped[str] = mapped_column(String(255), nullable=False)
timezone: Mapped[str] = mapped_column(String(64), nullable=False)
# Schedule control
next_run_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
def to_dict(self) -> dict:
"""Convert to dictionary representation"""
return {
"id": self.id,
"app_id": self.app_id,
"node_id": self.node_id,
"tenant_id": self.tenant_id,
"cron_expression": self.cron_expression,
"timezone": self.timezone,
"next_run_at": self.next_run_at.isoformat() if self.next_run_at else None,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}

View File

@ -28,8 +28,12 @@ from core.trigger.utils.encryption import (
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.provider_ids import TriggerProviderID
from models.trigger import TriggerOAuthSystemClient, TriggerOAuthTenantClient, TriggerSubscription
from models.workflow import WorkflowPluginTrigger
from models.trigger import (
TriggerOAuthSystemClient,
TriggerOAuthTenantClient,
TriggerSubscription,
WorkflowPluginTrigger,
)
from services.plugin.plugin_service import PluginService
logger = logging.getLogger(__name__)

View File

@ -22,8 +22,8 @@ from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.model import App
from models.provider_ids import TriggerProviderID
from models.trigger import TriggerSubscription
from models.workflow import AppTrigger, AppTriggerStatus, Workflow, WorkflowPluginTrigger
from models.trigger import AppTrigger, AppTriggerStatus, TriggerSubscription, WorkflowPluginTrigger
from models.workflow import Workflow
from services.trigger.trigger_provider_service import TriggerProviderService
from services.trigger.trigger_request_service import TriggerRequestService
from services.workflow.entities import PluginTriggerDispatchData

View File

@ -22,7 +22,8 @@ from extensions.ext_redis import redis_client
from factories import file_factory
from models.enums import WorkflowRunTriggeredFrom
from models.model import App
from models.workflow import AppTrigger, AppTriggerStatus, AppTriggerType, Workflow, WorkflowWebhookTrigger
from models.trigger import AppTrigger, AppTriggerStatus, AppTriggerType, WorkflowWebhookTrigger
from models.workflow import Workflow
from services.async_workflow_service import AsyncWorkflowService
from services.end_user_service import EndUserService
from services.workflow.entities import TriggerData

View File

@ -26,8 +26,8 @@ from extensions.ext_database import db
from models.enums import WorkflowRunTriggeredFrom
from models.model import EndUser
from models.provider_ids import TriggerProviderID
from models.trigger import TriggerSubscription
from models.workflow import Workflow, WorkflowPluginTrigger
from models.trigger import TriggerSubscription, WorkflowPluginTrigger
from models.workflow import Workflow
from services.async_workflow_service import AsyncWorkflowService
from services.end_user_service import EndUserService
from services.trigger.trigger_provider_service import TriggerProviderService