From e04f2a0786df994168618abe1832a8ae921f6227 Mon Sep 17 00:00:00 2001 From: Stream Date: Thu, 5 Feb 2026 18:58:17 +0800 Subject: [PATCH] feat: use static manifest for pre-caching all plugin manifests before checking updates (#31942) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Junyan Qin Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- api/core/helper/marketplace.py | 54 +++++++----- api/core/plugin/entities/marketplace.py | 14 +++- api/schedule/check_upgradable_plugin_task.py | 24 ++++++ ...ss_tenant_plugin_autoupgrade_check_task.py | 83 +++++-------------- 4 files changed, 88 insertions(+), 87 deletions(-) diff --git a/api/core/helper/marketplace.py b/api/core/helper/marketplace.py index 25dc4ba9ed..d7b6e82062 100644 --- a/api/core/helper/marketplace.py +++ b/api/core/helper/marketplace.py @@ -6,7 +6,8 @@ from yarl import URL from configs import dify_config from core.helper.download import download_with_size_limit -from core.plugin.entities.marketplace import MarketplacePluginDeclaration +from core.plugin.entities.marketplace import MarketplacePluginDeclaration, MarketplacePluginSnapshot +from extensions.ext_redis import redis_client marketplace_api_url = URL(str(dify_config.MARKETPLACE_API_URL)) logger = logging.getLogger(__name__) @@ -43,28 +44,37 @@ def batch_fetch_plugin_by_ids(plugin_ids: list[str]) -> list[dict]: return data.get("data", {}).get("plugins", []) -def batch_fetch_plugin_manifests_ignore_deserialization_error( - plugin_ids: list[str], -) -> Sequence[MarketplacePluginDeclaration]: - if len(plugin_ids) == 0: - return [] - - url = str(marketplace_api_url / "api/v1/plugins/batch") - response = httpx.post(url, json={"plugin_ids": plugin_ids}, headers={"X-Dify-Version": dify_config.project.version}) - response.raise_for_status() - result: list[MarketplacePluginDeclaration] = [] - for plugin in response.json()["data"]["plugins"]: - try: - result.append(MarketplacePluginDeclaration.model_validate(plugin)) - except Exception: - logger.exception( - "Failed to deserialize marketplace plugin manifest for %s", plugin.get("plugin_id", "unknown") - ) - - return result - - def record_install_plugin_event(plugin_unique_identifier: str): url = str(marketplace_api_url / "api/v1/stats/plugins/install_count") response = httpx.post(url, json={"unique_identifier": plugin_unique_identifier}) response.raise_for_status() + + +def fetch_global_plugin_manifest(cache_key_prefix: str, cache_ttl: int) -> None: + """ + Fetch all plugin manifests from marketplace and cache them in Redis. + This should be called once per check cycle to populate the instance-level cache. + + Args: + cache_key_prefix: Redis key prefix for caching plugin manifests + cache_ttl: Cache TTL in seconds + + Raises: + httpx.HTTPError: If the HTTP request fails + Exception: If any other error occurs during fetching or caching + """ + url = str(marketplace_api_url / "api/v1/dist/plugins/manifest.json") + response = httpx.get(url, headers={"X-Dify-Version": dify_config.project.version}, timeout=30) + response.raise_for_status() + + raw_json = response.json() + plugins_data = raw_json.get("plugins", []) + + # Parse and cache all plugin snapshots + for plugin_data in plugins_data: + plugin_snapshot = MarketplacePluginSnapshot.model_validate(plugin_data) + redis_client.setex( + name=f"{cache_key_prefix}{plugin_snapshot.plugin_id}", + time=cache_ttl, + value=plugin_snapshot.model_dump_json(), + ) diff --git a/api/core/plugin/entities/marketplace.py b/api/core/plugin/entities/marketplace.py index e0762619e6..cf1f7ff0dd 100644 --- a/api/core/plugin/entities/marketplace.py +++ b/api/core/plugin/entities/marketplace.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel, Field, model_validator +from pydantic import BaseModel, Field, computed_field, model_validator from core.model_runtime.entities.provider_entities import ProviderEntity from core.plugin.entities.endpoint import EndpointProviderDeclaration @@ -48,3 +48,15 @@ class MarketplacePluginDeclaration(BaseModel): if "tool" in data and not data["tool"]: del data["tool"] return data + + +class MarketplacePluginSnapshot(BaseModel): + org: str + name: str + latest_version: str + latest_package_identifier: str + latest_package_url: str + + @computed_field + def plugin_id(self) -> str: + return f"{self.org}/{self.name}" diff --git a/api/schedule/check_upgradable_plugin_task.py b/api/schedule/check_upgradable_plugin_task.py index e91ce07be3..13d2f24ca0 100644 --- a/api/schedule/check_upgradable_plugin_task.py +++ b/api/schedule/check_upgradable_plugin_task.py @@ -1,16 +1,24 @@ +import logging import math import time import click import app +from core.helper.marketplace import fetch_global_plugin_manifest from extensions.ext_database import db from models.account import TenantPluginAutoUpgradeStrategy from tasks import process_tenant_plugin_autoupgrade_check_task as check_task +logger = logging.getLogger(__name__) + AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes MAX_CONCURRENT_CHECK_TASKS = 20 +# Import cache constants from the task module +CACHE_REDIS_KEY_PREFIX = check_task.CACHE_REDIS_KEY_PREFIX +CACHE_REDIS_TTL = check_task.CACHE_REDIS_TTL + @app.celery.task(queue="plugin") def check_upgradable_plugin_task(): @@ -40,6 +48,22 @@ def check_upgradable_plugin_task(): ) # make sure all strategies are checked in this interval batch_interval_time = (AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL / batch_chunk_count) if batch_chunk_count > 0 else 0 + if total_strategies == 0: + click.echo(click.style("no strategies to process, skipping plugin manifest fetch.", fg="green")) + return + + # Fetch and cache all plugin manifests before processing tenants + # This reduces load on marketplace from 300k requests to 1 request per check cycle + logger.info("fetching global plugin manifest from marketplace") + try: + fetch_global_plugin_manifest(CACHE_REDIS_KEY_PREFIX, CACHE_REDIS_TTL) + logger.info("successfully fetched and cached global plugin manifest") + except Exception as e: + logger.exception("failed to fetch global plugin manifest") + click.echo(click.style(f"failed to fetch global plugin manifest: {e}", fg="red")) + click.echo(click.style("skipping plugin upgrade check for this cycle", fg="yellow")) + return + for i in range(0, total_strategies, MAX_CONCURRENT_CHECK_TASKS): batch_strategies = strategies[i : i + MAX_CONCURRENT_CHECK_TASKS] for strategy in batch_strategies: diff --git a/api/tasks/process_tenant_plugin_autoupgrade_check_task.py b/api/tasks/process_tenant_plugin_autoupgrade_check_task.py index b5e6508006..6ad04aab0d 100644 --- a/api/tasks/process_tenant_plugin_autoupgrade_check_task.py +++ b/api/tasks/process_tenant_plugin_autoupgrade_check_task.py @@ -6,8 +6,8 @@ import typing import click from celery import shared_task -from core.helper import marketplace -from core.helper.marketplace import MarketplacePluginDeclaration +from core.helper.marketplace import record_install_plugin_event +from core.plugin.entities.marketplace import MarketplacePluginSnapshot from core.plugin.entities.plugin import PluginInstallationSource from core.plugin.impl.plugin import PluginInstaller from extensions.ext_redis import redis_client @@ -16,7 +16,7 @@ from models.account import TenantPluginAutoUpgradeStrategy logger = logging.getLogger(__name__) RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3 -CACHE_REDIS_KEY_PREFIX = "plugin_autoupgrade_check_task:cached_plugin_manifests:" +CACHE_REDIS_KEY_PREFIX = "plugin_autoupgrade_check_task:cached_plugin_snapshot:" CACHE_REDIS_TTL = 60 * 60 # 1 hour @@ -25,11 +25,11 @@ def _get_redis_cache_key(plugin_id: str) -> str: return f"{CACHE_REDIS_KEY_PREFIX}{plugin_id}" -def _get_cached_manifest(plugin_id: str) -> typing.Union[MarketplacePluginDeclaration, None, bool]: +def _get_cached_manifest(plugin_id: str) -> typing.Union[MarketplacePluginSnapshot, None, bool]: """ Get cached plugin manifest from Redis. Returns: - - MarketplacePluginDeclaration: if found in cache + - MarketplacePluginSnapshot: if found in cache - None: if cached as not found (marketplace returned no result) - False: if not in cache at all """ @@ -43,76 +43,31 @@ def _get_cached_manifest(plugin_id: str) -> typing.Union[MarketplacePluginDeclar if cached_json is None: return None - return MarketplacePluginDeclaration.model_validate(cached_json) + return MarketplacePluginSnapshot.model_validate(cached_json) except Exception: logger.exception("Failed to get cached manifest for plugin %s", plugin_id) return False -def _set_cached_manifest(plugin_id: str, manifest: typing.Union[MarketplacePluginDeclaration, None]) -> None: - """ - Cache plugin manifest in Redis. - Args: - plugin_id: The plugin ID - manifest: The manifest to cache, or None if not found in marketplace - """ - try: - key = _get_redis_cache_key(plugin_id) - if manifest is None: - # Cache the fact that this plugin was not found - redis_client.setex(key, CACHE_REDIS_TTL, json.dumps(None)) - else: - # Cache the manifest data - redis_client.setex(key, CACHE_REDIS_TTL, manifest.model_dump_json()) - except Exception: - # If Redis fails, continue without caching - # traceback.print_exc() - logger.exception("Failed to set cached manifest for plugin %s", plugin_id) - - def marketplace_batch_fetch_plugin_manifests( plugin_ids_plain_list: list[str], -) -> list[MarketplacePluginDeclaration]: - """Fetch plugin manifests with Redis caching support.""" - cached_manifests: dict[str, typing.Union[MarketplacePluginDeclaration, None]] = {} - not_cached_plugin_ids: list[str] = [] +) -> list[MarketplacePluginSnapshot]: + """ + Fetch plugin manifests from Redis cache only. + This function assumes fetch_global_plugin_manifest() has been called + to pre-populate the cache with all marketplace plugins. + """ + result: list[MarketplacePluginSnapshot] = [] # Check Redis cache for each plugin for plugin_id in plugin_ids_plain_list: cached_result = _get_cached_manifest(plugin_id) - if cached_result is False: - # Not in cache, need to fetch - not_cached_plugin_ids.append(plugin_id) - else: - # Either found manifest or cached as None (not found in marketplace) - # At this point, cached_result is either MarketplacePluginDeclaration or None - if isinstance(cached_result, bool): - # This should never happen due to the if condition above, but for type safety - continue - cached_manifests[plugin_id] = cached_result + if not isinstance(cached_result, MarketplacePluginSnapshot): + # cached_result is False (not in cache) or None (cached as not found) + logger.warning("plugin %s not found in cache, skipping", plugin_id) + continue - # Fetch uncached plugins from marketplace - if not_cached_plugin_ids: - manifests = marketplace.batch_fetch_plugin_manifests_ignore_deserialization_error(not_cached_plugin_ids) - - # Cache the fetched manifests - for manifest in manifests: - cached_manifests[manifest.plugin_id] = manifest - _set_cached_manifest(manifest.plugin_id, manifest) - - # Cache plugins that were not found in marketplace - fetched_plugin_ids = {manifest.plugin_id for manifest in manifests} - for plugin_id in not_cached_plugin_ids: - if plugin_id not in fetched_plugin_ids: - cached_manifests[plugin_id] = None - _set_cached_manifest(plugin_id, None) - - # Build result list from cached manifests - result: list[MarketplacePluginDeclaration] = [] - for plugin_id in plugin_ids_plain_list: - cached_manifest: typing.Union[MarketplacePluginDeclaration, None] = cached_manifests.get(plugin_id) - if cached_manifest is not None: - result.append(cached_manifest) + result.append(cached_result) return result @@ -211,7 +166,7 @@ def process_tenant_plugin_autoupgrade_check_task( # execute upgrade new_unique_identifier = manifest.latest_package_identifier - marketplace.record_install_plugin_event(new_unique_identifier) + record_install_plugin_event(new_unique_identifier) click.echo( click.style( f"Upgrade plugin: {original_unique_identifier} -> {new_unique_identifier}",