feat: list app and dataset permission keys

This commit is contained in:
fatelei 2026-05-11 16:25:09 +08:00
parent 3aa8abbe14
commit e44252c242
No known key found for this signature in database
GPG Key ID: 2F91DA05646F4EED
6 changed files with 429 additions and 7 deletions

View File

@ -27,6 +27,7 @@ from core.ops.ops_trace_manager import OpsTraceManager
from core.rag.entities import PreProcessingRule, Rule, Segmentation
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.trigger.constants import TRIGGER_NODE_TYPES
from configs import dify_config
from extensions.ext_database import db
from fields.base import ResponseModel
from graphon.enums import WorkflowExecutionStatus
@ -37,6 +38,7 @@ from models.model import IconType
from services.app_dsl_service import AppDslService
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
from services.enterprise import rbac_service as enterprise_rbac_service
from services.entities.dsl_entities import ImportMode, ImportStatus
from services.entities.knowledge_entities.knowledge_entities import (
DataSource,
@ -330,6 +332,7 @@ class AppPartial(ResponseModel):
create_user_name: str | None = None
author_name: str | None = None
has_draft_trigger: bool | None = None
permission_keys: list[str] = Field(default_factory=list)
@computed_field(return_type=str | None) # type: ignore
@property
@ -475,6 +478,20 @@ class AppListApi(Resource):
if str(app.id) in res:
app.access_mode = res[str(app.id)].access_mode
if app_pagination.items:
if dify_config.RBAC_ENABLED:
app_ids = [str(app.id) for app in app_pagination.items]
permission_keys_map = enterprise_rbac_service.RBACService.AppPermissions.batch_get(
str(current_tenant_id),
current_user.id,
app_ids,
)
for app in app_pagination.items:
app.permission_keys = permission_keys_map.get(str(app.id), [])
else:
for app in app_pagination.items:
app.permission_keys = []
workflow_capable_app_ids = [
str(app.id) for app in app_pagination.items if app.mode in {"workflow", "advanced-chat"}
]

View File

@ -57,6 +57,7 @@ from models.enums import ApiTokenType, SegmentStatus
from models.provider_ids import ModelProviderID
from services.api_token_service import ApiTokenCache
from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService
from services.enterprise import rbac_service as enterprise_rbac_service
# Register models for flask_restx to avoid dict type issues in Swagger
dataset_base_model = get_or_create_model("DatasetBase", dataset_fields)
@ -127,6 +128,14 @@ def _validate_doc_form(value: str | None) -> str | None:
return value
def _ensure_permission_keys(dataset: Dataset, *, enabled: bool) -> None:
if not enabled:
setattr(dataset, "permission_keys", [])
return
if not isinstance(getattr(dataset, "permission_keys", None), list):
setattr(dataset, "permission_keys", [])
class DatasetCreatePayload(BaseModel):
name: str = Field(..., min_length=1, max_length=40)
description: str = Field("", max_length=400)
@ -329,6 +338,19 @@ class DatasetListApi(Resource):
query.include_all,
)
for dataset in datasets:
_ensure_permission_keys(dataset, enabled=dify_config.RBAC_ENABLED)
if dify_config.RBAC_ENABLED and datasets:
dataset_ids = [str(dataset.id) for dataset in datasets]
permission_keys_map = enterprise_rbac_service.RBACService.DatasetPermissions.batch_get(
str(current_tenant_id),
current_user.id,
dataset_ids,
)
for dataset in datasets:
setattr(dataset, "permission_keys", permission_keys_map.get(str(dataset.id), []))
# check embedding setting
provider_manager = create_plugin_provider_manager(tenant_id=current_tenant_id)
configurations = provider_manager.get_configurations(tenant_id=current_tenant_id)
@ -410,6 +432,7 @@ class DatasetListApi(Resource):
except services.errors.dataset.DatasetNameDuplicateError:
raise DatasetNameDuplicateError()
_ensure_permission_keys(dataset, enabled=dify_config.RBAC_ENABLED)
return marshal(dataset, dataset_detail_fields), 201
@ -434,6 +457,7 @@ class DatasetApi(Resource):
DatasetService.check_dataset_permission(dataset, current_user)
except services.errors.account.NoPermissionError as e:
raise Forbidden(str(e))
_ensure_permission_keys(dataset, enabled=dify_config.RBAC_ENABLED)
data = cast(dict[str, Any], marshal(dataset, dataset_detail_fields))
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
if dataset.embedding_model_provider:
@ -503,6 +527,7 @@ class DatasetApi(Resource):
if dataset is None:
raise NotFound("Dataset not found.")
_ensure_permission_keys(dataset, enabled=dify_config.RBAC_ENABLED)
result_data = cast(dict[str, Any], marshal(dataset, dataset_detail_fields))
tenant_id = current_tenant_id

