feat: add WorkspacePermission model and associated method for workspace permissions retrieval

This commit is contained in:
GareArc 2026-01-03 18:45:50 -08:00
parent 43932412d0
commit 2bbc01bd01
No known key found for this signature in database
8 changed files with 309 additions and 6 deletions

View File

@ -67,6 +67,11 @@ class ActivateCheckApi(Resource):
invitation = RegisterService.get_invitation_with_case_fallback(workspaceId, args.email, token)
if invitation:
# Check workspace permission for member invitations (secondary check for old tokens)
from libs.workspace_permission import check_workspace_member_invite_permission
check_workspace_member_invite_permission(workspaceId)
data = invitation.get("data", {})
tenant = invitation.get("tenant", None)
workspace_name = tenant.name if tenant else None

View File

@ -107,6 +107,12 @@ class MemberInviteEmailApi(Resource):
inviter = current_user
if not inviter.current_tenant:
raise ValueError("No current tenant")
# Check workspace permission for member invitations
from libs.workspace_permission import check_workspace_member_invite_permission
check_workspace_member_invite_permission(inviter.current_tenant.id)
invitation_results = []
console_web_url = dify_config.CONSOLE_WEB_URL

View File

@ -20,6 +20,7 @@ from controllers.console.error import AccountNotLinkTenantError
from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
only_edition_enterprise,
setup_required,
)
from enums.cloud_plan import CloudPlan
@ -28,6 +29,7 @@ from libs.helper import TimestampField
from libs.login import current_account_with_tenant, login_required
from models.account import Tenant, TenantStatus
from services.account_service import TenantService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.file_service import FileService
from services.workspace_service import WorkspaceService
@ -285,3 +287,31 @@ class WorkspaceInfoApi(Resource):
db.session.commit()
return {"result": "success", "tenant": marshal(WorkspaceService.get_tenant_info(tenant), tenant_fields)}
@console_ns.route("/workspaces/current/permission")
class WorkspacePermissionApi(Resource):
"""Get workspace permissions for the current workspace."""
@setup_required
@login_required
@account_initialization_required
@only_edition_enterprise
def get(self):
"""
Get workspace permission settings.
Returns permission flags that control workspace features like member invitations and owner transfer.
"""
_, current_tenant_id = current_account_with_tenant()
if not current_tenant_id:
raise ValueError("No current tenant")
# Get workspace permissions from enterprise service
permission = EnterpriseService.WorkspacePermission.get_permission(current_tenant_id)
return {
"workspace_id": permission.workspace_id,
"allow_member_invite": permission.allow_member_invite,
"allow_owner_transfer": permission.allow_owner_transfer,
}, 200

View File

