feat: add workflow schedule trigger support (#24428)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
诗浓 2025-09-10 13:24:23 +08:00 committed by GitHub
parent 07dda61929
commit 4a743e6dc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1815 additions and 5 deletions

View File

@ -505,6 +505,12 @@ ENABLE_CLEAN_MESSAGES=false
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false
ENABLE_DATASETS_QUEUE_MONITOR=false
ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true
ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK=true
# Interval time in minutes for polling scheduled workflows(default: 1 min)
WORKFLOW_SCHEDULE_POLLER_INTERVAL=1
WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE=100
# Maximum number of scheduled workflows to dispatch per tick (0 for unlimited)
WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK=0
# Position configuration
POSITION_TOOL_PINS=

View File

@ -882,6 +882,22 @@ class CeleryScheduleTasksConfig(BaseSettings):
description="Enable check upgradable plugin task",
default=True,
)
ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK: bool = Field(
description="Enable workflow schedule poller task",
default=True,
)
WORKFLOW_SCHEDULE_POLLER_INTERVAL: int = Field(
description="Workflow schedule poller interval in minutes",
default=1,
)
WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE: int = Field(
description="Maximum number of schedules to process in each poll batch",
default=100,
)
WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK: int = Field(
description="Maximum schedules to dispatch per tick (0=unlimited, circuit breaker)",
default=0,
)
class PositionConfig(BaseSettings):

View File

@ -20,6 +20,7 @@ from core.workflow.nodes.start import StartNode
from core.workflow.nodes.template_transform import TemplateTransformNode
from core.workflow.nodes.tool import ToolNode
from core.workflow.nodes.trigger_plugin import TriggerPluginNode
from core.workflow.nodes.trigger_schedule import TriggerScheduleNode
from core.workflow.nodes.trigger_webhook import TriggerWebhookNode
from core.workflow.nodes.variable_aggregator import VariableAggregatorNode
from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1
@ -142,4 +143,8 @@ NODE_TYPE_CLASSES_MAPPING: Mapping[NodeType, Mapping[str, type[BaseNode]]] = {
LATEST_VERSION: TriggerPluginNode,
"1": TriggerPluginNode,
},
NodeType.TRIGGER_SCHEDULE: {
LATEST_VERSION: TriggerScheduleNode,
"1": TriggerScheduleNode,
},
}

View File

@ -0,0 +1,3 @@
from core.workflow.nodes.trigger_schedule.trigger_schedule_node import TriggerScheduleNode
__all__ = ["TriggerScheduleNode"]

View File

@ -0,0 +1,51 @@
from typing import Literal, Optional, Union
from pydantic import BaseModel, Field
from core.workflow.nodes.base import BaseNodeData
class TriggerScheduleNodeData(BaseNodeData):
"""
Trigger Schedule Node Data
"""
mode: str = Field(default="visual", description="Schedule mode: visual or cron")
frequency: Optional[str] = Field(
default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly"
)
cron_expression: Optional[str] = Field(default=None, description="Cron expression for cron mode")
visual_config: Optional[dict] = Field(default=None, description="Visual configuration details")
timezone: str = Field(default="UTC", description="Timezone for schedule execution")
class ScheduleConfig(BaseModel):
node_id: str
cron_expression: str
timezone: str = "UTC"
class SchedulePlanUpdate(BaseModel):
node_id: Optional[str] = None
cron_expression: Optional[str] = None
timezone: Optional[str] = None
class VisualConfig(BaseModel):
"""Visual configuration for schedule trigger"""
# For hourly frequency
on_minute: Optional[int] = Field(default=0, ge=0, le=59, description="Minute of the hour (0-59)")
# For daily, weekly, monthly frequencies
time: Optional[str] = Field(default="12:00 PM", description="Time in 12-hour format (e.g., '2:30 PM')")
# For weekly frequency
weekdays: Optional[list[Literal["sun", "mon", "tue", "wed", "thu", "fri", "sat"]]] = Field(
default=None, description="List of weekdays to run on"
)
# For monthly frequency
monthly_days: Optional[list[Union[int, Literal["last"]]]] = Field(
default=None, description="Days of month to run on (1-31 or 'last')"
)

View File

@ -0,0 +1,31 @@
from core.workflow.nodes.base.exc import BaseNodeError
class ScheduleNodeError(BaseNodeError):
"""Base schedule node error."""
pass
class ScheduleNotFoundError(ScheduleNodeError):
"""Schedule not found error."""
pass
class ScheduleConfigError(ScheduleNodeError):
"""Schedule configuration error."""
pass
class ScheduleExecutionError(ScheduleNodeError):
"""Schedule execution error."""
pass
class TenantOwnerNotFoundError(ScheduleExecutionError):
"""Tenant owner not found error for schedule execution."""
pass

View File

@ -0,0 +1,62 @@
from collections.abc import Mapping
from datetime import UTC, datetime
from typing import Any, Optional
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.enums import ErrorStrategy, NodeType
from core.workflow.nodes.trigger_schedule.entities import TriggerScheduleNodeData
class TriggerScheduleNode(BaseNode):
_node_type = NodeType.TRIGGER_SCHEDULE
_node_data: TriggerScheduleNodeData
def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = TriggerScheduleNodeData(**data)
def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy
def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config
def _get_title(self) -> str:
return self._node_data.title
def _get_description(self) -> Optional[str]:
return self._node_data.desc
def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData:
return self._node_data
@classmethod
def version(cls) -> str:
return "1"
@classmethod
def get_default_config(cls, filters: Optional[dict] = None) -> dict:
return {
"type": "trigger-schedule",
"config": {
"mode": "visual",
"frequency": "weekly",
"visual_config": {"time": "11:30 AM", "on_minute": 0, "weekdays": ["sun"], "monthly_days": [1]},
"timezone": "UTC",
},
}
def _run(self) -> NodeRunResult:
current_time = datetime.now(UTC)
node_outputs = {"current_time": current_time.isoformat()}
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs=node_outputs,
)

View File

@ -34,10 +34,10 @@ if [[ "${MODE}" == "worker" ]]; then
if [[ -z "${CELERY_QUEUES}" ]]; then
if [[ "${EDITION}" == "CLOUD" ]]; then
# Cloud edition: separate queues for dataset and trigger tasks
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox"
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor"
else
# Community edition (SELF_HOSTED): dataset and workflow have separate queues
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow"
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor"
fi
else
DEFAULT_QUEUES="${CELERY_QUEUES}"

View File

@ -4,6 +4,7 @@ from .create_document_index import handle
from .create_installed_app_when_app_created import handle
from .create_site_record_when_app_created import handle
from .delete_tool_parameters_cache_when_sync_draft_workflow import handle
from .sync_workflow_schedule_when_app_published import handle
from .update_app_dataset_join_when_app_model_config_updated import handle
from .update_app_dataset_join_when_app_published_workflow_updated import handle
from .update_app_triggers_when_app_published_workflow_updated import handle

View File

