fix(offline): guard marketplace I/O paths for ENG-421 (#36335)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Xiyuan Chen 2026-05-18 06:53:42 -07:00 committed by GitHub
parent 06f076e0ff
commit 2d5186fb28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 280 additions and 25 deletions

View File

@ -16,6 +16,7 @@ from pydantic import TypeAdapter
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from configs import dify_config
from core.agent.entities import AgentToolEntity
from core.helper import marketplace
from core.plugin.entities.plugin import PluginInstallationSource
@ -310,6 +311,8 @@ class PluginMigration:
"""
Fetch plugin unique identifier using plugin id.
"""
if not dify_config.MARKETPLACE_ENABLED:
return None
plugin_manifest = marketplace.batch_fetch_plugin_manifests([plugin_id])
if not plugin_manifest:
return None
@ -542,6 +545,11 @@ class PluginMigration:
"""
Install plugins for a tenant.
"""
if plugin_identifiers_map and not dify_config.MARKETPLACE_ENABLED:
raise ValueError(
"Marketplace disabled in offline mode; cannot bulk-install plugins. "
"Pre-upload plugin packages via Console first."
)
manager = PluginInstaller()
# download all the plugins and upload

View File

@ -73,35 +73,43 @@ class PluginService:
cache_not_exists.append(plugin_id)
if cache_not_exists:
manifests = {
manifest.plugin_id: manifest
for manifest in marketplace.batch_fetch_plugin_manifests(cache_not_exists)
}
for plugin_id, manifest in manifests.items():
latest_plugin = PluginService.LatestPluginCache(
plugin_id=plugin_id,
version=manifest.latest_version,
unique_identifier=manifest.latest_package_identifier,
status=manifest.status,
deprecated_reason=manifest.deprecated_reason,
alternative_plugin_id=manifest.alternative_plugin_id,
if not dify_config.MARKETPLACE_ENABLED:
logger.info(
"Marketplace disabled; skipping latest-plugins metadata fetch for %d ids",
len(cache_not_exists),
)
for plugin_id in cache_not_exists:
result[plugin_id] = None
else:
manifests = {
manifest.plugin_id: manifest
for manifest in marketplace.batch_fetch_plugin_manifests(cache_not_exists)
}
# Store in Redis
redis_client.setex(
f"{PluginService.REDIS_KEY_PREFIX}{plugin_id}",
PluginService.REDIS_TTL,
latest_plugin.model_dump_json(),
)
for plugin_id, manifest in manifests.items():
latest_plugin = PluginService.LatestPluginCache(
plugin_id=plugin_id,
version=manifest.latest_version,
unique_identifier=manifest.latest_package_identifier,
status=manifest.status,
deprecated_reason=manifest.deprecated_reason,
alternative_plugin_id=manifest.alternative_plugin_id,
)
result[plugin_id] = latest_plugin
# Store in Redis
redis_client.setex(
f"{PluginService.REDIS_KEY_PREFIX}{plugin_id}",
PluginService.REDIS_TTL,
latest_plugin.model_dump_json(),
)
# pop plugin_id from cache_not_exists
cache_not_exists.remove(plugin_id)
result[plugin_id] = latest_plugin
for plugin_id in cache_not_exists:
result[plugin_id] = None
# pop plugin_id from cache_not_exists
cache_not_exists.remove(plugin_id)
for plugin_id in cache_not_exists:
result[plugin_id] = None
return result
except Exception:

View File

@ -1350,6 +1350,12 @@ class RagPipelineService:
)
return workflow_node_execution_db_model
def _fetch_recommended_plugin_manifests(self, plugin_ids: list[str]) -> list[Any]:
if not dify_config.MARKETPLACE_ENABLED:
logger.info("Marketplace disabled; recommended-plugins list empty")
return []
return marketplace.batch_fetch_plugin_by_ids(plugin_ids)
def get_recommended_plugins(self, type: str) -> dict[str, Any]:
# Query active recommended plugins
stmt = select(PipelineRecommendedPlugin).where(PipelineRecommendedPlugin.active == True)
@ -1372,7 +1378,7 @@ class RagPipelineService:
)
providers_map = {provider.plugin_id: provider.to_dict() for provider in providers}
plugin_manifests = marketplace.batch_fetch_plugin_by_ids(plugin_ids)
plugin_manifests = self._fetch_recommended_plugin_manifests(plugin_ids)
plugin_manifests_map = {manifest["plugin_id"]: manifest for manifest in plugin_manifests}
installed_plugin_list = []

View File

@ -9,6 +9,7 @@ import yaml
from flask_login import current_user
from sqlalchemy import select
from configs import dify_config
from constants import DOCUMENT_EXTENSIONS
from core.plugin.impl.plugin import PluginInstaller
from core.rag.index_processor.constant.index_type import IndexStructureType, IndexTechniqueType
@ -273,6 +274,13 @@ class RagPipelineTransformService:
plugin_unique_identifier = dependency.get("value", {}).get("plugin_unique_identifier")
plugin_id = plugin_unique_identifier.split(":")[0]
if plugin_id not in installed_plugins_ids:
if not dify_config.MARKETPLACE_ENABLED:
logger.warning(
"Marketplace disabled; skipping auto-install of %s. "
"Pre-install via Console if pipeline requires it.",
plugin_id,
)
continue
plugin_unique_identifier = plugin_migration._fetch_plugin_unique_identifier(plugin_id) # type: ignore
if plugin_unique_identifier:
need_install_plugin_unique_identifiers.append(plugin_unique_identifier)

