mirror of
https://github.com/langgenius/dify.git
synced 2026-06-26 14:51:13 +08:00
Previously the console api-key list returned every key's full plaintext token, so anyone with console access could retrieve the secret of an already-created key (via the copy button or the raw API response). This is contrary to the reveal-once norm. - List endpoints (app keys, workspace dataset keys, per-dataset keys) now return a masked token (prefix + last 4); the full secret is only ever returned by the create endpoint, at creation time. - Frontend secret-key modal displays the masked token as-is and drops the copy affordance for existing keys (copying a masked value is pointless). Applies to both app and dataset keys since they share the modal and the ApiKeyItem response model.
325 lines
13 KiB
Python
325 lines
13 KiB
Python
from collections.abc import Iterable
|
|
from datetime import datetime
|
|
from typing import override
|
|
from uuid import UUID
|
|
|
|
import flask_restx
|
|
from flask_restx import Resource
|
|
from flask_restx._http import HTTPStatus
|
|
from pydantic import field_validator
|
|
from sqlalchemy import delete, func, or_, select
|
|
from sqlalchemy.orm import sessionmaker
|
|
from werkzeug.exceptions import Forbidden
|
|
|
|
from configs import dify_config
|
|
from controllers.common.schema import register_response_schema_models
|
|
from extensions.ext_database import db
|
|
from fields.base import ResponseModel
|
|
from libs.helper import dump_response, to_timestamp
|
|
from libs.login import login_required
|
|
from models import Account
|
|
from models.dataset import Dataset
|
|
from models.enums import ApiTokenType
|
|
from models.model import ApiToken, App
|
|
from services.api_token_service import ApiTokenCache
|
|
|
|
from . import console_ns
|
|
from .wraps import (
|
|
RBACPermission,
|
|
RBACResourceScope,
|
|
account_initialization_required,
|
|
edit_permission_required,
|
|
rbac_permission_required,
|
|
setup_required,
|
|
with_current_tenant_id,
|
|
with_current_user,
|
|
)
|
|
|
|
|
|
class ApiKeyItem(ResponseModel):
|
|
id: str
|
|
type: str
|
|
token: str
|
|
# Set only for dataset keys bound to a single knowledge base; None means the
|
|
# key is workspace-scoped (app keys are always None).
|
|
dataset_id: str | None = None
|
|
last_used_at: int | None = None
|
|
created_at: int | None = None
|
|
|
|
@field_validator("last_used_at", "created_at", mode="before")
|
|
@classmethod
|
|
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
|
|
return to_timestamp(value)
|
|
|
|
|
|
class ApiKeyList(ResponseModel):
|
|
data: list[ApiKeyItem]
|
|
|
|
|
|
register_response_schema_models(console_ns, ApiKeyItem, ApiKeyList)
|
|
|
|
|
|
def mask_api_token(token: str) -> str:
|
|
"""Mask a secret token for list responses.
|
|
|
|
Reveal-once: the full secret is only returned by the create endpoint. List
|
|
endpoints expose just enough (prefix + last 4) to identify a key, never the
|
|
full value, so an existing key's secret cannot be retrieved after creation.
|
|
"""
|
|
if len(token) <= 8:
|
|
return "***"
|
|
return f"{token[:5]}...{token[-4:]}"
|
|
|
|
|
|
def build_masked_api_key_list(api_tokens: Iterable[ApiToken]) -> ApiKeyList:
|
|
"""Build an ApiKeyList from ORM tokens with their secrets masked."""
|
|
items: list[ApiKeyItem] = []
|
|
for api_token in api_tokens:
|
|
item = ApiKeyItem.model_validate(api_token, from_attributes=True)
|
|
item.token = mask_api_token(item.token)
|
|
items.append(item)
|
|
return ApiKeyList(data=items)
|
|
|
|
|
|
def _get_resource(resource_id, tenant_id, resource_model):
|
|
with sessionmaker(db.engine).begin() as session:
|
|
resource = session.execute(
|
|
select(resource_model).filter_by(id=resource_id, tenant_id=tenant_id)
|
|
).scalar_one_or_none()
|
|
|
|
if resource is None:
|
|
flask_restx.abort(HTTPStatus.NOT_FOUND, message=f"{resource_model.__name__} not found.")
|
|
|
|
return resource
|
|
|
|
|
|
class BaseApiKeyListResource(Resource):
|
|
method_decorators = [account_initialization_required, login_required, setup_required]
|
|
|
|
resource_type: ApiTokenType | None = None
|
|
resource_model: type | None = None
|
|
resource_id_field: str | None = None
|
|
token_prefix: str | None = None
|
|
max_keys = 10
|
|
|
|
def get(self, resource_id: str, current_tenant_id: str) -> dict[str, object]:
|
|
return dump_response(ApiKeyList, self._get_api_key_list(resource_id, current_tenant_id))
|
|
|
|
def _get_api_key_list(self, resource_id: str, current_tenant_id: str) -> ApiKeyList:
|
|
assert self.resource_id_field is not None, "resource_id_field must be set"
|
|
|
|
_get_resource(resource_id, current_tenant_id, self.resource_model)
|
|
keys = db.session.scalars(
|
|
select(ApiToken).where(
|
|
ApiToken.type == self.resource_type, getattr(ApiToken, self.resource_id_field) == resource_id
|
|
)
|
|
).all()
|
|
return build_masked_api_key_list(keys)
|
|
|
|
@edit_permission_required
|
|
def post(self, resource_id: str, current_tenant_id: str) -> tuple[dict[str, object], int]:
|
|
return dump_response(ApiKeyItem, self._create_api_key(resource_id, current_tenant_id)), 201
|
|
|
|
def _create_api_key(self, resource_id: str, current_tenant_id: str) -> ApiToken:
|
|
assert self.resource_id_field is not None, "resource_id_field must be set"
|
|
_get_resource(resource_id, current_tenant_id, self.resource_model)
|
|
current_key_count: int = (
|
|
db.session.scalar(
|
|
select(func.count(ApiToken.id)).where(
|
|
ApiToken.type == self.resource_type, getattr(ApiToken, self.resource_id_field) == resource_id
|
|
)
|
|
)
|
|
or 0
|
|
)
|
|
|
|
if current_key_count >= self.max_keys:
|
|
flask_restx.abort(
|
|
HTTPStatus.BAD_REQUEST,
|
|
message=f"Cannot create more than {self.max_keys} API keys for this resource type.",
|
|
custom="max_keys_exceeded",
|
|
)
|
|
|
|
key = ApiToken.generate_api_key(self.token_prefix or "", 24)
|
|
assert self.resource_type is not None, "resource_type must be set"
|
|
api_token = ApiToken()
|
|
setattr(api_token, self.resource_id_field, resource_id)
|
|
api_token.tenant_id = current_tenant_id
|
|
api_token.token = key
|
|
api_token.type = self.resource_type
|
|
db.session.add(api_token)
|
|
db.session.commit()
|
|
return api_token
|
|
|
|
|
|
class BaseApiKeyResource(Resource):
|
|
method_decorators = [account_initialization_required, login_required, setup_required]
|
|
|
|
resource_type: ApiTokenType | None = None
|
|
resource_model: type | None = None
|
|
resource_id_field: str | None = None
|
|
|
|
def delete(
|
|
self, resource_id: str, api_key_id: str, current_tenant_id: str, current_user: Account
|
|
) -> tuple[str, int]:
|
|
self._delete_api_key(resource_id, api_key_id, current_tenant_id, current_user)
|
|
return "", 204
|
|
|
|
def _delete_api_key(
|
|
self,
|
|
resource_id: str,
|
|
api_key_id: str,
|
|
current_tenant_id: str,
|
|
current_user: Account,
|
|
) -> None:
|
|
assert self.resource_id_field is not None, "resource_id_field must be set"
|
|
_get_resource(resource_id, current_tenant_id, self.resource_model)
|
|
|
|
if not dify_config.RBAC_ENABLED and not current_user.is_admin_or_owner:
|
|
raise Forbidden()
|
|
|
|
key = db.session.scalar(
|
|
select(ApiToken)
|
|
.where(
|
|
getattr(ApiToken, self.resource_id_field) == resource_id,
|
|
ApiToken.type == self.resource_type,
|
|
ApiToken.id == api_key_id,
|
|
)
|
|
.limit(1)
|
|
)
|
|
|
|
if key is None:
|
|
flask_restx.abort(HTTPStatus.NOT_FOUND, message="API key not found")
|
|
|
|
# Invalidate cache before deleting from database
|
|
# Type assertion: key is guaranteed to be non-None here because abort() raises
|
|
assert key is not None # nosec - for type checker only
|
|
ApiTokenCache.delete(key.token, key.type)
|
|
|
|
db.session.execute(delete(ApiToken).where(ApiToken.id == api_key_id))
|
|
db.session.commit()
|
|
|
|
|
|
@console_ns.route("/apps/<uuid:resource_id>/api-keys")
|
|
class AppApiKeyListResource(BaseApiKeyListResource):
|
|
@console_ns.doc("get_app_api_keys")
|
|
@console_ns.doc(description="Get all API keys for an app")
|
|
@console_ns.doc(params={"resource_id": "App ID"})
|
|
@console_ns.response(200, "API keys retrieved successfully", console_ns.models[ApiKeyList.__name__])
|
|
@with_current_tenant_id
|
|
def get(self, current_tenant_id: str, resource_id: UUID) -> dict[str, object]:
|
|
"""Get all API keys for an app"""
|
|
return dump_response(ApiKeyList, self._get_api_key_list(str(resource_id), current_tenant_id))
|
|
|
|
@console_ns.doc("create_app_api_key")
|
|
@console_ns.doc(description="Create a new API key for an app")
|
|
@console_ns.doc(params={"resource_id": "App ID"})
|
|
@console_ns.response(201, "API key created successfully", console_ns.models[ApiKeyItem.__name__])
|
|
@console_ns.response(400, "Maximum keys exceeded")
|
|
@with_current_tenant_id
|
|
@edit_permission_required
|
|
@rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_RELEASE_AND_VERSION)
|
|
def post(self, current_tenant_id: str, resource_id: UUID) -> tuple[dict[str, object], int]:
|
|
"""Create a new API key for an app"""
|
|
return dump_response(ApiKeyItem, self._create_api_key(str(resource_id), current_tenant_id)), 201
|
|
|
|
resource_type = ApiTokenType.APP
|
|
resource_model = App
|
|
resource_id_field = "app_id"
|
|
token_prefix = "app-"
|
|
|
|
|
|
@console_ns.route("/apps/<uuid:resource_id>/api-keys/<uuid:api_key_id>")
|
|
class AppApiKeyResource(BaseApiKeyResource):
|
|
@console_ns.doc("delete_app_api_key")
|
|
@console_ns.doc(description="Delete an API key for an app")
|
|
@console_ns.doc(params={"resource_id": "App ID", "api_key_id": "API key ID"})
|
|
@console_ns.response(204, "API key deleted successfully")
|
|
@with_current_user
|
|
@with_current_tenant_id
|
|
@rbac_permission_required(RBACResourceScope.APP, RBACPermission.APP_RELEASE_AND_VERSION)
|
|
def delete(
|
|
self, current_tenant_id: str, current_user: Account, resource_id: UUID, api_key_id: UUID
|
|
) -> tuple[str, int]:
|
|
"""Delete an API key for an app"""
|
|
self._delete_api_key(str(resource_id), str(api_key_id), current_tenant_id, current_user)
|
|
return "", 204
|
|
|
|
resource_type = ApiTokenType.APP
|
|
resource_model = App
|
|
resource_id_field = "app_id"
|
|
|
|
|
|
@console_ns.route("/datasets/<uuid:resource_id>/api-keys")
|
|
class DatasetApiKeyListResource(BaseApiKeyListResource):
|
|
"""Per-dataset API keys: keys created here are bound to a single dataset.
|
|
|
|
Binding is stored in ``ApiToken.dataset_id`` and enforced by
|
|
``validate_dataset_token`` (controllers/service_api/wraps.py). Workspace-scoped
|
|
keys (NULL ``dataset_id``) are managed by ``DatasetApiKeyApi`` in
|
|
controllers/console/datasets/datasets.py.
|
|
"""
|
|
|
|
@console_ns.doc("get_dataset_api_keys")
|
|
@console_ns.doc(description="Get all API keys that can access a dataset")
|
|
@console_ns.doc(params={"resource_id": "Dataset ID"})
|
|
@console_ns.response(200, "API keys retrieved successfully", console_ns.models[ApiKeyList.__name__])
|
|
@with_current_tenant_id
|
|
def get(self, current_tenant_id: str, resource_id: UUID) -> dict[str, object]:
|
|
"""Get all API keys that can access a dataset"""
|
|
return dump_response(ApiKeyList, self._get_api_key_list(str(resource_id), current_tenant_id))
|
|
|
|
@override
|
|
def _get_api_key_list(self, resource_id: str, current_tenant_id: str) -> ApiKeyList:
|
|
# Unlike the app list, this returns every key that can reach the dataset:
|
|
# keys bound to it plus the tenant's workspace-scoped (NULL dataset_id) keys,
|
|
# so the dataset page shows the full access picture rather than a subset.
|
|
_get_resource(resource_id, current_tenant_id, self.resource_model)
|
|
keys = db.session.scalars(
|
|
select(ApiToken).where(
|
|
ApiToken.type == self.resource_type,
|
|
ApiToken.tenant_id == current_tenant_id,
|
|
or_(ApiToken.dataset_id == resource_id, ApiToken.dataset_id.is_(None)),
|
|
)
|
|
).all()
|
|
return build_masked_api_key_list(keys)
|
|
|
|
@console_ns.doc("create_dataset_api_key")
|
|
@console_ns.doc(description="Create a new API key bound to a single dataset")
|
|
@console_ns.doc(params={"resource_id": "Dataset ID"})
|
|
@console_ns.response(201, "API key created successfully", console_ns.models[ApiKeyItem.__name__])
|
|
@console_ns.response(400, "Maximum keys exceeded")
|
|
@with_current_tenant_id
|
|
@edit_permission_required
|
|
@rbac_permission_required(RBACResourceScope.DATASET, RBACPermission.DATASET_API_KEY_MANAGE)
|
|
def post(self, current_tenant_id: str, resource_id: UUID) -> tuple[dict[str, object], int]:
|
|
"""Create a new API key bound to a single dataset"""
|
|
return dump_response(ApiKeyItem, self._create_api_key(str(resource_id), current_tenant_id)), 201
|
|
|
|
resource_type = ApiTokenType.DATASET
|
|
resource_model = Dataset
|
|
resource_id_field = "dataset_id"
|
|
# Same prefix as workspace-scoped dataset keys (datasets.py); scope is carried
|
|
# by the dataset_id column, not the token text.
|
|
token_prefix = "dataset-"
|
|
|
|
|
|
@console_ns.route("/datasets/<uuid:resource_id>/api-keys/<uuid:api_key_id>")
|
|
class DatasetApiKeyResource(BaseApiKeyResource):
|
|
@console_ns.doc("delete_dataset_api_key")
|
|
@console_ns.doc(description="Delete an API key for a dataset")
|
|
@console_ns.doc(params={"resource_id": "Dataset ID", "api_key_id": "API key ID"})
|
|
@console_ns.response(204, "API key deleted successfully")
|
|
@with_current_user
|
|
@with_current_tenant_id
|
|
@rbac_permission_required(RBACResourceScope.DATASET, RBACPermission.DATASET_API_KEY_MANAGE)
|
|
def delete(
|
|
self, current_tenant_id: str, current_user: Account, resource_id: UUID, api_key_id: UUID
|
|
) -> tuple[str, int]:
|
|
"""Delete an API key for a dataset"""
|
|
self._delete_api_key(str(resource_id), str(api_key_id), current_tenant_id, current_user)
|
|
return "", 204
|
|
|
|
resource_type = ApiTokenType.DATASET
|
|
resource_model = Dataset
|
|
resource_id_field = "dataset_id"
|