@ -286,13 +286,12 @@ def enable_change_email(view: Callable[P, R]):
def is_allow_transfer_owner(view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
_, current_tenant_id = current_account_with_tenant()
features = FeatureService.get_features(current_tenant_id)
if features.is_allow_transfer_workspace:
return view(*args, **kwargs)
from libs.workspace_permission import check_workspace_owner_transfer_permission
# otherwise, return 403
abort(403)
_, current_tenant_id = current_account_with_tenant()
# Check both billing/plan level and workspace policy level permissions
check_workspace_owner_transfer_permission(current_tenant_id)
return view(*args, **kwargs)
return decorated

View File

@ -0,0 +1,73 @@
"""
Workspace permission helper functions.
These helpers check both billing/plan level and workspace-specific policy level permissions.
Checks are performed at two levels:
1. Billing/plan level - via FeatureService (e.g., SANDBOX plan restrictions)
2. Workspace policy level - via EnterpriseService (admin-configured per workspace)
"""
import logging
from werkzeug.exceptions import Forbidden
from configs import dify_config
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
logger = logging.getLogger(__name__)
def check_workspace_member_invite_permission(workspace_id: str) -> None:
"""
Check if workspace allows member invitations at both billing and policy levels.
Checks performed:
1. Enterprise policy level - Admin-configured workspace permission
Args:
workspace_id: The workspace ID to check permissions for
Raises:
Forbidden: If either billing plan or workspace policy prohibits member invitations
"""
# Check enterprise workspace policy level (only if enterprise enabled)
if dify_config.ENTERPRISE_ENABLED:
try:
permission = EnterpriseService.WorkspacePermission.get_permission(workspace_id)
if not permission.allow_member_invite:
raise Forbidden("Workspace policy prohibits member invitations")
except Forbidden:
raise
except Exception as e:
logger.error(f"Failed to check workspace invite permission for {workspace_id}: {e}")
def check_workspace_owner_transfer_permission(workspace_id: str) -> None:
"""
Check if workspace allows owner transfer at both billing and policy levels.
Checks performed:
1. Billing/plan level - SANDBOX plan blocks owner transfer
2. Enterprise policy level - Admin-configured workspace permission
Args:
workspace_id: The workspace ID to check permissions for
Raises:
Forbidden: If either billing plan or workspace policy prohibits ownership transfer
"""
features = FeatureService.get_features(workspace_id)
if not features.is_allow_transfer_workspace:
raise Forbidden("Your current plan does not allow workspace ownership transfer")
# Check enterprise workspace policy level (only if enterprise enabled)
if dify_config.ENTERPRISE_ENABLED:
try:
permission = EnterpriseService.WorkspacePermission.get_permission(workspace_id)
if not permission.allow_owner_transfer:
raise Forbidden("Workspace policy prohibits ownership transfer")
except Forbidden:
raise
except Exception as e:
logger.error(f"Failed to check workspace transfer permission for {workspace_id}: {e}")

View File

@ -1376,6 +1376,11 @@ class RegisterService:
normalized_email = email.lower()
"""Invite new member"""
# Check workspace permission for member invitations
from libs.workspace_permission import check_workspace_member_invite_permission
check_workspace_member_invite_permission(tenant.id)
with Session(db.engine) as session:
account = AccountService.get_account_by_email_with_case_fallback(email, session=session)

View File

@ -11,6 +11,22 @@ class WebAppSettings(BaseModel):
default="private",
alias="accessMode",
)
class WorkspacePermission(BaseModel):
workspace_id: str = Field(
description="The ID of the workspace.",
alias="workspaceId",
)
allow_member_invite: bool = Field(
description="Whether to allow members to invite new members to the workspace.",
default=False,
alias="allowMemberInvite",
)
allow_owner_transfer: bool = Field(
description="Whether to allow owners to transfer ownership of the workspace.",
default=False,
alias="allowOwnerTransfer",
)
class EnterpriseService:
@ -43,6 +59,15 @@ class EnterpriseService:
return datetime.fromisoformat(data)
except ValueError as e:
raise ValueError(f"Invalid date format: {data}") from e
class WorkspacePermission:
@classmethod
def get_permission(cls, workspace_id: str):
params = {"id": workspace_id}
data = EnterpriseRequest.send_request("GET", "/workspaces/permission", params=params)
if not data or not "permission" in data:
raise ValueError("No data found.")
return WorkspacePermission.model_validate(data["permission"])
class WebAppAuth:
@classmethod

View File

