dify/api/tests/unit_tests/services/test_model_provider_service.py
Yunlu Wen 3193e8a712
chore: reorg imports (#35308)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-16 08:50:02 +00:00

603 lines
24 KiB
Python

from types import SimpleNamespace
from typing import Any
from unittest.mock import MagicMock
import pytest
from core.entities.model_entities import ModelStatus
from graphon.model_runtime.entities.common_entities import I18nObject
from graphon.model_runtime.entities.model_entities import FetchFrom, ModelType, ParameterRule, ParameterType
from models.provider import ProviderType
from services import model_provider_service as service_module
from services.errors.app_model_config import ProviderNotFoundError
from services.model_provider_service import ModelProviderService
def _create_service_with_mocked_manager() -> tuple[ModelProviderService, MagicMock]:
manager = MagicMock()
service = ModelProviderService()
service._get_provider_manager = MagicMock(return_value=manager)
return service, manager
def _build_provider_configuration(
*,
provider_name: str = "openai",
supported_model_types: list[ModelType] | None = None,
custom_models: list[Any] | None = None,
custom_config_available: bool = True,
) -> SimpleNamespace:
if supported_model_types is None:
supported_model_types = [ModelType.LLM]
return SimpleNamespace(
provider=SimpleNamespace(
provider=provider_name,
label=I18nObject(en_US=provider_name),
description=None,
icon_small=None,
icon_small_dark=None,
background=None,
help=None,
supported_model_types=supported_model_types,
configurate_methods=[],
provider_credential_schema=None,
model_credential_schema=None,
),
preferred_provider_type=ProviderType.CUSTOM,
custom_configuration=SimpleNamespace(
provider=SimpleNamespace(
current_credential_id="cred-1",
current_credential_name="Credential 1",
available_credentials=[],
),
models=custom_models,
can_added_models=[],
),
system_configuration=SimpleNamespace(enabled=False, current_quota_type=None, quota_configurations=[]),
is_custom_configuration_available=lambda: custom_config_available,
)
class TestModelProviderServiceConfiguration:
def test__get_provider_configuration_should_return_configuration_when_provider_exists(self) -> None:
service, manager = _create_service_with_mocked_manager()
provider_configuration = SimpleNamespace(name="provider-config")
manager.get_configurations.return_value = {"openai": provider_configuration}
result = service._get_provider_configuration(tenant_id="tenant-1", provider="openai")
assert result is provider_configuration
def test__get_provider_configuration_should_raise_error_when_provider_is_missing(self) -> None:
service, manager = _create_service_with_mocked_manager()
manager.get_configurations.return_value = {}
with pytest.raises(ProviderNotFoundError, match="does not exist"):
service._get_provider_configuration(tenant_id="tenant-1", provider="missing")
def test_get_provider_list_should_filter_by_model_type_and_build_no_configure_status(self) -> None:
service, manager = _create_service_with_mocked_manager()
allowed = _build_provider_configuration(
provider_name="openai",
supported_model_types=[ModelType.LLM],
custom_config_available=False,
)
filtered = _build_provider_configuration(
provider_name="embedding",
supported_model_types=[ModelType.TEXT_EMBEDDING],
custom_config_available=True,
)
manager.get_configurations.return_value = {"openai": allowed, "embedding": filtered}
result = service.get_provider_list(tenant_id="tenant-1", model_type=ModelType.LLM.value)
assert len(result) == 1
assert result[0].provider == "openai"
assert result[0].custom_configuration.status.value == "no-configure"
def test_get_models_by_provider_should_wrap_model_entities_with_tenant_context(self) -> None:
service, manager = _create_service_with_mocked_manager()
class _Model:
def __init__(self, model_name: str) -> None:
self.model_name = model_name
def model_dump(self) -> dict[str, Any]:
return {
"model": self.model_name,
"label": {"en_US": self.model_name},
"model_type": ModelType.LLM,
"features": [],
"fetch_from": FetchFrom.PREDEFINED_MODEL,
"model_properties": {},
"deprecated": False,
"status": ModelStatus.ACTIVE,
"load_balancing_enabled": False,
"has_invalid_load_balancing_configs": False,
"provider": {
"provider": "openai",
"label": {"en_US": "OpenAI"},
"icon_small": None,
"icon_small_dark": None,
"supported_model_types": [ModelType.LLM],
},
}
provider_configurations = SimpleNamespace(
get_models=MagicMock(return_value=[_Model("gpt-4o"), _Model("gpt-4o-mini")])
)
manager.get_configurations.return_value = provider_configurations
result = service.get_models_by_provider(tenant_id="tenant-1", provider="openai")
assert len(result) == 2
assert result[0].model == "gpt-4o"
assert result[1].provider.provider == "openai"
provider_configurations.get_models.assert_called_once_with(provider="openai")
class TestModelProviderServiceDelegation:
@pytest.mark.parametrize(
("method_name", "method_kwargs", "provider_method_name", "provider_call_kwargs", "provider_return"),
[
(
"get_provider_credential",
{"tenant_id": "tenant-1", "provider": "openai", "credential_id": "cred-1"},
"get_provider_credential",
{"credential_id": "cred-1"},
{"token": "abc"},
),
(
"validate_provider_credentials",
{"tenant_id": "tenant-1", "provider": "openai", "credentials": {"token": "abc"}},
"validate_provider_credentials",
({"token": "abc"},),
None,
),
(
"create_provider_credential",
{
"tenant_id": "tenant-1",
"provider": "openai",
"credentials": {"token": "abc"},
"credential_name": "A",
},
"create_provider_credential",
({"token": "abc"}, "A"),
None,
),
(
"update_provider_credential",
{
"tenant_id": "tenant-1",
"provider": "openai",
"credentials": {"token": "abc"},
"credential_id": "cred-1",
"credential_name": "B",
},
"update_provider_credential",
{"credential_id": "cred-1", "credentials": {"token": "abc"}, "credential_name": "B"},
None,
),
(
"remove_provider_credential",
{"tenant_id": "tenant-1", "provider": "openai", "credential_id": "cred-1"},
"delete_provider_credential",
{"credential_id": "cred-1"},
None,
),
(
"switch_active_provider_credential",
{"tenant_id": "tenant-1", "provider": "openai", "credential_id": "cred-1"},
"switch_active_provider_credential",
{"credential_id": "cred-1"},
None,
),
],
)
def test_provider_credential_methods_should_delegate_to_provider_configuration(
self,
method_name: str,
method_kwargs: dict[str, Any],
provider_method_name: str,
provider_call_kwargs: Any,
provider_return: Any,
monkeypatch: pytest.MonkeyPatch,
) -> None:
service = ModelProviderService()
provider_configuration = MagicMock()
getattr(provider_configuration, provider_method_name).return_value = provider_return
get_provider_config_mock = MagicMock(return_value=provider_configuration)
monkeypatch.setattr(service, "_get_provider_configuration", get_provider_config_mock)
result = getattr(service, method_name)(**method_kwargs)
get_provider_config_mock.assert_called_once_with("tenant-1", "openai")
provider_method = getattr(provider_configuration, provider_method_name)
if isinstance(provider_call_kwargs, tuple):
provider_method.assert_called_once_with(*provider_call_kwargs)
elif isinstance(provider_call_kwargs, dict):
provider_method.assert_called_once_with(**provider_call_kwargs)
else:
provider_method.assert_called_once_with(provider_call_kwargs)
if method_name == "get_provider_credential":
assert result == {"token": "abc"}
@pytest.mark.parametrize(
("method_name", "method_kwargs", "provider_method_name", "expected_kwargs", "provider_return"),
[
(
"get_model_credential",
{
"tenant_id": "tenant-1",
"provider": "openai",
"model_type": ModelType.LLM.value,
"model": "gpt-4o",
"credential_id": "cred-1",
},
"get_custom_model_credential",
{"model_type": ModelType.LLM, "model": "gpt-4o", "credential_id": "cred-1"},
{"api_key": "x"},
),
(
"validate_model_credentials",
{
"tenant_id": "tenant-1",
"provider": "openai",
"model_type": ModelType.LLM.value,
"model": "gpt-4o",
"credentials": {"api_key": "x"},
},
"validate_custom_model_credentials",
{"model_type": ModelType.LLM, "model": "gpt-4o", "credentials": {"api_key": "x"}},
None,
),
(
"create_model_credential",
{
"tenant_id": "tenant-1",
"provider": "openai",
"model_type": ModelType.LLM.value,
"model": "gpt-4o",
"credentials": {"api_key": "x"},
"credential_name": "cred-a",
},
"create_custom_model_credential",
{
"model_type": ModelType.LLM,
"model": "gpt-4o",
"credentials": {"api_key": "x"},
"credential_name": "cred-a",
},
None,
),
(
"update_model_credential",
{
"tenant_id": "tenant-1",
"provider": "openai",
"model_type": ModelType.LLM.value,
"model": "gpt-4o",
"credentials": {"api_key": "x"},
"credential_id": "cred-1",
"credential_name": "cred-b",
},
"update_custom_model_credential",
{
"model_type": ModelType.LLM,
"model": "gpt-4o",
"credentials": {"api_key": "x"},
"credential_id": "cred-1",
"credential_name": "cred-b",
},
None,
),
(
"remove_model_credential",
{
"tenant_id": "tenant-1",
"provider": "openai",
"model_type": ModelType.LLM.value,
"model": "gpt-4o",
"credential_id": "cred-1",
},
"delete_custom_model_credential",
{"model_type": ModelType.LLM, "model": "gpt-4o", "credential_id": "cred-1"},
None,
),
(
"switch_active_custom_model_credential",
{
"tenant_id": "tenant-1",
"provider": "openai",
"model_type": ModelType.LLM.value,
"model": "gpt-4o",
"credential_id": "cred-1",
},
"switch_custom_model_credential",
{"model_type": ModelType.LLM, "model": "gpt-4o", "credential_id": "cred-1"},
None,
),
(
"add_model_credential_to_model_list",
{
"tenant_id": "tenant-1",
"provider": "openai",
"model_type": ModelType.LLM.value,
"model": "gpt-4o",
"credential_id": "cred-1",
},
"add_model_credential_to_model",
{"model_type": ModelType.LLM, "model": "gpt-4o", "credential_id": "cred-1"},
None,
),
(
"remove_model",
{
"tenant_id": "tenant-1",
"provider": "openai",
"model_type": ModelType.LLM.value,
"model": "gpt-4o",
},
"delete_custom_model",
{"model_type": ModelType.LLM, "model": "gpt-4o"},
None,
),
],
)
def test_custom_model_methods_should_convert_model_type_and_delegate(
self,
method_name: str,
method_kwargs: dict[str, Any],
provider_method_name: str,
expected_kwargs: dict[str, Any],
provider_return: Any,
monkeypatch: pytest.MonkeyPatch,
) -> None:
service = ModelProviderService()
provider_configuration = MagicMock()
getattr(provider_configuration, provider_method_name).return_value = provider_return
get_provider_config_mock = MagicMock(return_value=provider_configuration)
monkeypatch.setattr(service, "_get_provider_configuration", get_provider_config_mock)
result = getattr(service, method_name)(**method_kwargs)
get_provider_config_mock.assert_called_once_with("tenant-1", "openai")
getattr(provider_configuration, provider_method_name).assert_called_once_with(**expected_kwargs)
if method_name == "get_model_credential":
assert result == {"api_key": "x"}
class TestModelProviderServiceListingsAndDefaults:
def test_get_models_by_model_type_should_group_active_non_deprecated_models(self) -> None:
service, manager = _create_service_with_mocked_manager()
openai_provider = SimpleNamespace(
provider="openai",
label=I18nObject(en_US="OpenAI"),
icon_small=None,
icon_small_dark=None,
)
anthropic_provider = SimpleNamespace(
provider="anthropic",
label=I18nObject(en_US="Anthropic"),
icon_small=None,
icon_small_dark=None,
)
models = [
SimpleNamespace(
provider=openai_provider,
model="gpt-4o",
label=I18nObject(en_US="GPT-4o"),
model_type=ModelType.LLM,
features=[],
fetch_from=FetchFrom.PREDEFINED_MODEL,
model_properties={},
status=ModelStatus.ACTIVE,
load_balancing_enabled=False,
deprecated=False,
),
SimpleNamespace(
provider=openai_provider,
model="old-openai",
label=I18nObject(en_US="Old OpenAI"),
model_type=ModelType.LLM,
features=[],
fetch_from=FetchFrom.PREDEFINED_MODEL,
model_properties={},
status=ModelStatus.ACTIVE,
load_balancing_enabled=False,
deprecated=True,
),
SimpleNamespace(
provider=anthropic_provider,
model="old-anthropic",
label=I18nObject(en_US="Old Anthropic"),
model_type=ModelType.LLM,
features=[],
fetch_from=FetchFrom.PREDEFINED_MODEL,
model_properties={},
status=ModelStatus.ACTIVE,
load_balancing_enabled=False,
deprecated=True,
),
]
provider_configurations = SimpleNamespace(get_models=MagicMock(return_value=models))
manager.get_configurations.return_value = provider_configurations
result = service.get_models_by_model_type(tenant_id="tenant-1", model_type=ModelType.LLM.value)
provider_configurations.get_models.assert_called_once_with(model_type=ModelType.LLM, only_active=True)
assert len(result) == 1
assert result[0].provider == "openai"
assert len(result[0].models) == 1
assert result[0].models[0].model == "gpt-4o"
@pytest.mark.parametrize(
("credentials", "schema", "expected_count"),
[
(None, None, 0),
({"api_key": "x"}, None, 0),
(
{"api_key": "x"},
SimpleNamespace(
parameter_rules=[
ParameterRule(
name="temperature",
label=I18nObject(en_US="Temperature"),
type=ParameterType.FLOAT,
)
]
),
1,
),
],
)
def test_get_model_parameter_rules_should_handle_missing_credentials_and_schema(
self,
credentials: dict[str, Any] | None,
schema: Any,
expected_count: int,
monkeypatch: pytest.MonkeyPatch,
) -> None:
service = ModelProviderService()
provider_configuration = MagicMock()
provider_configuration.get_current_credentials.return_value = credentials
provider_configuration.get_model_schema.return_value = schema
monkeypatch.setattr(service, "_get_provider_configuration", MagicMock(return_value=provider_configuration))
result = service.get_model_parameter_rules(tenant_id="tenant-1", provider="openai", model="gpt-4o")
assert len(result) == expected_count
provider_configuration.get_current_credentials.assert_called_once_with(
model_type=ModelType.LLM,
model="gpt-4o",
)
if credentials:
provider_configuration.get_model_schema.assert_called_once_with(
model_type=ModelType.LLM,
model="gpt-4o",
credentials=credentials,
)
else:
provider_configuration.get_model_schema.assert_not_called()
def test_get_default_model_of_model_type_should_return_response_when_manager_returns_model(self) -> None:
service, manager = _create_service_with_mocked_manager()
manager.get_default_model.return_value = SimpleNamespace(
model="gpt-4o",
model_type=ModelType.LLM,
provider=SimpleNamespace(
provider="openai",
label=I18nObject(en_US="OpenAI"),
icon_small=None,
supported_model_types=[ModelType.LLM],
),
)
result = service.get_default_model_of_model_type(tenant_id="tenant-1", model_type=ModelType.LLM.value)
assert result is not None
assert result.model == "gpt-4o"
assert result.provider.provider == "openai"
manager.get_default_model.assert_called_once_with(tenant_id="tenant-1", model_type=ModelType.LLM)
def test_get_default_model_of_model_type_should_return_none_when_manager_returns_none(self) -> None:
service, manager = _create_service_with_mocked_manager()
manager.get_default_model.return_value = None
result = service.get_default_model_of_model_type(tenant_id="tenant-1", model_type=ModelType.LLM.value)
assert result is None
def test_get_default_model_of_model_type_should_return_none_when_manager_raises_exception(self) -> None:
service, manager = _create_service_with_mocked_manager()
manager.get_default_model.side_effect = RuntimeError("boom")
result = service.get_default_model_of_model_type(tenant_id="tenant-1", model_type=ModelType.LLM.value)
assert result is None
def test_update_default_model_of_model_type_should_delegate_to_provider_manager(self) -> None:
service, manager = _create_service_with_mocked_manager()
service.update_default_model_of_model_type(
tenant_id="tenant-1",
model_type=ModelType.LLM.value,
provider="openai",
model="gpt-4o",
)
manager.update_default_model_record.assert_called_once_with(
tenant_id="tenant-1",
model_type=ModelType.LLM,
provider="openai",
model="gpt-4o",
)
def test_get_model_provider_icon_should_fetch_icon_bytes_from_factory(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
service = ModelProviderService()
factory_instance = MagicMock()
factory_instance.get_provider_icon.return_value = (b"icon-bytes", "image/png")
factory_constructor = MagicMock(return_value=factory_instance)
monkeypatch.setattr(service_module, "create_plugin_model_provider_factory", factory_constructor)
result = service.get_model_provider_icon(
tenant_id="tenant-1",
provider="openai",
icon_type="icon_small",
lang="en_US",
)
factory_constructor.assert_called_once_with(tenant_id="tenant-1")
factory_instance.get_provider_icon.assert_called_once_with("openai", "icon_small", "en_US")
assert result == (b"icon-bytes", "image/png")
def test_switch_preferred_provider_should_convert_enum_and_delegate(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
service = ModelProviderService()
provider_configuration = MagicMock()
monkeypatch.setattr(service, "_get_provider_configuration", MagicMock(return_value=provider_configuration))
service.switch_preferred_provider(
tenant_id="tenant-1",
provider="openai",
preferred_provider_type=ProviderType.SYSTEM.value,
)
provider_configuration.switch_preferred_provider_type.assert_called_once_with(ProviderType.SYSTEM)
@pytest.mark.parametrize(
("method_name", "provider_method_name"),
[
("enable_model", "enable_model"),
("disable_model", "disable_model"),
],
)
def test_model_enablement_methods_should_convert_model_type_and_delegate(
self,
method_name: str,
provider_method_name: str,
monkeypatch: pytest.MonkeyPatch,
) -> None:
service = ModelProviderService()
provider_configuration = MagicMock()
monkeypatch.setattr(service, "_get_provider_configuration", MagicMock(return_value=provider_configuration))
getattr(service, method_name)(
tenant_id="tenant-1",
provider="openai",
model="gpt-4o",
model_type=ModelType.LLM.value,
)
getattr(provider_configuration, provider_method_name).assert_called_once_with(
model="gpt-4o",
model_type=ModelType.LLM,
)