feat(api): add require_scope decorator (Phase A.4)

Route-level scope gate; pairs with validate_bearer. Bearer holding the
catch-all SCOPE_FULL ('full', carried by dfoa_) passes any check;
narrower bearers (dfoe_, future PATs) need the exact scope listed in
the route decorator.

No v1.0 route applies it yet — apps/datasets controllers will be the
first consumers when those plans land. Programming-error guard: if
@require_scope runs without @validate_bearer above it, raises
RuntimeError instead of silently allowing.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
This commit is contained in:
GareArc 2026-04-26 23:27:48 -07:00
parent 4214583ae5
commit 501c0b8746
No known key found for this signature in database
2 changed files with 112 additions and 0 deletions

View File

@ -404,6 +404,37 @@ def bearer_feature_required(fn: Callable) -> Callable:
return inner
# "full" is the catch-all scope carried by dfoa_ tokens; any scope check
# passes when the bearer holds it. dfoe_ ships with apps:run and a few
# narrower scopes; PATs (future) carry only what the user requested at
# mint time.
SCOPE_FULL = "full"
def require_scope(scope: str) -> Callable:
"""Route-level scope gate — must run AFTER validate_bearer so that
g.auth_ctx is set. Raises Forbidden('insufficient_scope: <scope>')
when the bearer lacks both the requested scope and the catch-all.
"""
def wrap(fn: Callable) -> Callable:
@wraps(fn)
def inner(*args, **kwargs):
ctx = getattr(g, "auth_ctx", None)
if ctx is None:
raise RuntimeError(
"require_scope used without validate_bearer; "
"stack @validate_bearer above @require_scope"
)
if SCOPE_FULL not in ctx.scopes and scope not in ctx.scopes:
raise Forbidden(f"insufficient_scope: {scope}")
return fn(*args, **kwargs)
return inner
return wrap
# ============================================================================
# Wiring — called once from the app factory
# ============================================================================

View File

@ -0,0 +1,81 @@
"""require_scope is a route-level gate run after validate_bearer.
Tests use a fake auth_ctx attached directly to flask.g no
authenticator wiring needed.
"""
from __future__ import annotations
import uuid
import pytest
from flask import Flask, g
from werkzeug.exceptions import Forbidden
from libs.oauth_bearer import (
SCOPE_FULL,
AuthContext,
SubjectType,
require_scope,
)
@pytest.fixture
def app() -> Flask:
app = Flask(__name__)
app.config["TESTING"] = True
return app
def _ctx(scopes: frozenset[str]) -> AuthContext:
return AuthContext(
subject_type=SubjectType.ACCOUNT,
subject_email="user@example.com",
subject_issuer="dify:account",
account_id=uuid.uuid4(),
scopes=scopes,
token_id=uuid.uuid4(),
source="oauth_account",
expires_at=None,
)
def test_require_scope_allows_when_scope_present(app: Flask):
@require_scope("apps:read")
def view():
return "ok"
with app.test_request_context():
g.auth_ctx = _ctx(frozenset({"apps:read"}))
assert view() == "ok"
def test_require_scope_rejects_when_scope_missing(app: Flask):
@require_scope("apps:write")
def view():
return "ok"
with app.test_request_context():
g.auth_ctx = _ctx(frozenset({"apps:read"}))
with pytest.raises(Forbidden) as exc:
view()
assert "insufficient_scope: apps:write" in str(exc.value.description)
def test_require_scope_full_passes_any_check(app: Flask):
@require_scope("apps:write")
def view():
return "ok"
with app.test_request_context():
g.auth_ctx = _ctx(frozenset({SCOPE_FULL}))
assert view() == "ok"
def test_require_scope_without_validate_bearer_raises_runtime_error(app: Flask):
@require_scope("apps:read")
def view():
return "ok"
with app.test_request_context():
# No g.auth_ctx — validate_bearer was forgotten
with pytest.raises(RuntimeError, match="stack @validate_bearer above @require_scope"):
view()