View File

@ -0,0 +1,76 @@
from unittest.mock import MagicMock, patch
import pytest
from pytest_mock import MockerFixture
from services.plugin.plugin_migration import PluginMigration
MIGRATION_MODULE = "services.plugin.plugin_migration"
def test_fetch_plugin_unique_identifier_returns_none_when_disabled(mocker: MockerFixture) -> None:
mocker.patch("services.plugin.plugin_migration.dify_config.MARKETPLACE_ENABLED", False)
batch_fetch = mocker.patch("services.plugin.plugin_migration.marketplace.batch_fetch_plugin_manifests")
result = PluginMigration._fetch_plugin_unique_identifier("langgenius/openai")
assert result is None
batch_fetch.assert_not_called()
def test_fetch_plugin_unique_identifier_calls_marketplace_when_enabled(mocker: MockerFixture) -> None:
mocker.patch("services.plugin.plugin_migration.dify_config.MARKETPLACE_ENABLED", True)
manifest = mocker.MagicMock()
manifest.latest_package_identifier = "langgenius/openai:1.0.0@abc"
mocker.patch(
"services.plugin.plugin_migration.marketplace.batch_fetch_plugin_manifests",
return_value=[manifest],
)
result = PluginMigration._fetch_plugin_unique_identifier("langgenius/openai")
assert result == "langgenius/openai:1.0.0@abc"
class TestHandlePluginInstanceInstall:
def test_raises_when_disabled_and_map_nonempty(self) -> None:
with patch(f"{MIGRATION_MODULE}.dify_config") as mock_cfg:
mock_cfg.MARKETPLACE_ENABLED = False
with pytest.raises(ValueError, match="Marketplace disabled"):
PluginMigration.handle_plugin_instance_install(
"tenant1", {"langgenius/openai": "langgenius/openai:1.0.0@abc"}
)
def test_no_raise_when_disabled_and_map_empty(self) -> None:
with (
patch(f"{MIGRATION_MODULE}.dify_config") as mock_cfg,
patch(f"{MIGRATION_MODULE}.PluginInstaller") as mock_installer_cls,
):
mock_cfg.MARKETPLACE_ENABLED = False
mock_installer = MagicMock()
mock_installer_cls.return_value = mock_installer
mock_installer.install_from_identifiers.return_value = MagicMock(all_installed=True)
result = PluginMigration.handle_plugin_instance_install("tenant1", {})
assert isinstance(result, dict)
def test_proceeds_when_enabled(self) -> None:
with (
patch(f"{MIGRATION_MODULE}.dify_config") as mock_cfg,
patch(f"{MIGRATION_MODULE}.marketplace") as mock_marketplace,
patch(f"{MIGRATION_MODULE}.PluginInstaller") as mock_installer_cls,
):
mock_cfg.MARKETPLACE_ENABLED = True
mock_marketplace.download_plugin_pkg.return_value = b"pkg_data"
mock_installer = MagicMock()
mock_installer_cls.return_value = mock_installer
mock_installer.install_from_identifiers.return_value = MagicMock(all_installed=True)
result = PluginMigration.handle_plugin_instance_install(
"tenant1", {"langgenius/openai": "langgenius/openai:1.0.0@abc"}
)
mock_marketplace.download_plugin_pkg.assert_called_once()
assert "success" in result or "failed" in result

View File

@ -0,0 +1,50 @@
from unittest.mock import MagicMock, patch
MODULE = "services.plugin.plugin_service"
class TestFetchLatestPluginVersion:
def test_skips_marketplace_fetch_when_disabled(self) -> None:
"""Cache misses stay None; marketplace is never called when disabled."""
with (
patch(f"{MODULE}.dify_config") as mock_cfg,
patch(f"{MODULE}.redis_client") as mock_redis,
patch(f"{MODULE}.marketplace") as mock_marketplace,
):
mock_cfg.MARKETPLACE_ENABLED = False
mock_redis.get.return_value = None # all cache misses
from services.plugin.plugin_service import PluginService
result = PluginService.fetch_latest_plugin_version(["langgenius/openai", "langgenius/anthropic"])
mock_marketplace.batch_fetch_plugin_manifests.assert_not_called()
assert result == {"langgenius/openai": None, "langgenius/anthropic": None}
def test_calls_marketplace_fetch_when_enabled(self) -> None:
"""Cache misses trigger marketplace fetch when enabled."""
manifest = MagicMock()
manifest.plugin_id = "langgenius/openai"
manifest.latest_version = "1.0.0"
manifest.latest_package_identifier = "langgenius/openai:1.0.0@abc"
manifest.status = "active"
manifest.deprecated_reason = ""
manifest.alternative_plugin_id = ""
with (
patch(f"{MODULE}.dify_config") as mock_cfg,
patch(f"{MODULE}.redis_client") as mock_redis,
patch(f"{MODULE}.marketplace") as mock_marketplace,
):
mock_cfg.MARKETPLACE_ENABLED = True
mock_redis.get.return_value = None
mock_marketplace.batch_fetch_plugin_manifests.return_value = [manifest]
from services.plugin.plugin_service import PluginService
result = PluginService.fetch_latest_plugin_version(["langgenius/openai"])
# The list arg is mutated by remove() after the call, so check call count + result.
mock_marketplace.batch_fetch_plugin_manifests.assert_called_once()
assert result["langgenius/openai"] is not None
assert result["langgenius/openai"].version == "1.0.0"