View File

@ -3,8 +3,12 @@ from __future__ import annotations
from enum import StrEnum
from typing import Any, Generic, TypeVar
from sqlalchemy import select
from pydantic import BaseModel, ConfigDict, Field, field_validator
from configs import dify_config
from core.db.session_factory import session_factory
from models import TenantAccountJoin, TenantAccountRole
from services.enterprise.base import EnterpriseRequest
T = TypeVar("T")
@ -79,6 +83,15 @@ class MemberRoleSummary(_RBACModel):
name: str
class ResourcePermissionKeys(_RBACModel):
resource_id: str
permission_keys: list[str] = Field(default_factory=list)
class ResourcePermissionKeysBatchResponse(_RBACModel):
data: list[ResourcePermissionKeys] = Field(default_factory=list)
class AccessPolicy(_RBACModel):
id: str
tenant_id: str = ""
@ -158,11 +171,6 @@ class MemberRolesBatchResponse(_RBACModel):
data: list[MemberRolesResponse] = Field(default_factory=list)
class ResourcePermissionKeys(_RBACModel):
resource_id: str
permission_keys: list[str] = Field(default_factory=list)
class WorkspacePermissionSnapshot(_RBACModel):
permission_keys: list[str] = Field(default_factory=list)
@ -178,6 +186,97 @@ class MyPermissionsResponse(_RBACModel):
dataset: ResourcePermissionSnapshot = Field(default_factory=ResourcePermissionSnapshot)
_LEGACY_MY_PERMISSIONS: dict[TenantAccountRole, dict[str, list[str]]] = {
TenantAccountRole.OWNER: {
"workspace": [
"workspace.member.manage",
"workspace.role.manage",
],
"app": [
"app.acl.view_layout",
"app.acl.test_and_run",
"app.acl.edit",
"app.acl.access_config",
],
"dataset": [
"dataset.acl.readonly",
"dataset.acl.edit",
"dataset.acl.use",
],
},
TenantAccountRole.ADMIN: {
"workspace": [
"workspace.member.manage",
"workspace.role.manage",
],
"app": [
"app.acl.view_layout",
"app.acl.test_and_run",
"app.acl.edit",
"app.acl.access_config",
],
"dataset": [
"dataset.acl.readonly",
"dataset.acl.edit",
"dataset.acl.use",
],
},
TenantAccountRole.EDITOR: {
"app": [
"app.acl.view_layout",
"app.acl.test_and_run",
"app.acl.edit",
"app.acl.access_config",
],
"dataset": [
"dataset.acl.readonly",
"dataset.acl.edit",
"dataset.acl.use",
],
},
TenantAccountRole.NORMAL: {
"app": [
"app.acl.view_layout",
"app.acl.test_and_run",
],
},
TenantAccountRole.DATASET_OPERATOR: {
"dataset": [
"dataset.acl.readonly",
"dataset.acl.edit",
"dataset.acl.use",
],
},
}
def _legacy_my_permissions(tenant_id: str, account_id: str | None) -> MyPermissionsResponse:
if not account_id:
return MyPermissionsResponse()
with session_factory.create_session() as session:
role = session.scalar(
select(TenantAccountJoin.role).where(
TenantAccountJoin.tenant_id == tenant_id,
TenantAccountJoin.account_id == account_id,
)
)
if not role:
return MyPermissionsResponse()
try:
tenant_role = TenantAccountRole(role)
except ValueError:
return MyPermissionsResponse()
permissions = _LEGACY_MY_PERMISSIONS.get(tenant_role, {})
return MyPermissionsResponse(
workspace=WorkspacePermissionSnapshot(permission_keys=list(permissions.get("workspace", []))),
app=ResourcePermissionSnapshot(default_permission_keys=list(permissions.get("app", []))),
dataset=ResourcePermissionSnapshot(default_permission_keys=list(permissions.get("dataset", []))),
)
# ---------- Mutation request models ----------
@ -953,6 +1052,42 @@ class RBACService:
)
return MemberRolesResponse.model_validate(data or {})
class AppPermissions:
@staticmethod
def batch_get(
tenant_id: str,
account_id: str | None,
app_ids: list[str],
) -> dict[str, list[str]]:
if not app_ids:
return {}
data = _inner_call(
"POST",
f"{_INNER_PREFIX}/apps/permission-keys/batch",
tenant_id=tenant_id,
account_id=account_id,
json={"app_ids": app_ids},
)
return _parse_resource_permission_keys_batch(data, resource_id_key="app_id")
class DatasetPermissions:
@staticmethod
def batch_get(
tenant_id: str,
account_id: str | None,
dataset_ids: list[str],
) -> dict[str, list[str]]:
if not dataset_ids:
return {}
data = _inner_call(
"POST",
f"{_INNER_PREFIX}/datasets/permission-keys/batch",
tenant_id=tenant_id,
account_id=account_id,
json={"dataset_ids": dataset_ids},
)
return _parse_resource_permission_keys_batch(data, resource_id_key="dataset_id")
class MyPermissions:
@staticmethod
def get(
@ -962,6 +1097,9 @@ class RBACService:
app_id: str | None = None,
dataset_id: str | None = None,
) -> MyPermissionsResponse:
if not dify_config.RBAC_ENABLED:
return _legacy_my_permissions(tenant_id, account_id)
data = _inner_call(
"GET",
f"{_INNER_PREFIX}/my-permissions",
@ -978,3 +1116,39 @@ class RBACService:
or None,
)
return MyPermissionsResponse.model_validate(data or {})
def _parse_resource_permission_keys_batch(data: Any, *, resource_id_key: str) -> dict[str, list[str]]:
if not data:
return {}
if isinstance(data, dict):
permissions = data.get("permissions")
if isinstance(permissions, dict):
return {str(key): [str(item) for item in (value or [])] for key, value in permissions.items()}
items = data.get("data")
if items is None:
items = data.get("items")
if items is None:
items = data.get("apps") if resource_id_key == "app_id" else data.get("datasets")
if isinstance(items, dict):
items = [
{"resource_id": key, "permission_keys": value}
for key, value in items.items()
]
elif isinstance(data, list):
items = data
else:
items = []
result: dict[str, list[str]] = {}
for item in items or []:
if not isinstance(item, dict):
continue
resource_id = item.get("resource_id") or item.get(resource_id_key)
if not resource_id:
continue
permission_keys = item.get("permission_keys") or []
result[str(resource_id)] = [str(permission_key) for permission_key in permission_keys]
return result

