diff --git a/api/libs/oauth_bearer.py b/api/libs/oauth_bearer.py index 01f320d90b..eaa7f2d5f8 100644 --- a/api/libs/oauth_bearer.py +++ b/api/libs/oauth_bearer.py @@ -404,6 +404,37 @@ def bearer_feature_required(fn: Callable) -> Callable: return inner +# "full" is the catch-all scope carried by dfoa_ tokens; any scope check +# passes when the bearer holds it. dfoe_ ships with apps:run and a few +# narrower scopes; PATs (future) carry only what the user requested at +# mint time. +SCOPE_FULL = "full" + + +def require_scope(scope: str) -> Callable: + """Route-level scope gate — must run AFTER validate_bearer so that + g.auth_ctx is set. Raises Forbidden('insufficient_scope: ') + when the bearer lacks both the requested scope and the catch-all. + """ + + def wrap(fn: Callable) -> Callable: + @wraps(fn) + def inner(*args, **kwargs): + ctx = getattr(g, "auth_ctx", None) + if ctx is None: + raise RuntimeError( + "require_scope used without validate_bearer; " + "stack @validate_bearer above @require_scope" + ) + if SCOPE_FULL not in ctx.scopes and scope not in ctx.scopes: + raise Forbidden(f"insufficient_scope: {scope}") + return fn(*args, **kwargs) + + return inner + + return wrap + + # ============================================================================ # Wiring — called once from the app factory # ============================================================================ diff --git a/api/tests/unit_tests/libs/test_oauth_bearer_require_scope.py b/api/tests/unit_tests/libs/test_oauth_bearer_require_scope.py new file mode 100644 index 0000000000..4545b38690 --- /dev/null +++ b/api/tests/unit_tests/libs/test_oauth_bearer_require_scope.py @@ -0,0 +1,81 @@ +"""require_scope is a route-level gate run after validate_bearer. +Tests use a fake auth_ctx attached directly to flask.g — no +authenticator wiring needed. +""" +from __future__ import annotations + +import uuid + +import pytest +from flask import Flask, g +from werkzeug.exceptions import Forbidden + +from libs.oauth_bearer import ( + SCOPE_FULL, + AuthContext, + SubjectType, + require_scope, +) + + +@pytest.fixture +def app() -> Flask: + app = Flask(__name__) + app.config["TESTING"] = True + return app + + +def _ctx(scopes: frozenset[str]) -> AuthContext: + return AuthContext( + subject_type=SubjectType.ACCOUNT, + subject_email="user@example.com", + subject_issuer="dify:account", + account_id=uuid.uuid4(), + scopes=scopes, + token_id=uuid.uuid4(), + source="oauth_account", + expires_at=None, + ) + + +def test_require_scope_allows_when_scope_present(app: Flask): + @require_scope("apps:read") + def view(): + return "ok" + + with app.test_request_context(): + g.auth_ctx = _ctx(frozenset({"apps:read"})) + assert view() == "ok" + + +def test_require_scope_rejects_when_scope_missing(app: Flask): + @require_scope("apps:write") + def view(): + return "ok" + + with app.test_request_context(): + g.auth_ctx = _ctx(frozenset({"apps:read"})) + with pytest.raises(Forbidden) as exc: + view() + assert "insufficient_scope: apps:write" in str(exc.value.description) + + +def test_require_scope_full_passes_any_check(app: Flask): + @require_scope("apps:write") + def view(): + return "ok" + + with app.test_request_context(): + g.auth_ctx = _ctx(frozenset({SCOPE_FULL})) + assert view() == "ok" + + +def test_require_scope_without_validate_bearer_raises_runtime_error(app: Flask): + @require_scope("apps:read") + def view(): + return "ok" + + with app.test_request_context(): + # No g.auth_ctx — validate_bearer was forgotten + with pytest.raises(RuntimeError, match="stack @validate_bearer above @require_scope"): + view()