Fix/webapp remove code (#26436)

Co-authored-by: GareArc <chen4851@purdue.edu>
This commit is contained in:
Garfield Dai 2025-09-29 15:58:41 +08:00 committed by GitHub
parent 1277a57641
commit 5073ce6e22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 55 additions and 43 deletions

View File

@ -15,7 +15,6 @@ from libs.datetime_utils import naive_utc_now
from libs.login import current_user, login_required
from models import Account, App, InstalledApp, RecommendedApp
from services.account_service import TenantService
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
@ -68,31 +67,26 @@ class InstalledAppsListApi(Resource):
# Pre-filter out apps without setting or with sso_verified
filtered_installed_apps = []
app_id_to_app_code = {}
for installed_app in installed_app_list:
app_id = installed_app["app"].id
webapp_setting = webapp_settings.get(app_id)
if not webapp_setting or webapp_setting.access_mode == "sso_verified":
continue
app_code = AppService.get_app_code_by_id(str(app_id))
app_id_to_app_code[app_id] = app_code
filtered_installed_apps.append(installed_app)
app_codes = list(app_id_to_app_code.values())
# Batch permission check
app_ids = [installed_app["app"].id for installed_app in filtered_installed_apps]
permissions = EnterpriseService.WebAppAuth.batch_is_user_allowed_to_access_webapps(
user_id=user_id,
app_codes=app_codes,
app_ids=app_ids,
)
# Keep only allowed apps
res = []
for installed_app in filtered_installed_apps:
app_id = installed_app["app"].id
app_code = app_id_to_app_code[app_id]
if permissions.get(app_code):
if permissions.get(app_id):
res.append(installed_app)
installed_app_list = res

View File

@ -11,7 +11,6 @@ from controllers.console.wraps import account_initialization_required
from extensions.ext_database import db
from libs.login import login_required
from models import InstalledApp
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
@ -57,10 +56,9 @@ def user_allowed_to_access_app(view: Callable[Concatenate[InstalledApp, P], R] |
feature = FeatureService.get_system_features()
if feature.webapp_auth.enabled:
app_id = installed_app.app_id
app_code = AppService.get_app_code_by_id(app_id)
res = EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(
user_id=str(current_user.id),
app_code=app_code,
app_id=app_id,
)
if not res:
raise AppAccessDeniedError()

View File

@ -160,9 +160,8 @@ class AppWebAuthPermission(Resource):
args = parser.parse_args()
app_id = args["appId"]
app_code = AppService.get_app_code_by_id(app_id)
res = True
if WebAppAuthService.is_app_require_permission_check(app_id=app_id):
res = EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(str(user_id), app_code)
res = EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(str(user_id), app_id)
return {"result": res}

View File

@ -12,6 +12,7 @@ from controllers.web.error import WebAppAuthRequiredError
from extensions.ext_database import db
from libs.passport import PassportService
from models.model import App, EndUser, Site
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService, WebAppAuthType
@ -38,7 +39,7 @@ class PassportResource(Resource):
if app_code is None:
raise Unauthorized("X-App-Code header is missing.")
app_id = AppService.get_app_id_by_code(app_code)
# exchange token for enterprise logined web user
enterprise_user_decoded = decode_enterprise_webapp_user_id(web_app_access_token)
if enterprise_user_decoded:
@ -48,7 +49,7 @@ class PassportResource(Resource):
)
if system_features.webapp_auth.enabled:
app_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code=app_code)
app_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id=app_id)
if not app_settings or not app_settings.access_mode == "public":
raise WebAppAuthRequiredError()

View File

