mirror of
https://github.com/langgenius/dify.git
synced 2026-05-12 15:58:19 +08:00
91 lines
2.8 KiB
Python
91 lines
2.8 KiB
Python
"""Surface gate.
|
|
|
|
`@accept_subjects(...)` is the route-level form. `SurfaceCheck` (pipeline
|
|
step) is the pipeline-level form. Both delegate to `check_surface` so the
|
|
audit emit + canonical-path message are single-sourced.
|
|
|
|
Subjects come from `libs.oauth_bearer.SubjectType` directly — no parallel
|
|
vocabulary. Caller hits the wrong surface → 403 ``wrong_surface`` + audit
|
|
``openapi.wrong_surface_denied``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable
|
|
from functools import wraps
|
|
from typing import TypeVar
|
|
|
|
from flask import g, request
|
|
from werkzeug.exceptions import Forbidden
|
|
|
|
from controllers.openapi._audit import emit_wrong_surface
|
|
from libs.oauth_bearer import SubjectType
|
|
|
|
_CANONICAL_PATH: dict[SubjectType, str] = {
|
|
SubjectType.ACCOUNT: "/openapi/v1/apps",
|
|
SubjectType.EXTERNAL_SSO: "/openapi/v1/permitted-external-apps",
|
|
}
|
|
|
|
F = TypeVar("F", bound=Callable[..., object])
|
|
|
|
|
|
def check_surface(accepted: frozenset[SubjectType]) -> None:
|
|
"""Enforce that ``g.auth_ctx.subject_type`` is in ``accepted``.
|
|
|
|
Raises ``Forbidden`` with ``wrong_surface`` + canonical-path hint on
|
|
miss; emits ``openapi.wrong_surface_denied`` audit. If ``g.auth_ctx``
|
|
is missing the bearer layer didn't run — that's a wiring bug, not a
|
|
user-driven failure, so surface it as a ``RuntimeError`` instead of
|
|
a silent 403.
|
|
"""
|
|
ctx = getattr(g, "auth_ctx", None)
|
|
if ctx is None:
|
|
raise RuntimeError(
|
|
"check_surface called without g.auth_ctx; "
|
|
"stack validate_bearer or BearerCheck above the surface gate"
|
|
)
|
|
|
|
subject = _coerce_subject_type(getattr(ctx, "subject_type", None))
|
|
if subject in accepted:
|
|
return
|
|
|
|
canonical = _CANONICAL_PATH.get(subject, "/openapi/v1/") if subject else "/openapi/v1/"
|
|
emit_wrong_surface(
|
|
subject_type=subject.value if subject else None,
|
|
attempted_path=request.path,
|
|
client_id=getattr(ctx, "client_id", None),
|
|
token_id=_stringify(getattr(ctx, "token_id", None)),
|
|
)
|
|
raise Forbidden(description=f"wrong_surface (canonical: {canonical})")
|
|
|
|
|
|
def accept_subjects(*accepted: SubjectType) -> Callable[[F], F]:
|
|
accepted_set: frozenset[SubjectType] = frozenset(accepted)
|
|
|
|
def deco(fn: F) -> F:
|
|
@wraps(fn)
|
|
def wrapper(*args: object, **kwargs: object) -> object:
|
|
check_surface(accepted_set)
|
|
return fn(*args, **kwargs)
|
|
|
|
return wrapper # type: ignore[return-value]
|
|
|
|
return deco
|
|
|
|
|
|
def _coerce_subject_type(raw: object) -> SubjectType | None:
|
|
if raw is None:
|
|
return None
|
|
if isinstance(raw, SubjectType):
|
|
return raw
|
|
try:
|
|
return SubjectType(raw)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _stringify(value: object) -> str | None:
|
|
if value is None:
|
|
return None
|
|
return str(value)
|