From c2520f7cb47c6a969973158be250a71a880cf486 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Tue, 20 May 2025 15:08:06 +0800 Subject: [PATCH] fix: bugs --- api/extensions/ext_celery.py | 2 +- api/schedule/check_upgradable_plugin_task.py | 88 ++++++++++++-------- 2 files changed, 56 insertions(+), 34 deletions(-) diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index d52d6a75a8..be89705858 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -110,7 +110,7 @@ def init_app(app: DifyApp) -> Celery: # every 15 minutes "check_upgradable_plugin_task": { "task": "schedule.check_upgradable_plugin_task.check_upgradable_plugin_task", - "schedule": timedelta(minutes=15), + "schedule": crontab(minute="*/15"), }, } celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) diff --git a/api/schedule/check_upgradable_plugin_task.py b/api/schedule/check_upgradable_plugin_task.py index 95e9d6d16a..96b05a814e 100644 --- a/api/schedule/check_upgradable_plugin_task.py +++ b/api/schedule/check_upgradable_plugin_task.py @@ -1,4 +1,5 @@ import time +import traceback import click @@ -14,12 +15,13 @@ AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3 -@app.celery.task(queue="dataset") +@app.celery.task(queue="plugin") def check_upgradable_plugin_task(): click.echo(click.style("Start check upgradable plugin.", fg="green")) start_at = time.perf_counter() now_seconds_of_day = time.time() % 86400 # we assume the tz is UTC + click.echo(click.style("Now seconds of day: {}".format(now_seconds_of_day), fg="green")) # get strategies that set to be performed in the next AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL strategies = ( @@ -69,50 +71,70 @@ def check_upgradable_plugin_task(): if not plugin_ids: continue + plugin_ids_plain_list = [plugin_id for plugin_id, _, _ in plugin_ids] + + click.echo(click.style("Fetching manifests for plugins: {}".format(plugin_ids_plain_list), fg="green")) + # fetch latest versions from marketplace - manifests = marketplace.batch_fetch_plugin_manifests(plugin_ids) + manifests = marketplace.batch_fetch_plugin_manifests(plugin_ids_plain_list) for manifest in manifests: for plugin_id, version, original_unique_identifier in plugin_ids: - current_version = version - latest_version = manifest.latest_version + if manifest.plugin_id != plugin_id: + continue - # @yeuoly review here - def fix_only_checker(latest_version, current_version): - latest_version_tuple = tuple(int(val) for val in latest_version.split(".")) - current_version_tuple = tuple(int(val) for val in current_version.split(".")) + try: + current_version = version + latest_version = manifest.latest_version - if ( - latest_version_tuple[0] == current_version_tuple[0] - and latest_version_tuple[1] == current_version_tuple[1] - ): - return latest_version_tuple[2] != current_version_tuple[2] - return False + # @yeuoly review here + def fix_only_checker(latest_version, current_version): + latest_version_tuple = tuple(int(val) for val in latest_version.split(".")) + current_version_tuple = tuple(int(val) for val in current_version.split(".")) - version_checker = { - TenantPluginAutoUpgradeStrategy.StrategySetting.LATEST: lambda latest_version, - current_version: latest_version != current_version, - TenantPluginAutoUpgradeStrategy.StrategySetting.FIX_ONLY: fix_only_checker, - } + if ( + latest_version_tuple[0] == current_version_tuple[0] + and latest_version_tuple[1] == current_version_tuple[1] + ): + return latest_version_tuple[2] != current_version_tuple[2] + return False - if version_checker[strategy_setting](latest_version, current_version): - # execute upgrade - new_unique_identifier = manifest.latest_package_identifier + version_checker = { + TenantPluginAutoUpgradeStrategy.StrategySetting.LATEST: lambda latest_version, + current_version: latest_version != current_version, + TenantPluginAutoUpgradeStrategy.StrategySetting.FIX_ONLY: fix_only_checker, + } - marketplace.record_install_plugin_event(new_unique_identifier) - click.echo(click.style("Upgrade plugin: {}".format(new_unique_identifier), fg="green")) - task_start_resp = manager.upgrade_plugin( - tenant_id, - original_unique_identifier, - new_unique_identifier, - PluginInstallationSource.Marketplace, - { - "plugin_unique_identifier": new_unique_identifier, - }, - ) + if version_checker[strategy_setting](latest_version, current_version): + # execute upgrade + new_unique_identifier = manifest.latest_package_identifier + + marketplace.record_install_plugin_event(new_unique_identifier) + click.echo( + click.style( + "Upgrade plugin: {} -> {}".format( + original_unique_identifier, new_unique_identifier + ), + fg="green", + ) + ) + task_start_resp = manager.upgrade_plugin( + tenant_id, + original_unique_identifier, + new_unique_identifier, + PluginInstallationSource.Marketplace, + { + "plugin_unique_identifier": new_unique_identifier, + }, + ) + except Exception as e: + click.echo(click.style("Error when upgrading plugin: {}".format(e), fg="red")) + traceback.print_exc() + break except Exception as e: click.echo(click.style("Error when checking upgradable plugin: {}".format(e), fg="red")) + traceback.print_exc() continue end_at = time.perf_counter()