mirror of
https://github.com/langgenius/dify.git
synced 2026-06-25 05:22:31 +08:00
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
226 lines
7.8 KiB
Python
226 lines
7.8 KiB
Python
import uuid
|
|
|
|
from controllers.openapi.auth.composition import account_pipeline, auth_router, external_sso_pipeline
|
|
from controllers.openapi.auth.data import RBACRequirement, RequestContext
|
|
from controllers.openapi.auth.flow import When
|
|
from controllers.openapi.auth.pipeline import AuthPipeline, PipelineRoute, PipelineRouter
|
|
from controllers.openapi.auth.verify import (
|
|
check_acl,
|
|
check_private_app_permission,
|
|
check_rbac_permission,
|
|
check_workspace_member,
|
|
check_workspace_mismatch,
|
|
check_workspace_role,
|
|
)
|
|
from core.rbac import RBACPermission, RBACResourceScope
|
|
from libs.oauth_bearer import Scope, TokenType
|
|
from models.account import TenantAccountRole
|
|
from services.enterprise.enterprise_service import WebAppAccessMode
|
|
|
|
|
|
def test_account_pipeline_is_auth_pipeline():
|
|
assert isinstance(account_pipeline, AuthPipeline)
|
|
|
|
|
|
def test_external_sso_pipeline_is_auth_pipeline():
|
|
assert isinstance(external_sso_pipeline, AuthPipeline)
|
|
|
|
|
|
def test_auth_router_is_pipeline_router():
|
|
assert isinstance(auth_router, PipelineRouter)
|
|
|
|
|
|
def test_account_pipeline_prepare_has_six_entries():
|
|
assert len(account_pipeline._prepare) == 6
|
|
|
|
|
|
def test_account_auth_list_has_eight_entries():
|
|
assert len(account_pipeline._auth) == 8
|
|
|
|
|
|
def test_external_sso_pipeline_prepare_has_four_entries():
|
|
assert len(external_sso_pipeline._prepare) == 4
|
|
|
|
|
|
def test_external_sso_auth_list_has_four_entries():
|
|
assert len(external_sso_pipeline._auth) == 4
|
|
|
|
|
|
def test_account_pipeline_has_unconditional_load_account():
|
|
non_when = [s for s in account_pipeline._prepare if not isinstance(s, When)]
|
|
assert len(non_when) == 1
|
|
|
|
|
|
def test_external_sso_pipeline_all_prepare_entries_are_when():
|
|
assert all(isinstance(s, When) for s in external_sso_pipeline._prepare)
|
|
|
|
|
|
def test_account_pipeline_has_one_unconditional_auth_step():
|
|
non_when = [s for s in account_pipeline._auth if not isinstance(s, When)]
|
|
assert len(non_when) == 1
|
|
|
|
|
|
def test_external_sso_pipeline_has_one_unconditional_auth_step():
|
|
non_when = [s for s in external_sso_pipeline._auth if not isinstance(s, When)]
|
|
assert len(non_when) == 1
|
|
|
|
|
|
def test_router_routes_contain_both_token_types():
|
|
assert TokenType.OAUTH_ACCOUNT in auth_router._routes
|
|
assert TokenType.OAUTH_EXTERNAL_SSO in auth_router._routes
|
|
|
|
|
|
def test_external_sso_route_has_ee_required_edition():
|
|
route = auth_router._routes[TokenType.OAUTH_EXTERNAL_SSO]
|
|
assert isinstance(route, PipelineRoute)
|
|
from controllers.openapi.auth.data import Edition
|
|
|
|
assert route.required_edition == frozenset({Edition.EE})
|
|
|
|
|
|
def test_account_route_has_no_required_edition():
|
|
route = auth_router._routes[TokenType.OAUTH_ACCOUNT]
|
|
assert isinstance(route, PipelineRoute)
|
|
assert route.required_edition is None
|
|
|
|
|
|
def _selected_auth_steps(*, app_id: bool, workspace_membership: bool, allowed_roles):
|
|
ctx = RequestContext(
|
|
token_type=TokenType.OAUTH_ACCOUNT,
|
|
scope=None,
|
|
path_params={"app_id": str(uuid.uuid4())} if app_id else {},
|
|
workspace_membership=workspace_membership,
|
|
allowed_roles=allowed_roles,
|
|
)
|
|
selected = []
|
|
for step in account_pipeline._auth:
|
|
if isinstance(step, When):
|
|
if step.applies(ctx, None):
|
|
selected.append(step._step)
|
|
else:
|
|
selected.append(step)
|
|
return selected
|
|
|
|
|
|
_ALL_ROLES = frozenset({TenantAccountRole.OWNER, TenantAccountRole.ADMIN, TenantAccountRole.NORMAL})
|
|
|
|
|
|
def test_workspace_path_selects_membership_check():
|
|
steps = _selected_auth_steps(app_id=False, workspace_membership=True, allowed_roles=None)
|
|
assert check_workspace_member in steps
|
|
assert check_workspace_role not in steps
|
|
|
|
|
|
def test_app_path_selects_membership_check():
|
|
steps = _selected_auth_steps(app_id=True, workspace_membership=False, allowed_roles=None)
|
|
assert check_workspace_member in steps
|
|
assert check_workspace_role not in steps
|
|
|
|
|
|
def test_roles_set_selects_both_membership_and_role_check():
|
|
steps = _selected_auth_steps(app_id=False, workspace_membership=True, allowed_roles=_ALL_ROLES)
|
|
assert check_workspace_member in steps
|
|
assert check_workspace_role in steps
|
|
|
|
|
|
def test_plain_path_selects_no_membership_or_role_step():
|
|
steps = _selected_auth_steps(app_id=False, workspace_membership=False, allowed_roles=None)
|
|
assert check_workspace_member not in steps
|
|
assert check_workspace_role not in steps
|
|
|
|
|
|
def test_app_path_selects_workspace_mismatch_check():
|
|
steps = _selected_auth_steps(app_id=True, workspace_membership=False, allowed_roles=None)
|
|
assert check_workspace_mismatch in steps
|
|
|
|
|
|
def test_workspace_path_skips_workspace_mismatch_check():
|
|
steps = _selected_auth_steps(app_id=False, workspace_membership=True, allowed_roles=None)
|
|
assert check_workspace_mismatch not in steps
|
|
|
|
|
|
def _selected_webapp_steps(*, scope, app_access_mode):
|
|
"""Select auth steps for an EE, webapp-auth-enabled, app-scoped request.
|
|
|
|
Patches the config-backed conditions (edition + webapp_auth) so the gating
|
|
reduces to PATH_HAS_APP_ID, LOADED_APP_IS_PRIVATE, and the request scope.
|
|
"""
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from controllers.openapi.auth.data import AuthData, Edition
|
|
|
|
ctx = RequestContext(
|
|
token_type=TokenType.OAUTH_ACCOUNT,
|
|
scope=scope,
|
|
path_params={"app_id": str(uuid.uuid4())},
|
|
)
|
|
data = AuthData(
|
|
token_type=TokenType.OAUTH_ACCOUNT,
|
|
token_hash="x",
|
|
scopes=frozenset({scope}) if scope is not None else frozenset(),
|
|
app_access_mode=app_access_mode,
|
|
)
|
|
features = MagicMock()
|
|
features.webapp_auth.enabled = True
|
|
selected = []
|
|
with (
|
|
patch("controllers.openapi.auth.conditions.current_edition", return_value=Edition.EE),
|
|
patch("controllers.openapi.auth.conditions.FeatureService.get_system_features", return_value=features),
|
|
):
|
|
for step in account_pipeline._auth:
|
|
if isinstance(step, When):
|
|
if step.applies(ctx, data):
|
|
selected.append(step._step)
|
|
else:
|
|
selected.append(step)
|
|
return selected
|
|
|
|
|
|
def test_apps_run_scope_selects_webapp_checks():
|
|
steps = _selected_webapp_steps(scope=Scope.APPS_RUN, app_access_mode=WebAppAccessMode.PRIVATE)
|
|
assert check_acl in steps
|
|
assert check_private_app_permission in steps
|
|
|
|
|
|
def test_management_scope_skips_webapp_checks_on_private_app():
|
|
# Export DSL et al. carry an app_id but use a management scope; the webapp
|
|
# end-user ACL / private-app gate must not block workspace members.
|
|
steps = _selected_webapp_steps(scope=Scope.APPS_READ, app_access_mode=WebAppAccessMode.PRIVATE)
|
|
assert check_acl not in steps
|
|
assert check_private_app_permission not in steps
|
|
|
|
|
|
def _selected_auth_steps_with_rbac(rbac):
|
|
ctx = RequestContext(
|
|
token_type=TokenType.OAUTH_ACCOUNT,
|
|
scope=Scope.APPS_READ,
|
|
path_params={"app_id": str(uuid.uuid4())},
|
|
rbac=rbac,
|
|
)
|
|
selected = []
|
|
for step in account_pipeline._auth:
|
|
if isinstance(step, When):
|
|
if step.applies(ctx, None):
|
|
selected.append(step._step)
|
|
else:
|
|
selected.append(step)
|
|
return selected
|
|
|
|
|
|
def test_account_pipeline_selects_rbac_step_when_required():
|
|
rbac = RBACRequirement(resource_type=RBACResourceScope.APP, scene=RBACPermission.APP_VIEW_LAYOUT)
|
|
assert check_rbac_permission in _selected_auth_steps_with_rbac(rbac)
|
|
|
|
|
|
def test_account_pipeline_skips_rbac_step_without_requirement():
|
|
assert check_rbac_permission not in _selected_auth_steps_with_rbac(None)
|
|
|
|
|
|
def test_external_sso_pipeline_never_enforces_rbac():
|
|
# RBAC is a console (account) concern; external SSO callers are scope-gated.
|
|
rbac_steps = [
|
|
s._step for s in external_sso_pipeline._auth if isinstance(s, When) and s._step is check_rbac_permission
|
|
]
|
|
assert rbac_steps == []
|
|
assert check_rbac_permission not in external_sso_pipeline._auth
|