From 5d722c19a74312cc109855f2fa0b2b4566a5b340 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Wed, 9 Jul 2025 10:48:02 +0800 Subject: [PATCH] feat: add switch to config celery schedule tasks --- api/.env.example | 10 +++ api/configs/feature/__init__.py | 36 +++++++++++ api/extensions/ext_celery.py | 67 +++++++++++--------- api/schedule/check_upgradable_plugin_task.py | 3 +- 4 files changed, 84 insertions(+), 32 deletions(-) diff --git a/api/.env.example b/api/.env.example index baa9c382c8..01fcad599e 100644 --- a/api/.env.example +++ b/api/.env.example @@ -451,6 +451,16 @@ APP_MAX_ACTIVE_REQUESTS=0 # Celery beat configuration CELERY_BEAT_SCHEDULER_TIME=1 +# Celery schedule tasks configuration +ENABLE_CLEAN_EMBEDDING_CACHE_TASK=false +ENABLE_CLEAN_UNUSED_DATASETS_TASK=false +ENABLE_CREATE_TIDB_SERVERLESS_TASK=false +ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK=false +ENABLE_CLEAN_MESSAGES=false +ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false +ENABLE_DATASETS_QUEUE_MONITOR=false +ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true + # Position configuration POSITION_TOOL_PINS= POSITION_TOOL_INCLUDES= diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index df15b92c35..c2eaa89b6e 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -779,6 +779,41 @@ class CeleryBeatConfig(BaseSettings): ) +class CeleryScheduleTasksConfig(BaseSettings): + ENABLE_CLEAN_EMBEDDING_CACHE_TASK: bool = Field( + description="Enable clean embedding cache task", + default=False, + ) + ENABLE_CLEAN_UNUSED_DATASETS_TASK: bool = Field( + description="Enable clean unused datasets task", + default=False, + ) + ENABLE_CREATE_TIDB_SERVERLESS_TASK: bool = Field( + description="Enable create tidb service job task", + default=False, + ) + ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK: bool = Field( + description="Enable update tidb service job status task", + default=False, + ) + ENABLE_CLEAN_MESSAGES: bool = Field( + description="Enable clean messages task", + default=False, + ) + ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK: bool = Field( + description="Enable mail clean document notify task", + default=False, + ) + ENABLE_DATASETS_QUEUE_MONITOR: bool = Field( + description="Enable queue monitor task", + default=False, + ) + ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK: bool = Field( + description="Enable check upgradable plugin task", + default=True, + ) + + class PositionConfig(BaseSettings): POSITION_PROVIDER_PINS: str = Field( description="Comma-separated list of pinned model providers", @@ -907,5 +942,6 @@ class FeatureConfig( # hosted services config HostedServiceConfig, CeleryBeatConfig, + CeleryScheduleTasksConfig, ): pass diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index be89705858..2c2846ba26 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -64,55 +64,62 @@ def init_app(app: DifyApp) -> Celery: celery_app.set_default() app.extensions["celery"] = celery_app - imports = [ - "schedule.clean_embedding_cache_task", - "schedule.clean_unused_datasets_task", - "schedule.create_tidb_serverless_task", - "schedule.update_tidb_serverless_status_task", - "schedule.clean_messages", - "schedule.mail_clean_document_notify_task", - "schedule.queue_monitor_task", - "schedule.check_upgradable_plugin_task", - ] + imports = [] day = dify_config.CELERY_BEAT_SCHEDULER_TIME - beat_schedule = { - "clean_embedding_cache_task": { + + # if you add a new task, please add the switch to CeleryScheduleTasksConfig + beat_schedule = {} + if dify_config.ENABLE_CLEAN_EMBEDDING_CACHE_TASK: + imports.append("schedule.clean_embedding_cache_task") + beat_schedule["clean_embedding_cache_task"] = { "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task", "schedule": timedelta(days=day), - }, - "clean_unused_datasets_task": { + } + if dify_config.ENABLE_CLEAN_UNUSED_DATASETS_TASK: + imports.append("schedule.clean_unused_datasets_task") + beat_schedule["clean_unused_datasets_task"] = { "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task", "schedule": timedelta(days=day), - }, - "create_tidb_serverless_task": { + } + if dify_config.ENABLE_CREATE_TIDB_SERVERLESS_TASK: + imports.append("schedule.create_tidb_serverless_task") + beat_schedule["create_tidb_serverless_task"] = { "task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task", "schedule": crontab(minute="0", hour="*"), - }, - "update_tidb_serverless_status_task": { + } + if dify_config.ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK: + imports.append("schedule.update_tidb_serverless_status_task") + beat_schedule["update_tidb_serverless_status_task"] = { "task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task", "schedule": timedelta(minutes=10), - }, - "clean_messages": { + } + if dify_config.ENABLE_CLEAN_MESSAGES: + imports.append("schedule.clean_messages") + beat_schedule["clean_messages"] = { "task": "schedule.clean_messages.clean_messages", "schedule": timedelta(days=day), - }, - # every Monday - "mail_clean_document_notify_task": { + } + if dify_config.ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK: + imports.append("schedule.mail_clean_document_notify_task") + beat_schedule["mail_clean_document_notify_task"] = { "task": "schedule.mail_clean_document_notify_task.mail_clean_document_notify_task", "schedule": crontab(minute="0", hour="10", day_of_week="1"), - }, - "datasets-queue-monitor": { + } + if dify_config.ENABLE_DATASETS_QUEUE_MONITOR: + imports.append("schedule.queue_monitor_task") + beat_schedule["datasets-queue-monitor"] = { "task": "schedule.queue_monitor_task.queue_monitor_task", "schedule": timedelta( minutes=dify_config.QUEUE_MONITOR_INTERVAL if dify_config.QUEUE_MONITOR_INTERVAL else 30 ), - }, - # every 15 minutes - "check_upgradable_plugin_task": { + } + if dify_config.ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK: + imports.append("schedule.check_upgradable_plugin_task") + beat_schedule["check_upgradable_plugin_task"] = { "task": "schedule.check_upgradable_plugin_task.check_upgradable_plugin_task", "schedule": crontab(minute="*/15"), - }, - } + } + celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) return celery_app diff --git a/api/schedule/check_upgradable_plugin_task.py b/api/schedule/check_upgradable_plugin_task.py index bfe24c7b83..c1d6018827 100644 --- a/api/schedule/check_upgradable_plugin_task.py +++ b/api/schedule/check_upgradable_plugin_task.py @@ -15,10 +15,9 @@ def check_upgradable_plugin_task(): click.echo(click.style("Start check upgradable plugin.", fg="green")) start_at = time.perf_counter() - now_seconds_of_day = time.time() % 86400 # we assume the tz is UTC + now_seconds_of_day = time.time() % 86400 - 30 # we assume the tz is UTC click.echo(click.style("Now seconds of day: {}".format(now_seconds_of_day), fg="green")) - # 获取需要在下一个AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL内执行的策略 strategies = ( db.session.query(TenantPluginAutoUpgradeStrategy) .filter(