@ -0,0 +1,86 @@
import logging
from typing import Optional, cast
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.workflow.nodes.trigger_schedule.entities import SchedulePlanUpdate
from events.app_event import app_published_workflow_was_updated
from extensions.ext_database import db
from models import AppMode, Workflow, WorkflowSchedulePlan
from services.schedule_service import ScheduleService
logger = logging.getLogger(__name__)
@app_published_workflow_was_updated.connect
def handle(sender, **kwargs):
"""
Handle app published workflow update event to sync workflow_schedule_plans table.
When a workflow is published, this handler will:
1. Extract schedule trigger nodes from the workflow graph
2. Compare with existing workflow_schedule_plans records
3. Create/update/delete schedule plans as needed
"""
app = sender
if app.mode != AppMode.WORKFLOW.value:
return
published_workflow = kwargs.get("published_workflow")
published_workflow = cast(Workflow, published_workflow)
sync_schedule_from_workflow(tenant_id=app.tenant_id, app_id=app.id, workflow=published_workflow)
def sync_schedule_from_workflow(tenant_id: str, app_id: str, workflow: Workflow) -> Optional[WorkflowSchedulePlan]:
"""
Sync schedule plan from workflow graph configuration.
Args:
tenant_id: Tenant ID
app_id: App ID
workflow: Published workflow instance
Returns:
Updated or created WorkflowSchedulePlan, or None if no schedule node
"""
with Session(db.engine) as session:
schedule_config = ScheduleService.extract_schedule_config(workflow)
existing_plan = session.scalar(
select(WorkflowSchedulePlan).where(
WorkflowSchedulePlan.tenant_id == tenant_id,
WorkflowSchedulePlan.app_id == app_id,
)
)
if not schedule_config:
if existing_plan:
logger.info("No schedule node in workflow for app %s, removing schedule plan", app_id)
ScheduleService.delete_schedule(session=session, schedule_id=existing_plan.id)
session.commit()
return None
if existing_plan:
updates = SchedulePlanUpdate(
node_id=schedule_config.node_id,
cron_expression=schedule_config.cron_expression,
timezone=schedule_config.timezone,
)
updated_plan = ScheduleService.update_schedule(
session=session,
schedule_id=existing_plan.id,
updates=updates,
)
session.commit()
return updated_plan
else:
new_plan = ScheduleService.create_schedule(
session=session,
tenant_id=tenant_id,
app_id=app_id,
config=schedule_config,
)
session.commit()
return new_plan

View File

@ -160,6 +160,12 @@ def init_app(app: DifyApp) -> Celery:
"task": "schedule.clean_workflow_runlogs_precise.clean_workflow_runlogs_precise",
"schedule": crontab(minute="0", hour="2"),
}
if dify_config.ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK:
imports.append("schedule.workflow_schedule_task")
beat_schedule["workflow_schedule_task"] = {
"task": "schedule.workflow_schedule_task.poll_workflow_schedules",
"schedule": timedelta(minutes=dify_config.WORKFLOW_SCHEDULE_POLLER_INTERVAL),
}
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
return celery_app

View File

@ -0,0 +1,97 @@
from datetime import UTC, datetime
from typing import Optional
import pytz
from croniter import croniter
def calculate_next_run_at(
cron_expression: str,
timezone: str,
base_time: Optional[datetime] = None,
) -> datetime:
"""
Calculate the next run time for a cron expression in a specific timezone.
Args:
cron_expression: Cron expression string (supports croniter extensions like 'L')
timezone: Timezone string (e.g., 'UTC', 'America/New_York')
base_time: Base time to calculate from (defaults to current UTC time)
Returns:
Next run time in UTC
Note:
Supports croniter's extended syntax including:
- 'L' for last day of month
- Standard 5-field cron expressions
"""
tz = pytz.timezone(timezone)
if base_time is None:
base_time = datetime.now(UTC)
base_time_tz = base_time.astimezone(tz)
cron = croniter(cron_expression, base_time_tz)
next_run_tz = cron.get_next(datetime)
next_run_utc = next_run_tz.astimezone(UTC)
return next_run_utc
def convert_12h_to_24h(time_str: str) -> tuple[int, int]:
"""
Parse 12-hour time format to 24-hour format for cron compatibility.
Args:
time_str: Time string in format "HH:MM AM/PM" (e.g., "12:30 PM")
Returns:
Tuple of (hour, minute) in 24-hour format
Raises:
ValueError: If time string format is invalid or values are out of range
Examples:
- "12:00 AM" -> (0, 0) # Midnight
- "12:00 PM" -> (12, 0) # Noon
- "1:30 PM" -> (13, 30)
- "11:59 PM" -> (23, 59)
"""
if not time_str or not time_str.strip():
raise ValueError("Time string cannot be empty")
parts = time_str.strip().split()
if len(parts) != 2:
raise ValueError(f"Invalid time format: '{time_str}'. Expected 'HH:MM AM/PM'")
time_part, period = parts
period = period.upper()
if period not in ["AM", "PM"]:
raise ValueError(f"Invalid period: '{period}'. Must be 'AM' or 'PM'")
time_parts = time_part.split(":")
if len(time_parts) != 2:
raise ValueError(f"Invalid time format: '{time_part}'. Expected 'HH:MM'")
try:
hour = int(time_parts[0])
minute = int(time_parts[1])
except ValueError as e:
raise ValueError(f"Invalid time values: {e}")
if hour < 1 or hour > 12:
raise ValueError(f"Invalid hour: {hour}. Must be between 1 and 12")
if minute < 0 or minute > 59:
raise ValueError(f"Invalid minute: {minute}. Must be between 0 and 59")
# Handle 12-hour to 24-hour edge cases
if period == "PM" and hour != 12:
hour += 12
elif period == "AM" and hour == 12:
hour = 0
return hour, minute

View File

@ -1,4 +1,4 @@
"""empty message
"""Add workflow trigger logs table
Revision ID: 4558cfabe44e
Revises: 0e154742a5fa

View File

