dify/api/controllers/inner_api/wraps.py
盐粒 Yanli 0ea0647dd0
feat(agent): wire knowledge base retrieval into runtime (#37577)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-06-17 09:27:38 +00:00

98 lines
3.0 KiB
Python

from base64 import b64encode
from collections.abc import Callable
from functools import wraps
from hashlib import sha1
from hmac import new as hmac_new
from flask import abort, request
from configs import dify_config
from core.db.session_factory import session_factory
from libs.exception import BaseHTTPException
from models.model import EndUser
class InnerApiUnauthorizedError(BaseHTTPException):
error_code = "inner_api_unauthorized"
description = "Unauthorized."
code = 401
def inner_api_only[**P, R](view: Callable[P, R]) -> Callable[P, R]:
"""Restrict access to callers authenticated with the shared inner API key."""
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
if not dify_config.INNER_API:
abort(404)
inner_api_key = request.headers.get("X-Inner-Api-Key")
if not inner_api_key or inner_api_key != dify_config.INNER_API_KEY:
raise InnerApiUnauthorizedError()
return view(*args, **kwargs)
return decorated
def billing_inner_api_only[**P, R](view: Callable[P, R]) -> Callable[P, R]:
return inner_api_only(view)
def enterprise_inner_api_only[**P, R](view: Callable[P, R]) -> Callable[P, R]:
return inner_api_only(view)
def enterprise_inner_api_user_auth[**P, R](view: Callable[P, R]) -> Callable[P, R]:
"""Inject an EndUser for valid inner API HMAC auth, otherwise pass the request through unchanged."""
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
if not dify_config.INNER_API:
return view(*args, **kwargs)
# get header 'X-Inner-Api-Key'
authorization = request.headers.get("Authorization")
if not authorization:
return view(*args, **kwargs)
parts = authorization.split(":")
if len(parts) != 2:
return view(*args, **kwargs)
user_id, token = parts
if " " in user_id:
user_id = user_id.split(" ")[1]
inner_api_key = request.headers.get("X-Inner-Api-Key", "")
data_to_sign = f"DIFY {user_id}"
signature = hmac_new(inner_api_key.encode("utf-8"), data_to_sign.encode("utf-8"), sha1)
signature_base64 = b64encode(signature.digest()).decode("utf-8")
if signature_base64 != token:
return view(*args, **kwargs)
with session_factory.create_session() as session:
kwargs["user"] = session.get(EndUser, user_id)
return view(*args, **kwargs)
return decorated
def plugin_inner_api_only[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
if not dify_config.PLUGIN_DAEMON_KEY:
abort(404)
# get header 'X-Inner-Api-Key'
inner_api_key = request.headers.get("X-Inner-Api-Key")
if not inner_api_key or inner_api_key != dify_config.INNER_API_KEY_FOR_PLUGIN:
abort(404)
return view(*args, **kwargs)
return decorated