From cba2b9b2ad433cec06402109c1bc515f71358c3d Mon Sep 17 00:00:00 2001 From: "Junyan Qin (Chin)" Date: Mon, 29 Sep 2025 12:57:30 +0800 Subject: [PATCH] fix: switch plugin auto upgrade cache to redis (#26356) --- api/README.md | 4 +- api/extensions/ext_celery.py | 1 + api/schedule/check_upgradable_plugin_task.py | 4 +- ...ss_tenant_plugin_autoupgrade_check_task.py | 108 ++++++++++++++---- 4 files changed, 93 insertions(+), 24 deletions(-) diff --git a/api/README.md b/api/README.md index 5ecf92a4f0..e75ea3d354 100644 --- a/api/README.md +++ b/api/README.md @@ -80,10 +80,10 @@ 1. If you need to handle and debug the async tasks (e.g. dataset importing and documents indexing), please start the worker service. ```bash -uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation +uv run celery -A app.celery worker -P gevent -c 2 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation ``` -Addition, if you want to debug the celery scheduled tasks, you can use the following command in another terminal: +Additionally, if you want to debug the celery scheduled tasks, you can run the following command in another terminal to start the beat service: ```bash uv run celery -A app.celery beat diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 585539e2ce..6d7d81ed87 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -145,6 +145,7 @@ def init_app(app: DifyApp) -> Celery: } if dify_config.ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK and dify_config.MARKETPLACE_ENABLED: imports.append("schedule.check_upgradable_plugin_task") + imports.append("tasks.process_tenant_plugin_autoupgrade_check_task") beat_schedule["check_upgradable_plugin_task"] = { "task": "schedule.check_upgradable_plugin_task.check_upgradable_plugin_task", "schedule": crontab(minute="*/15"), diff --git a/api/schedule/check_upgradable_plugin_task.py b/api/schedule/check_upgradable_plugin_task.py index a9ad27b059..0712100c01 100644 --- a/api/schedule/check_upgradable_plugin_task.py +++ b/api/schedule/check_upgradable_plugin_task.py @@ -6,7 +6,7 @@ import click import app from extensions.ext_database import db from models.account import TenantPluginAutoUpgradeStrategy -from tasks.process_tenant_plugin_autoupgrade_check_task import process_tenant_plugin_autoupgrade_check_task +from tasks import process_tenant_plugin_autoupgrade_check_task as check_task AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes MAX_CONCURRENT_CHECK_TASKS = 20 @@ -43,7 +43,7 @@ def check_upgradable_plugin_task(): for i in range(0, total_strategies, MAX_CONCURRENT_CHECK_TASKS): batch_strategies = strategies[i : i + MAX_CONCURRENT_CHECK_TASKS] for strategy in batch_strategies: - process_tenant_plugin_autoupgrade_check_task.delay( + check_task.process_tenant_plugin_autoupgrade_check_task.delay( strategy.tenant_id, strategy.strategy_setting, strategy.upgrade_time_of_day, diff --git a/api/tasks/process_tenant_plugin_autoupgrade_check_task.py b/api/tasks/process_tenant_plugin_autoupgrade_check_task.py index bae8f1c4db..124971e8e2 100644 --- a/api/tasks/process_tenant_plugin_autoupgrade_check_task.py +++ b/api/tasks/process_tenant_plugin_autoupgrade_check_task.py @@ -1,5 +1,5 @@ +import json import operator -import traceback import typing import click @@ -9,38 +9,106 @@ from core.helper import marketplace from core.helper.marketplace import MarketplacePluginDeclaration from core.plugin.entities.plugin import PluginInstallationSource from core.plugin.impl.plugin import PluginInstaller +from extensions.ext_redis import redis_client from models.account import TenantPluginAutoUpgradeStrategy RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3 +CACHE_REDIS_KEY_PREFIX = "plugin_autoupgrade_check_task:cached_plugin_manifests:" +CACHE_REDIS_TTL = 60 * 15 # 15 minutes -cached_plugin_manifests: dict[str, typing.Union[MarketplacePluginDeclaration, None]] = {} +def _get_redis_cache_key(plugin_id: str) -> str: + """Generate Redis cache key for plugin manifest.""" + return f"{CACHE_REDIS_KEY_PREFIX}{plugin_id}" + + +def _get_cached_manifest(plugin_id: str) -> typing.Union[MarketplacePluginDeclaration, None, bool]: + """ + Get cached plugin manifest from Redis. + Returns: + - MarketplacePluginDeclaration: if found in cache + - None: if cached as not found (marketplace returned no result) + - False: if not in cache at all + """ + try: + key = _get_redis_cache_key(plugin_id) + cached_data = redis_client.get(key) + if cached_data is None: + return False + + cached_json = json.loads(cached_data) + if cached_json is None: + return None + + return MarketplacePluginDeclaration.model_validate(cached_json) + except Exception: + return False + + +def _set_cached_manifest(plugin_id: str, manifest: typing.Union[MarketplacePluginDeclaration, None]) -> None: + """ + Cache plugin manifest in Redis. + Args: + plugin_id: The plugin ID + manifest: The manifest to cache, or None if not found in marketplace + """ + try: + key = _get_redis_cache_key(plugin_id) + if manifest is None: + # Cache the fact that this plugin was not found + redis_client.setex(key, CACHE_REDIS_TTL, json.dumps(None)) + else: + # Cache the manifest data + redis_client.setex(key, CACHE_REDIS_TTL, manifest.model_dump_json()) + except Exception: + # If Redis fails, continue without caching + # traceback.print_exc() + pass def marketplace_batch_fetch_plugin_manifests( plugin_ids_plain_list: list[str], ) -> list[MarketplacePluginDeclaration]: - global cached_plugin_manifests - # return marketplace.batch_fetch_plugin_manifests(plugin_ids_plain_list) - not_included_plugin_ids = [ - plugin_id for plugin_id in plugin_ids_plain_list if plugin_id not in cached_plugin_manifests - ] - if not_included_plugin_ids: - manifests = marketplace.batch_fetch_plugin_manifests_ignore_deserialization_error(not_included_plugin_ids) + """Fetch plugin manifests with Redis caching support.""" + cached_manifests: dict[str, typing.Union[MarketplacePluginDeclaration, None]] = {} + not_cached_plugin_ids: list[str] = [] + + # Check Redis cache for each plugin + for plugin_id in plugin_ids_plain_list: + cached_result = _get_cached_manifest(plugin_id) + if cached_result is False: + # Not in cache, need to fetch + not_cached_plugin_ids.append(plugin_id) + else: + # Either found manifest or cached as None (not found in marketplace) + # At this point, cached_result is either MarketplacePluginDeclaration or None + if isinstance(cached_result, bool): + # This should never happen due to the if condition above, but for type safety + continue + cached_manifests[plugin_id] = cached_result + + # Fetch uncached plugins from marketplace + if not_cached_plugin_ids: + manifests = marketplace.batch_fetch_plugin_manifests_ignore_deserialization_error(not_cached_plugin_ids) + + # Cache the fetched manifests for manifest in manifests: - cached_plugin_manifests[manifest.plugin_id] = manifest + cached_manifests[manifest.plugin_id] = manifest + _set_cached_manifest(manifest.plugin_id, manifest) - if ( - len(manifests) == 0 - ): # this indicates that the plugin not found in marketplace, should set None in cache to prevent future check - for plugin_id in not_included_plugin_ids: - cached_plugin_manifests[plugin_id] = None + # Cache plugins that were not found in marketplace + fetched_plugin_ids = {manifest.plugin_id for manifest in manifests} + for plugin_id in not_cached_plugin_ids: + if plugin_id not in fetched_plugin_ids: + cached_manifests[plugin_id] = None + _set_cached_manifest(plugin_id, None) + # Build result list from cached manifests result: list[MarketplacePluginDeclaration] = [] for plugin_id in plugin_ids_plain_list: - final_manifest = cached_plugin_manifests.get(plugin_id) - if final_manifest is not None: - result.append(final_manifest) + cached_manifest: typing.Union[MarketplacePluginDeclaration, None] = cached_manifests.get(plugin_id) + if cached_manifest is not None: + result.append(cached_manifest) return result @@ -157,10 +225,10 @@ def process_tenant_plugin_autoupgrade_check_task( ) except Exception as e: click.echo(click.style(f"Error when upgrading plugin: {e}", fg="red")) - traceback.print_exc() + # traceback.print_exc() break except Exception as e: click.echo(click.style(f"Error when checking upgradable plugin: {e}", fg="red")) - traceback.print_exc() + # traceback.print_exc() return