View File

@ -11,6 +11,8 @@ from typing import Any
import pytest
from flask.views import MethodView
from configs import dify_config
# kombu references MethodView as a global when importing celery/kombu pools.
if not hasattr(builtins, "MethodView"):
builtins.MethodView = MethodView # type: ignore[attr-defined]
@ -196,6 +198,7 @@ def test_app_partial_serialization_uses_aliases(app_models):
create_user_name="Creator",
author_name="Author",
has_draft_trigger=True,
permission_keys=["app.acl.view_layout"],
)
serialized = AppPartial.model_validate(app_obj, from_attributes=True).model_dump(mode="json")
@ -208,6 +211,7 @@ def test_app_partial_serialization_uses_aliases(app_models):
assert serialized["model_config"]["model"] == {"provider": "openai", "name": "gpt-4o"}
assert serialized["workflow"]["id"] == "wf-1"
assert serialized["tags"][0]["name"] == "Utilities"
assert serialized["permission_keys"] == ["app.acl.view_layout"]
def test_app_detail_with_site_includes_nested_serialization(app_models):
@ -271,6 +275,7 @@ def test_app_pagination_aliases_per_page_and_has_next(app_models):
icon="first-icon",
created_at=_ts(15),
updated_at=_ts(15),
permission_keys=["app.acl.edit"],
)
item_two = SimpleNamespace(
id="app-11",
@ -298,3 +303,52 @@ def test_app_pagination_aliases_per_page_and_has_next(app_models):
assert len(serialized["data"]) == 2
assert serialized["data"][0]["icon_url"] == "signed:first-icon"
assert serialized["data"][1]["icon_url"] is None
assert serialized["data"][0]["permission_keys"] == ["app.acl.edit"]
def test_app_list_api_attaches_permission_keys(app, app_module):
method = app_module.AppListApi.get
while hasattr(method, "__wrapped__"):
method = method.__wrapped__
app_obj = SimpleNamespace(
id="app-1",
name="List App",
desc_or_prompt="Summary",
mode_compatible_with_agent="chat",
mode="chat",
created_at=_ts(15),
updated_at=_ts(15),
permission_keys=[],
)
pagination = SimpleNamespace(page=1, per_page=20, total=1, has_next=False, items=[app_obj])
with app.test_request_context("/apps"):
with pytest.MonkeyPatch.context() as monkeypatch:
monkeypatch.setattr(dify_config, "RBAC_ENABLED", True)
monkeypatch.setattr(
app_module,
"current_account_with_tenant",
lambda: (SimpleNamespace(id="acct-1"), "tenant-1"),
)
monkeypatch.setattr(
app_module.AppService,
"get_paginate_apps",
lambda self, user_id, tenant_id, args_dict: pagination,
)
monkeypatch.setattr(
app_module.FeatureService,
"get_system_features",
lambda: SimpleNamespace(webapp_auth=SimpleNamespace(enabled=False)),
)
monkeypatch.setattr(
app_module.enterprise_rbac_service.RBACService.AppPermissions,
"batch_get",
lambda tenant_id, account_id, app_ids: {"app-1": ["app.acl.view_layout", "app.acl.edit"]},
)
resp, status = method(app_module.AppListApi())
assert status == 200
assert app_obj.permission_keys == ["app.acl.view_layout", "app.acl.edit"]
assert resp["data"][0]["permission_keys"] == ["app.acl.view_layout", "app.acl.edit"]

View File

@ -93,6 +93,48 @@ class TestDatasetList:
assert resp["total"] == 1
assert resp["data"][0]["embedding_available"] is True
def test_get_with_rbac_enabled_fetches_permission_keys(self, app):
api = DatasetListApi()
method = unwrap(api.get)
current_user = self._mock_user()
current_user.id = "acct-1"
dataset = MagicMock(id="ds-1")
datasets = [dataset]
marshaled = [self._mock_dataset_dict()]
with app.test_request_context("/datasets"):
with (
patch(
"controllers.console.datasets.datasets.current_account_with_tenant",
return_value=(current_user, "tenant-1"),
),
patch("controllers.console.datasets.datasets.dify_config.RBAC_ENABLED", True),
patch.object(
DatasetService,
"get_datasets",
return_value=(datasets, 1),
),
patch(
"controllers.console.datasets.datasets.enterprise_rbac_service.RBACService.DatasetPermissions.batch_get",
return_value={"ds-1": ["dataset.acl.readonly", "dataset.acl.edit"]},
) as mock_batch_get,
patch(
"controllers.console.datasets.datasets.marshal",
return_value=marshaled,
),
patch.object(
ProviderManager,
"get_configurations",
return_value=MagicMock(get_models=lambda **_: []),
),
):
resp, status = method(api)
assert status == 200
assert dataset.permission_keys == ["dataset.acl.readonly", "dataset.acl.edit"]
mock_batch_get.assert_called_once_with("tenant-1", "acct-1", ["ds-1"])
def test_get_with_ids_filter(self, app):
api = DatasetListApi()
method = unwrap(api.get)

View File

@ -316,7 +316,8 @@ class TestMyPermissions:
"dataset": {"default_permission_keys": [], "overrides": []},
}
out = svc.RBACService.MyPermissions.get("tenant-1", "acct-1")
with patch(f"{MODULE}.dify_config.RBAC_ENABLED", True):
out = svc.RBACService.MyPermissions.get("tenant-1", "acct-1")
call = _call_args(mock_send)
assert call.method == "GET"
@ -325,6 +326,74 @@ class TestMyPermissions:
assert call.params is None
assert out.workspace.permission_keys == ["workspace.member.manage"]
@pytest.mark.parametrize(
("role", "workspace_keys", "app_keys", "dataset_keys"),
[
(
"owner",
["workspace.member.manage", "workspace.role.manage"],
["app.acl.view_layout", "app.acl.test_and_run", "app.acl.edit", "app.acl.access_config"],
["dataset.acl.readonly", "dataset.acl.edit", "dataset.acl.use"],
),
(
"admin",
["workspace.member.manage", "workspace.role.manage"],
["app.acl.view_layout", "app.acl.test_and_run", "app.acl.edit", "app.acl.access_config"],
["dataset.acl.readonly", "dataset.acl.edit", "dataset.acl.use"],
),
(
"editor",
[],
["app.acl.view_layout", "app.acl.test_and_run", "app.acl.edit", "app.acl.access_config"],
["dataset.acl.readonly", "dataset.acl.edit", "dataset.acl.use"],
),
(
"normal",
[],
["app.acl.view_layout", "app.acl.test_and_run"],
[],
),
(
"dataset_operator",
[],
[],
["dataset.acl.readonly", "dataset.acl.edit", "dataset.acl.use"],
),
],
)
def test_get_uses_legacy_role_permissions_when_rbac_disabled(
self,
mock_send: MagicMock,
role: str,
workspace_keys: list[str],
app_keys: list[str],
dataset_keys: list[str],
):
with (
patch(f"{MODULE}.dify_config.RBAC_ENABLED", False),
patch(f"{MODULE}.db.session.scalar", return_value=role),
):
out = svc.RBACService.MyPermissions.get("tenant-1", "acct-1")
mock_send.assert_not_called()
assert out.workspace.permission_keys == workspace_keys
assert out.app.default_permission_keys == app_keys
assert out.dataset.default_permission_keys == dataset_keys
assert out.app.overrides == []
assert out.dataset.overrides == []
def test_get_returns_empty_when_role_missing_and_rbac_disabled(self, mock_send: MagicMock):
with (
patch(f"{MODULE}.dify_config.RBAC_ENABLED", False),
patch(f"{MODULE}.db.session.scalar", return_value=None),
):
out = svc.RBACService.MyPermissions.get("tenant-1", "acct-1")
mock_send.assert_not_called()
assert out.workspace.permission_keys == []
assert out.app.default_permission_keys == []
assert out.dataset.default_permission_keys == []
def test_get_with_single_resource_filters(self, mock_send: MagicMock):
mock_send.return_value = {
"workspace": {"permission_keys": []},
@ -332,7 +401,8 @@ class TestMyPermissions:
"dataset": {"default_permission_keys": [], "overrides": []},
}
out = svc.RBACService.MyPermissions.get("tenant-1", "acct-1", app_id="app-1")
with patch(f"{MODULE}.dify_config.RBAC_ENABLED", True):
out = svc.RBACService.MyPermissions.get("tenant-1", "acct-1", app_id="app-1")
call = _call_args(mock_send)
assert call.method == "GET"
@ -399,6 +469,46 @@ class TestMemberRoles:
assert len(out[0].roles) == 2
class TestResourcePermissions:
def test_app_permissions_batch_get(self, mock_send: MagicMock):
mock_send.return_value = {
"data": [
{"resource_id": "app-1", "permission_keys": ["app.acl.view_layout", "app.acl.edit"]},
{"resource_id": "app-2", "permission_keys": []},
]
}
out = svc.RBACService.AppPermissions.batch_get("tenant-1", "acct-1", ["app-1", "app-2"])
call = _call_args(mock_send)
assert call.method == "POST"
assert call.endpoint == "/rbac/apps/permission-keys/batch"
assert call.json == {"app_ids": ["app-1", "app-2"]}
assert out == {
"app-1": ["app.acl.view_layout", "app.acl.edit"],
"app-2": [],
}
def test_dataset_permissions_batch_get(self, mock_send: MagicMock):
mock_send.return_value = {
"data": [
{"resource_id": "ds-1", "permission_keys": ["dataset.acl.readonly"]},
{"resource_id": "ds-2", "permission_keys": ["dataset.acl.edit"]},
]
}
out = svc.RBACService.DatasetPermissions.batch_get("tenant-1", "acct-1", ["ds-1", "ds-2"])
call = _call_args(mock_send)
assert call.method == "POST"
assert call.endpoint == "/rbac/datasets/permission-keys/batch"
assert call.json == {"dataset_ids": ["ds-1", "ds-2"]}
assert out == {
"ds-1": ["dataset.acl.readonly"],
"ds-2": ["dataset.acl.edit"],
}
class TestListOption:
def test_empty_produces_empty_params(self):
assert svc.ListOption().to_params() == {}