View File

@ -0,0 +1,36 @@
from pytest_mock import MockerFixture
from services.rag_pipeline.rag_pipeline import RagPipelineService
def _make_service() -> RagPipelineService:
return RagPipelineService.__new__(RagPipelineService)
def test_fetch_recommended_plugin_manifests_returns_empty_when_disabled(
mocker: MockerFixture,
) -> None:
mocker.patch("services.rag_pipeline.rag_pipeline.dify_config.MARKETPLACE_ENABLED", False)
batch_fetch = mocker.patch("services.rag_pipeline.rag_pipeline.marketplace.batch_fetch_plugin_by_ids")
service = _make_service()
result = service._fetch_recommended_plugin_manifests(["langgenius/openai"])
assert result == []
batch_fetch.assert_not_called()
def test_fetch_recommended_plugin_manifests_returns_data_when_enabled(
mocker: MockerFixture,
) -> None:
mocker.patch("services.rag_pipeline.rag_pipeline.dify_config.MARKETPLACE_ENABLED", True)
expected = [{"plugin_id": "langgenius/openai", "name": "OpenAI"}]
mocker.patch(
"services.rag_pipeline.rag_pipeline.marketplace.batch_fetch_plugin_by_ids",
return_value=expected,
)
service = _make_service()
result = service._fetch_recommended_plugin_manifests(["langgenius/openai"])
assert result == expected

View File

@ -1,8 +1,10 @@
import logging
from datetime import UTC, datetime
from types import SimpleNamespace
from typing import cast
import pytest
from pytest_mock import MockerFixture
from models.dataset import Dataset
from services.entities.knowledge_entities.rag_pipeline_entities import KnowledgeConfiguration
@ -514,3 +516,64 @@ def test_deal_document_data_upload_file_with_existing_file(mocker) -> None:
assert document.data_source_type == "local_file"
assert "real_file_id" in document.data_source_info
assert add_mock.call_count >= 2
def _make_service():
return RagPipelineTransformService.__new__(RagPipelineTransformService)
def test_deal_dependencies_skips_marketplace_when_disabled(mocker: MockerFixture, caplog) -> None:
mocker.patch(
"services.rag_pipeline.rag_pipeline_transform_service.dify_config.MARKETPLACE_ENABLED",
False,
)
installer = mocker.patch("services.rag_pipeline.rag_pipeline_transform_service.PluginInstaller").return_value
installer.list_plugins.return_value = []
mocker.patch("services.rag_pipeline.rag_pipeline_transform_service.PluginMigration")
install_call = mocker.patch(
"services.rag_pipeline.rag_pipeline_transform_service.PluginService.install_from_marketplace_pkg"
)
pipeline_yaml = {
"dependencies": [
{
"type": "marketplace",
"value": {"plugin_unique_identifier": "langgenius/openai:1.0.0@abc"},
}
]
}
service = _make_service()
with caplog.at_level(logging.WARNING):
service._deal_dependencies(pipeline_yaml, "tenant-1")
install_call.assert_not_called()
assert any("Marketplace disabled" in rec.message for rec in caplog.records)
def test_deal_dependencies_installs_when_enabled(mocker: MockerFixture) -> None:
mocker.patch(
"services.rag_pipeline.rag_pipeline_transform_service.dify_config.MARKETPLACE_ENABLED",
True,
)
installer = mocker.patch("services.rag_pipeline.rag_pipeline_transform_service.PluginInstaller").return_value
installer.list_plugins.return_value = []
migration = mocker.patch("services.rag_pipeline.rag_pipeline_transform_service.PluginMigration").return_value
migration._fetch_plugin_unique_identifier.return_value = "langgenius/openai:1.0.0@abc"
install_call = mocker.patch(
"services.rag_pipeline.rag_pipeline_transform_service.PluginService.install_from_marketplace_pkg"
)
pipeline_yaml = {
"dependencies": [
{
"type": "marketplace",
"value": {"plugin_unique_identifier": "langgenius/openai:1.0.0@abc"},
}
]
}
service = _make_service()
service._deal_dependencies(pipeline_yaml, "tenant-1")
install_call.assert_called_once_with("tenant-1", ["langgenius/openai:1.0.0@abc"])