mirror of
https://github.com/langgenius/dify.git
synced 2026-05-13 08:57:28 +08:00
refactor(auth): standardize failed login audit logging (#35054)
This commit is contained in:
parent
b5bbbdd840
commit
8dd4473432
@ -1,7 +1,10 @@
|
||||
import logging
|
||||
|
||||
import flask_login
|
||||
from flask import make_response, request
|
||||
from flask_restx import Resource
|
||||
from pydantic import BaseModel, Field
|
||||
from werkzeug.exceptions import Unauthorized
|
||||
|
||||
import services
|
||||
from configs import dify_config
|
||||
@ -42,12 +45,13 @@ from libs.token import (
|
||||
)
|
||||
from services.account_service import AccountService, InvitationDetailDict, RegisterService, TenantService
|
||||
from services.billing_service import BillingService
|
||||
from services.entities.auth_entities import LoginPayloadBase
|
||||
from services.entities.auth_entities import LoginFailureReason, LoginPayloadBase
|
||||
from services.errors.account import AccountRegisterError
|
||||
from services.errors.workspace import WorkSpaceNotAllowedCreateError, WorkspacesLimitExceededError
|
||||
from services.feature_service import FeatureService
|
||||
|
||||
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LoginPayload(LoginPayloadBase):
|
||||
@ -91,10 +95,12 @@ class LoginApi(Resource):
|
||||
normalized_email = request_email.lower()
|
||||
|
||||
if dify_config.BILLING_ENABLED and BillingService.is_email_in_freeze(normalized_email):
|
||||
_log_console_login_failure(email=normalized_email, reason=LoginFailureReason.ACCOUNT_IN_FREEZE)
|
||||
raise AccountInFreezeError()
|
||||
|
||||
is_login_error_rate_limit = AccountService.is_login_error_rate_limit(normalized_email)
|
||||
if is_login_error_rate_limit:
|
||||
_log_console_login_failure(email=normalized_email, reason=LoginFailureReason.LOGIN_RATE_LIMITED)
|
||||
raise EmailPasswordLoginLimitError()
|
||||
|
||||
invite_token = args.invite_token
|
||||
@ -110,14 +116,20 @@ class LoginApi(Resource):
|
||||
invitee_email = data.get("email") if data else None
|
||||
invitee_email_normalized = invitee_email.lower() if isinstance(invitee_email, str) else invitee_email
|
||||
if invitee_email_normalized != normalized_email:
|
||||
_log_console_login_failure(
|
||||
email=normalized_email,
|
||||
reason=LoginFailureReason.INVALID_INVITATION_EMAIL,
|
||||
)
|
||||
raise InvalidEmailError()
|
||||
account = _authenticate_account_with_case_fallback(
|
||||
request_email, normalized_email, args.password, invite_token
|
||||
)
|
||||
except services.errors.account.AccountLoginError:
|
||||
_log_console_login_failure(email=normalized_email, reason=LoginFailureReason.ACCOUNT_BANNED)
|
||||
raise AccountBannedError()
|
||||
except services.errors.account.AccountPasswordError as exc:
|
||||
AccountService.add_login_error_rate_limit(normalized_email)
|
||||
_log_console_login_failure(email=normalized_email, reason=LoginFailureReason.INVALID_CREDENTIALS)
|
||||
raise AuthenticationFailedError() from exc
|
||||
# SELF_HOSTED only have one workspace
|
||||
tenants = TenantService.get_join_tenants(account)
|
||||
@ -240,20 +252,27 @@ class EmailCodeLoginApi(Resource):
|
||||
|
||||
token_data = AccountService.get_email_code_login_data(args.token)
|
||||
if token_data is None:
|
||||
_log_console_login_failure(email=user_email, reason=LoginFailureReason.INVALID_EMAIL_CODE_TOKEN)
|
||||
raise InvalidTokenError()
|
||||
|
||||
token_email = token_data.get("email")
|
||||
normalized_token_email = token_email.lower() if isinstance(token_email, str) else token_email
|
||||
if normalized_token_email != user_email:
|
||||
_log_console_login_failure(email=user_email, reason=LoginFailureReason.EMAIL_CODE_EMAIL_MISMATCH)
|
||||
raise InvalidEmailError()
|
||||
|
||||
if token_data["code"] != args.code:
|
||||
_log_console_login_failure(email=user_email, reason=LoginFailureReason.INVALID_EMAIL_CODE)
|
||||
raise EmailCodeError()
|
||||
|
||||
AccountService.revoke_email_code_login_token(args.token)
|
||||
try:
|
||||
account = _get_account_with_case_fallback(original_email)
|
||||
except Unauthorized as exc:
|
||||
_log_console_login_failure(email=user_email, reason=LoginFailureReason.ACCOUNT_BANNED)
|
||||
raise AccountBannedError() from exc
|
||||
except AccountRegisterError:
|
||||
_log_console_login_failure(email=user_email, reason=LoginFailureReason.ACCOUNT_IN_FREEZE)
|
||||
raise AccountInFreezeError()
|
||||
if account:
|
||||
tenants = TenantService.get_join_tenants(account)
|
||||
@ -279,6 +298,7 @@ class EmailCodeLoginApi(Resource):
|
||||
except WorkSpaceNotAllowedCreateError:
|
||||
raise NotAllowedCreateWorkspace()
|
||||
except AccountRegisterError:
|
||||
_log_console_login_failure(email=user_email, reason=LoginFailureReason.ACCOUNT_IN_FREEZE)
|
||||
raise AccountInFreezeError()
|
||||
except WorkspacesLimitExceededError:
|
||||
raise WorkspacesLimitExceeded()
|
||||
@ -336,3 +356,12 @@ def _authenticate_account_with_case_fallback(
|
||||
if original_email == normalized_email:
|
||||
raise
|
||||
return AccountService.authenticate(normalized_email, password, invite_token)
|
||||
|
||||
|
||||
def _log_console_login_failure(*, email: str, reason: LoginFailureReason) -> None:
|
||||
logger.warning(
|
||||
"Console login failed: email=%s reason=%s ip_address=%s",
|
||||
email,
|
||||
reason,
|
||||
extract_remote_ip(request),
|
||||
)
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
import logging
|
||||
|
||||
from flask import make_response, request
|
||||
from flask_restx import Resource
|
||||
from jwt import InvalidTokenError
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from werkzeug.exceptions import Unauthorized
|
||||
|
||||
import services
|
||||
from configs import dify_config
|
||||
@ -20,7 +23,7 @@ from controllers.console.wraps import (
|
||||
)
|
||||
from controllers.web import web_ns
|
||||
from controllers.web.wraps import decode_jwt_token
|
||||
from libs.helper import EmailStr
|
||||
from libs.helper import EmailStr, extract_remote_ip
|
||||
from libs.passport import PassportService
|
||||
from libs.password import valid_password
|
||||
from libs.token import (
|
||||
@ -29,9 +32,11 @@ from libs.token import (
|
||||
)
|
||||
from services.account_service import AccountService
|
||||
from services.app_service import AppService
|
||||
from services.entities.auth_entities import LoginPayloadBase
|
||||
from services.entities.auth_entities import LoginFailureReason, LoginPayloadBase
|
||||
from services.webapp_auth_service import WebAppAuthService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LoginPayload(LoginPayloadBase):
|
||||
@field_validator("password")
|
||||
@ -76,14 +81,18 @@ class LoginApi(Resource):
|
||||
def post(self):
|
||||
"""Authenticate user and login."""
|
||||
payload = LoginPayload.model_validate(web_ns.payload or {})
|
||||
normalized_email = payload.email.lower()
|
||||
|
||||
try:
|
||||
account = WebAppAuthService.authenticate(payload.email, payload.password)
|
||||
except services.errors.account.AccountLoginError:
|
||||
_log_web_login_failure(email=normalized_email, reason=LoginFailureReason.ACCOUNT_BANNED)
|
||||
raise AccountBannedError()
|
||||
except services.errors.account.AccountPasswordError:
|
||||
_log_web_login_failure(email=normalized_email, reason=LoginFailureReason.INVALID_CREDENTIALS)
|
||||
raise AuthenticationFailedError()
|
||||
except services.errors.account.AccountNotFoundError:
|
||||
_log_web_login_failure(email=normalized_email, reason=LoginFailureReason.ACCOUNT_NOT_FOUND)
|
||||
raise AuthenticationFailedError()
|
||||
|
||||
token = WebAppAuthService.login(account=account)
|
||||
@ -212,21 +221,30 @@ class EmailCodeLoginApi(Resource):
|
||||
|
||||
token_data = WebAppAuthService.get_email_code_login_data(payload.token)
|
||||
if token_data is None:
|
||||
_log_web_login_failure(email=user_email, reason=LoginFailureReason.INVALID_EMAIL_CODE_TOKEN)
|
||||
raise InvalidTokenError()
|
||||
|
||||
token_email = token_data.get("email")
|
||||
if not isinstance(token_email, str):
|
||||
_log_web_login_failure(email=user_email, reason=LoginFailureReason.EMAIL_CODE_EMAIL_MISMATCH)
|
||||
raise InvalidEmailError()
|
||||
normalized_token_email = token_email.lower()
|
||||
if normalized_token_email != user_email:
|
||||
_log_web_login_failure(email=user_email, reason=LoginFailureReason.EMAIL_CODE_EMAIL_MISMATCH)
|
||||
raise InvalidEmailError()
|
||||
|
||||
if token_data["code"] != payload.code:
|
||||
_log_web_login_failure(email=user_email, reason=LoginFailureReason.INVALID_EMAIL_CODE)
|
||||
raise EmailCodeError()
|
||||
|
||||
WebAppAuthService.revoke_email_code_login_token(payload.token)
|
||||
account = WebAppAuthService.get_user_through_email(token_email)
|
||||
try:
|
||||
account = WebAppAuthService.get_user_through_email(token_email)
|
||||
except Unauthorized as exc:
|
||||
_log_web_login_failure(email=user_email, reason=LoginFailureReason.ACCOUNT_BANNED)
|
||||
raise AccountBannedError() from exc
|
||||
if not account:
|
||||
_log_web_login_failure(email=user_email, reason=LoginFailureReason.ACCOUNT_NOT_FOUND)
|
||||
raise AuthenticationFailedError()
|
||||
|
||||
token = WebAppAuthService.login(account=account)
|
||||
@ -234,3 +252,12 @@ class EmailCodeLoginApi(Resource):
|
||||
response = make_response({"result": "success", "data": {"access_token": token}})
|
||||
# set_access_token_to_cookie(request, response, token, samesite="None", httponly=False)
|
||||
return response
|
||||
|
||||
|
||||
def _log_web_login_failure(*, email: str, reason: LoginFailureReason) -> None:
|
||||
logger.warning(
|
||||
"Web login failed: email=%s reason=%s ip_address=%s",
|
||||
email,
|
||||
reason,
|
||||
extract_remote_ip(request),
|
||||
)
|
||||
|
||||
@ -1,9 +1,25 @@
|
||||
from enum import StrEnum, auto
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from libs.helper import EmailStr
|
||||
from libs.password import valid_password
|
||||
|
||||
|
||||
class LoginFailureReason(StrEnum):
|
||||
"""Bounded reason codes for failed login audit logs."""
|
||||
|
||||
ACCOUNT_BANNED = auto()
|
||||
ACCOUNT_IN_FREEZE = auto()
|
||||
ACCOUNT_NOT_FOUND = auto()
|
||||
EMAIL_CODE_EMAIL_MISMATCH = auto()
|
||||
INVALID_CREDENTIALS = auto()
|
||||
INVALID_EMAIL_CODE = auto()
|
||||
INVALID_EMAIL_CODE_TOKEN = auto()
|
||||
INVALID_INVITATION_EMAIL = auto()
|
||||
LOGIN_RATE_LIMITED = auto()
|
||||
|
||||
|
||||
class LoginPayloadBase(BaseModel):
|
||||
email: EmailStr
|
||||
password: str
|
||||
|
||||
@ -14,18 +14,20 @@ from unittest.mock import MagicMock, patch
|
||||
import pytest
|
||||
from flask import Flask
|
||||
from flask_restx import Api
|
||||
from werkzeug.exceptions import Unauthorized
|
||||
|
||||
from controllers.console.auth.error import (
|
||||
AuthenticationFailedError,
|
||||
EmailPasswordLoginLimitError,
|
||||
InvalidEmailError,
|
||||
)
|
||||
from controllers.console.auth.login import LoginApi, LogoutApi
|
||||
from controllers.console.auth.login import EmailCodeLoginApi, LoginApi, LogoutApi
|
||||
from controllers.console.error import (
|
||||
AccountBannedError,
|
||||
AccountInFreezeError,
|
||||
WorkspacesLimitExceeded,
|
||||
)
|
||||
from services.entities.auth_entities import LoginFailureReason
|
||||
from services.errors.account import AccountLoginError, AccountPasswordError
|
||||
|
||||
|
||||
@ -34,6 +36,11 @@ def encode_password(password: str) -> str:
|
||||
return base64.b64encode(password.encode("utf-8")).decode()
|
||||
|
||||
|
||||
def encode_code(code: str) -> str:
|
||||
"""Helper to encode verification code as Base64 for testing."""
|
||||
return base64.b64encode(code.encode("utf-8")).decode()
|
||||
|
||||
|
||||
class TestLoginApi:
|
||||
"""Test cases for the LoginApi endpoint."""
|
||||
|
||||
@ -197,12 +204,17 @@ class TestLoginApi:
|
||||
mock_get_invitation.return_value = None
|
||||
|
||||
# Act & Assert
|
||||
with app.test_request_context(
|
||||
"/login", method="POST", json={"email": "test@example.com", "password": encode_password("password")}
|
||||
):
|
||||
login_api = LoginApi()
|
||||
with pytest.raises(EmailPasswordLoginLimitError):
|
||||
login_api.post()
|
||||
with patch("controllers.console.auth.login.logger.warning") as mock_log_warning:
|
||||
with app.test_request_context(
|
||||
"/login", method="POST", json={"email": "test@example.com", "password": encode_password("password")}
|
||||
):
|
||||
login_api = LoginApi()
|
||||
with pytest.raises(EmailPasswordLoginLimitError):
|
||||
login_api.post()
|
||||
|
||||
assert mock_log_warning.call_count == 1
|
||||
assert mock_log_warning.call_args.args[1] == "test@example.com"
|
||||
assert mock_log_warning.call_args.args[2] == LoginFailureReason.LOGIN_RATE_LIMITED
|
||||
|
||||
@patch("controllers.console.wraps.db")
|
||||
@patch("controllers.console.auth.login.dify_config.BILLING_ENABLED", True)
|
||||
@ -220,12 +232,17 @@ class TestLoginApi:
|
||||
mock_is_frozen.return_value = True
|
||||
|
||||
# Act & Assert
|
||||
with app.test_request_context(
|
||||
"/login", method="POST", json={"email": "frozen@example.com", "password": encode_password("password")}
|
||||
):
|
||||
login_api = LoginApi()
|
||||
with pytest.raises(AccountInFreezeError):
|
||||
login_api.post()
|
||||
with patch("controllers.console.auth.login.logger.warning") as mock_log_warning:
|
||||
with app.test_request_context(
|
||||
"/login", method="POST", json={"email": "frozen@example.com", "password": encode_password("password")}
|
||||
):
|
||||
login_api = LoginApi()
|
||||
with pytest.raises(AccountInFreezeError):
|
||||
login_api.post()
|
||||
|
||||
assert mock_log_warning.call_count == 1
|
||||
assert mock_log_warning.call_args.args[1] == "frozen@example.com"
|
||||
assert mock_log_warning.call_args.args[2] == LoginFailureReason.ACCOUNT_IN_FREEZE
|
||||
|
||||
@patch("controllers.console.wraps.db")
|
||||
@patch("controllers.console.auth.login.dify_config.BILLING_ENABLED", False)
|
||||
@ -257,14 +274,20 @@ class TestLoginApi:
|
||||
mock_authenticate.side_effect = AccountPasswordError("Invalid password")
|
||||
|
||||
# Act & Assert
|
||||
with app.test_request_context(
|
||||
"/login", method="POST", json={"email": "test@example.com", "password": encode_password("WrongPass123!")}
|
||||
):
|
||||
login_api = LoginApi()
|
||||
with pytest.raises(AuthenticationFailedError):
|
||||
login_api.post()
|
||||
with patch("controllers.console.auth.login.logger.warning") as mock_log_warning:
|
||||
with app.test_request_context(
|
||||
"/login",
|
||||
method="POST",
|
||||
json={"email": "test@example.com", "password": encode_password("WrongPass123!")},
|
||||
):
|
||||
login_api = LoginApi()
|
||||
with pytest.raises(AuthenticationFailedError):
|
||||
login_api.post()
|
||||
|
||||
mock_add_rate_limit.assert_called_once_with("test@example.com")
|
||||
assert mock_log_warning.call_count == 1
|
||||
assert mock_log_warning.call_args.args[1] == "test@example.com"
|
||||
assert mock_log_warning.call_args.args[2] == LoginFailureReason.INVALID_CREDENTIALS
|
||||
|
||||
@patch("controllers.console.wraps.db")
|
||||
@patch("controllers.console.auth.login.dify_config.BILLING_ENABLED", False)
|
||||
@ -288,12 +311,19 @@ class TestLoginApi:
|
||||
mock_authenticate.side_effect = AccountLoginError("Account is banned")
|
||||
|
||||
# Act & Assert
|
||||
with app.test_request_context(
|
||||
"/login", method="POST", json={"email": "banned@example.com", "password": encode_password("ValidPass123!")}
|
||||
):
|
||||
login_api = LoginApi()
|
||||
with pytest.raises(AccountBannedError):
|
||||
login_api.post()
|
||||
with patch("controllers.console.auth.login.logger.warning") as mock_log_warning:
|
||||
with app.test_request_context(
|
||||
"/login",
|
||||
method="POST",
|
||||
json={"email": "banned@example.com", "password": encode_password("ValidPass123!")},
|
||||
):
|
||||
login_api = LoginApi()
|
||||
with pytest.raises(AccountBannedError):
|
||||
login_api.post()
|
||||
|
||||
assert mock_log_warning.call_count == 1
|
||||
assert mock_log_warning.call_args.args[1] == "banned@example.com"
|
||||
assert mock_log_warning.call_args.args[2] == LoginFailureReason.ACCOUNT_BANNED
|
||||
|
||||
@patch("controllers.console.wraps.db")
|
||||
@patch("controllers.console.auth.login.dify_config.BILLING_ENABLED", False)
|
||||
@ -417,6 +447,36 @@ class TestLoginApi:
|
||||
mock_add_rate_limit.assert_not_called()
|
||||
mock_reset_rate_limit.assert_called_once_with("upper@example.com")
|
||||
|
||||
@patch("controllers.console.wraps.db")
|
||||
@patch("controllers.console.auth.login.AccountService.get_email_code_login_data")
|
||||
@patch("controllers.console.auth.login.AccountService.revoke_email_code_login_token")
|
||||
@patch("controllers.console.auth.login._get_account_with_case_fallback")
|
||||
def test_email_code_login_logs_banned_account(
|
||||
self,
|
||||
mock_get_account,
|
||||
mock_revoke_token,
|
||||
mock_get_token_data,
|
||||
mock_db,
|
||||
app,
|
||||
):
|
||||
mock_db.session.query.return_value.first.return_value = MagicMock()
|
||||
mock_get_token_data.return_value = {"email": "User@Example.com", "code": "123456"}
|
||||
mock_get_account.side_effect = Unauthorized("Account is banned.")
|
||||
|
||||
with patch("controllers.console.auth.login.logger.warning") as mock_log_warning:
|
||||
with app.test_request_context(
|
||||
"/email-code-login/validity",
|
||||
method="POST",
|
||||
json={"email": "User@Example.com", "code": encode_code("123456"), "token": "token-123"},
|
||||
):
|
||||
with pytest.raises(AccountBannedError):
|
||||
EmailCodeLoginApi().post()
|
||||
|
||||
mock_revoke_token.assert_called_once_with("token-123")
|
||||
assert mock_log_warning.call_count == 1
|
||||
assert mock_log_warning.call_args.args[1] == "user@example.com"
|
||||
assert mock_log_warning.call_args.args[2] == LoginFailureReason.ACCOUNT_BANNED
|
||||
|
||||
|
||||
class TestLogoutApi:
|
||||
"""Test cases for the LogoutApi endpoint."""
|
||||
|
||||
@ -4,9 +4,12 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
from jwt import InvalidTokenError
|
||||
from werkzeug.exceptions import Unauthorized
|
||||
|
||||
import services.errors.account
|
||||
from controllers.web.login import EmailCodeLoginApi, EmailCodeLoginSendEmailApi, LoginApi, LoginStatusApi, LogoutApi
|
||||
from services.entities.auth_entities import LoginFailureReason
|
||||
|
||||
|
||||
def encode_code(code: str) -> str:
|
||||
@ -115,13 +118,18 @@ class TestLoginApi:
|
||||
def test_login_banned_account(self, mock_auth: MagicMock, app: Flask) -> None:
|
||||
from controllers.console.error import AccountBannedError
|
||||
|
||||
with app.test_request_context(
|
||||
"/web/login",
|
||||
method="POST",
|
||||
json={"email": "user@example.com", "password": base64.b64encode(b"Valid1234").decode()},
|
||||
):
|
||||
with pytest.raises(AccountBannedError):
|
||||
LoginApi().post()
|
||||
with patch("controllers.web.login.logger.warning") as mock_log_warning:
|
||||
with app.test_request_context(
|
||||
"/web/login",
|
||||
method="POST",
|
||||
json={"email": "user@example.com", "password": base64.b64encode(b"Valid1234").decode()},
|
||||
):
|
||||
with pytest.raises(AccountBannedError):
|
||||
LoginApi().post()
|
||||
|
||||
assert mock_log_warning.call_count == 1
|
||||
assert mock_log_warning.call_args.args[1] == "user@example.com"
|
||||
assert mock_log_warning.call_args.args[2] == LoginFailureReason.ACCOUNT_BANNED
|
||||
|
||||
@patch(
|
||||
"controllers.web.login.WebAppAuthService.authenticate",
|
||||
@ -130,13 +138,87 @@ class TestLoginApi:
|
||||
def test_login_wrong_password(self, mock_auth: MagicMock, app: Flask) -> None:
|
||||
from controllers.console.auth.error import AuthenticationFailedError
|
||||
|
||||
with app.test_request_context(
|
||||
"/web/login",
|
||||
method="POST",
|
||||
json={"email": "user@example.com", "password": base64.b64encode(b"Valid1234").decode()},
|
||||
):
|
||||
with pytest.raises(AuthenticationFailedError):
|
||||
LoginApi().post()
|
||||
with patch("controllers.web.login.logger.warning") as mock_log_warning:
|
||||
with app.test_request_context(
|
||||
"/web/login",
|
||||
method="POST",
|
||||
json={"email": "user@example.com", "password": base64.b64encode(b"Valid1234").decode()},
|
||||
):
|
||||
with pytest.raises(AuthenticationFailedError):
|
||||
LoginApi().post()
|
||||
|
||||
assert mock_log_warning.call_count == 1
|
||||
assert mock_log_warning.call_args.args[1] == "user@example.com"
|
||||
assert mock_log_warning.call_args.args[2] == LoginFailureReason.INVALID_CREDENTIALS
|
||||
|
||||
@patch(
|
||||
"controllers.web.login.WebAppAuthService.authenticate",
|
||||
side_effect=services.errors.account.AccountNotFoundError(),
|
||||
)
|
||||
def test_login_account_not_found(self, mock_auth: MagicMock, app: Flask) -> None:
|
||||
from controllers.console.auth.error import AuthenticationFailedError
|
||||
|
||||
with patch("controllers.web.login.logger.warning") as mock_log_warning:
|
||||
with app.test_request_context(
|
||||
"/web/login",
|
||||
method="POST",
|
||||
json={"email": "missing@example.com", "password": base64.b64encode(b"Valid1234").decode()},
|
||||
):
|
||||
with pytest.raises(AuthenticationFailedError):
|
||||
LoginApi().post()
|
||||
|
||||
assert mock_log_warning.call_count == 1
|
||||
assert mock_log_warning.call_args.args[1] == "missing@example.com"
|
||||
assert mock_log_warning.call_args.args[2] == LoginFailureReason.ACCOUNT_NOT_FOUND
|
||||
|
||||
@patch("controllers.web.login.WebAppAuthService.get_email_code_login_data", return_value=None)
|
||||
def test_email_code_login_logs_invalid_token(self, mock_get_token_data: MagicMock, app: Flask) -> None:
|
||||
with patch("controllers.web.login.logger.warning") as mock_log_warning:
|
||||
with app.test_request_context(
|
||||
"/web/email-code-login/validity",
|
||||
method="POST",
|
||||
json={"email": "user@example.com", "code": encode_code("123456"), "token": "token-123"},
|
||||
):
|
||||
with pytest.raises(InvalidTokenError):
|
||||
EmailCodeLoginApi().post()
|
||||
|
||||
mock_get_token_data.assert_called_once_with("token-123")
|
||||
assert mock_log_warning.call_count == 1
|
||||
assert mock_log_warning.call_args.args[1] == "user@example.com"
|
||||
assert mock_log_warning.call_args.args[2] == LoginFailureReason.INVALID_EMAIL_CODE_TOKEN
|
||||
|
||||
@patch("controllers.web.login.WebAppAuthService.revoke_email_code_login_token")
|
||||
@patch(
|
||||
"controllers.web.login.WebAppAuthService.get_user_through_email",
|
||||
side_effect=Unauthorized("Account is banned."),
|
||||
)
|
||||
@patch(
|
||||
"controllers.web.login.WebAppAuthService.get_email_code_login_data",
|
||||
return_value={"email": "User@Example.com", "code": "123456"},
|
||||
)
|
||||
def test_email_code_login_logs_banned_account(
|
||||
self,
|
||||
mock_get_token_data: MagicMock,
|
||||
mock_get_user: MagicMock,
|
||||
mock_revoke_token: MagicMock,
|
||||
app: Flask,
|
||||
) -> None:
|
||||
from controllers.console.error import AccountBannedError
|
||||
|
||||
with patch("controllers.web.login.logger.warning") as mock_log_warning:
|
||||
with app.test_request_context(
|
||||
"/web/email-code-login/validity",
|
||||
method="POST",
|
||||
json={"email": "User@Example.com", "code": encode_code("123456"), "token": "token-123"},
|
||||
):
|
||||
with pytest.raises(AccountBannedError):
|
||||
EmailCodeLoginApi().post()
|
||||
|
||||
mock_get_token_data.assert_called_once_with("token-123")
|
||||
mock_revoke_token.assert_called_once_with("token-123")
|
||||
assert mock_log_warning.call_count == 1
|
||||
assert mock_log_warning.call_args.args[1] == "user@example.com"
|
||||
assert mock_log_warning.call_args.args[2] == LoginFailureReason.ACCOUNT_BANNED
|
||||
|
||||
|
||||
class TestLoginStatusApi:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user