@ -13,6 +13,7 @@ from controllers.web.error import WebAppAuthAccessDeniedError, WebAppAuthRequire
from extensions.ext_database import db
from libs.passport import PassportService
from models.model import App, EndUser, Site
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService, WebAppSettings
from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService
@ -37,7 +38,11 @@ def validate_jwt_token(view: Callable[Concatenate[App, EndUser, P], R] | None =
def decode_jwt_token():
system_features = FeatureService.get_system_features()
app_code = str(request.headers.get("X-App-Code"))
app_code = request.headers.get("X-App-Code")
if not app_code:
app_code = None
else:
app_code = str(app_code)
try:
auth_header = request.headers.get("Authorization")
if auth_header is None:
@ -51,15 +56,30 @@ def decode_jwt_token():
if auth_scheme != "bearer":
raise Unauthorized("Invalid Authorization header format. Expected 'Bearer <api-key>' format.")
# Check for invalid token values
if tk in ["undefined", "null", "None", ""]:
raise Unauthorized("Invalid token provided.")
decoded = PassportService().verify(tk)
app_code = decoded.get("app_code")
# Preserve app_code from header if JWT token doesn't contain one
jwt_app_code = decoded.get("app_code")
if jwt_app_code:
app_code = jwt_app_code
app_id = decoded.get("app_id")
# Validate required fields from JWT token
if not app_id:
raise Unauthorized("Invalid token: missing app_id.")
if not app_code:
raise Unauthorized("Invalid token: missing app_code.")
with Session(db.engine, expire_on_commit=False) as session:
app_model = session.scalar(select(App).where(App.id == app_id))
site = session.scalar(select(Site).where(Site.code == app_code))
if not app_model:
raise NotFound()
if not app_code or not site:
if not site:
raise BadRequest("Site URL is no longer valid.")
if app_model.enable_site is False:
raise BadRequest("Site is disabled.")
@ -72,7 +92,12 @@ def decode_jwt_token():
app_web_auth_enabled = False
webapp_settings = None
if system_features.webapp_auth.enabled:
webapp_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code=app_code)
if not app_code:
raise BadRequest("App code is required for webapp authentication.")
if app_code in ["undefined", "null", "None", ""]:
raise BadRequest("Invalid app code provided.")
app_id = AppService.get_app_id_by_code(app_code)
webapp_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id)
if not webapp_settings:
raise NotFound("Web app settings not found.")
app_web_auth_enabled = webapp_settings.access_mode != "public"
@ -87,8 +112,11 @@ def decode_jwt_token():
if system_features.webapp_auth.enabled:
if not app_code:
raise Unauthorized("Please re-login to access the web app.")
if app_code in ["undefined", "null", "None", ""]:
raise Unauthorized("Invalid app code provided.")
app_id = AppService.get_app_id_by_code(app_code)
app_web_auth_enabled = (
EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code=str(app_code)).access_mode != "public"
EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id=app_id).access_mode != "public"
)
if app_web_auth_enabled:
raise WebAppAuthRequiredError()
@ -129,7 +157,10 @@ def _validate_user_accessibility(
raise WebAppAuthRequiredError("Web app settings not found.")
if WebAppAuthService.is_app_require_permission_check(access_mode=webapp_settings.access_mode):
if not EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(user_id, app_code=app_code):
if not app_code or app_code in ["undefined", "null", "None", ""]:
raise WebAppAuthAccessDeniedError("Invalid app code for permission check.")
app_id = AppService.get_app_id_by_code(app_code)
if not EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(user_id, app_id):
raise WebAppAuthAccessDeniedError()
auth_type = decoded.get("auth_type")

View File

@ -46,17 +46,17 @@ class EnterpriseService:
class WebAppAuth:
@classmethod
def is_user_allowed_to_access_webapp(cls, user_id: str, app_code: str):
params = {"userId": user_id, "appCode": app_code}
def is_user_allowed_to_access_webapp(cls, user_id: str, app_id: str):
params = {"userId": user_id, "appId": app_id}
data = EnterpriseRequest.send_request("GET", "/webapp/permission", params=params)
return data.get("result", False)
@classmethod
def batch_is_user_allowed_to_access_webapps(cls, user_id: str, app_codes: list[str]):
if not app_codes:
def batch_is_user_allowed_to_access_webapps(cls, user_id: str, app_ids: list[str]):
if not app_ids:
return {}
body = {"userId": user_id, "appCodes": app_codes}
body = {"userId": user_id, "appIds": app_ids}
data = EnterpriseRequest.send_request("POST", "/webapp/permission/batch", json=body)
if not data:
raise ValueError("No data found.")
@ -92,16 +92,6 @@ class EnterpriseService:
return ret
@classmethod
def get_app_access_mode_by_code(cls, app_code: str) -> WebAppSettings:
if not app_code:
raise ValueError("app_code must be provided.")
params = {"appCode": app_code}
data = EnterpriseRequest.send_request("GET", "/webapp/access-mode/code", params=params)
if not data:
raise ValueError("No data found.")
return WebAppSettings(**data)
@classmethod
def update_app_access_mode(cls, app_id: str, access_mode: str):
if not app_id:

View File

@ -172,7 +172,8 @@ class WebAppAuthService:
return WebAppAuthType.EXTERNAL
if app_code:
webapp_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code)
app_id = AppService.get_app_id_by_code(app_code)
webapp_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id)
return cls.get_app_auth_type(access_mode=webapp_settings.access_mode)
raise ValueError("Could not determine app authentication type.")

View File

@ -35,9 +35,7 @@ class TestWebAppAuthService:
mock_enterprise_service.WebAppAuth.get_app_access_mode_by_id.return_value = type(
"MockWebAppAuth", (), {"access_mode": "private"}
)()
mock_enterprise_service.WebAppAuth.get_app_access_mode_by_code.return_value = type(
"MockWebAppAuth", (), {"access_mode": "private"}
)()
# Note: get_app_access_mode_by_code method was removed in refactoring
yield {
"passport_service": mock_passport_service,
@ -866,7 +864,7 @@ class TestWebAppAuthService:
mock_webapp_auth = type("MockWebAppAuth", (), {"access_mode": "sso_verified"})()
mock_external_service_dependencies[
"enterprise_service"
].WebAppAuth.get_app_access_mode_by_code.return_value = mock_webapp_auth
].WebAppAuth.get_app_access_mode_by_id.return_value = mock_webapp_auth
# Act: Execute authentication type determination
result = WebAppAuthService.get_app_auth_type(app_code="mock_app_code")
@ -877,7 +875,7 @@ class TestWebAppAuthService:
# Verify mock service was called correctly
mock_external_service_dependencies[
"enterprise_service"
].WebAppAuth.get_app_access_mode_by_code.assert_called_once_with("mock_app_code")
].WebAppAuth.get_app_access_mode_by_id.assert_called_once_with("mock_app_id")
def test_get_app_auth_type_no_parameters(self, db_session_with_containers, mock_external_service_dependencies):
"""