mirror of
https://github.com/langgenius/dify.git
synced 2026-04-16 18:39:18 +08:00
603 lines
24 KiB
Python
603 lines
24 KiB
Python
from types import SimpleNamespace
|
|
from typing import Any
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
from core.entities.model_entities import ModelStatus
|
|
from graphon.model_runtime.entities.common_entities import I18nObject
|
|
from graphon.model_runtime.entities.model_entities import FetchFrom, ModelType, ParameterRule, ParameterType
|
|
from models.provider import ProviderType
|
|
from services import model_provider_service as service_module
|
|
from services.errors.app_model_config import ProviderNotFoundError
|
|
from services.model_provider_service import ModelProviderService
|
|
|
|
|
|
def _create_service_with_mocked_manager() -> tuple[ModelProviderService, MagicMock]:
|
|
manager = MagicMock()
|
|
service = ModelProviderService()
|
|
service._get_provider_manager = MagicMock(return_value=manager)
|
|
return service, manager
|
|
|
|
|
|
def _build_provider_configuration(
|
|
*,
|
|
provider_name: str = "openai",
|
|
supported_model_types: list[ModelType] | None = None,
|
|
custom_models: list[Any] | None = None,
|
|
custom_config_available: bool = True,
|
|
) -> SimpleNamespace:
|
|
if supported_model_types is None:
|
|
supported_model_types = [ModelType.LLM]
|
|
|
|
return SimpleNamespace(
|
|
provider=SimpleNamespace(
|
|
provider=provider_name,
|
|
label=I18nObject(en_US=provider_name),
|
|
description=None,
|
|
icon_small=None,
|
|
icon_small_dark=None,
|
|
background=None,
|
|
help=None,
|
|
supported_model_types=supported_model_types,
|
|
configurate_methods=[],
|
|
provider_credential_schema=None,
|
|
model_credential_schema=None,
|
|
),
|
|
preferred_provider_type=ProviderType.CUSTOM,
|
|
custom_configuration=SimpleNamespace(
|
|
provider=SimpleNamespace(
|
|
current_credential_id="cred-1",
|
|
current_credential_name="Credential 1",
|
|
available_credentials=[],
|
|
),
|
|
models=custom_models,
|
|
can_added_models=[],
|
|
),
|
|
system_configuration=SimpleNamespace(enabled=False, current_quota_type=None, quota_configurations=[]),
|
|
is_custom_configuration_available=lambda: custom_config_available,
|
|
)
|
|
|
|
|
|
class TestModelProviderServiceConfiguration:
|
|
def test__get_provider_configuration_should_return_configuration_when_provider_exists(self) -> None:
|
|
service, manager = _create_service_with_mocked_manager()
|
|
provider_configuration = SimpleNamespace(name="provider-config")
|
|
manager.get_configurations.return_value = {"openai": provider_configuration}
|
|
|
|
result = service._get_provider_configuration(tenant_id="tenant-1", provider="openai")
|
|
|
|
assert result is provider_configuration
|
|
|
|
def test__get_provider_configuration_should_raise_error_when_provider_is_missing(self) -> None:
|
|
service, manager = _create_service_with_mocked_manager()
|
|
manager.get_configurations.return_value = {}
|
|
|
|
with pytest.raises(ProviderNotFoundError, match="does not exist"):
|
|
service._get_provider_configuration(tenant_id="tenant-1", provider="missing")
|
|
|
|
def test_get_provider_list_should_filter_by_model_type_and_build_no_configure_status(self) -> None:
|
|
service, manager = _create_service_with_mocked_manager()
|
|
allowed = _build_provider_configuration(
|
|
provider_name="openai",
|
|
supported_model_types=[ModelType.LLM],
|
|
custom_config_available=False,
|
|
)
|
|
filtered = _build_provider_configuration(
|
|
provider_name="embedding",
|
|
supported_model_types=[ModelType.TEXT_EMBEDDING],
|
|
custom_config_available=True,
|
|
)
|
|
manager.get_configurations.return_value = {"openai": allowed, "embedding": filtered}
|
|
|
|
result = service.get_provider_list(tenant_id="tenant-1", model_type=ModelType.LLM.value)
|
|
|
|
assert len(result) == 1
|
|
assert result[0].provider == "openai"
|
|
assert result[0].custom_configuration.status.value == "no-configure"
|
|
|
|
def test_get_models_by_provider_should_wrap_model_entities_with_tenant_context(self) -> None:
|
|
service, manager = _create_service_with_mocked_manager()
|
|
|
|
class _Model:
|
|
def __init__(self, model_name: str) -> None:
|
|
self.model_name = model_name
|
|
|
|
def model_dump(self) -> dict[str, Any]:
|
|
return {
|
|
"model": self.model_name,
|
|
"label": {"en_US": self.model_name},
|
|
"model_type": ModelType.LLM,
|
|
"features": [],
|
|
"fetch_from": FetchFrom.PREDEFINED_MODEL,
|
|
"model_properties": {},
|
|
"deprecated": False,
|
|
"status": ModelStatus.ACTIVE,
|
|
"load_balancing_enabled": False,
|
|
"has_invalid_load_balancing_configs": False,
|
|
"provider": {
|
|
"provider": "openai",
|
|
"label": {"en_US": "OpenAI"},
|
|
"icon_small": None,
|
|
"icon_small_dark": None,
|
|
"supported_model_types": [ModelType.LLM],
|
|
},
|
|
}
|
|
|
|
provider_configurations = SimpleNamespace(
|
|
get_models=MagicMock(return_value=[_Model("gpt-4o"), _Model("gpt-4o-mini")])
|
|
)
|
|
manager.get_configurations.return_value = provider_configurations
|
|
|
|
result = service.get_models_by_provider(tenant_id="tenant-1", provider="openai")
|
|
|
|
assert len(result) == 2
|
|
assert result[0].model == "gpt-4o"
|
|
assert result[1].provider.provider == "openai"
|
|
provider_configurations.get_models.assert_called_once_with(provider="openai")
|
|
|
|
|
|
class TestModelProviderServiceDelegation:
|
|
@pytest.mark.parametrize(
|
|
("method_name", "method_kwargs", "provider_method_name", "provider_call_kwargs", "provider_return"),
|
|
[
|
|
(
|
|
"get_provider_credential",
|
|
{"tenant_id": "tenant-1", "provider": "openai", "credential_id": "cred-1"},
|
|
"get_provider_credential",
|
|
{"credential_id": "cred-1"},
|
|
{"token": "abc"},
|
|
),
|
|
(
|
|
"validate_provider_credentials",
|
|
{"tenant_id": "tenant-1", "provider": "openai", "credentials": {"token": "abc"}},
|
|
"validate_provider_credentials",
|
|
({"token": "abc"},),
|
|
None,
|
|
),
|
|
(
|
|
"create_provider_credential",
|
|
{
|
|
"tenant_id": "tenant-1",
|
|
"provider": "openai",
|
|
"credentials": {"token": "abc"},
|
|
"credential_name": "A",
|
|
},
|
|
"create_provider_credential",
|
|
({"token": "abc"}, "A"),
|
|
None,
|
|
),
|
|
(
|
|
"update_provider_credential",
|
|
{
|
|
"tenant_id": "tenant-1",
|
|
"provider": "openai",
|
|
"credentials": {"token": "abc"},
|
|
"credential_id": "cred-1",
|
|
"credential_name": "B",
|
|
},
|
|
"update_provider_credential",
|
|
{"credential_id": "cred-1", "credentials": {"token": "abc"}, "credential_name": "B"},
|
|
None,
|
|
),
|
|
(
|
|
"remove_provider_credential",
|
|
{"tenant_id": "tenant-1", "provider": "openai", "credential_id": "cred-1"},
|
|
"delete_provider_credential",
|
|
{"credential_id": "cred-1"},
|
|
None,
|
|
),
|
|
(
|
|
"switch_active_provider_credential",
|
|
{"tenant_id": "tenant-1", "provider": "openai", "credential_id": "cred-1"},
|
|
"switch_active_provider_credential",
|
|
{"credential_id": "cred-1"},
|
|
None,
|
|
),
|
|
],
|
|
)
|
|
def test_provider_credential_methods_should_delegate_to_provider_configuration(
|
|
self,
|
|
method_name: str,
|
|
method_kwargs: dict[str, Any],
|
|
provider_method_name: str,
|
|
provider_call_kwargs: Any,
|
|
provider_return: Any,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
service = ModelProviderService()
|
|
provider_configuration = MagicMock()
|
|
getattr(provider_configuration, provider_method_name).return_value = provider_return
|
|
get_provider_config_mock = MagicMock(return_value=provider_configuration)
|
|
monkeypatch.setattr(service, "_get_provider_configuration", get_provider_config_mock)
|
|
|
|
result = getattr(service, method_name)(**method_kwargs)
|
|
|
|
get_provider_config_mock.assert_called_once_with("tenant-1", "openai")
|
|
provider_method = getattr(provider_configuration, provider_method_name)
|
|
if isinstance(provider_call_kwargs, tuple):
|
|
provider_method.assert_called_once_with(*provider_call_kwargs)
|
|
elif isinstance(provider_call_kwargs, dict):
|
|
provider_method.assert_called_once_with(**provider_call_kwargs)
|
|
else:
|
|
provider_method.assert_called_once_with(provider_call_kwargs)
|
|
if method_name == "get_provider_credential":
|
|
assert result == {"token": "abc"}
|
|
|
|
@pytest.mark.parametrize(
|
|
("method_name", "method_kwargs", "provider_method_name", "expected_kwargs", "provider_return"),
|
|
[
|
|
(
|
|
"get_model_credential",
|
|
{
|
|
"tenant_id": "tenant-1",
|
|
"provider": "openai",
|
|
"model_type": ModelType.LLM.value,
|
|
"model": "gpt-4o",
|
|
"credential_id": "cred-1",
|
|
},
|
|
"get_custom_model_credential",
|
|
{"model_type": ModelType.LLM, "model": "gpt-4o", "credential_id": "cred-1"},
|
|
{"api_key": "x"},
|
|
),
|
|
(
|
|
"validate_model_credentials",
|
|
{
|
|
"tenant_id": "tenant-1",
|
|
"provider": "openai",
|
|
"model_type": ModelType.LLM.value,
|
|
"model": "gpt-4o",
|
|
"credentials": {"api_key": "x"},
|
|
},
|
|
"validate_custom_model_credentials",
|
|
{"model_type": ModelType.LLM, "model": "gpt-4o", "credentials": {"api_key": "x"}},
|
|
None,
|
|
),
|
|
(
|
|
"create_model_credential",
|
|
{
|
|
"tenant_id": "tenant-1",
|
|
"provider": "openai",
|
|
"model_type": ModelType.LLM.value,
|
|
"model": "gpt-4o",
|
|
"credentials": {"api_key": "x"},
|
|
"credential_name": "cred-a",
|
|
},
|
|
"create_custom_model_credential",
|
|
{
|
|
"model_type": ModelType.LLM,
|
|
"model": "gpt-4o",
|
|
"credentials": {"api_key": "x"},
|
|
"credential_name": "cred-a",
|
|
},
|
|
None,
|
|
),
|
|
(
|
|
"update_model_credential",
|
|
{
|
|
"tenant_id": "tenant-1",
|
|
"provider": "openai",
|
|
"model_type": ModelType.LLM.value,
|
|
"model": "gpt-4o",
|
|
"credentials": {"api_key": "x"},
|
|
"credential_id": "cred-1",
|
|
"credential_name": "cred-b",
|
|
},
|
|
"update_custom_model_credential",
|
|
{
|
|
"model_type": ModelType.LLM,
|
|
"model": "gpt-4o",
|
|
"credentials": {"api_key": "x"},
|
|
"credential_id": "cred-1",
|
|
"credential_name": "cred-b",
|
|
},
|
|
None,
|
|
),
|
|
(
|
|
"remove_model_credential",
|
|
{
|
|
"tenant_id": "tenant-1",
|
|
"provider": "openai",
|
|
"model_type": ModelType.LLM.value,
|
|
"model": "gpt-4o",
|
|
"credential_id": "cred-1",
|
|
},
|
|
"delete_custom_model_credential",
|
|
{"model_type": ModelType.LLM, "model": "gpt-4o", "credential_id": "cred-1"},
|
|
None,
|
|
),
|
|
(
|
|
"switch_active_custom_model_credential",
|
|
{
|
|
"tenant_id": "tenant-1",
|
|
"provider": "openai",
|
|
"model_type": ModelType.LLM.value,
|
|
"model": "gpt-4o",
|
|
"credential_id": "cred-1",
|
|
},
|
|
"switch_custom_model_credential",
|
|
{"model_type": ModelType.LLM, "model": "gpt-4o", "credential_id": "cred-1"},
|
|
None,
|
|
),
|
|
(
|
|
"add_model_credential_to_model_list",
|
|
{
|
|
"tenant_id": "tenant-1",
|
|
"provider": "openai",
|
|
"model_type": ModelType.LLM.value,
|
|
"model": "gpt-4o",
|
|
"credential_id": "cred-1",
|
|
},
|
|
"add_model_credential_to_model",
|
|
{"model_type": ModelType.LLM, "model": "gpt-4o", "credential_id": "cred-1"},
|
|
None,
|
|
),
|
|
(
|
|
"remove_model",
|
|
{
|
|
"tenant_id": "tenant-1",
|
|
"provider": "openai",
|
|
"model_type": ModelType.LLM.value,
|
|
"model": "gpt-4o",
|
|
},
|
|
"delete_custom_model",
|
|
{"model_type": ModelType.LLM, "model": "gpt-4o"},
|
|
None,
|
|
),
|
|
],
|
|
)
|
|
def test_custom_model_methods_should_convert_model_type_and_delegate(
|
|
self,
|
|
method_name: str,
|
|
method_kwargs: dict[str, Any],
|
|
provider_method_name: str,
|
|
expected_kwargs: dict[str, Any],
|
|
provider_return: Any,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
service = ModelProviderService()
|
|
provider_configuration = MagicMock()
|
|
getattr(provider_configuration, provider_method_name).return_value = provider_return
|
|
get_provider_config_mock = MagicMock(return_value=provider_configuration)
|
|
monkeypatch.setattr(service, "_get_provider_configuration", get_provider_config_mock)
|
|
|
|
result = getattr(service, method_name)(**method_kwargs)
|
|
|
|
get_provider_config_mock.assert_called_once_with("tenant-1", "openai")
|
|
getattr(provider_configuration, provider_method_name).assert_called_once_with(**expected_kwargs)
|
|
if method_name == "get_model_credential":
|
|
assert result == {"api_key": "x"}
|
|
|
|
|
|
class TestModelProviderServiceListingsAndDefaults:
|
|
def test_get_models_by_model_type_should_group_active_non_deprecated_models(self) -> None:
|
|
service, manager = _create_service_with_mocked_manager()
|
|
openai_provider = SimpleNamespace(
|
|
provider="openai",
|
|
label=I18nObject(en_US="OpenAI"),
|
|
icon_small=None,
|
|
icon_small_dark=None,
|
|
)
|
|
anthropic_provider = SimpleNamespace(
|
|
provider="anthropic",
|
|
label=I18nObject(en_US="Anthropic"),
|
|
icon_small=None,
|
|
icon_small_dark=None,
|
|
)
|
|
models = [
|
|
SimpleNamespace(
|
|
provider=openai_provider,
|
|
model="gpt-4o",
|
|
label=I18nObject(en_US="GPT-4o"),
|
|
model_type=ModelType.LLM,
|
|
features=[],
|
|
fetch_from=FetchFrom.PREDEFINED_MODEL,
|
|
model_properties={},
|
|
status=ModelStatus.ACTIVE,
|
|
load_balancing_enabled=False,
|
|
deprecated=False,
|
|
),
|
|
SimpleNamespace(
|
|
provider=openai_provider,
|
|
model="old-openai",
|
|
label=I18nObject(en_US="Old OpenAI"),
|
|
model_type=ModelType.LLM,
|
|
features=[],
|
|
fetch_from=FetchFrom.PREDEFINED_MODEL,
|
|
model_properties={},
|
|
status=ModelStatus.ACTIVE,
|
|
load_balancing_enabled=False,
|
|
deprecated=True,
|
|
),
|
|
SimpleNamespace(
|
|
provider=anthropic_provider,
|
|
model="old-anthropic",
|
|
label=I18nObject(en_US="Old Anthropic"),
|
|
model_type=ModelType.LLM,
|
|
features=[],
|
|
fetch_from=FetchFrom.PREDEFINED_MODEL,
|
|
model_properties={},
|
|
status=ModelStatus.ACTIVE,
|
|
load_balancing_enabled=False,
|
|
deprecated=True,
|
|
),
|
|
]
|
|
provider_configurations = SimpleNamespace(get_models=MagicMock(return_value=models))
|
|
manager.get_configurations.return_value = provider_configurations
|
|
|
|
result = service.get_models_by_model_type(tenant_id="tenant-1", model_type=ModelType.LLM.value)
|
|
|
|
provider_configurations.get_models.assert_called_once_with(model_type=ModelType.LLM, only_active=True)
|
|
assert len(result) == 1
|
|
assert result[0].provider == "openai"
|
|
assert len(result[0].models) == 1
|
|
assert result[0].models[0].model == "gpt-4o"
|
|
|
|
@pytest.mark.parametrize(
|
|
("credentials", "schema", "expected_count"),
|
|
[
|
|
(None, None, 0),
|
|
({"api_key": "x"}, None, 0),
|
|
(
|
|
{"api_key": "x"},
|
|
SimpleNamespace(
|
|
parameter_rules=[
|
|
ParameterRule(
|
|
name="temperature",
|
|
label=I18nObject(en_US="Temperature"),
|
|
type=ParameterType.FLOAT,
|
|
)
|
|
]
|
|
),
|
|
1,
|
|
),
|
|
],
|
|
)
|
|
def test_get_model_parameter_rules_should_handle_missing_credentials_and_schema(
|
|
self,
|
|
credentials: dict[str, Any] | None,
|
|
schema: Any,
|
|
expected_count: int,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
service = ModelProviderService()
|
|
provider_configuration = MagicMock()
|
|
provider_configuration.get_current_credentials.return_value = credentials
|
|
provider_configuration.get_model_schema.return_value = schema
|
|
monkeypatch.setattr(service, "_get_provider_configuration", MagicMock(return_value=provider_configuration))
|
|
|
|
result = service.get_model_parameter_rules(tenant_id="tenant-1", provider="openai", model="gpt-4o")
|
|
|
|
assert len(result) == expected_count
|
|
provider_configuration.get_current_credentials.assert_called_once_with(
|
|
model_type=ModelType.LLM,
|
|
model="gpt-4o",
|
|
)
|
|
if credentials:
|
|
provider_configuration.get_model_schema.assert_called_once_with(
|
|
model_type=ModelType.LLM,
|
|
model="gpt-4o",
|
|
credentials=credentials,
|
|
)
|
|
else:
|
|
provider_configuration.get_model_schema.assert_not_called()
|
|
|
|
def test_get_default_model_of_model_type_should_return_response_when_manager_returns_model(self) -> None:
|
|
service, manager = _create_service_with_mocked_manager()
|
|
manager.get_default_model.return_value = SimpleNamespace(
|
|
model="gpt-4o",
|
|
model_type=ModelType.LLM,
|
|
provider=SimpleNamespace(
|
|
provider="openai",
|
|
label=I18nObject(en_US="OpenAI"),
|
|
icon_small=None,
|
|
supported_model_types=[ModelType.LLM],
|
|
),
|
|
)
|
|
|
|
result = service.get_default_model_of_model_type(tenant_id="tenant-1", model_type=ModelType.LLM.value)
|
|
|
|
assert result is not None
|
|
assert result.model == "gpt-4o"
|
|
assert result.provider.provider == "openai"
|
|
manager.get_default_model.assert_called_once_with(tenant_id="tenant-1", model_type=ModelType.LLM)
|
|
|
|
def test_get_default_model_of_model_type_should_return_none_when_manager_returns_none(self) -> None:
|
|
service, manager = _create_service_with_mocked_manager()
|
|
manager.get_default_model.return_value = None
|
|
|
|
result = service.get_default_model_of_model_type(tenant_id="tenant-1", model_type=ModelType.LLM.value)
|
|
|
|
assert result is None
|
|
|
|
def test_get_default_model_of_model_type_should_return_none_when_manager_raises_exception(self) -> None:
|
|
service, manager = _create_service_with_mocked_manager()
|
|
manager.get_default_model.side_effect = RuntimeError("boom")
|
|
|
|
result = service.get_default_model_of_model_type(tenant_id="tenant-1", model_type=ModelType.LLM.value)
|
|
|
|
assert result is None
|
|
|
|
def test_update_default_model_of_model_type_should_delegate_to_provider_manager(self) -> None:
|
|
service, manager = _create_service_with_mocked_manager()
|
|
|
|
service.update_default_model_of_model_type(
|
|
tenant_id="tenant-1",
|
|
model_type=ModelType.LLM.value,
|
|
provider="openai",
|
|
model="gpt-4o",
|
|
)
|
|
|
|
manager.update_default_model_record.assert_called_once_with(
|
|
tenant_id="tenant-1",
|
|
model_type=ModelType.LLM,
|
|
provider="openai",
|
|
model="gpt-4o",
|
|
)
|
|
|
|
def test_get_model_provider_icon_should_fetch_icon_bytes_from_factory(
|
|
self,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
service = ModelProviderService()
|
|
factory_instance = MagicMock()
|
|
factory_instance.get_provider_icon.return_value = (b"icon-bytes", "image/png")
|
|
factory_constructor = MagicMock(return_value=factory_instance)
|
|
monkeypatch.setattr(service_module, "create_plugin_model_provider_factory", factory_constructor)
|
|
|
|
result = service.get_model_provider_icon(
|
|
tenant_id="tenant-1",
|
|
provider="openai",
|
|
icon_type="icon_small",
|
|
lang="en_US",
|
|
)
|
|
|
|
factory_constructor.assert_called_once_with(tenant_id="tenant-1")
|
|
factory_instance.get_provider_icon.assert_called_once_with("openai", "icon_small", "en_US")
|
|
assert result == (b"icon-bytes", "image/png")
|
|
|
|
def test_switch_preferred_provider_should_convert_enum_and_delegate(
|
|
self,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
service = ModelProviderService()
|
|
provider_configuration = MagicMock()
|
|
monkeypatch.setattr(service, "_get_provider_configuration", MagicMock(return_value=provider_configuration))
|
|
|
|
service.switch_preferred_provider(
|
|
tenant_id="tenant-1",
|
|
provider="openai",
|
|
preferred_provider_type=ProviderType.SYSTEM.value,
|
|
)
|
|
|
|
provider_configuration.switch_preferred_provider_type.assert_called_once_with(ProviderType.SYSTEM)
|
|
|
|
@pytest.mark.parametrize(
|
|
("method_name", "provider_method_name"),
|
|
[
|
|
("enable_model", "enable_model"),
|
|
("disable_model", "disable_model"),
|
|
],
|
|
)
|
|
def test_model_enablement_methods_should_convert_model_type_and_delegate(
|
|
self,
|
|
method_name: str,
|
|
provider_method_name: str,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
service = ModelProviderService()
|
|
provider_configuration = MagicMock()
|
|
monkeypatch.setattr(service, "_get_provider_configuration", MagicMock(return_value=provider_configuration))
|
|
|
|
getattr(service, method_name)(
|
|
tenant_id="tenant-1",
|
|
provider="openai",
|
|
model="gpt-4o",
|
|
model_type=ModelType.LLM.value,
|
|
)
|
|
|
|
getattr(provider_configuration, provider_method_name).assert_called_once_with(
|
|
model="gpt-4o",
|
|
model_type=ModelType.LLM,
|
|
)
|