@ -0,0 +1,160 @@
import pytest
from unittest.mock import Mock, patch
from werkzeug.exceptions import Forbidden
from libs.workspace_permission import (
check_workspace_member_invite_permission,
check_workspace_owner_transfer_permission,
)
class TestWorkspacePermissionHelper:
"""Test workspace permission helper functions."""
@patch("libs.workspace_permission.dify_config")
@patch("libs.workspace_permission.FeatureService")
def test_community_edition_allows_invite(self, mock_feature_service, mock_config):
"""Community edition should always allow invitations without calling enterprise service."""
mock_config.ENTERPRISE_ENABLED = False
mock_features = Mock()
mock_feature_service.get_features.return_value = mock_features
# Should not raise
check_workspace_member_invite_permission("test-workspace-id")
# FeatureService should be called but EnterpriseService should not
mock_feature_service.get_features.assert_called_once_with("test-workspace-id")
@patch("libs.workspace_permission.dify_config")
@patch("libs.workspace_permission.FeatureService")
def test_community_edition_allows_transfer(self, mock_feature_service, mock_config):
"""Community edition should check billing plan but not call enterprise service."""
mock_config.ENTERPRISE_ENABLED = False
mock_features = Mock()
mock_features.is_allow_transfer_workspace = True
mock_feature_service.get_features.return_value = mock_features
# Should not raise
check_workspace_owner_transfer_permission("test-workspace-id")
mock_feature_service.get_features.assert_called_once_with("test-workspace-id")
@patch("libs.workspace_permission.EnterpriseService")
@patch("libs.workspace_permission.dify_config")
@patch("libs.workspace_permission.FeatureService")
def test_enterprise_blocks_invite_when_disabled(
self, mock_feature_service, mock_config, mock_enterprise_service
):
"""Enterprise edition should block invitations when workspace policy is False."""
mock_config.ENTERPRISE_ENABLED = True
mock_features = Mock()
mock_feature_service.get_features.return_value = mock_features
mock_permission = Mock()
mock_permission.allow_member_invite = False
mock_enterprise_service.WorkspacePermission.get_permission.return_value = mock_permission
with pytest.raises(Forbidden, match="Workspace policy prohibits member invitations"):
check_workspace_member_invite_permission("test-workspace-id")
mock_enterprise_service.WorkspacePermission.get_permission.assert_called_once_with("test-workspace-id")
@patch("libs.workspace_permission.EnterpriseService")
@patch("libs.workspace_permission.dify_config")
@patch("libs.workspace_permission.FeatureService")
def test_enterprise_allows_invite_when_enabled(
self, mock_feature_service, mock_config, mock_enterprise_service
):
"""Enterprise edition should allow invitations when workspace policy is True."""
mock_config.ENTERPRISE_ENABLED = True
mock_features = Mock()
mock_feature_service.get_features.return_value = mock_features
mock_permission = Mock()
mock_permission.allow_member_invite = True
mock_enterprise_service.WorkspacePermission.get_permission.return_value = mock_permission
# Should not raise
check_workspace_member_invite_permission("test-workspace-id")
mock_enterprise_service.WorkspacePermission.get_permission.assert_called_once_with("test-workspace-id")
@patch("libs.workspace_permission.EnterpriseService")
@patch("libs.workspace_permission.dify_config")
@patch("libs.workspace_permission.FeatureService")
def test_billing_plan_blocks_transfer(self, mock_feature_service, mock_config, mock_enterprise_service):
"""SANDBOX billing plan should block owner transfer before checking enterprise policy."""
mock_config.ENTERPRISE_ENABLED = True
mock_features = Mock()
mock_features.is_allow_transfer_workspace = False # SANDBOX plan
mock_feature_service.get_features.return_value = mock_features
with pytest.raises(Forbidden, match="Your current plan does not allow workspace ownership transfer"):
check_workspace_owner_transfer_permission("test-workspace-id")
# Enterprise service should NOT be called since billing plan already blocks
mock_enterprise_service.WorkspacePermission.get_permission.assert_not_called()
@patch("libs.workspace_permission.EnterpriseService")
@patch("libs.workspace_permission.dify_config")
@patch("libs.workspace_permission.FeatureService")
def test_enterprise_blocks_transfer_when_disabled(
self, mock_feature_service, mock_config, mock_enterprise_service
):
"""Enterprise edition should block transfer when workspace policy is False."""
mock_config.ENTERPRISE_ENABLED = True
mock_features = Mock()
mock_features.is_allow_transfer_workspace = True # Billing plan allows
mock_feature_service.get_features.return_value = mock_features
mock_permission = Mock()
mock_permission.allow_owner_transfer = False # Workspace policy blocks
mock_enterprise_service.WorkspacePermission.get_permission.return_value = mock_permission
with pytest.raises(Forbidden, match="Workspace policy prohibits ownership transfer"):
check_workspace_owner_transfer_permission("test-workspace-id")
mock_enterprise_service.WorkspacePermission.get_permission.assert_called_once_with("test-workspace-id")
@patch("libs.workspace_permission.EnterpriseService")
@patch("libs.workspace_permission.dify_config")
@patch("libs.workspace_permission.FeatureService")
def test_enterprise_allows_transfer_when_both_enabled(
self, mock_feature_service, mock_config, mock_enterprise_service
):
"""Enterprise edition should allow transfer when both billing and workspace policy allow."""
mock_config.ENTERPRISE_ENABLED = True
mock_features = Mock()
mock_features.is_allow_transfer_workspace = True # Billing plan allows
mock_feature_service.get_features.return_value = mock_features
mock_permission = Mock()
mock_permission.allow_owner_transfer = True # Workspace policy allows
mock_enterprise_service.WorkspacePermission.get_permission.return_value = mock_permission
# Should not raise
check_workspace_owner_transfer_permission("test-workspace-id")
mock_enterprise_service.WorkspacePermission.get_permission.assert_called_once_with("test-workspace-id")
@patch("libs.workspace_permission.logger")
@patch("libs.workspace_permission.EnterpriseService")
@patch("libs.workspace_permission.dify_config")
@patch("libs.workspace_permission.FeatureService")
def test_enterprise_service_error_fails_open(
self, mock_feature_service, mock_config, mock_enterprise_service, mock_logger
):
"""On enterprise service error, should fail-open (allow) and log error."""
mock_config.ENTERPRISE_ENABLED = True
mock_features = Mock()
mock_feature_service.get_features.return_value = mock_features
# Simulate enterprise service error
mock_enterprise_service.WorkspacePermission.get_permission.side_effect = Exception("Service unavailable")
# Should not raise (fail-open)
check_workspace_member_invite_permission("test-workspace-id")
# Should log the error
mock_logger.error.assert_called_once()
assert "Failed to check workspace invite permission" in str(mock_logger.error.call_args)