diff --git a/api/tests/unit_tests/services/test_webapp_auth_service.py b/api/tests/unit_tests/services/test_webapp_auth_service.py deleted file mode 100644 index 262c1f1524..0000000000 --- a/api/tests/unit_tests/services/test_webapp_auth_service.py +++ /dev/null @@ -1,379 +0,0 @@ -from __future__ import annotations - -from datetime import UTC, datetime -from types import SimpleNamespace -from typing import Any, cast -from unittest.mock import MagicMock - -import pytest -from pytest_mock import MockerFixture -from werkzeug.exceptions import NotFound, Unauthorized - -from models import Account, AccountStatus -from services.errors.account import AccountLoginError, AccountNotFoundError, AccountPasswordError -from services.webapp_auth_service import WebAppAuthService, WebAppAuthType - -ACCOUNT_LOOKUP_PATH = "services.webapp_auth_service.AccountService.get_account_by_email_with_case_fallback" -TOKEN_GENERATE_PATH = "services.webapp_auth_service.TokenManager.generate_token" -TOKEN_GET_DATA_PATH = "services.webapp_auth_service.TokenManager.get_token_data" - - -def _account(**kwargs: Any) -> Account: - return cast(Account, SimpleNamespace(**kwargs)) - - -@pytest.fixture -def mock_db(mocker: MockerFixture) -> MagicMock: - # Arrange - mocked_db = mocker.patch("services.webapp_auth_service.db") - mocked_db.session = MagicMock() - return mocked_db - - -def test_authenticate_should_raise_account_not_found_when_email_does_not_exist(mocker: MockerFixture) -> None: - # Arrange - mocker.patch(ACCOUNT_LOOKUP_PATH, return_value=None) - - # Act + Assert - with pytest.raises(AccountNotFoundError): - WebAppAuthService.authenticate("user@example.com", "pwd") - - -def test_authenticate_should_raise_account_login_error_when_account_is_banned(mocker: MockerFixture) -> None: - # Arrange - account = SimpleNamespace(status=AccountStatus.BANNED, password="hash", password_salt="salt") - mocker.patch( - ACCOUNT_LOOKUP_PATH, - return_value=account, - ) - - # Act + Assert - with pytest.raises(AccountLoginError, match="Account is banned"): - WebAppAuthService.authenticate("user@example.com", "pwd") - - -@pytest.mark.parametrize("password_value", [None, "hash"]) -def test_authenticate_should_raise_password_error_when_password_is_invalid( - password_value: str | None, - mocker: MockerFixture, -) -> None: - # Arrange - account = SimpleNamespace(status=AccountStatus.ACTIVE, password=password_value, password_salt="salt") - mocker.patch( - ACCOUNT_LOOKUP_PATH, - return_value=account, - ) - mocker.patch("services.webapp_auth_service.compare_password", return_value=False) - - # Act + Assert - with pytest.raises(AccountPasswordError, match="Invalid email or password"): - WebAppAuthService.authenticate("user@example.com", "pwd") - - -def test_authenticate_should_return_account_when_credentials_are_valid(mocker: MockerFixture) -> None: - # Arrange - account = SimpleNamespace(status=AccountStatus.ACTIVE, password="hash", password_salt="salt") - mocker.patch( - ACCOUNT_LOOKUP_PATH, - return_value=account, - ) - mocker.patch("services.webapp_auth_service.compare_password", return_value=True) - - # Act - result = WebAppAuthService.authenticate("user@example.com", "pwd") - - # Assert - assert result is account - - -def test_login_should_return_token_from_internal_token_builder(mocker: MockerFixture) -> None: - # Arrange - account = _account(id="a1", email="u@example.com") - mock_get_token = mocker.patch.object(WebAppAuthService, "_get_account_jwt_token", return_value="jwt-token") - - # Act - result = WebAppAuthService.login(account) - - # Assert - assert result == "jwt-token" - mock_get_token.assert_called_once_with(account=account) - - -def test_get_user_through_email_should_return_none_when_account_not_found(mocker: MockerFixture) -> None: - # Arrange - mocker.patch(ACCOUNT_LOOKUP_PATH, return_value=None) - - # Act - result = WebAppAuthService.get_user_through_email("missing@example.com") - - # Assert - assert result is None - - -def test_get_user_through_email_should_raise_unauthorized_when_account_banned(mocker: MockerFixture) -> None: - # Arrange - account = SimpleNamespace(status=AccountStatus.BANNED) - mocker.patch( - ACCOUNT_LOOKUP_PATH, - return_value=account, - ) - - # Act + Assert - with pytest.raises(Unauthorized, match="Account is banned"): - WebAppAuthService.get_user_through_email("user@example.com") - - -def test_get_user_through_email_should_return_account_when_active(mocker: MockerFixture) -> None: - # Arrange - account = SimpleNamespace(status=AccountStatus.ACTIVE) - mocker.patch( - ACCOUNT_LOOKUP_PATH, - return_value=account, - ) - - # Act - result = WebAppAuthService.get_user_through_email("user@example.com") - - # Assert - assert result is account - - -def test_send_email_code_login_email_should_raise_error_when_email_not_provided() -> None: - # Arrange - # Act + Assert - with pytest.raises(ValueError, match="Email must be provided"): - WebAppAuthService.send_email_code_login_email(account=None, email=None) - - -def test_send_email_code_login_email_should_generate_token_and_send_mail_for_account( - mocker: MockerFixture, -) -> None: - # Arrange - account = _account(email="user@example.com") - mocker.patch("services.webapp_auth_service.secrets.randbelow", side_effect=[1, 2, 3, 4, 5, 6]) - mock_generate_token = mocker.patch(TOKEN_GENERATE_PATH, return_value="token-1") - mock_delay = mocker.patch("services.webapp_auth_service.send_email_code_login_mail_task.delay") - - # Act - result = WebAppAuthService.send_email_code_login_email(account=account, language="en-US") - - # Assert - assert result == "token-1" - mock_generate_token.assert_called_once() - assert mock_generate_token.call_args.kwargs["additional_data"] == {"code": "123456"} - mock_delay.assert_called_once_with(language="en-US", to="user@example.com", code="123456") - - -def test_send_email_code_login_email_should_send_mail_for_email_without_account( - mocker: MockerFixture, -) -> None: - # Arrange - mocker.patch("services.webapp_auth_service.secrets.randbelow", side_effect=[0, 0, 0, 0, 0, 0]) - mocker.patch(TOKEN_GENERATE_PATH, return_value="token-2") - mock_delay = mocker.patch("services.webapp_auth_service.send_email_code_login_mail_task.delay") - - # Act - result = WebAppAuthService.send_email_code_login_email(account=None, email="alt@example.com", language="zh-Hans") - - # Assert - assert result == "token-2" - mock_delay.assert_called_once_with(language="zh-Hans", to="alt@example.com", code="000000") - - -def test_get_email_code_login_data_should_delegate_to_token_manager(mocker: MockerFixture) -> None: - # Arrange - mock_get_data = mocker.patch(TOKEN_GET_DATA_PATH, return_value={"code": "123"}) - - # Act - result = WebAppAuthService.get_email_code_login_data("token-abc") - - # Assert - assert result == {"code": "123"} - mock_get_data.assert_called_once_with("token-abc", "email_code_login") - - -def test_revoke_email_code_login_token_should_delegate_to_token_manager(mocker: MockerFixture) -> None: - # Arrange - mock_revoke = mocker.patch("services.webapp_auth_service.TokenManager.revoke_token") - - # Act - WebAppAuthService.revoke_email_code_login_token("token-xyz") - - # Assert - mock_revoke.assert_called_once_with("token-xyz", "email_code_login") - - -def test_create_end_user_should_raise_not_found_when_site_does_not_exist(mock_db: MagicMock) -> None: - # Arrange - mock_db.session.query.return_value.where.return_value.first.return_value = None - - # Act + Assert - with pytest.raises(NotFound, match="Site not found"): - WebAppAuthService.create_end_user("app-code", "user@example.com") - - -def test_create_end_user_should_raise_not_found_when_app_does_not_exist(mock_db: MagicMock) -> None: - # Arrange - site = SimpleNamespace(app_id="app-1") - app_query = MagicMock() - app_query.where.return_value.first.return_value = None - mock_db.session.query.return_value.where.return_value.first.side_effect = [site, None] - - # Act + Assert - with pytest.raises(NotFound, match="App not found"): - WebAppAuthService.create_end_user("app-code", "user@example.com") - - -def test_create_end_user_should_create_and_commit_end_user_when_data_is_valid(mock_db: MagicMock) -> None: - # Arrange - site = SimpleNamespace(app_id="app-1") - app_model = SimpleNamespace(tenant_id="tenant-1", id="app-1") - mock_db.session.query.return_value.where.return_value.first.side_effect = [site, app_model] - - # Act - result = WebAppAuthService.create_end_user("app-code", "user@example.com") - - # Assert - assert result.tenant_id == "tenant-1" - assert result.app_id == "app-1" - assert result.session_id == "user@example.com" - mock_db.session.add.assert_called_once() - mock_db.session.commit.assert_called_once() - - -def test_get_account_jwt_token_should_build_payload_and_issue_token(mocker: MockerFixture) -> None: - # Arrange - account = _account(id="a1", email="user@example.com") - mocker.patch("services.webapp_auth_service.dify_config.ACCESS_TOKEN_EXPIRE_MINUTES", 60) - mock_issue = mocker.patch("services.webapp_auth_service.PassportService.issue", return_value="jwt-1") - - # Act - token = WebAppAuthService._get_account_jwt_token(account) - - # Assert - assert token == "jwt-1" - payload = mock_issue.call_args.args[0] - assert payload["user_id"] == "a1" - assert payload["session_id"] == "user@example.com" - assert payload["token_source"] == "webapp_login_token" - assert payload["auth_type"] == "internal" - assert payload["exp"] > int(datetime.now(UTC).timestamp()) - - -@pytest.mark.parametrize( - ("access_mode", "expected"), - [ - ("private", True), - ("private_all", True), - ("public", False), - ], -) -def test_is_app_require_permission_check_should_use_access_mode_when_provided( - access_mode: str, - expected: bool, -) -> None: - # Arrange - # Act - result = WebAppAuthService.is_app_require_permission_check(access_mode=access_mode) - - # Assert - assert result is expected - - -def test_is_app_require_permission_check_should_raise_when_no_identifier_provided() -> None: - # Arrange - # Act + Assert - with pytest.raises(ValueError, match="Either app_code or app_id must be provided"): - WebAppAuthService.is_app_require_permission_check() - - -def test_is_app_require_permission_check_should_raise_when_app_id_cannot_be_determined(mocker: MockerFixture) -> None: - # Arrange - mocker.patch("services.webapp_auth_service.AppService.get_app_id_by_code", return_value=None) - - # Act + Assert - with pytest.raises(ValueError, match="App ID could not be determined"): - WebAppAuthService.is_app_require_permission_check(app_code="app-code") - - -def test_is_app_require_permission_check_should_return_true_when_enterprise_mode_requires_it( - mocker: MockerFixture, -) -> None: - # Arrange - mocker.patch("services.webapp_auth_service.AppService.get_app_id_by_code", return_value="app-1") - mocker.patch( - "services.webapp_auth_service.EnterpriseService.WebAppAuth.get_app_access_mode_by_id", - return_value=SimpleNamespace(access_mode="private"), - ) - - # Act - result = WebAppAuthService.is_app_require_permission_check(app_code="app-code") - - # Assert - assert result is True - - -def test_is_app_require_permission_check_should_return_false_when_enterprise_settings_do_not_require_it( - mocker: MockerFixture, -) -> None: - # Arrange - mocker.patch( - "services.webapp_auth_service.EnterpriseService.WebAppAuth.get_app_access_mode_by_id", - return_value=SimpleNamespace(access_mode="public"), - ) - - # Act - result = WebAppAuthService.is_app_require_permission_check(app_id="app-1") - - # Assert - assert result is False - - -@pytest.mark.parametrize( - ("access_mode", "expected"), - [ - ("public", WebAppAuthType.PUBLIC), - ("private", WebAppAuthType.INTERNAL), - ("private_all", WebAppAuthType.INTERNAL), - ("sso_verified", WebAppAuthType.EXTERNAL), - ], -) -def test_get_app_auth_type_should_map_access_modes_correctly( - access_mode: str, - expected: WebAppAuthType, -) -> None: - # Arrange - # Act - result = WebAppAuthService.get_app_auth_type(access_mode=access_mode) - - # Assert - assert result == expected - - -def test_get_app_auth_type_should_resolve_from_app_code(mocker: MockerFixture) -> None: - # Arrange - mocker.patch("services.webapp_auth_service.AppService.get_app_id_by_code", return_value="app-1") - mocker.patch( - "services.webapp_auth_service.EnterpriseService.WebAppAuth.get_app_access_mode_by_id", - return_value=SimpleNamespace(access_mode="private_all"), - ) - - # Act - result = WebAppAuthService.get_app_auth_type(app_code="app-code") - - # Assert - assert result == WebAppAuthType.INTERNAL - - -def test_get_app_auth_type_should_raise_when_no_input_provided() -> None: - # Arrange - # Act + Assert - with pytest.raises(ValueError, match="Either app_code or access_mode must be provided"): - WebAppAuthService.get_app_auth_type() - - -def test_get_app_auth_type_should_raise_when_cannot_determine_type_from_invalid_mode() -> None: - # Arrange - # Act + Assert - with pytest.raises(ValueError, match="Could not determine app authentication type"): - WebAppAuthService.get_app_auth_type(access_mode="unknown")