@ -0,0 +1,47 @@
"""Add workflow schedule plan table
Revision ID: c19938f630b6
Revises: 9ee7d347f4c1
Create Date: 2025-08-28 20:52:41.300028
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'c19938f630b6'
down_revision = '9ee7d347f4c1'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('workflow_schedule_plans',
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('node_id', sa.String(length=64), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
sa.Column('cron_expression', sa.String(length=255), nullable=False),
sa.Column('timezone', sa.String(length=64), nullable=False),
sa.Column('next_run_at', sa.DateTime(), nullable=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.PrimaryKeyConstraint('id', name='workflow_schedule_plan_pkey'),
sa.UniqueConstraint('app_id', 'node_id', name='uniq_app_node')
)
with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op:
batch_op.create_index('workflow_schedule_plan_next_idx', ['next_run_at'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op:
batch_op.drop_index('workflow_schedule_plan_next_idx')
op.drop_table('workflow_schedule_plans')
# ### end Alembic commands ###

View File

@ -92,6 +92,7 @@ from .workflow import (
WorkflowNodeExecutionModel,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowSchedulePlan,
WorkflowType,
)
@ -185,6 +186,7 @@ __all__ = [
"WorkflowNodeExecutionTriggeredFrom",
"WorkflowRun",
"WorkflowRunTriggeredFrom",
"WorkflowSchedulePlan",
"WorkflowToolProvider",
"WorkflowType",
"db",

View File

@ -1525,3 +1525,60 @@ class AppTrigger(Base):
default=naive_utc_now(),
server_onupdate=func.current_timestamp(),
)
class WorkflowSchedulePlan(Base):
"""
Workflow Schedule Configuration
Store schedule configurations for time-based workflow triggers.
Uses cron expressions with timezone support for flexible scheduling.
Attributes:
- id (uuid) Primary key
- app_id (uuid) App ID to bind to a specific app
- node_id (varchar) Starting node ID for workflow execution
- tenant_id (uuid) Workspace ID for multi-tenancy
- cron_expression (varchar) Cron expression defining schedule pattern
- timezone (varchar) Timezone for cron evaluation (e.g., 'Asia/Shanghai')
- next_run_at (timestamp) Next scheduled execution time
- created_at (timestamp) Creation timestamp
- updated_at (timestamp) Last update timestamp
"""
__tablename__ = "workflow_schedule_plans"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_schedule_plan_pkey"),
sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node"),
sa.Index("workflow_schedule_plan_next_idx", "next_run_at"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
# Schedule configuration
cron_expression: Mapped[str] = mapped_column(String(255), nullable=False)
timezone: Mapped[str] = mapped_column(String(64), nullable=False)
# Schedule control
next_run_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
def to_dict(self) -> dict:
"""Convert to dictionary representation"""
return {
"id": self.id,
"app_id": self.app_id,
"node_id": self.node_id,
"tenant_id": self.tenant_id,
"cron_expression": self.cron_expression,
"timezone": self.timezone,
"next_run_at": self.next_run_at.isoformat() if self.next_run_at else None,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}

View File

@ -88,6 +88,7 @@ dependencies = [
"httpx-sse>=0.4.0",
"sendgrid~=6.12.3",
"flask-restx>=1.3.0",
"croniter>=6.0.0",
]
# Before adding new dependency, consider place it in
# alphabet order (a-z) and suitable group.

View File

@ -0,0 +1,127 @@
import logging
from celery import group, shared_task
from sqlalchemy import and_, select
from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from libs.schedule_utils import calculate_next_run_at
from models.workflow import AppTrigger, AppTriggerStatus, AppTriggerType, WorkflowSchedulePlan
from services.workflow.queue_dispatcher import QueueDispatcherManager
from tasks.workflow_schedule_tasks import run_schedule_trigger
logger = logging.getLogger(__name__)
@shared_task(queue="schedule_poller")
def poll_workflow_schedules() -> None:
"""
Poll and process due workflow schedules.
Streaming flow:
1. Fetch due schedules in batches
2. Process each batch until all due schedules are handled
3. Optional: Limit total dispatches per tick as a circuit breaker
"""
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
with session_factory() as session:
total_dispatched = 0
total_rate_limited = 0
# Process in batches until we've handled all due schedules or hit the limit
while True:
due_schedules = _fetch_due_schedules(session)
if not due_schedules:
break
dispatched_count, rate_limited_count = _process_schedules(session, due_schedules)
total_dispatched += dispatched_count
total_rate_limited += rate_limited_count
logger.debug("Batch processed: %d dispatched, %d rate limited", dispatched_count, rate_limited_count)
# Circuit breaker: check if we've hit the per-tick limit (if enabled)
if (
dify_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK > 0
and total_dispatched >= dify_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK
):
logger.warning(
"Circuit breaker activated: reached dispatch limit (%d), will continue next tick",
dify_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK,
)
break
if total_dispatched > 0 or total_rate_limited > 0:
logger.info("Total processed: %d dispatched, %d rate limited", total_dispatched, total_rate_limited)
def _fetch_due_schedules(session: Session) -> list[WorkflowSchedulePlan]:
"""
Fetch a batch of due schedules, sorted by most overdue first.
Returns up to WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE schedules per call.
Used in a loop to progressively process all due schedules.
"""
now = naive_utc_now()
due_schedules = session.scalars(
(
select(WorkflowSchedulePlan)
.join(
AppTrigger,
and_(
AppTrigger.app_id == WorkflowSchedulePlan.app_id,
AppTrigger.node_id == WorkflowSchedulePlan.node_id,
AppTrigger.trigger_type == AppTriggerType.TRIGGER_SCHEDULE,
),
)
.where(
WorkflowSchedulePlan.next_run_at <= now,
WorkflowSchedulePlan.next_run_at.isnot(None),
AppTrigger.status == AppTriggerStatus.ENABLED,
)
)
.order_by(WorkflowSchedulePlan.next_run_at.asc())
.with_for_update(skip_locked=True)
.limit(dify_config.WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE)
)
return list(due_schedules)
def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan]) -> tuple[int, int]:
"""Process schedules: check quota, update next run time and dispatch to Celery in parallel."""
if not schedules:
return 0, 0
dispatcher_manager = QueueDispatcherManager()
tasks_to_dispatch = []
rate_limited_count = 0
for schedule in schedules:
next_run_at = calculate_next_run_at(
schedule.cron_expression,
schedule.timezone,
)
schedule.next_run_at = next_run_at
dispatcher = dispatcher_manager.get_dispatcher(schedule.tenant_id)
if not dispatcher.check_daily_quota(schedule.tenant_id):
logger.info("Tenant %s rate limited, skipping schedule_plan %s", schedule.tenant_id, schedule.id)
rate_limited_count += 1
else:
tasks_to_dispatch.append(schedule.id)
if tasks_to_dispatch:
job = group(run_schedule_trigger.s(schedule_id) for schedule_id in tasks_to_dispatch)
job.apply_async()
logger.debug("Dispatched %d tasks in parallel", len(tasks_to_dispatch))
session.commit()
return len(tasks_to_dispatch), rate_limited_count

View File

@ -0,0 +1,274 @@
import json
import logging
from datetime import datetime
from typing import Optional
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.workflow.nodes import NodeType
from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, SchedulePlanUpdate, VisualConfig
from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError, ScheduleNotFoundError
from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h
from models.account import Account, TenantAccountJoin
from models.workflow import Workflow, WorkflowSchedulePlan
logger = logging.getLogger(__name__)
class ScheduleService:
@staticmethod
def create_schedule(
session: Session,
tenant_id: str,
app_id: str,
config: ScheduleConfig,
) -> WorkflowSchedulePlan:
"""
Create a new schedule with validated configuration.
Args:
session: Database session
tenant_id: Tenant ID
app_id: Application ID
config: Validated schedule configuration
Returns:
Created WorkflowSchedulePlan instance
"""
next_run_at = calculate_next_run_at(
config.cron_expression,
config.timezone,
)
schedule = WorkflowSchedulePlan(
tenant_id=tenant_id,
app_id=app_id,
node_id=config.node_id,
cron_expression=config.cron_expression,
timezone=config.timezone,
next_run_at=next_run_at,
)
session.add(schedule)
session.flush()
return schedule
@staticmethod
def update_schedule(
session: Session,
schedule_id: str,
updates: SchedulePlanUpdate,
) -> WorkflowSchedulePlan:
"""
Update an existing schedule with validated configuration.
Args:
session: Database session
schedule_id: Schedule ID to update
updates: Validated update configuration
Raises:
ScheduleNotFoundError: If schedule not found
Returns:
Updated WorkflowSchedulePlan instance
"""
schedule = session.get(WorkflowSchedulePlan, schedule_id)
if not schedule:
raise ScheduleNotFoundError(f"Schedule not found: {schedule_id}")
# If time-related fields are updated, synchronously update the next_run_at.
time_fields_updated = False
if updates.node_id is not None:
schedule.node_id = updates.node_id
if updates.cron_expression is not None:
schedule.cron_expression = updates.cron_expression
time_fields_updated = True
if updates.timezone is not None:
schedule.timezone = updates.timezone
time_fields_updated = True
if time_fields_updated:
schedule.next_run_at = calculate_next_run_at(
schedule.cron_expression,
schedule.timezone,
)
session.flush()
return schedule
@staticmethod
def delete_schedule(
session: Session,
schedule_id: str,
) -> None:
"""
Delete a schedule plan.
Args:
session: Database session
schedule_id: Schedule ID to delete
"""
schedule = session.get(WorkflowSchedulePlan, schedule_id)
if not schedule:
raise ScheduleNotFoundError(f"Schedule not found: {schedule_id}")
session.delete(schedule)
session.flush()
@staticmethod
def get_tenant_owner(session: Session, tenant_id: str) -> Optional[Account]:
"""
Returns an account to execute scheduled workflows on behalf of the tenant.
Prioritizes owner over admin to ensure proper authorization hierarchy.
"""
result = session.execute(
select(TenantAccountJoin)
.where(TenantAccountJoin.tenant_id == tenant_id, TenantAccountJoin.role == "owner")
.limit(1)
).scalar_one_or_none()
if not result:
# Owner may not exist in some tenant configurations, fallback to admin
result = session.execute(
select(TenantAccountJoin)
.where(TenantAccountJoin.tenant_id == tenant_id, TenantAccountJoin.role == "admin")
.limit(1)
).scalar_one_or_none()
if result:
return session.get(Account, result.account_id)
@staticmethod
def update_next_run_at(
session: Session,
schedule_id: str,
) -> datetime:
"""
Advances the schedule to its next execution time after a successful trigger.
Uses current time as base to prevent missing executions during delays.
"""
schedule = session.get(WorkflowSchedulePlan, schedule_id)
if not schedule:
raise ScheduleNotFoundError(f"Schedule not found: {schedule_id}")
# Base on current time to handle execution delays gracefully
next_run_at = calculate_next_run_at(
schedule.cron_expression,
schedule.timezone,
)
schedule.next_run_at = next_run_at
session.flush()
return next_run_at
@staticmethod
def extract_schedule_config(workflow: Workflow) -> Optional[ScheduleConfig]:
"""
Extracts schedule configuration from workflow graph.
Searches for the first schedule trigger node in the workflow and converts
its configuration (either visual or cron mode) into a unified ScheduleConfig.
Args:
workflow: The workflow containing the graph definition
Returns:
ScheduleConfig if a valid schedule node is found, None if no schedule node exists
Raises:
ScheduleConfigError: If graph parsing fails or schedule configuration is invalid
Note:
Currently only returns the first schedule node found.
Multiple schedule nodes in the same workflow are not supported.
"""
try:
graph_data = workflow.graph_dict
except (json.JSONDecodeError, TypeError, AttributeError) as e:
raise ScheduleConfigError(f"Failed to parse workflow graph: {e}")
if not graph_data:
raise ScheduleConfigError("Workflow graph is empty")
nodes = graph_data.get("nodes", [])
for node in nodes:
node_data = node.get("data", {})
if node_data.get("type") != NodeType.TRIGGER_SCHEDULE.value:
continue
mode = node_data.get("mode", "visual")
timezone = node_data.get("timezone", "UTC")
node_id = node.get("id", "start")
cron_expression = None
if mode == "cron":
cron_expression = node_data.get("cron_expression")
if not cron_expression:
raise ScheduleConfigError("Cron expression is required for cron mode")
elif mode == "visual":
frequency = node_data.get("frequency")
visual_config_dict = node_data.get("visual_config", {})
visual_config = VisualConfig(**visual_config_dict)
cron_expression = ScheduleService.visual_to_cron(frequency, visual_config)
else:
raise ScheduleConfigError(f"Invalid schedule mode: {mode}")
return ScheduleConfig(node_id=node_id, cron_expression=cron_expression, timezone=timezone)
@staticmethod
def visual_to_cron(frequency: str, visual_config: VisualConfig) -> str:
"""
Converts user-friendly visual schedule settings to cron expression.
Maintains consistency with frontend UI expectations while supporting croniter's extended syntax.
"""
if frequency == "hourly":
if visual_config.on_minute is None:
raise ScheduleConfigError("on_minute is required for hourly schedules")
return f"{visual_config.on_minute} * * * *"
elif frequency == "daily":
if not visual_config.time:
raise ScheduleConfigError("time is required for daily schedules")
hour, minute = convert_12h_to_24h(visual_config.time)
return f"{minute} {hour} * * *"
elif frequency == "weekly":
if not visual_config.time:
raise ScheduleConfigError("time is required for weekly schedules")
if not visual_config.weekdays:
raise ScheduleConfigError("Weekdays are required for weekly schedules")
hour, minute = convert_12h_to_24h(visual_config.time)
weekday_map = {"sun": "0", "mon": "1", "tue": "2", "wed": "3", "thu": "4", "fri": "5", "sat": "6"}
cron_weekdays = [weekday_map[day] for day in visual_config.weekdays]
return f"{minute} {hour} * * {','.join(sorted(cron_weekdays))}"
elif frequency == "monthly":
if not visual_config.time:
raise ScheduleConfigError("time is required for monthly schedules")
if not visual_config.monthly_days:
raise ScheduleConfigError("Monthly days are required for monthly schedules")
hour, minute = convert_12h_to_24h(visual_config.time)
numeric_days = []
has_last = False
for day in visual_config.monthly_days:
if day == "last":
has_last = True
else:
numeric_days.append(day)
result_days = [str(d) for d in sorted(set(numeric_days))]
if has_last:
result_days.append("L")
return f"{minute} {hour} {','.join(result_days)} * *"
else:
raise ScheduleConfigError(f"Unsupported frequency: {frequency}")

View File

@ -0,0 +1,69 @@
import logging
from datetime import UTC, datetime
from zoneinfo import ZoneInfo
from celery import shared_task
from sqlalchemy.orm import sessionmaker
from core.workflow.nodes.trigger_schedule.exc import (
ScheduleExecutionError,
ScheduleNotFoundError,
TenantOwnerNotFoundError,
)
from extensions.ext_database import db
from models.enums import WorkflowRunTriggeredFrom
from models.workflow import WorkflowSchedulePlan
from services.async_workflow_service import AsyncWorkflowService
from services.schedule_service import ScheduleService
from services.workflow.entities import TriggerData
logger = logging.getLogger(__name__)
@shared_task(queue="schedule_executor")
def run_schedule_trigger(schedule_id: str) -> None:
"""
Execute a scheduled workflow trigger.
Note: No retry logic needed as schedules will run again at next interval.
The execution result is tracked via WorkflowTriggerLog.
Raises:
ScheduleNotFoundError: If schedule doesn't exist
TenantOwnerNotFoundError: If no owner/admin for tenant
ScheduleExecutionError: If workflow trigger fails
"""
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
with session_factory() as session:
schedule = session.get(WorkflowSchedulePlan, schedule_id)
if not schedule:
raise ScheduleNotFoundError(f"Schedule {schedule_id} not found")
tenant_owner = ScheduleService.get_tenant_owner(session, schedule.tenant_id)
if not tenant_owner:
raise TenantOwnerNotFoundError(f"No owner or admin found for tenant {schedule.tenant_id}")
try:
current_utc = datetime.now(UTC)
schedule_tz = ZoneInfo(schedule.timezone) if schedule.timezone else UTC
current_in_tz = current_utc.astimezone(schedule_tz)
inputs = {"current_time": current_in_tz.isoformat()}
response = AsyncWorkflowService.trigger_workflow_async(
session=session,
user=tenant_owner,
trigger_data=TriggerData(
app_id=schedule.app_id,
root_node_id=schedule.node_id,
trigger_type=WorkflowRunTriggeredFrom.SCHEDULE,
inputs=inputs,
tenant_id=schedule.tenant_id,
),
)
logger.info("Schedule %s triggered workflow: %s", schedule_id, response.workflow_trigger_log_id)
except Exception as e:
raise ScheduleExecutionError(
f"Failed to trigger workflow for schedule {schedule_id}, app {schedule.app_id}"
) from e

View File

@ -131,6 +131,10 @@ class TestCelerySSLConfiguration:
mock_config.ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK = False
mock_config.ENABLE_DATASETS_QUEUE_MONITOR = False
mock_config.ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK = False
mock_config.ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK = False
mock_config.WORKFLOW_SCHEDULE_POLLER_INTERVAL = 1
mock_config.WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE = 100
mock_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK = 0
with patch("extensions.ext_celery.dify_config", mock_config):
from dify_app import DifyApp

View File

@ -0,0 +1,780 @@
import unittest
from datetime import UTC, datetime
from unittest.mock import MagicMock, Mock, patch
import pytest
from sqlalchemy.orm import Session
from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, SchedulePlanUpdate, VisualConfig
from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError
from events.event_handlers.sync_workflow_schedule_when_app_published import (
sync_schedule_from_workflow,
)
from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h
from models.account import Account, TenantAccountJoin
from models.workflow import Workflow, WorkflowSchedulePlan
from services.schedule_service import ScheduleService
class TestScheduleService(unittest.TestCase):
"""Test cases for ScheduleService class."""
def test_calculate_next_run_at_valid_cron(self):
"""Test calculating next run time with valid cron expression."""
# Test daily cron at 10:30 AM
cron_expr = "30 10 * * *"
timezone = "UTC"
base_time = datetime(2025, 8, 29, 9, 0, 0, tzinfo=UTC)
next_run = calculate_next_run_at(cron_expr, timezone, base_time)
assert next_run is not None
assert next_run.hour == 10
assert next_run.minute == 30
assert next_run.day == 29
def test_calculate_next_run_at_with_timezone(self):
"""Test calculating next run time with different timezone."""
cron_expr = "0 9 * * *" # 9:00 AM
timezone = "America/New_York"
base_time = datetime(2025, 8, 29, 12, 0, 0, tzinfo=UTC) # 8:00 AM EDT
next_run = calculate_next_run_at(cron_expr, timezone, base_time)
assert next_run is not None
# 9:00 AM EDT = 13:00 UTC (during EDT)
expected_utc_hour = 13
assert next_run.hour == expected_utc_hour
def test_calculate_next_run_at_with_last_day_of_month(self):
"""Test calculating next run time with 'L' (last day) syntax."""
cron_expr = "0 10 L * *" # 10:00 AM on last day of month
timezone = "UTC"
base_time = datetime(2025, 2, 15, 9, 0, 0, tzinfo=UTC)
next_run = calculate_next_run_at(cron_expr, timezone, base_time)
assert next_run is not None
# February 2025 has 28 days
assert next_run.day == 28
assert next_run.month == 2
def test_calculate_next_run_at_invalid_cron(self):
"""Test calculating next run time with invalid cron expression."""
from croniter import CroniterBadCronError
cron_expr = "invalid cron"
timezone = "UTC"
with pytest.raises(CroniterBadCronError):
calculate_next_run_at(cron_expr, timezone)
def test_calculate_next_run_at_invalid_timezone(self):
"""Test calculating next run time with invalid timezone."""
from pytz import UnknownTimeZoneError
cron_expr = "30 10 * * *"
timezone = "Invalid/Timezone"
with pytest.raises(UnknownTimeZoneError):
calculate_next_run_at(cron_expr, timezone)
@patch("libs.schedule_utils.calculate_next_run_at")
def test_create_schedule(self, mock_calculate_next_run):
"""Test creating a new schedule."""
mock_session = MagicMock(spec=Session)
mock_calculate_next_run.return_value = datetime(2025, 8, 30, 10, 30, 0, tzinfo=UTC)
config = ScheduleConfig(
node_id="start",
cron_expression="30 10 * * *",
timezone="UTC",
)
schedule = ScheduleService.create_schedule(
session=mock_session,
tenant_id="test-tenant",
app_id="test-app",
config=config,
)
assert schedule is not None
assert schedule.tenant_id == "test-tenant"
assert schedule.app_id == "test-app"
assert schedule.node_id == "start"
assert schedule.cron_expression == "30 10 * * *"
assert schedule.timezone == "UTC"
assert schedule.next_run_at is not None
mock_session.add.assert_called_once()
mock_session.flush.assert_called_once()
@patch("services.schedule_service.calculate_next_run_at")
def test_update_schedule(self, mock_calculate_next_run):
"""Test updating an existing schedule."""
mock_session = MagicMock(spec=Session)
mock_schedule = Mock(spec=WorkflowSchedulePlan)
mock_schedule.cron_expression = "0 12 * * *"
mock_schedule.timezone = "America/New_York"
mock_session.get.return_value = mock_schedule
mock_calculate_next_run.return_value = datetime(2025, 8, 30, 12, 0, 0, tzinfo=UTC)
updates = SchedulePlanUpdate(
cron_expression="0 12 * * *",
timezone="America/New_York",
)
result = ScheduleService.update_schedule(
session=mock_session,
schedule_id="test-schedule-id",
updates=updates,
)
assert result is not None
assert result.cron_expression == "0 12 * * *"
assert result.timezone == "America/New_York"
mock_calculate_next_run.assert_called_once()
mock_session.flush.assert_called_once()
def test_update_schedule_not_found(self):
"""Test updating a non-existent schedule raises exception."""
from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError
mock_session = MagicMock(spec=Session)
mock_session.get.return_value = None
updates = SchedulePlanUpdate(
cron_expression="0 12 * * *",
)
with pytest.raises(ScheduleNotFoundError) as context:
ScheduleService.update_schedule(
session=mock_session,
schedule_id="non-existent-id",
updates=updates,
)
assert "Schedule not found: non-existent-id" in str(context.value)
mock_session.flush.assert_not_called()
def test_delete_schedule(self):
"""Test deleting a schedule."""
mock_session = MagicMock(spec=Session)
mock_schedule = Mock(spec=WorkflowSchedulePlan)
mock_session.get.return_value = mock_schedule
# Should not raise exception and complete successfully
ScheduleService.delete_schedule(
session=mock_session,
schedule_id="test-schedule-id",
)
mock_session.delete.assert_called_once_with(mock_schedule)
mock_session.flush.assert_called_once()
def test_delete_schedule_not_found(self):
"""Test deleting a non-existent schedule raises exception."""
from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError
mock_session = MagicMock(spec=Session)
mock_session.get.return_value = None
# Should raise ScheduleNotFoundError
with pytest.raises(ScheduleNotFoundError) as context:
ScheduleService.delete_schedule(
session=mock_session,
schedule_id="non-existent-id",
)
assert "Schedule not found: non-existent-id" in str(context.value)
mock_session.delete.assert_not_called()
@patch("services.schedule_service.select")
def test_get_tenant_owner(self, mock_select):
"""Test getting tenant owner account."""
mock_session = MagicMock(spec=Session)
mock_account = Mock(spec=Account)
mock_account.id = "owner-account-id"
# Mock owner query
mock_owner_result = Mock(spec=TenantAccountJoin)
mock_owner_result.account_id = "owner-account-id"
mock_session.execute.return_value.scalar_one_or_none.return_value = mock_owner_result
mock_session.get.return_value = mock_account
result = ScheduleService.get_tenant_owner(
session=mock_session,
tenant_id="test-tenant",
)
assert result is not None
assert result.id == "owner-account-id"
@patch("services.schedule_service.select")
def test_get_tenant_owner_fallback_to_admin(self, mock_select):
"""Test getting tenant owner falls back to admin if no owner."""
mock_session = MagicMock(spec=Session)
mock_account = Mock(spec=Account)
mock_account.id = "admin-account-id"
# Mock admin query (owner returns None)
mock_admin_result = Mock(spec=TenantAccountJoin)
mock_admin_result.account_id = "admin-account-id"
mock_session.execute.return_value.scalar_one_or_none.side_effect = [None, mock_admin_result]
mock_session.get.return_value = mock_account
result = ScheduleService.get_tenant_owner(
session=mock_session,
tenant_id="test-tenant",
)
assert result is not None
assert result.id == "admin-account-id"
@patch("services.schedule_service.calculate_next_run_at")
def test_update_next_run_at(self, mock_calculate_next_run):
"""Test updating next run time after schedule triggered."""
mock_session = MagicMock(spec=Session)
mock_schedule = Mock(spec=WorkflowSchedulePlan)
mock_schedule.cron_expression = "30 10 * * *"
mock_schedule.timezone = "UTC"
mock_session.get.return_value = mock_schedule
next_time = datetime(2025, 8, 31, 10, 30, 0, tzinfo=UTC)
mock_calculate_next_run.return_value = next_time
result = ScheduleService.update_next_run_at(
session=mock_session,
schedule_id="test-schedule-id",
)
assert result == next_time
assert mock_schedule.next_run_at == next_time
mock_session.flush.assert_called_once()
class TestVisualToCron(unittest.TestCase):
"""Test cases for visual configuration to cron conversion."""
def test_visual_to_cron_hourly(self):
"""Test converting hourly visual config to cron."""
visual_config = VisualConfig(on_minute=15)
result = ScheduleService.visual_to_cron("hourly", visual_config)
assert result == "15 * * * *"
def test_visual_to_cron_daily(self):
"""Test converting daily visual config to cron."""
visual_config = VisualConfig(time="2:30 PM")
result = ScheduleService.visual_to_cron("daily", visual_config)
assert result == "30 14 * * *"
def test_visual_to_cron_weekly(self):
"""Test converting weekly visual config to cron."""
visual_config = VisualConfig(
time="10:00 AM",
weekdays=["mon", "wed", "fri"],
)
result = ScheduleService.visual_to_cron("weekly", visual_config)
assert result == "0 10 * * 1,3,5"
def test_visual_to_cron_monthly_with_specific_days(self):
"""Test converting monthly visual config with specific days."""
visual_config = VisualConfig(
time="11:30 AM",
monthly_days=[1, 15],
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "30 11 1,15 * *"
def test_visual_to_cron_monthly_with_last_day(self):
"""Test converting monthly visual config with last day using 'L' syntax."""
visual_config = VisualConfig(
time="11:30 AM",
monthly_days=[1, "last"],
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "30 11 1,L * *"
def test_visual_to_cron_monthly_only_last_day(self):
"""Test converting monthly visual config with only last day."""
visual_config = VisualConfig(
time="9:00 PM",
monthly_days=["last"],
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "0 21 L * *"
def test_visual_to_cron_monthly_with_end_days_and_last(self):
"""Test converting monthly visual config with days 29, 30, 31 and 'last'."""
visual_config = VisualConfig(
time="3:45 PM",
monthly_days=[29, 30, 31, "last"],
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
# Should have 29,30,31,L - the L handles all possible last days
assert result == "45 15 29,30,31,L * *"
def test_visual_to_cron_invalid_frequency(self):
"""Test converting with invalid frequency."""
with pytest.raises(ScheduleConfigError, match="Unsupported frequency: invalid"):
ScheduleService.visual_to_cron("invalid", VisualConfig())
def test_visual_to_cron_weekly_no_weekdays(self):
"""Test converting weekly with no weekdays specified."""
visual_config = VisualConfig(time="10:00 AM")
with pytest.raises(ScheduleConfigError, match="Weekdays are required for weekly schedules"):
ScheduleService.visual_to_cron("weekly", visual_config)
def test_visual_to_cron_hourly_no_minute(self):
"""Test converting hourly with no on_minute specified."""
visual_config = VisualConfig() # on_minute defaults to 0
result = ScheduleService.visual_to_cron("hourly", visual_config)
assert result == "0 * * * *" # Should use default value 0
def test_visual_to_cron_daily_no_time(self):
"""Test converting daily with no time specified."""
visual_config = VisualConfig(time=None)
with pytest.raises(ScheduleConfigError, match="time is required for daily schedules"):
ScheduleService.visual_to_cron("daily", visual_config)
def test_visual_to_cron_weekly_no_time(self):
"""Test converting weekly with no time specified."""
visual_config = VisualConfig(weekdays=["mon"])
visual_config.time = None # Override default
with pytest.raises(ScheduleConfigError, match="time is required for weekly schedules"):
ScheduleService.visual_to_cron("weekly", visual_config)
def test_visual_to_cron_monthly_no_time(self):
"""Test converting monthly with no time specified."""
visual_config = VisualConfig(monthly_days=[1])
visual_config.time = None # Override default
with pytest.raises(ScheduleConfigError, match="time is required for monthly schedules"):
ScheduleService.visual_to_cron("monthly", visual_config)
def test_visual_to_cron_monthly_duplicate_days(self):
"""Test monthly with duplicate days should be deduplicated."""
visual_config = VisualConfig(
time="10:00 AM",
monthly_days=[1, 15, 1, 15, 31], # Duplicates
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "0 10 1,15,31 * *" # Should be deduplicated
def test_visual_to_cron_monthly_unsorted_days(self):
"""Test monthly with unsorted days should be sorted."""
visual_config = VisualConfig(
time="2:30 PM",
monthly_days=[20, 5, 15, 1, 10], # Unsorted
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "30 14 1,5,10,15,20 * *" # Should be sorted
def test_visual_to_cron_weekly_all_weekdays(self):
"""Test weekly with all weekdays."""
visual_config = VisualConfig(
time="8:00 AM",
weekdays=["sun", "mon", "tue", "wed", "thu", "fri", "sat"],
)
result = ScheduleService.visual_to_cron("weekly", visual_config)
assert result == "0 8 * * 0,1,2,3,4,5,6"
def test_visual_to_cron_hourly_boundary_values(self):
"""Test hourly with boundary minute values."""
# Minimum value
visual_config = VisualConfig(on_minute=0)
result = ScheduleService.visual_to_cron("hourly", visual_config)
assert result == "0 * * * *"
# Maximum value
visual_config = VisualConfig(on_minute=59)
result = ScheduleService.visual_to_cron("hourly", visual_config)
assert result == "59 * * * *"
def test_visual_to_cron_daily_midnight_noon(self):
"""Test daily at special times (midnight and noon)."""
# Midnight
visual_config = VisualConfig(time="12:00 AM")
result = ScheduleService.visual_to_cron("daily", visual_config)
assert result == "0 0 * * *"
# Noon
visual_config = VisualConfig(time="12:00 PM")
result = ScheduleService.visual_to_cron("daily", visual_config)
assert result == "0 12 * * *"
def test_visual_to_cron_monthly_mixed_with_last_and_duplicates(self):
"""Test monthly with mixed days, 'last', and duplicates."""
visual_config = VisualConfig(
time="11:45 PM",
monthly_days=[15, 1, "last", 15, 30, 1, "last"], # Mixed with duplicates
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "45 23 1,15,30,L * *" # Deduplicated and sorted with L at end
def test_visual_to_cron_weekly_single_day(self):
"""Test weekly with single weekday."""
visual_config = VisualConfig(
time="6:30 PM",
weekdays=["sun"],
)
result = ScheduleService.visual_to_cron("weekly", visual_config)
assert result == "30 18 * * 0"
def test_visual_to_cron_monthly_all_possible_days(self):
"""Test monthly with all 31 days plus 'last'."""
all_days = list(range(1, 32)) + ["last"]
visual_config = VisualConfig(
time="12:01 AM",
monthly_days=all_days,
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
expected_days = ','.join([str(i) for i in range(1, 32)]) + ',L'
assert result == f"1 0 {expected_days} * *"
def test_visual_to_cron_monthly_no_days(self):
"""Test monthly without any days specified should raise error."""
visual_config = VisualConfig(time="10:00 AM", monthly_days=[])
with pytest.raises(ScheduleConfigError, match="Monthly days are required for monthly schedules"):
ScheduleService.visual_to_cron("monthly", visual_config)
def test_visual_to_cron_weekly_empty_weekdays_list(self):
"""Test weekly with empty weekdays list should raise error."""
visual_config = VisualConfig(time="10:00 AM", weekdays=[])
with pytest.raises(ScheduleConfigError, match="Weekdays are required for weekly schedules"):
ScheduleService.visual_to_cron("weekly", visual_config)
class TestParseTime(unittest.TestCase):
"""Test cases for time parsing function."""
def test_parse_time_am(self):
"""Test parsing AM time."""
hour, minute = convert_12h_to_24h("9:30 AM")
assert hour == 9
assert minute == 30
def test_parse_time_pm(self):
"""Test parsing PM time."""
hour, minute = convert_12h_to_24h("2:45 PM")
assert hour == 14
assert minute == 45
def test_parse_time_noon(self):
"""Test parsing 12:00 PM (noon)."""
hour, minute = convert_12h_to_24h("12:00 PM")
assert hour == 12
assert minute == 0
def test_parse_time_midnight(self):
"""Test parsing 12:00 AM (midnight)."""
hour, minute = convert_12h_to_24h("12:00 AM")
assert hour == 0
assert minute == 0
def test_parse_time_invalid_format(self):
"""Test parsing invalid time format."""
with pytest.raises(ValueError, match="Invalid time format"):
convert_12h_to_24h("25:00")
def test_parse_time_invalid_hour(self):
"""Test parsing invalid hour."""
with pytest.raises(ValueError, match="Invalid hour: 13"):
convert_12h_to_24h("13:00 PM")
def test_parse_time_invalid_minute(self):
"""Test parsing invalid minute."""
with pytest.raises(ValueError, match="Invalid minute: 60"):
convert_12h_to_24h("10:60 AM")
def test_parse_time_empty_string(self):
"""Test parsing empty string."""
with pytest.raises(ValueError, match="Time string cannot be empty"):
convert_12h_to_24h("")
def test_parse_time_invalid_period(self):
"""Test parsing invalid period."""
with pytest.raises(ValueError, match="Invalid period"):
convert_12h_to_24h("10:30 XM")
class TestExtractScheduleConfig(unittest.TestCase):
"""Test cases for extracting schedule configuration from workflow."""
def test_extract_schedule_config_with_cron_mode(self):
"""Test extracting schedule config in cron mode."""
workflow = Mock(spec=Workflow)
workflow.graph_dict = {
"nodes": [
{
"id": "schedule-node",
"data": {
"type": "trigger-schedule",
"mode": "cron",
"cron_expression": "0 10 * * *",
"timezone": "America/New_York",
},
}
]
}
config = ScheduleService.extract_schedule_config(workflow)
assert config is not None
assert config.node_id == "schedule-node"
assert config.cron_expression == "0 10 * * *"
assert config.timezone == "America/New_York"
def test_extract_schedule_config_with_visual_mode(self):
"""Test extracting schedule config in visual mode."""
workflow = Mock(spec=Workflow)
workflow.graph_dict = {
"nodes": [
{
"id": "schedule-node",
"data": {
"type": "trigger-schedule",
"mode": "visual",
"frequency": "daily",
"visual_config": {"time": "10:30 AM"},
"timezone": "UTC",
},
}
]
}
config = ScheduleService.extract_schedule_config(workflow)
assert config is not None
assert config.node_id == "schedule-node"
assert config.cron_expression == "30 10 * * *"
assert config.timezone == "UTC"
def test_extract_schedule_config_no_schedule_node(self):
"""Test extracting config when no schedule node exists."""
workflow = Mock(spec=Workflow)
workflow.graph_dict = {
"nodes": [
{
"id": "other-node",
"data": {"type": "llm"},
}
]
}
config = ScheduleService.extract_schedule_config(workflow)
assert config is None
def test_extract_schedule_config_invalid_graph(self):
"""Test extracting config with invalid graph data."""
workflow = Mock(spec=Workflow)
workflow.graph_dict = None
with pytest.raises(ScheduleConfigError, match="Workflow graph is empty"):
ScheduleService.extract_schedule_config(workflow)
class TestScheduleWithTimezone(unittest.TestCase):
"""Test cases for schedule with timezone handling."""
def test_visual_schedule_with_timezone_integration(self):
"""Test complete flow: visual config → cron → execution in different timezones.
This test verifies that when a user in Shanghai sets a schedule for 10:30 AM,
it runs at 10:30 AM Shanghai time, not 10:30 AM UTC.
"""
# User in Shanghai wants to run a task at 10:30 AM local time
visual_config = VisualConfig(
time="10:30 AM", # This is Shanghai time
monthly_days=[1],
)
# Convert to cron expression
cron_expr = ScheduleService.visual_to_cron("monthly", visual_config)
assert cron_expr is not None
assert cron_expr == "30 10 1 * *" # Direct conversion
# Now test execution with Shanghai timezone
shanghai_tz = "Asia/Shanghai"
# Base time: 2025-01-01 00:00:00 UTC (08:00:00 Shanghai)
base_time = datetime(2025, 1, 1, 0, 0, 0, tzinfo=UTC)
next_run = calculate_next_run_at(cron_expr, shanghai_tz, base_time)
assert next_run is not None
# Should run at 10:30 AM Shanghai time on Jan 1
# 10:30 AM Shanghai = 02:30 AM UTC (Shanghai is UTC+8)
assert next_run.year == 2025
assert next_run.month == 1
assert next_run.day == 1
assert next_run.hour == 2 # 02:30 UTC
assert next_run.minute == 30
def test_visual_schedule_different_timezones_same_local_time(self):
"""Test that same visual config in different timezones runs at different UTC times.
This verifies that a schedule set for "9:00 AM" runs at 9 AM local time
regardless of the timezone.
"""
visual_config = VisualConfig(
time="9:00 AM",
weekdays=["mon"],
)
cron_expr = ScheduleService.visual_to_cron("weekly", visual_config)
assert cron_expr is not None
assert cron_expr == "0 9 * * 1"
# Base time: Sunday 2025-01-05 12:00:00 UTC
base_time = datetime(2025, 1, 5, 12, 0, 0, tzinfo=UTC)
# Test New York (UTC-5 in January)
ny_next = calculate_next_run_at(cron_expr, "America/New_York", base_time)
assert ny_next is not None
# Monday 9 AM EST = Monday 14:00 UTC
assert ny_next.day == 6
assert ny_next.hour == 14 # 9 AM EST = 2 PM UTC
# Test Tokyo (UTC+9)
tokyo_next = calculate_next_run_at(cron_expr, "Asia/Tokyo", base_time)
assert tokyo_next is not None
# Monday 9 AM JST = Monday 00:00 UTC
assert tokyo_next.day == 6
assert tokyo_next.hour == 0 # 9 AM JST = 0 AM UTC
def test_visual_schedule_daily_across_dst_change(self):
"""Test that daily schedules adjust correctly during DST changes.
A schedule set for "10:00 AM" should always run at 10 AM local time,
even when DST changes.
"""
visual_config = VisualConfig(
time="10:00 AM",
)
cron_expr = ScheduleService.visual_to_cron("daily", visual_config)
assert cron_expr is not None
assert cron_expr == "0 10 * * *"
# Test before DST (EST - UTC-5)
winter_base = datetime(2025, 2, 1, 0, 0, 0, tzinfo=UTC)
winter_next = calculate_next_run_at(cron_expr, "America/New_York", winter_base)
assert winter_next is not None
# 10 AM EST = 15:00 UTC
assert winter_next.hour == 15
# Test during DST (EDT - UTC-4)
summer_base = datetime(2025, 6, 1, 0, 0, 0, tzinfo=UTC)
summer_next = calculate_next_run_at(cron_expr, "America/New_York", summer_base)
assert summer_next is not None
# 10 AM EDT = 14:00 UTC
assert summer_next.hour == 14
class TestSyncScheduleFromWorkflow(unittest.TestCase):
"""Test cases for syncing schedule from workflow."""
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.db")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.select")
def test_sync_schedule_create_new(self, mock_select, mock_service, mock_db):
"""Test creating new schedule when none exists."""
mock_session = MagicMock()
mock_db.engine = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
Session = MagicMock(return_value=mock_session)
with patch("events.event_handlers.sync_workflow_schedule_when_app_published.Session", Session):
mock_session.scalar.return_value = None # No existing plan
# Mock extract_schedule_config to return a ScheduleConfig object
mock_config = Mock(spec=ScheduleConfig)
mock_config.node_id = "start"
mock_config.cron_expression = "30 10 * * *"
mock_config.timezone = "UTC"
mock_service.extract_schedule_config.return_value = mock_config
mock_new_plan = Mock(spec=WorkflowSchedulePlan)
mock_service.create_schedule.return_value = mock_new_plan
workflow = Mock(spec=Workflow)
result = sync_schedule_from_workflow("tenant-id", "app-id", workflow)
assert result == mock_new_plan
mock_service.create_schedule.assert_called_once()
mock_session.commit.assert_called_once()
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.db")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.select")
def test_sync_schedule_update_existing(self, mock_select, mock_service, mock_db):
"""Test updating existing schedule."""
mock_session = MagicMock()
mock_db.engine = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
Session = MagicMock(return_value=mock_session)
with patch("events.event_handlers.sync_workflow_schedule_when_app_published.Session", Session):
mock_existing_plan = Mock(spec=WorkflowSchedulePlan)
mock_existing_plan.id = "existing-plan-id"
mock_session.scalar.return_value = mock_existing_plan
# Mock extract_schedule_config to return a ScheduleConfig object
mock_config = Mock(spec=ScheduleConfig)
mock_config.node_id = "start"
mock_config.cron_expression = "0 12 * * *"
mock_config.timezone = "America/New_York"
mock_service.extract_schedule_config.return_value = mock_config
mock_updated_plan = Mock(spec=WorkflowSchedulePlan)
mock_service.update_schedule.return_value = mock_updated_plan
workflow = Mock(spec=Workflow)
result = sync_schedule_from_workflow("tenant-id", "app-id", workflow)
assert result == mock_updated_plan
mock_service.update_schedule.assert_called_once()
# Verify the arguments passed to update_schedule
call_args = mock_service.update_schedule.call_args
assert call_args.kwargs["session"] == mock_session
assert call_args.kwargs["schedule_id"] == "existing-plan-id"
updates_obj = call_args.kwargs["updates"]
assert isinstance(updates_obj, SchedulePlanUpdate)
assert updates_obj.node_id == "start"
assert updates_obj.cron_expression == "0 12 * * *"
assert updates_obj.timezone == "America/New_York"
mock_session.commit.assert_called_once()
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.db")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.select")
def test_sync_schedule_remove_when_no_config(self, mock_select, mock_service, mock_db):
"""Test removing schedule when no schedule config in workflow."""
mock_session = MagicMock()
mock_db.engine = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
Session = MagicMock(return_value=mock_session)
with patch("events.event_handlers.sync_workflow_schedule_when_app_published.Session", Session):
mock_existing_plan = Mock(spec=WorkflowSchedulePlan)
mock_existing_plan.id = "existing-plan-id"
mock_session.scalar.return_value = mock_existing_plan
mock_service.extract_schedule_config.return_value = None # No schedule config
workflow = Mock(spec=Workflow)
result = sync_schedule_from_workflow("tenant-id", "app-id", workflow)
assert result is None
# Now using ScheduleService.delete_schedule instead of session.delete
mock_service.delete_schedule.assert_called_once_with(session=mock_session, schedule_id="existing-plan-id")
mock_session.commit.assert_called_once()
if __name__ == "__main__":
unittest.main()

View File

@ -1161,6 +1161,19 @@ version = "1.7"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/6b/b0/e595ce2a2527e169c3bcd6c33d2473c1918e0b7f6826a043ca1245dd4e5b/crcmod-1.7.tar.gz", hash = "sha256:dc7051a0db5f2bd48665a990d3ec1cc305a466a77358ca4492826f41f283601e", size = 89670 }
[[package]]
name = "croniter"
version = "6.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "python-dateutil" },
{ name = "pytz" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ad/2f/44d1ae153a0e27be56be43465e5cb39b9650c781e001e7864389deb25090/croniter-6.0.0.tar.gz", hash = "sha256:37c504b313956114a983ece2c2b07790b1f1094fe9d81cc94739214748255577", size = 64481, upload-time = "2024-12-17T17:17:47.32Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/07/4b/290b4c3efd6417a8b0c284896de19b1d5855e6dbdb97d2a35e68fa42de85/croniter-6.0.0-py2.py3-none-any.whl", hash = "sha256:2f878c3856f17896979b2a4379ba1f09c83e374931ea15cc835c5dd2eee9b368", size = 25468, upload-time = "2024-12-17T17:17:45.359Z" },
]
[[package]]
name = "cryptography"
version = "45.0.6"
@ -1271,6 +1284,7 @@ dependencies = [
{ name = "cachetools" },
{ name = "celery" },
{ name = "chardet" },
{ name = "croniter" },
{ name = "flask" },
{ name = "flask-compress" },
{ name = "flask-cors" },
@ -1460,6 +1474,7 @@ requires-dist = [
{ name = "cachetools", specifier = "~=5.3.0" },
{ name = "celery", specifier = "~=5.5.2" },
{ name = "chardet", specifier = "~=5.1.0" },
{ name = "croniter", specifier = ">=6.0.0" },
{ name = "flask", specifier = "~=3.1.2" },
{ name = "flask-compress", specifier = "~=1.17" },
{ name = "flask-cors", specifier = "~=6.0.0" },

60
dev/start-beat Executable file
View File

@ -0,0 +1,60 @@
#!/bin/bash
set -x
# Help function
show_help() {
echo "Usage: $0 [OPTIONS]"
echo ""
echo "Options:"
echo " --loglevel LEVEL Log level (default: INFO)"
echo " --scheduler SCHEDULER Scheduler class (default: celery.beat:PersistentScheduler)"
echo " -h, --help Show this help message"
echo ""
echo "Examples:"
echo " $0"
echo " $0 --loglevel DEBUG"
echo " $0 --scheduler django_celery_beat.schedulers:DatabaseScheduler"
echo ""
echo "Description:"
echo " Starts Celery Beat scheduler for periodic task execution."
echo " Beat sends scheduled tasks to worker queues at specified intervals."
}
# Parse command line arguments
LOGLEVEL="INFO"
SCHEDULER="celery.beat:PersistentScheduler"
while [[ $# -gt 0 ]]; do
case $1 in
--loglevel)
LOGLEVEL="$2"
shift 2
;;
--scheduler)
SCHEDULER="$2"
shift 2
;;
-h|--help)
show_help
exit 0
;;
*)
echo "Unknown option: $1"
show_help
exit 1
;;
esac
done
SCRIPT_DIR="$(dirname "$(realpath "$0")")"
cd "$SCRIPT_DIR/.."
echo "Starting Celery Beat with:"
echo " Log Level: ${LOGLEVEL}"
echo " Scheduler: ${SCHEDULER}"
uv --directory api run \
celery -A app.celery beat \
--loglevel ${LOGLEVEL} \
--scheduler ${SCHEDULER}

View File

@ -24,6 +24,8 @@ show_help() {
echo " workflow_professional - Professional tier workflows (cloud edition)"
echo " workflow_team - Team tier workflows (cloud edition)"
echo " workflow_sandbox - Sandbox tier workflows (cloud edition)"
echo " schedule_poller - Schedule polling tasks"
echo " schedule_executor - Schedule execution tasks"
echo " generation - Content generation tasks"
echo " mail - Email notifications"
echo " ops_trace - Operations tracing"
@ -79,10 +81,10 @@ if [[ -z "${QUEUES}" ]]; then
# Configure queues based on edition
if [[ "${EDITION}" == "CLOUD" ]]; then
# Cloud edition: separate queues for dataset and trigger tasks
QUEUES="dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox"
QUEUES="dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor"
else
# Community edition (SELF_HOSTED): dataset and workflow have separate queues
QUEUES="dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow"
QUEUES="dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor"
fi
echo "No queues specified, using edition-based defaults: ${QUEUES}"

View File

@ -1272,3 +1272,7 @@ ENABLE_CLEAN_MESSAGES=false
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false
ENABLE_DATASETS_QUEUE_MONITOR=false
ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true
ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK=true
WORKFLOW_SCHEDULE_POLLER_INTERVAL=1
WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE=100
WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK=0

View File

@ -579,6 +579,10 @@ x-shared-env: &shared-api-worker-env
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK: ${ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK:-false}
ENABLE_DATASETS_QUEUE_MONITOR: ${ENABLE_DATASETS_QUEUE_MONITOR:-false}
ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK: ${ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK:-true}
ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK: ${ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK:-true}
WORKFLOW_SCHEDULE_POLLER_INTERVAL: ${WORKFLOW_SCHEDULE_POLLER_INTERVAL:-1}
WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE: ${WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE:-100}
WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK: ${WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK:-0}
services:
# API service