diff --git a/api/core/helper/marketplace.py b/api/core/helper/marketplace.py index 65bf4fc1db..fe3078923d 100644 --- a/api/core/helper/marketplace.py +++ b/api/core/helper/marketplace.py @@ -25,9 +25,29 @@ def batch_fetch_plugin_manifests(plugin_ids: list[str]) -> Sequence[MarketplaceP url = str(marketplace_api_url / "api/v1/plugins/batch") response = requests.post(url, json={"plugin_ids": plugin_ids}) response.raise_for_status() + return [MarketplacePluginDeclaration(**plugin) for plugin in response.json()["data"]["plugins"]] +def batch_fetch_plugin_manifests_ignore_deserialization_error( + plugin_ids: list[str], +) -> Sequence[MarketplacePluginDeclaration]: + if len(plugin_ids) == 0: + return [] + + url = str(marketplace_api_url / "api/v1/plugins/batch") + response = requests.post(url, json={"plugin_ids": plugin_ids}) + response.raise_for_status() + result: list[MarketplacePluginDeclaration] = [] + for plugin in response.json()["data"]["plugins"]: + try: + result.append(MarketplacePluginDeclaration(**plugin)) + except Exception as e: + pass + + return result + + def record_install_plugin_event(plugin_unique_identifier: str): url = str(marketplace_api_url / "api/v1/stats/plugins/install_count") response = requests.post(url, json={"unique_identifier": plugin_unique_identifier}) diff --git a/api/schedule/check_upgradable_plugin_task.py b/api/schedule/check_upgradable_plugin_task.py index b709689d27..bfe24c7b83 100644 --- a/api/schedule/check_upgradable_plugin_task.py +++ b/api/schedule/check_upgradable_plugin_task.py @@ -1,19 +1,14 @@ import time -import traceback import click import app -from core.helper import marketplace -from core.plugin.entities.plugin import PluginInstallationSource -from core.plugin.impl.plugin import PluginInstaller from extensions.ext_database import db from models.account import TenantPluginAutoUpgradeStrategy +from tasks.process_tenant_plugin_autoupgrade_check_task import process_tenant_plugin_autoupgrade_check_task AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes -RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3 - @app.celery.task(queue="plugin") def check_upgradable_plugin_task(): @@ -23,7 +18,7 @@ def check_upgradable_plugin_task(): now_seconds_of_day = time.time() % 86400 # we assume the tz is UTC click.echo(click.style("Now seconds of day: {}".format(now_seconds_of_day), fg="green")) - # get strategies that set to be performed in the next AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL + # 获取需要在下一个AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL内执行的策略 strategies = ( db.session.query(TenantPluginAutoUpgradeStrategy) .filter( @@ -36,113 +31,15 @@ def check_upgradable_plugin_task(): .all() ) - manager = PluginInstaller() - for strategy in strategies: - try: - tenant_id = strategy.tenant_id - strategy_setting = strategy.strategy_setting - upgrade_mode = strategy.upgrade_mode - exclude_plugins = strategy.exclude_plugins - include_plugins = strategy.include_plugins - - if strategy_setting == TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED: - continue - - # get plugins that need to be checked - plugin_ids: list[tuple[str, str, str]] = [] # plugin_id, version, unique_identifier - - if upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.PARTIAL and include_plugins: - all_plugins = manager.list_plugins(tenant_id) - - for plugin in all_plugins: - if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id in include_plugins: - plugin_ids.append((plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier)) - - elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.EXCLUDE: - # get all plugins and remove the exclude plugins - all_plugins = manager.list_plugins(tenant_id) - plugin_ids = [ - (plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier) - for plugin in all_plugins - if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id not in exclude_plugins - ] - elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.ALL: - all_plugins = manager.list_plugins(tenant_id) - plugin_ids = [ - (plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier) - for plugin in all_plugins - if plugin.source == PluginInstallationSource.Marketplace - ] - - if not plugin_ids: - continue - - plugin_ids_plain_list = [plugin_id for plugin_id, _, _ in plugin_ids] - - click.echo(click.style("Fetching manifests for plugins: {}".format(plugin_ids_plain_list), fg="green")) - - # fetch latest versions from marketplace - manifests = marketplace.batch_fetch_plugin_manifests(plugin_ids_plain_list) - - for manifest in manifests: - for plugin_id, version, original_unique_identifier in plugin_ids: - if manifest.plugin_id != plugin_id: - continue - - try: - current_version = version - latest_version = manifest.latest_version - - # @yeuoly review here - def fix_only_checker(latest_version, current_version): - latest_version_tuple = tuple(int(val) for val in latest_version.split(".")) - current_version_tuple = tuple(int(val) for val in current_version.split(".")) - - if ( - latest_version_tuple[0] == current_version_tuple[0] - and latest_version_tuple[1] == current_version_tuple[1] - ): - return latest_version_tuple[2] != current_version_tuple[2] - return False - - version_checker = { - TenantPluginAutoUpgradeStrategy.StrategySetting.LATEST: lambda latest_version, - current_version: latest_version != current_version, - TenantPluginAutoUpgradeStrategy.StrategySetting.FIX_ONLY: fix_only_checker, - } - - if version_checker[strategy_setting](latest_version, current_version): - # execute upgrade - new_unique_identifier = manifest.latest_package_identifier - - marketplace.record_install_plugin_event(new_unique_identifier) - click.echo( - click.style( - "Upgrade plugin: {} -> {}".format( - original_unique_identifier, new_unique_identifier - ), - fg="green", - ) - ) - task_start_resp = manager.upgrade_plugin( - tenant_id, - original_unique_identifier, - new_unique_identifier, - PluginInstallationSource.Marketplace, - { - "plugin_unique_identifier": new_unique_identifier, - }, - ) - except Exception as e: - click.echo(click.style("Error when upgrading plugin: {}".format(e), fg="red")) - traceback.print_exc() - break - - except Exception as e: - click.echo(click.style("Error when checking upgradable plugin: {}".format(e), fg="red")) - traceback.print_exc() - continue + process_tenant_plugin_autoupgrade_check_task.delay( + strategy.tenant_id, + strategy.strategy_setting, + strategy.upgrade_time_of_day, + strategy.upgrade_mode, + strategy.exclude_plugins, + strategy.include_plugins, + ) end_at = time.perf_counter() click.echo( diff --git a/api/tasks/process_tenant_plugin_autoupgrade_check_task.py b/api/tasks/process_tenant_plugin_autoupgrade_check_task.py new file mode 100644 index 0000000000..fc01b33f4e --- /dev/null +++ b/api/tasks/process_tenant_plugin_autoupgrade_check_task.py @@ -0,0 +1,163 @@ +import traceback + +import click +from celery import shared_task + +from core.helper import marketplace +from core.helper.marketplace import MarketplacePluginDeclaration +from core.plugin.entities.plugin import PluginInstallationSource +from core.plugin.impl.plugin import PluginInstaller +from models.account import TenantPluginAutoUpgradeStrategy + +RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3 + + +cached_plugin_manifests: dict[str, MarketplacePluginDeclaration] = {} + + +def marketplace_batch_fetch_plugin_manifests( + plugin_ids_plain_list: list[str], +) -> list[MarketplacePluginDeclaration]: + global cached_plugin_manifests + # return marketplace.batch_fetch_plugin_manifests(plugin_ids_plain_list) + not_included_plugin_ids = [ + plugin_id for plugin_id in plugin_ids_plain_list if plugin_id not in cached_plugin_manifests + ] + if not_included_plugin_ids: + manifests = marketplace.batch_fetch_plugin_manifests_ignore_deserialization_error(not_included_plugin_ids) + for manifest in manifests: + cached_plugin_manifests[manifest.plugin_id] = manifest + + if ( + len(manifests) == 0 + ): # this indicates that the plugin not found in marketplace, should set None in cache to prevent future check + for plugin_id in not_included_plugin_ids: + cached_plugin_manifests[plugin_id] = None + + return [ + cached_plugin_manifests[plugin_id] + for plugin_id in plugin_ids_plain_list + if cached_plugin_manifests[plugin_id] is not None + ] + + +@shared_task(queue="plugin") +def process_tenant_plugin_autoupgrade_check_task( + tenant_id: str, + strategy_setting: TenantPluginAutoUpgradeStrategy.StrategySetting, + upgrade_time_of_day: int, + upgrade_mode: TenantPluginAutoUpgradeStrategy.UpgradeMode, + exclude_plugins: list[str], + include_plugins: list[str], +): + try: + manager = PluginInstaller() + + click.echo( + click.style( + "Checking upgradable plugin for tenant: {}".format(tenant_id), + fg="green", + ) + ) + + if strategy_setting == TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED: + return + + # 获取需要检查的插件 + plugin_ids: list[tuple[str, str, str]] = [] # plugin_id, version, unique_identifier + click.echo(click.style("Upgrade mode: {}".format(upgrade_mode), fg="green")) + + if upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.PARTIAL and include_plugins: + all_plugins = manager.list_plugins(tenant_id) + + for plugin in all_plugins: + if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id in include_plugins: + plugin_ids.append( + ( + plugin.plugin_id, + plugin.version, + plugin.plugin_unique_identifier, + ) + ) + + elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.EXCLUDE: + # 获取所有插件并移除exclude的 + all_plugins = manager.list_plugins(tenant_id) + plugin_ids = [ + (plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier) + for plugin in all_plugins + if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id not in exclude_plugins + ] + elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.ALL: + all_plugins = manager.list_plugins(tenant_id) + plugin_ids = [ + (plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier) + for plugin in all_plugins + if plugin.source == PluginInstallationSource.Marketplace + ] + + if not plugin_ids: + return + + plugin_ids_plain_list = [plugin_id for plugin_id, _, _ in plugin_ids] + + manifests = marketplace_batch_fetch_plugin_manifests(plugin_ids_plain_list) + + if not manifests: + return + + for manifest in manifests: + for plugin_id, version, original_unique_identifier in plugin_ids: + if manifest.plugin_id != plugin_id: + continue + + try: + current_version = version + latest_version = manifest.latest_version + + def fix_only_checker(latest_version, current_version): + latest_version_tuple = tuple(int(val) for val in latest_version.split(".")) + current_version_tuple = tuple(int(val) for val in current_version.split(".")) + + if ( + latest_version_tuple[0] == current_version_tuple[0] + and latest_version_tuple[1] == current_version_tuple[1] + ): + return latest_version_tuple[2] != current_version_tuple[2] + return False + + version_checker = { + TenantPluginAutoUpgradeStrategy.StrategySetting.LATEST: lambda latest_version, + current_version: latest_version != current_version, + TenantPluginAutoUpgradeStrategy.StrategySetting.FIX_ONLY: fix_only_checker, + } + + if version_checker[strategy_setting](latest_version, current_version): + # 执行升级 + new_unique_identifier = manifest.latest_package_identifier + + marketplace.record_install_plugin_event(new_unique_identifier) + click.echo( + click.style( + "Upgrade plugin: {} -> {}".format(original_unique_identifier, new_unique_identifier), + fg="green", + ) + ) + task_start_resp = manager.upgrade_plugin( + tenant_id, + original_unique_identifier, + new_unique_identifier, + PluginInstallationSource.Marketplace, + { + "plugin_unique_identifier": new_unique_identifier, + }, + ) + except Exception as e: + click.echo(click.style("Error when upgrading plugin: {}".format(e), fg="red")) + traceback.print_exc() + break + + except Exception as e: + click.echo(click.style("Error when checking upgradable plugin: {}".format(e), fg="red")) + traceback.print_exc() + return