feat: guard openapi with rbac (#37752)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Yunlu Wen 2026-06-22 17:35:33 +08:00 committed by GitHub
parent 0d7ca17cd1
commit 82d08851be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 655 additions and 97 deletions

View File

@ -0,0 +1,107 @@
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from typing import TYPE_CHECKING
from services.enterprise import rbac_service as enterprise_rbac_service
if TYPE_CHECKING:
from services.app_service import AppListBaseParams
from services.enterprise.rbac_service import MyPermissionsResponse
# Permission keys (dot-notation, from MyPermissionsResponse) that grant
# list/preview access to an app. Keep this the single source of truth for both
# the console and OpenAPI app-list endpoints.
APP_LIST_PERMISSION_KEYS: frozenset[str] = frozenset({"app.preview", "app.acl.preview", "app.full_access"})
# Workspace permission key that lets a caller see apps they maintain even when
# those apps are not in their preview whitelist.
_MANAGE_OWN_APPS_PERMISSION_KEY = "app.create_and_management"
def has_app_list_permission(permission_keys: Sequence[str]) -> bool:
"""Return True if any of ``permission_keys`` grants app list/preview access."""
return any(permission_key in APP_LIST_PERMISSION_KEYS for permission_key in permission_keys)
@dataclass(frozen=True)
class AppAccessFilter:
"""Resolved RBAC visibility for app list/read endpoints.
``accessible_app_ids`` of ``None`` means the caller can see every app in the
workspace (unrestricted). Otherwise it is the exact set of app ids the
caller may preview; combined with ``can_manage_own_apps`` it also covers
apps the caller maintains.
"""
accessible_app_ids: set[str] | None
can_manage_own_apps: bool
@classmethod
def unrestricted(cls) -> AppAccessFilter:
"""Filter that imposes no restriction (RBAC disabled / not applicable)."""
return cls(accessible_app_ids=None, can_manage_own_apps=False)
def is_app_accessible(self, app_id: str, maintainer: str | None, account_id: str) -> bool:
"""Whether a single app is visible to the caller under this filter.
Mirrors the service-layer query gate: an app is visible when the filter
is unrestricted, the app id is whitelisted, or the caller maintains it
and holds ``app.create_and_management``.
"""
if self.accessible_app_ids is None:
return True
if app_id in self.accessible_app_ids:
return True
return self.can_manage_own_apps and maintainer is not None and maintainer == account_id
def apply_to_params(self, params: AppListBaseParams) -> None:
if self.accessible_app_ids is None:
return
params.accessible_app_ids = sorted(self.accessible_app_ids)
params.include_own_apps = self.can_manage_own_apps
def resolve_app_access_filter(
tenant_id: str,
account_id: str,
*,
permissions: MyPermissionsResponse | None = None,
) -> AppAccessFilter:
"""Compute the RBAC app-access filter for ``account_id`` in ``tenant_id``.
Pass ``permissions`` when the caller has already fetched the snapshot (the
console controller reuses it for per-app permission keys) to avoid a second
inner-API round trip; otherwise it is fetched here.
"""
if permissions is None:
permissions = enterprise_rbac_service.RBACService.MyPermissions.get(tenant_id, account_id)
whitelist_scope = enterprise_rbac_service.RBACService.AppAccess.whitelist_resources(tenant_id, account_id)
can_manage_own_apps = _MANAGE_OWN_APPS_PERMISSION_KEY in permissions.workspace.permission_keys
has_default_preview = has_app_list_permission(permissions.app.default_permission_keys) or has_app_list_permission(
permissions.workspace.permission_keys
)
permission_app_ids: set[str] | None = None
if not has_default_preview:
# Collect apps the caller can preview via per-app permission overrides.
permission_app_ids = {
override.resource_id
for override in permissions.app.overrides
if has_app_list_permission(override.permission_keys)
}
accessible_app_ids: set[str] | None
if getattr(whitelist_scope, "unrestricted", False):
accessible_app_ids = permission_app_ids
else:
accessible_app_ids = set(whitelist_scope.resource_ids)
if permission_app_ids is not None:
accessible_app_ids |= permission_app_ids
elif has_default_preview:
# Default preview overrides the whitelist restriction.
accessible_app_ids = None
return AppAccessFilter(accessible_app_ids=accessible_app_ids, can_manage_own_apps=can_manage_own_apps)

View File

@ -1,23 +1,3 @@
"""Shared decorator utilities for Dify controller layers.
This module provides decorators that are not tied to any single API group (e.g.
console, inner, service). Currently it exposes the RBAC permission gate, which
can be applied to any blueprint.
Key exports
-----------
``rbac_permission_required`` decorator that enforces enterprise RBAC access
control. When ``RBAC_ENABLED`` is ``False`` it is a no-op.
``RBACPermission``, ``RBACResourceScope`` re-exported from ``core.rbac`` so
callers only need a single import site.
Private helpers
---------------
``_extract_resource_id``, ``_is_resource_owned_by_current_user`` kept module-
private but accessible via the module namespace for unit-test patching.
"""
from collections.abc import Callable
from functools import wraps
@ -32,7 +12,57 @@ from models.dataset import Dataset
from models.model import App
from services.enterprise.rbac_service import RBACService
__all__ = ["RBACPermission", "RBACResourceScope", "rbac_permission_required"]
__all__ = ["RBACPermission", "RBACResourceScope", "enforce_rbac_access", "rbac_permission_required"]
def enforce_rbac_access(
*,
tenant_id: str,
account_id: str,
resource_type: RBACResourceScope,
scene: RBACPermission,
resource_required: bool = True,
path_args: dict[str, object] | None = None,
) -> None:
"""Enforce enterprise RBAC for an explicit account/tenant pair.
This is the flask-login-independent core of the RBAC gate so it can run
inside request-handling layers that resolve the caller themselves (e.g. the
openapi auth pipeline, which has the account on ``AuthData`` before
flask-login is mounted).
No-op when ``RBAC_ENABLED`` is ``False``. For resource-scoped checks the
resource ID is taken from ``path_args`` merged with ``request.view_args``;
resource ownership short-circuits the check. Raises ``Forbidden`` when
access is denied. For workspace-level checks pass ``resource_required=False``
so the RBAC request omits ``resource_id``.
Args:
tenant_id: The tenant the access is evaluated against.
account_id: The account requesting access.
resource_type: The :class:`RBACResourceScope` member (app/dataset/workspace).
scene: The :class:`RBACPermission` permission point, e.g. ``RBACPermission.APP_DELETE``.
resource_required: Whether a concrete resource ID is required.
path_args: Extra path arguments to merge with ``request.view_args``.
"""
if not dify_config.RBAC_ENABLED:
return
check_resource_type = None if resource_type == RBACResourceScope.WORKSPACE else resource_type
resource_id = None
if resource_required and check_resource_type:
resource_id = _extract_resource_id(resource_type, path_args)
if _is_resource_owned_by_current_user(tenant_id, account_id, resource_type, resource_id):
return
allowed = RBACService.CheckAccess.check(
tenant_id,
account_id,
scene=scene,
resource_type=check_resource_type,
resource_id=resource_id,
)
if not allowed:
raise Forbidden()
def rbac_permission_required[**P, R](
@ -41,14 +71,12 @@ def rbac_permission_required[**P, R](
*,
resource_required: bool = True,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""Check enterprise RBAC permissions for the current user.
"""Check enterprise RBAC permissions for the current flask-login user.
When ``RBAC_ENABLED`` is ``False`` the decorator is a no-op and the
request passes through unchanged. When enabled it extracts the resource ID
from ``request.view_args`` for resource-scoped checks, calls the RBAC
service ``check-access`` endpoint, and raises ``Forbidden`` if the access
is denied. For workspace-level checks, set ``resource_required=False`` so
the RBAC request omits ``resource_id``.
request passes through unchanged. When enabled it resolves the current
account/tenant and delegates to :func:`enforce_rbac_access`, raising
``Forbidden`` if access is denied.
Args:
resource_type: The :class:`RBACResourceScope` member (app/dataset/workspace).
@ -63,23 +91,14 @@ def rbac_permission_required[**P, R](
return view(*args, **kwargs)
current_user, current_tenant_id = current_account_with_tenant()
check_resource_type = None if resource_type == RBACResourceScope.WORKSPACE else resource_type
resource_id = None
if resource_required and check_resource_type:
resource_id = _extract_resource_id(resource_type, kwargs)
if _is_resource_owned_by_current_user(current_tenant_id, current_user.id, resource_type, resource_id):
return view(*args, **kwargs)
allowed = RBACService.CheckAccess.check(
current_tenant_id,
current_user.id,
enforce_rbac_access(
tenant_id=current_tenant_id,
account_id=current_user.id,
resource_type=resource_type,
scene=scene,
resource_type=check_resource_type,
resource_id=resource_id,
resource_required=resource_required,
path_args=kwargs,
)
if not allowed:
raise Forbidden()
return view(*args, **kwargs)
return decorated

View File

@ -14,6 +14,7 @@ from werkzeug.datastructures import MultiDict
from werkzeug.exceptions import BadRequest, NotFound
from configs import dify_config
from controllers.common.app_access import resolve_app_access_filter
from controllers.common.fields import RedirectUrlResponse, SimpleResultResponse
from controllers.common.helpers import FileInfo
from controllers.common.schema import (
@ -78,7 +79,6 @@ _TAG_IDS_BRACKET_PATTERN = re.compile(r"^tag_ids\[(\d+)\]$")
_CREATOR_IDS_BRACKET_PATTERN = re.compile(r"^creator_ids\[(\d+)\]$")
AppListMode = Literal["completion", "chat", "advanced-chat", "workflow", "agent-chat", "agent", "channel", "all"]
DEFAULT_APP_LIST_MODE: AppListMode = "all"
APP_LIST_PERMISSION_KEYS = frozenset({"app.preview", "app.acl.preview", "app.full_access"})
class AppListBaseQuery(BaseModel):
@ -167,10 +167,6 @@ def _normalize_app_list_query_args(query_args: MultiDict[str, str]) -> dict[str,
return normalized
def _has_app_list_permission(permission_keys: Sequence[str]) -> bool:
return any(permission_key in APP_LIST_PERMISSION_KEYS for permission_key in permission_keys)
class CreateAppPayload(BaseModel):
name: str = Field(..., min_length=1, description="App name")
description: str | None = Field(default=None, description="App description (max 400 chars)", max_length=400)
@ -612,38 +608,12 @@ class AppListApi(Resource):
current_user_id,
)
if dify_config.RBAC_ENABLED:
whitelist_scope = enterprise_rbac_service.RBACService.AppAccess.whitelist_resources(
access_filter = resolve_app_access_filter(
str(current_tenant_id),
current_user_id,
permissions=permissions,
)
can_manage_own_apps = "app.create_and_management" in permissions.workspace.permission_keys
has_default_preview = _has_app_list_permission(
permissions.app.default_permission_keys
) or _has_app_list_permission(permissions.workspace.permission_keys)
permission_app_ids: set[str] | None = None
if not has_default_preview:
permission_app_ids = {
override.resource_id
for override in permissions.app.overrides
if _has_app_list_permission(override.permission_keys)
}
if getattr(whitelist_scope, "unrestricted", False):
accessible_app_ids = permission_app_ids
else:
accessible_app_ids = set(whitelist_scope.resource_ids)
if permission_app_ids is not None:
accessible_app_ids |= permission_app_ids
elif has_default_preview:
accessible_app_ids = None
if accessible_app_ids:
params.accessible_app_ids = sorted(accessible_app_ids)
params.include_own_apps = can_manage_own_apps
elif accessible_app_ids is not None and can_manage_own_apps:
params.is_created_by_me = True
elif accessible_app_ids is not None:
params.accessible_app_ids = []
access_filter.apply_to_params(params)
# get app list
app_service = AppService()

View File

@ -5,11 +5,12 @@ from typing import cast
from flask_restx import Resource
from sqlalchemy.orm import Session
from controllers.common.wraps import RBACPermission, RBACResourceScope
from controllers.openapi import openapi_ns
from controllers.openapi._contract import accepts, returns
from controllers.openapi._models import AppDslExportQuery, AppDslExportResponse, AppDslImportPayload
from controllers.openapi.auth.composition import auth_router
from controllers.openapi.auth.data import AuthData
from controllers.openapi.auth.data import AuthData, RBACRequirement
from extensions.ext_database import db
from libs.oauth_bearer import Scope, TokenType
from models import Account, App
@ -37,6 +38,11 @@ class AppDslImportApi(Resource):
scope=Scope.WORKSPACE_WRITE,
allowed_token_types=frozenset({TokenType.OAUTH_ACCOUNT}),
allowed_roles=frozenset({TenantAccountRole.EDITOR, TenantAccountRole.ADMIN, TenantAccountRole.OWNER}),
rbac=RBACRequirement(
resource_type=RBACResourceScope.APP,
scene=RBACPermission.APP_IMPORT_EXPORT_DSL,
resource_required=False,
),
)
@returns(200, Import, "Import completed")
@returns(202, Import, "Import pending confirmation")
@ -89,6 +95,11 @@ class AppDslImportConfirmApi(Resource):
scope=Scope.WORKSPACE_WRITE,
allowed_token_types=frozenset({TokenType.OAUTH_ACCOUNT}),
allowed_roles=frozenset({TenantAccountRole.EDITOR, TenantAccountRole.ADMIN, TenantAccountRole.OWNER}),
rbac=RBACRequirement(
resource_type=RBACResourceScope.APP,
scene=RBACPermission.APP_IMPORT_EXPORT_DSL,
resource_required=False,
),
)
@returns(200, Import, "Import confirmed")
@returns(400, Import, "Import failed")
@ -125,6 +136,7 @@ class AppDslExportApi(Resource):
scope=Scope.APPS_READ,
allowed_token_types=frozenset({TokenType.OAUTH_ACCOUNT}),
allowed_roles=frozenset({TenantAccountRole.EDITOR, TenantAccountRole.ADMIN, TenantAccountRole.OWNER}),
rbac=RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_IMPORT_EXPORT_DSL),
)
@accepts(query=AppDslExportQuery)
@returns(200, AppDslExportResponse, "Export successful")
@ -155,6 +167,7 @@ class AppDslCheckDependenciesApi(Resource):
scope=Scope.APPS_READ,
allowed_token_types=frozenset({TokenType.OAUTH_ACCOUNT}),
allowed_roles=frozenset({TenantAccountRole.EDITOR, TenantAccountRole.ADMIN, TenantAccountRole.OWNER}),
rbac=RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_IMPORT_EXPORT_DSL),
)
@returns(200, CheckDependenciesResult, "Dependencies checked")
def get(self, app_id: str, *, auth_data: AuthData):

View File

@ -19,12 +19,13 @@ from werkzeug.exceptions import (
import services
from controllers.common.fields import EventStreamResponse
from controllers.common.wraps import RBACPermission, RBACResourceScope
from controllers.openapi import openapi_ns
from controllers.openapi._audit import emit_app_run
from controllers.openapi._contract import accepts, returns
from controllers.openapi._models import AppRunRequest, TaskStopResponse
from controllers.openapi.auth.composition import auth_router
from controllers.openapi.auth.data import AuthData
from controllers.openapi.auth.data import AuthData, RBACRequirement
from controllers.service_api.app.error import (
AppUnavailableError,
CompletionRequestError,
@ -136,7 +137,10 @@ _DISPATCH: dict[AppMode, Callable[[App, Any, AppRunRequest], Any]] = {
@openapi_ns.route("/apps/<string:app_id>/run")
class AppRunApi(Resource):
@auth_router.guard(scope=Scope.APPS_RUN)
@auth_router.guard(
scope=Scope.APPS_RUN,
rbac=RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_TEST_AND_RUN),
)
@openapi_ns.response(200, "Run result (SSE stream)", openapi_ns.models[EventStreamResponse.__name__])
@accepts(body=AppRunRequest)
def post(self, app_id: str, *, auth_data: AuthData, body: AppRunRequest):
@ -167,7 +171,10 @@ class AppRunApi(Resource):
@openapi_ns.route("/apps/<string:app_id>/tasks/<string:task_id>/stop")
class AppRunTaskStopApi(Resource):
@auth_router.guard(scope=Scope.APPS_RUN)
@auth_router.guard(
scope=Scope.APPS_RUN,
rbac=RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_TEST_AND_RUN),
)
@returns(200, TaskStopResponse, description="Task stopped")
def post(self, app_id: str, task_id: str, *, auth_data: AuthData):
app_model, caller, caller_kind = auth_data.require_app_context()

View File

@ -8,7 +8,10 @@ from typing import Any, cast
from flask_restx import Resource
from werkzeug.exceptions import Conflict, NotFound, UnprocessableEntity
from configs import dify_config
from controllers.common.app_access import AppAccessFilter, resolve_app_access_filter
from controllers.common.fields import Parameters
from controllers.common.wraps import RBACPermission, RBACResourceScope
from controllers.openapi import openapi_ns
from controllers.openapi._contract import accepts, returns
from controllers.openapi._input_schema import EMPTY_INPUT_SCHEMA, build_input_schema, resolve_app_config
@ -21,7 +24,7 @@ from controllers.openapi._models import (
AppListRow,
)
from controllers.openapi.auth.composition import auth_router
from controllers.openapi.auth.data import AuthData
from controllers.openapi.auth.data import AuthData, RBACRequirement
from controllers.service_api.app.error import AppUnavailableError
from core.app.app_config.common.parameters_mapping import get_parameters_from_feature_dict
from extensions.ext_database import db
@ -121,7 +124,11 @@ def build_app_describe_response(app: App, fields: set[str] | None) -> AppDescrib
@openapi_ns.route("/apps/<string:app_id>/describe")
class AppDescribeApi(AppReadResource):
@auth_router.guard(scope=Scope.APPS_READ, allowed_token_types=frozenset({TokenType.OAUTH_ACCOUNT}))
@auth_router.guard(
scope=Scope.APPS_READ,
allowed_token_types=frozenset({TokenType.OAUTH_ACCOUNT}),
rbac=RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_VIEW_LAYOUT),
)
@returns(200, AppDescribeResponse, description="App description")
@accepts(query=AppDescribeQuery)
def get(self, app_id: str, *, auth_data: AuthData, query: AppDescribeQuery):
@ -148,11 +155,28 @@ class AppListApi(Resource):
else:
parsed_uuid = None
# Compute RBAC-accessible app IDs when RBAC is enabled and the caller is an account.
# ``None`` means unrestricted (caller can see all apps in the workspace);
# an empty set or list means the caller has no accessible apps.
# End-users bypass RBAC here — their access is controlled by scope upstream.
apply_rbac_filter = (
dify_config.RBAC_ENABLED and auth_data.caller_kind != "end_user" and auth_data.account_id is not None
)
access_filter = AppAccessFilter.unrestricted()
if apply_rbac_filter:
access_filter = resolve_app_access_filter(workspace_id, str(auth_data.account_id))
tenant_name: str | None = None
if parsed_uuid is not None:
app: App | None = AppService.get_visible_app_by_id(db.session, str(parsed_uuid))
if app is None or str(app.tenant_id) != workspace_id:
return empty
# Apply RBAC visibility to the UUID fast-path the same way the service
# layer does for paginated queries (id in accessible set OR own app).
if apply_rbac_filter and not access_filter.is_app_accessible(
str(app.id), str(app.maintainer) if app.maintainer else None, str(auth_data.account_id)
):
return empty
tenant_name = TenantService.get_tenant_name(db.session, workspace_id)
item = AppListRow(
id=str(app.id),
@ -177,6 +201,9 @@ class AppListApi(Resource):
openapi_visible=True,
)
if apply_rbac_filter:
access_filter.apply_to_params(params)
pagination = AppService().get_paginate_apps(str(auth_data.account_id), workspace_id, params, db.session)
if pagination is None:
return empty

View File

@ -3,9 +3,11 @@ from __future__ import annotations
from controllers.openapi.auth.conditions import (
EDITION_EE,
HAS_ALLOWED_ROLES,
HAS_RBAC,
LOADED_APP_IS_PRIVATE,
PATH_HAS_APP_ID,
WEBAPP_AUTH_ENABLED,
WEBAPP_RUN_SCOPED,
WORKSPACE_MEMBERSHIP_REQUIRED,
WORKSPACE_SCOPED,
)
@ -25,6 +27,7 @@ from controllers.openapi.auth.verify import (
check_acl,
check_app_api_enabled,
check_private_app_permission,
check_rbac_permission,
check_scope,
check_workspace_member,
check_workspace_mismatch,
@ -47,8 +50,9 @@ account_pipeline = AuthPipeline(
When(WORKSPACE_SCOPED, then=check_workspace_member),
When(PATH_HAS_APP_ID, then=check_workspace_mismatch),
When(HAS_ALLOWED_ROLES, then=check_workspace_role),
When(PATH_HAS_APP_ID & EDITION_EE & WEBAPP_AUTH_ENABLED, then=check_acl),
When(EDITION_EE & LOADED_APP_IS_PRIVATE, then=check_private_app_permission),
When(HAS_RBAC, then=check_rbac_permission),
When(PATH_HAS_APP_ID & EDITION_EE & WEBAPP_AUTH_ENABLED & WEBAPP_RUN_SCOPED, then=check_acl),
When(EDITION_EE & LOADED_APP_IS_PRIVATE & WEBAPP_RUN_SCOPED, then=check_private_app_permission),
],
)

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from collections.abc import Callable
from controllers.openapi.auth.data import AuthData, Edition, RequestContext, current_edition
from libs.oauth_bearer import TokenType
from libs.oauth_bearer import Scope, TokenType
from services.enterprise.enterprise_service import WebAppAccessMode
from services.feature_service import FeatureService
@ -50,8 +50,11 @@ EDITION_SAAS = config_cond(lambda: current_edition() == Edition.SAAS)
WEBAPP_AUTH_ENABLED = config_cond(lambda: FeatureService.get_system_features().webapp_auth.enabled)
WEBAPP_RUN_SCOPED = request_cond(lambda ctx: ctx.scope == Scope.APPS_RUN)
WORKSPACE_MEMBERSHIP_REQUIRED = request_cond(lambda ctx: ctx.workspace_membership)
HAS_ALLOWED_ROLES = request_cond(lambda ctx: ctx.allowed_roles is not None)
HAS_RBAC = request_cond(lambda ctx: ctx.rbac is not None)
# Caller must belong to the resolved tenant: either an app-scoped path (tenant
# from the app) or an explicit workspace-membership path (tenant from request).

View File

@ -8,6 +8,7 @@ from pydantic import BaseModel, ConfigDict, Field
from werkzeug.exceptions import InternalServerError
from configs import dify_config
from core.rbac import RBACPermission, RBACResourceScope
from libs.oauth_bearer import Scope, TokenType
from models.account import Account, Tenant, TenantAccountRole
from models.model import App, EndUser
@ -35,6 +36,14 @@ class ExternalIdentity(BaseModel):
issuer: str | None = None
class RBACRequirement(BaseModel):
model_config = ConfigDict(frozen=True)
resource_type: RBACResourceScope
scene: RBACPermission
resource_required: bool = True
class RequestContext(BaseModel):
model_config = ConfigDict(frozen=True)
@ -43,6 +52,7 @@ class RequestContext(BaseModel):
path_params: dict[str, str]
workspace_membership: bool = False
allowed_roles: frozenset[TenantAccountRole] | None = None
rbac: RBACRequirement | None = None
class AuthData(BaseModel):
@ -59,6 +69,7 @@ class AuthData(BaseModel):
path_params: dict[str, str] = Field(default_factory=dict)
allowed_roles: frozenset[TenantAccountRole] | None = None
rbac: RBACRequirement | None = None
app: App | None = None
tenant: Tenant | None = None

View File

@ -21,6 +21,7 @@ from controllers.openapi.auth.data import (
AuthData,
Edition,
ExternalIdentity,
RBACRequirement,
RequestContext,
current_edition,
)
@ -59,6 +60,7 @@ class AuthPipeline:
scope: Scope | None,
workspace_membership: bool = False,
allowed_roles: frozenset[TenantAccountRole] | None = None,
rbac: RBACRequirement | None = None,
) -> Any:
req_ctx = RequestContext(
token_type=identity.token_type,
@ -66,6 +68,7 @@ class AuthPipeline:
path_params=dict(request.view_args or {}),
workspace_membership=workspace_membership,
allowed_roles=allowed_roles,
rbac=rbac,
)
data = AuthData(
@ -77,6 +80,7 @@ class AuthPipeline:
tenants=dict(identity.verified_tenants),
required_scope=scope,
allowed_roles=allowed_roles,
rbac=rbac,
path_params=dict(req_ctx.path_params),
external_identity=(
ExternalIdentity(email=identity.subject_email, issuer=identity.subject_issuer)
@ -129,6 +133,7 @@ class PipelineRouter:
edition: frozenset[Edition] | None = None,
workspace_membership: bool = False,
allowed_roles: frozenset[TenantAccountRole] | None = None,
rbac: RBACRequirement | None = None,
) -> Callable:
return self._make_decorator(
scope=scope,
@ -136,6 +141,7 @@ class PipelineRouter:
edition=edition,
workspace_membership=workspace_membership,
allowed_roles=allowed_roles,
rbac=rbac,
)
def guard_workspace(
@ -145,6 +151,7 @@ class PipelineRouter:
allowed_token_types: frozenset[TokenType] | None = None,
edition: frozenset[Edition] | None = None,
allowed_roles: frozenset[TenantAccountRole] | None = None,
rbac: RBACRequirement | None = None,
) -> Callable:
return self._make_decorator(
scope=scope,
@ -152,6 +159,7 @@ class PipelineRouter:
edition=edition,
workspace_membership=True,
allowed_roles=allowed_roles,
rbac=rbac,
)
def _make_decorator(
@ -162,6 +170,7 @@ class PipelineRouter:
edition: frozenset[Edition] | None,
workspace_membership: bool,
allowed_roles: frozenset[TenantAccountRole] | None,
rbac: RBACRequirement | None,
) -> Callable:
def decorator(view: Callable) -> Callable:
@wraps(view)
@ -175,6 +184,7 @@ class PipelineRouter:
edition=edition,
workspace_membership=workspace_membership,
allowed_roles=allowed_roles,
rbac=rbac,
)
return decorated
@ -192,6 +202,7 @@ class PipelineRouter:
edition: frozenset[Edition] | None,
workspace_membership: bool = False,
allowed_roles: frozenset[TenantAccountRole] | None = None,
rbac: RBACRequirement | None = None,
) -> Any:
# 404 not 403 — this edition doesn't expose the feature at all
if edition is not None and current_edition() not in edition:
@ -235,6 +246,7 @@ class PipelineRouter:
scope=scope,
workspace_membership=workspace_membership,
allowed_roles=allowed_roles,
rbac=rbac,
)

View File

@ -3,6 +3,8 @@ from __future__ import annotations
from flask import request
from werkzeug.exceptions import Forbidden, NotFound, UnprocessableEntity
from configs import dify_config
from controllers.common.wraps import enforce_rbac_access
from controllers.openapi.auth.data import AuthData
from extensions.ext_database import db
from libs.oauth_bearer import Scope, TokenType
@ -38,6 +40,9 @@ def check_workspace_mismatch(data: AuthData) -> None:
def check_workspace_role(data: AuthData) -> None:
if dify_config.RBAC_ENABLED and data.rbac is not None:
# fine-grained permission check is performed by RBAC
return
if data.allowed_roles is None:
return
if data.tenant_role is None:
@ -46,6 +51,27 @@ def check_workspace_role(data: AuthData) -> None:
raise Forbidden("insufficient workspace role")
def check_rbac_permission(data: AuthData) -> None:
req = data.rbac
if req is None:
return
if not dify_config.RBAC_ENABLED:
return
# Only account callers are subject to RBAC; end_user access is scope-controlled.
if data.caller_kind != "account":
return
if data.account_id is None or data.tenant is None:
raise Forbidden("rbac context missing")
enforce_rbac_access(
tenant_id=str(data.tenant.id),
account_id=str(data.account_id),
resource_type=req.resource_type,
scene=req.scene,
resource_required=req.resource_required,
path_args=dict(data.path_params),
)
def check_app_api_enabled(data: AuthData) -> None:
if data.app is None:
return

View File

@ -16,12 +16,13 @@ from werkzeug.exceptions import BadRequest
from controllers.common.human_input import HumanInputFormSubmitPayload, stringify_form_default_values
from controllers.common.schema import register_schema_models
from controllers.common.wraps import RBACPermission, RBACResourceScope
from controllers.openapi import openapi_ns
from controllers.openapi._contract import accepts, returns
from controllers.openapi._errors import HumanInputFormNotFound, RecipientSurfaceMismatch
from controllers.openapi._models import FormSubmitResponse, HumanInputFormDefinitionResponse
from controllers.openapi.auth.composition import auth_router
from controllers.openapi.auth.data import AuthData
from controllers.openapi.auth.data import AuthData, RBACRequirement
from core.workflow.human_input_policy import (
HumanInputSurface,
is_recipient_type_allowed_for_surface,
@ -62,7 +63,10 @@ def _ensure_form_is_allowed_for_openapi(form) -> None:
@openapi_ns.route("/apps/<string:app_id>/form/human_input/<string:form_token>")
class OpenApiWorkflowHumanInputFormApi(Resource):
@openapi_ns.response(200, "Form definition", openapi_ns.models[HumanInputFormDefinitionResponse.__name__])
@auth_router.guard(scope=Scope.APPS_RUN)
@auth_router.guard(
scope=Scope.APPS_RUN,
rbac=RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_TEST_AND_RUN),
)
def get(self, app_id: str, form_token: str, *, auth_data: AuthData):
app_model, _caller, _caller_kind = auth_data.require_app_context()
service = HumanInputService(db.engine)
@ -75,7 +79,10 @@ class OpenApiWorkflowHumanInputFormApi(Resource):
service.ensure_form_active(form)
return _jsonify_form_definition(form)
@auth_router.guard(scope=Scope.APPS_RUN)
@auth_router.guard(
scope=Scope.APPS_RUN,
rbac=RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_TEST_AND_RUN),
)
@returns(200, FormSubmitResponse, description="Form submitted")
@accepts(body=HumanInputFormSubmitPayload)
def post(self, app_id: str, form_token: str, *, auth_data: AuthData, body: HumanInputFormSubmitPayload):

View File

@ -19,9 +19,10 @@ from werkzeug.exceptions import NotFound, UnprocessableEntity
from controllers.common.fields import EventStreamResponse
from controllers.common.schema import query_params_from_model
from controllers.common.wraps import RBACPermission, RBACResourceScope
from controllers.openapi import openapi_ns
from controllers.openapi.auth.composition import auth_router
from controllers.openapi.auth.data import AuthData
from controllers.openapi.auth.data import AuthData, RBACRequirement
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
@ -46,7 +47,10 @@ class WorkflowEventsQuery(BaseModel):
class OpenApiWorkflowEventsApi(Resource):
@openapi_ns.doc(params=query_params_from_model(WorkflowEventsQuery))
@openapi_ns.response(200, "SSE event stream", openapi_ns.models[EventStreamResponse.__name__])
@auth_router.guard(scope=Scope.APPS_RUN)
@auth_router.guard(
scope=Scope.APPS_RUN,
rbac=RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_TEST_AND_RUN),
)
def get(self, app_id: str, task_id: str, *, auth_data: AuthData):
app_model, caller, caller_kind = auth_data.require_app_context()
app_mode = AppMode.value_of(app_model.mode)

View File

@ -0,0 +1,161 @@
"""Unit tests for controllers.common.app_access RBAC app-id access filtering."""
from __future__ import annotations
import pytest
from controllers.common.app_access import (
APP_LIST_PERMISSION_KEYS,
AppAccessFilter,
has_app_list_permission,
resolve_app_access_filter,
)
from services.app_service import AppListParams
from services.enterprise.rbac_service import (
MyPermissionsResponse,
ResourcePermissionKeys,
ResourcePermissionSnapshot,
ResourceWhitelistResources,
WorkspacePermissionSnapshot,
)
_RBAC_MODULE = "controllers.common.app_access.enterprise_rbac_service"
def _permissions(
*,
workspace_keys: list[str] | None = None,
app_default_keys: list[str] | None = None,
app_overrides: list[ResourcePermissionKeys] | None = None,
) -> MyPermissionsResponse:
return MyPermissionsResponse(
workspace=WorkspacePermissionSnapshot(permission_keys=workspace_keys or []),
app=ResourcePermissionSnapshot(
default_permission_keys=app_default_keys or [],
overrides=app_overrides or [],
),
)
class TestHasAppListPermission:
def test_matches_known_preview_keys(self):
for key in APP_LIST_PERMISSION_KEYS:
assert has_app_list_permission([key])
def test_rejects_unknown_keys(self):
assert not has_app_list_permission(["app.export", "app.delete"])
assert not has_app_list_permission([])
class TestAppAccessFilterIsAppAccessible:
def test_unrestricted_sees_everything(self):
flt = AppAccessFilter.unrestricted()
assert flt.is_app_accessible("app-1", maintainer="someone", account_id="acc-1")
def test_whitelisted_app_is_visible(self):
flt = AppAccessFilter(accessible_app_ids={"app-1"}, can_manage_own_apps=False)
assert flt.is_app_accessible("app-1", maintainer=None, account_id="acc-1")
assert not flt.is_app_accessible("app-2", maintainer=None, account_id="acc-1")
def test_own_app_visible_only_with_manage_permission(self):
own = AppAccessFilter(accessible_app_ids=set(), can_manage_own_apps=True)
assert own.is_app_accessible("app-1", maintainer="acc-1", account_id="acc-1")
assert not own.is_app_accessible("app-1", maintainer="acc-2", account_id="acc-1")
no_manage = AppAccessFilter(accessible_app_ids=set(), can_manage_own_apps=False)
assert not no_manage.is_app_accessible("app-1", maintainer="acc-1", account_id="acc-1")
class TestAppAccessFilterApplyToParams:
def test_unrestricted_leaves_params_untouched(self):
params = AppListParams()
AppAccessFilter.unrestricted().apply_to_params(params)
assert params.accessible_app_ids is None
assert params.include_own_apps is False
assert params.is_created_by_me is None
def test_whitelisted_ids_are_sorted_with_own_apps_flag(self):
params = AppListParams()
AppAccessFilter(accessible_app_ids={"b", "a"}, can_manage_own_apps=True).apply_to_params(params)
assert params.accessible_app_ids == ["a", "b"]
assert params.include_own_apps is True
def test_empty_set_with_manage_falls_back_to_maintained_apps(self):
# Own-app fallback must use maintainer (include_own_apps), consistent
# with is_app_accessible — not created_by (is_created_by_me).
params = AppListParams()
AppAccessFilter(accessible_app_ids=set(), can_manage_own_apps=True).apply_to_params(params)
assert params.accessible_app_ids == []
assert params.include_own_apps is True
assert params.is_created_by_me is None
def test_empty_set_without_manage_sees_nothing(self):
params = AppListParams()
AppAccessFilter(accessible_app_ids=set(), can_manage_own_apps=False).apply_to_params(params)
assert params.accessible_app_ids == []
assert params.include_own_apps is False
assert params.is_created_by_me is None
class TestResolveAppAccessFilter:
def _patch_whitelist(self, monkeypatch: pytest.MonkeyPatch, whitelist: ResourceWhitelistResources) -> None:
monkeypatch.setattr(
f"{_RBAC_MODULE}.RBACService.AppAccess.whitelist_resources",
lambda tenant_id, account_id: whitelist,
)
def test_default_preview_is_unrestricted(self, monkeypatch: pytest.MonkeyPatch):
self._patch_whitelist(monkeypatch, ResourceWhitelistResources(unrestricted=True))
permissions = _permissions(app_default_keys=["app.preview"])
flt = resolve_app_access_filter("tenant-1", "acc-1", permissions=permissions)
assert flt.accessible_app_ids is None
assert flt.can_manage_own_apps is False
def test_default_preview_overrides_whitelist_restriction(self, monkeypatch: pytest.MonkeyPatch):
self._patch_whitelist(monkeypatch, ResourceWhitelistResources(unrestricted=False, resource_ids=["app-9"]))
permissions = _permissions(
workspace_keys=["app.full_access", "app.create_and_management"],
)
flt = resolve_app_access_filter("tenant-1", "acc-1", permissions=permissions)
# Workspace-level preview grant defeats the whitelist restriction.
assert flt.accessible_app_ids is None
assert flt.can_manage_own_apps is True
def test_override_apps_collected_without_default_preview(self, monkeypatch: pytest.MonkeyPatch):
self._patch_whitelist(monkeypatch, ResourceWhitelistResources(unrestricted=True))
permissions = _permissions(
app_overrides=[
ResourcePermissionKeys(resource_id="app-1", permission_keys=["app.preview"]),
ResourcePermissionKeys(resource_id="app-2", permission_keys=["app.export"]),
],
)
flt = resolve_app_access_filter("tenant-1", "acc-1", permissions=permissions)
assert flt.accessible_app_ids == {"app-1"}
def test_whitelist_union_with_override_apps(self, monkeypatch: pytest.MonkeyPatch):
self._patch_whitelist(monkeypatch, ResourceWhitelistResources(unrestricted=False, resource_ids=["app-5"]))
permissions = _permissions(
app_overrides=[ResourcePermissionKeys(resource_id="app-1", permission_keys=["app.acl.preview"])],
)
flt = resolve_app_access_filter("tenant-1", "acc-1", permissions=permissions)
assert flt.accessible_app_ids == {"app-1", "app-5"}
def test_fetches_permissions_when_not_supplied(self, monkeypatch: pytest.MonkeyPatch):
self._patch_whitelist(monkeypatch, ResourceWhitelistResources(unrestricted=False, resource_ids=[]))
monkeypatch.setattr(
f"{_RBAC_MODULE}.RBACService.MyPermissions.get",
lambda tenant_id, account_id: _permissions(workspace_keys=["app.create_and_management"]),
)
flt = resolve_app_access_filter("tenant-1", "acc-1")
assert flt.accessible_app_ids == set()
assert flt.can_manage_own_apps is True

View File

@ -1,16 +1,21 @@
import uuid
from controllers.openapi.auth.composition import account_pipeline, auth_router, external_sso_pipeline
from controllers.openapi.auth.data import RequestContext
from controllers.openapi.auth.data import RBACRequirement, RequestContext
from controllers.openapi.auth.flow import When
from controllers.openapi.auth.pipeline import AuthPipeline, PipelineRoute, PipelineRouter
from controllers.openapi.auth.verify import (
check_acl,
check_private_app_permission,
check_rbac_permission,
check_workspace_member,
check_workspace_mismatch,
check_workspace_role,
)
from libs.oauth_bearer import TokenType
from core.rbac import RBACPermission, RBACResourceScope
from libs.oauth_bearer import Scope, TokenType
from models.account import TenantAccountRole
from services.enterprise.enterprise_service import WebAppAccessMode
def test_account_pipeline_is_auth_pipeline():
@ -29,8 +34,8 @@ def test_account_pipeline_prepare_has_six_entries():
assert len(account_pipeline._prepare) == 6
def test_account_auth_list_has_seven_entries():
assert len(account_pipeline._auth) == 7
def test_account_auth_list_has_eight_entries():
assert len(account_pipeline._auth) == 8
def test_external_sso_pipeline_prepare_has_four_entries():
@ -132,3 +137,89 @@ def test_app_path_selects_workspace_mismatch_check():
def test_workspace_path_skips_workspace_mismatch_check():
steps = _selected_auth_steps(app_id=False, workspace_membership=True, allowed_roles=None)
assert check_workspace_mismatch not in steps
def _selected_webapp_steps(*, scope, app_access_mode):
"""Select auth steps for an EE, webapp-auth-enabled, app-scoped request.
Patches the config-backed conditions (edition + webapp_auth) so the gating
reduces to PATH_HAS_APP_ID, LOADED_APP_IS_PRIVATE, and the request scope.
"""
from unittest.mock import MagicMock, patch
from controllers.openapi.auth.data import AuthData, Edition
ctx = RequestContext(
token_type=TokenType.OAUTH_ACCOUNT,
scope=scope,
path_params={"app_id": str(uuid.uuid4())},
)
data = AuthData(
token_type=TokenType.OAUTH_ACCOUNT,
token_hash="x",
scopes=frozenset({scope}) if scope is not None else frozenset(),
app_access_mode=app_access_mode,
)
features = MagicMock()
features.webapp_auth.enabled = True
selected = []
with (
patch("controllers.openapi.auth.conditions.current_edition", return_value=Edition.EE),
patch("controllers.openapi.auth.conditions.FeatureService.get_system_features", return_value=features),
):
for step in account_pipeline._auth:
if isinstance(step, When):
if step.applies(ctx, data):
selected.append(step._step)
else:
selected.append(step)
return selected
def test_apps_run_scope_selects_webapp_checks():
steps = _selected_webapp_steps(scope=Scope.APPS_RUN, app_access_mode=WebAppAccessMode.PRIVATE)
assert check_acl in steps
assert check_private_app_permission in steps
def test_management_scope_skips_webapp_checks_on_private_app():
# Export DSL et al. carry an app_id but use a management scope; the webapp
# end-user ACL / private-app gate must not block workspace members.
steps = _selected_webapp_steps(scope=Scope.APPS_READ, app_access_mode=WebAppAccessMode.PRIVATE)
assert check_acl not in steps
assert check_private_app_permission not in steps
def _selected_auth_steps_with_rbac(rbac):
ctx = RequestContext(
token_type=TokenType.OAUTH_ACCOUNT,
scope=Scope.APPS_READ,
path_params={"app_id": str(uuid.uuid4())},
rbac=rbac,
)
selected = []
for step in account_pipeline._auth:
if isinstance(step, When):
if step.applies(ctx, None):
selected.append(step._step)
else:
selected.append(step)
return selected
def test_account_pipeline_selects_rbac_step_when_required():
rbac = RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_VIEW_LAYOUT)
assert check_rbac_permission in _selected_auth_steps_with_rbac(rbac)
def test_account_pipeline_skips_rbac_step_without_requirement():
assert check_rbac_permission not in _selected_auth_steps_with_rbac(None)
def test_external_sso_pipeline_never_enforces_rbac():
# RBAC is a console (account) concern; external SSO callers are scope-gated.
rbac_steps = [
s._step for s in external_sso_pipeline._auth if isinstance(s, When) and s._step is check_rbac_permission
]
assert rbac_steps == []
assert check_rbac_permission not in external_sso_pipeline._auth

View File

@ -5,19 +5,22 @@ from controllers.openapi.auth.conditions import (
EDITION_EE,
EDITION_SAAS,
HAS_ALLOWED_ROLES,
HAS_RBAC,
LOADED_APP_IS_PRIVATE,
PATH_HAS_APP_ID,
TOKEN_IS_OAUTH_ACCOUNT,
TOKEN_IS_OAUTH_EXTERNAL_SSO,
WEBAPP_AUTH_ENABLED,
WEBAPP_RUN_SCOPED,
WORKSPACE_MEMBERSHIP_REQUIRED,
Cond,
config_cond,
data_cond,
request_cond,
)
from controllers.openapi.auth.data import AuthData, Edition, RequestContext
from libs.oauth_bearer import TokenType
from controllers.openapi.auth.data import AuthData, Edition, RBACRequirement, RequestContext
from core.rbac import RBACPermission, RBACResourceScope
from libs.oauth_bearer import Scope, TokenType
from models.account import TenantAccountRole
from services.enterprise.enterprise_service import WebAppAccessMode
@ -137,6 +140,34 @@ def test_webapp_auth_enabled():
assert WEBAPP_AUTH_ENABLED(_ctx()) is True
def test_webapp_run_scoped_true_for_apps_run():
assert WEBAPP_RUN_SCOPED(_ctx(scope=Scope.APPS_RUN)) is True
def test_webapp_run_scoped_false_for_management_scope():
assert WEBAPP_RUN_SCOPED(_ctx(scope=Scope.APPS_READ)) is False
def test_webapp_run_scoped_false_when_scope_none():
assert WEBAPP_RUN_SCOPED(_ctx()) is False
def _rbac_req():
return RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_TEST_AND_RUN)
def test_has_rbac_true():
assert HAS_RBAC(_ctx(rbac=_rbac_req())) is True
def test_has_rbac_false():
assert HAS_RBAC(_ctx(rbac=None)) is False
def test_has_rbac_default():
assert HAS_RBAC(_ctx()) is False
def test_loaded_app_is_private():
data_private = _data(app_access_mode=WebAppAccessMode.PRIVATE)
data_public = _data(app_access_mode=WebAppAccessMode.PUBLIC)

View File

@ -5,17 +5,19 @@ import pytest
from flask import Flask
from werkzeug.exceptions import Forbidden, NotFound
from controllers.openapi.auth.data import AuthData
from controllers.openapi.auth.data import AuthData, RBACRequirement
from controllers.openapi.auth.verify import (
check_acl,
check_app_access,
check_app_api_enabled,
check_private_app_permission,
check_rbac_permission,
check_scope,
check_workspace_member,
check_workspace_mismatch,
check_workspace_role,
)
from core.rbac import RBACPermission, RBACResourceScope
from libs.oauth_bearer import Scope, TokenType
from models.account import Tenant, TenantAccountRole
from models.model import App
@ -75,6 +77,67 @@ def test_check_app_access_raises_when_not_member():
check_app_access(data)
# --- check_rbac_permission ---
_RBAC_REQ = RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_VIEW_LAYOUT)
def test_check_rbac_noop_when_no_requirement():
with patch("controllers.openapi.auth.verify.enforce_rbac_access") as mock_enforce:
check_rbac_permission(_data(rbac=None, caller_kind="account"))
mock_enforce.assert_not_called()
def test_check_rbac_noop_when_rbac_disabled():
with (
patch("controllers.openapi.auth.verify.dify_config.RBAC_ENABLED", False),
patch("controllers.openapi.auth.verify.enforce_rbac_access") as mock_enforce,
):
check_rbac_permission(_data(rbac=_RBAC_REQ, caller_kind="account"))
mock_enforce.assert_not_called()
def test_check_rbac_skips_end_user_caller():
with (
patch("controllers.openapi.auth.verify.dify_config.RBAC_ENABLED", True),
patch("controllers.openapi.auth.verify.enforce_rbac_access") as mock_enforce,
):
check_rbac_permission(_data(rbac=_RBAC_REQ, caller_kind="end_user"))
mock_enforce.assert_not_called()
def test_check_rbac_raises_when_context_missing():
with patch("controllers.openapi.auth.verify.dify_config.RBAC_ENABLED", True):
with pytest.raises(Forbidden, match="rbac context missing"):
check_rbac_permission(_data(rbac=_RBAC_REQ, caller_kind="account", account_id=None, tenant=None))
def test_check_rbac_enforces_for_account_caller():
tenant = MagicMock(spec=Tenant)
tenant.id = "t1"
account_id = uuid.uuid4()
data = _data(
rbac=_RBAC_REQ,
caller_kind="account",
account_id=account_id,
tenant=tenant,
path_params={"app_id": "app-1"},
)
with (
patch("controllers.openapi.auth.verify.dify_config.RBAC_ENABLED", True),
patch("controllers.openapi.auth.verify.enforce_rbac_access") as mock_enforce,
):
check_rbac_permission(data)
mock_enforce.assert_called_once_with(
tenant_id="t1",
account_id=str(account_id),
resource_type=RBACResourceScope.APP,
scene=RBACPermission.APP_VIEW_LAYOUT,
resource_required=True,
path_args={"app_id": "app-1"},
)
def test_check_acl_raises_when_app_or_mode_missing():
with pytest.raises(Forbidden):
check_acl(_data(app=None, app_access_mode=None))

View File

@ -20,6 +20,7 @@ def _stub_execute(
edition=None,
workspace_membership=False,
allowed_roles=None,
rbac=None,
):
"""Bypass all auth logic; inject minimal AuthData and call the view directly."""
kwargs["auth_data"] = AuthData(
@ -30,6 +31,7 @@ def _stub_execute(
scopes=frozenset({Scope.FULL}),
required_scope=scope,
allowed_roles=allowed_roles,
rbac=rbac,
)
return view(*args, **kwargs)