fix: IDOR on console GET /account/avatar (#35771)

Signed-off-by: NeatGuyCoding <15627489+NeatGuyCoding@users.noreply.github.com>
This commit is contained in:
NeatGuyCoding 2026-05-03 22:42:56 +08:00 committed by GitHub
parent 3708e3eef1
commit 7ba408eebe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 146 additions and 1 deletions

View File

@ -8,6 +8,7 @@ from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field, field_validator, model_validator
from sqlalchemy import select
from werkzeug.exceptions import NotFound
from configs import dify_config
from constants.languages import supported_language
@ -45,6 +46,8 @@ from libs.helper import EmailStr, extract_remote_ip, timezone
from libs.login import current_account_with_tenant, login_required
from models import AccountIntegrate, InvitationCode
from models.account import AccountStatus, InvitationCodeStatus
from models.enums import CreatorUserRole
from models.model import UploadFile
from services.account_service import AccountService
from services.billing_service import BillingService
from services.errors.account import CurrentPasswordIncorrectError as ServiceCurrentPasswordIncorrectError
@ -322,9 +325,24 @@ class AccountAvatarApi(Resource):
@login_required
@account_initialization_required
def get(self):
current_user, current_tenant_id = current_account_with_tenant()
args = AccountAvatarQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
avatar = args.avatar
avatar_url = file_helpers.get_signed_file_url(args.avatar)
if avatar.startswith(("http://", "https://")):
return {"avatar_url": avatar}
upload_file = db.session.scalar(select(UploadFile).where(UploadFile.id == avatar).limit(1))
if upload_file is None:
raise NotFound("Avatar file not found")
if upload_file.tenant_id != current_tenant_id:
raise NotFound("Avatar file not found")
if upload_file.created_by_role != CreatorUserRole.ACCOUNT or upload_file.created_by != current_user.id:
raise NotFound("Avatar file not found")
avatar_url = file_helpers.get_signed_file_url(upload_file_id=upload_file.id)
return {"avatar_url": avatar_url}
@console_ns.expect(console_ns.models[AccountAvatarPayload.__name__])

View File

@ -1,6 +1,7 @@
from unittest.mock import MagicMock, PropertyMock, patch
import pytest
from werkzeug.exceptions import NotFound
from controllers.console import console_ns
from controllers.console.auth.error import (
@ -29,6 +30,7 @@ from controllers.console.workspace.error import (
CurrentPasswordIncorrectError,
InvalidAccountDeletionCodeError,
)
from models.enums import CreatorUserRole
from services.errors.account import CurrentPasswordIncorrectError as ServicePwdError
@ -135,6 +137,131 @@ class TestAccountUpdateApis:
assert result["id"] == "u1"
class TestAccountAvatarApiGet:
"""GET /account/avatar must not sign arbitrary upload_file IDs (IDOR)."""
def test_get_avatar_signed_url_when_upload_owned_by_current_account(self, app):
api = AccountAvatarApi()
method = unwrap(api.get)
user = MagicMock()
user.id = "acc-owner"
tenant_id = "tenant-1"
file_id = "550e8400-e29b-41d4-a716-446655440000"
upload_file = MagicMock()
upload_file.id = file_id
upload_file.tenant_id = tenant_id
upload_file.created_by = user.id
upload_file.created_by_role = CreatorUserRole.ACCOUNT
with (
app.test_request_context(f"/account/avatar?avatar={file_id}"),
patch(
"controllers.console.workspace.account.current_account_with_tenant",
return_value=(user, tenant_id),
),
patch("controllers.console.workspace.account.db.session.scalar", return_value=upload_file),
patch(
"controllers.console.workspace.account.file_helpers.get_signed_file_url",
return_value="https://signed/example",
) as sign_mock,
):
result = method(api)
assert result == {"avatar_url": "https://signed/example"}
sign_mock.assert_called_once_with(upload_file_id=file_id)
def test_get_avatar_not_found_when_upload_created_by_other_account_same_tenant(self, app):
api = AccountAvatarApi()
method = unwrap(api.get)
user = MagicMock()
user.id = "acc-a"
tenant_id = "tenant-1"
file_id = "550e8400-e29b-41d4-a716-446655440001"
upload_file = MagicMock()
upload_file.id = file_id
upload_file.tenant_id = tenant_id
upload_file.created_by = "acc-b"
upload_file.created_by_role = CreatorUserRole.ACCOUNT
with (
app.test_request_context(f"/account/avatar?avatar={file_id}"),
patch(
"controllers.console.workspace.account.current_account_with_tenant",
return_value=(user, tenant_id),
),
patch("controllers.console.workspace.account.db.session.scalar", return_value=upload_file),
patch(
"controllers.console.workspace.account.file_helpers.get_signed_file_url",
return_value="https://signed/leak",
) as sign_mock,
):
with pytest.raises(NotFound):
method(api)
sign_mock.assert_not_called()
def test_get_avatar_not_found_when_upload_belongs_to_other_tenant(self, app):
api = AccountAvatarApi()
method = unwrap(api.get)
user = MagicMock()
user.id = "acc-owner"
tenant_id = "tenant-1"
file_id = "550e8400-e29b-41d4-a716-446655440002"
upload_file = MagicMock()
upload_file.id = file_id
upload_file.tenant_id = "tenant-other"
upload_file.created_by = user.id
upload_file.created_by_role = CreatorUserRole.ACCOUNT
with (
app.test_request_context(f"/account/avatar?avatar={file_id}"),
patch(
"controllers.console.workspace.account.current_account_with_tenant",
return_value=(user, tenant_id),
),
patch("controllers.console.workspace.account.db.session.scalar", return_value=upload_file),
patch(
"controllers.console.workspace.account.file_helpers.get_signed_file_url",
return_value="https://signed/leak",
) as sign_mock,
):
with pytest.raises(NotFound):
method(api)
sign_mock.assert_not_called()
def test_get_avatar_https_pass_through_without_signing(self, app):
api = AccountAvatarApi()
method = unwrap(api.get)
user = MagicMock()
user.id = "acc-owner"
tenant_id = "tenant-1"
external = "https://cdn.example/avatar.png"
with (
app.test_request_context(f"/account/avatar?avatar={external}"),
patch(
"controllers.console.workspace.account.current_account_with_tenant",
return_value=(user, tenant_id),
),
patch(
"controllers.console.workspace.account.file_helpers.get_signed_file_url",
return_value="https://signed/should-not-use",
) as sign_mock,
):
result = method(api)
assert result == {"avatar_url": external}
sign_mock.assert_not_called()
class TestAccountPasswordApi:
def test_password_success(self, app):
api = AccountPasswordApi()