From bdf70cc45a80eb073d17aadd445d3f6950f483cb Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Wed, 20 Aug 2025 20:20:18 +0800 Subject: [PATCH] fix: mypy error --- api/controllers/console/auth/oauth_server.py | 44 ++++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/api/controllers/console/auth/oauth_server.py b/api/controllers/console/auth/oauth_server.py index add1b904ae..ab895994a0 100644 --- a/api/controllers/console/auth/oauth_server.py +++ b/api/controllers/console/auth/oauth_server.py @@ -21,8 +21,8 @@ def oauth_server_client_id_required(view): def decorated(*args, **kwargs): parser = reqparse.RequestParser() parser.add_argument("client_id", type=str, required=True, location="json") - args = parser.parse_args() - client_id = args.get("client_id") + parsed_args = parser.parse_args() + client_id = parsed_args.get("client_id") if not client_id: raise BadRequest("client_id is required") @@ -40,18 +40,26 @@ def oauth_server_client_id_required(view): def oauth_server_access_token_required(view): @wraps(view) def decorated(*args, **kwargs): - oauth_provider_app: OAuthProviderApp = kwargs.get("oauth_provider_app") + oauth_provider_app = kwargs.get("oauth_provider_app") + if not oauth_provider_app or not isinstance(oauth_provider_app, OAuthProviderApp): + raise BadRequest("Invalid oauth_provider_app") if not request.headers.get("Authorization"): raise BadRequest("Authorization is required") - token_type = request.headers.get("Authorization") - if not token_type: - raise BadRequest("token_type is required") - token_type = token_type.split(" ")[0] + authorization_header = request.headers.get("Authorization") + if not authorization_header: + raise BadRequest("Authorization header is required") + + parts = authorization_header.split(" ") + if len(parts) != 2: + raise BadRequest("Invalid Authorization header format") + + token_type = parts[0] if token_type != "Bearer": raise BadRequest("token_type is invalid") - access_token = request.headers.get("Authorization").split(" ")[1] + + access_token = parts[1] if not access_token: raise BadRequest("access_token is required") @@ -72,8 +80,8 @@ class OAuthServerAppApi(Resource): def post(self, oauth_provider_app: OAuthProviderApp): parser = reqparse.RequestParser() parser.add_argument("redirect_uri", type=str, required=True, location="json") - args = parser.parse_args() - redirect_uri = args.get("redirect_uri") + parsed_args = parser.parse_args() + redirect_uri = parsed_args.get("redirect_uri") # check if redirect_uri is valid if redirect_uri not in oauth_provider_app.redirect_uris: @@ -115,22 +123,22 @@ class OAuthServerUserTokenApi(Resource): parser.add_argument("client_secret", type=str, required=False, location="json") parser.add_argument("redirect_uri", type=str, required=False, location="json") parser.add_argument("refresh_token", type=str, required=False, location="json") - args = parser.parse_args() + parsed_args = parser.parse_args() - grant_type = OAuthGrantType(args["grant_type"]) + grant_type = OAuthGrantType(parsed_args["grant_type"]) if grant_type == OAuthGrantType.AUTHORIZATION_CODE: - if not args["code"]: + if not parsed_args["code"]: raise BadRequest("code is required") - if args["client_secret"] != oauth_provider_app.client_secret: + if parsed_args["client_secret"] != oauth_provider_app.client_secret: raise BadRequest("client_secret is invalid") - if args["redirect_uri"] not in oauth_provider_app.redirect_uris: + if parsed_args["redirect_uri"] not in oauth_provider_app.redirect_uris: raise BadRequest("redirect_uri is invalid") access_token, refresh_token = OAuthServerService.sign_oauth_access_token( - grant_type, code=args["code"], client_id=oauth_provider_app.client_id + grant_type, code=parsed_args["code"], client_id=oauth_provider_app.client_id ) return jsonable_encoder( { @@ -141,11 +149,11 @@ class OAuthServerUserTokenApi(Resource): } ) elif grant_type == OAuthGrantType.REFRESH_TOKEN: - if not args["refresh_token"]: + if not parsed_args["refresh_token"]: raise BadRequest("refresh_token is required") access_token, refresh_token = OAuthServerService.sign_oauth_access_token( - grant_type, refresh_token=args["refresh_token"], client_id=oauth_provider_app.client_id + grant_type, refresh_token=parsed_args["refresh_token"], client_id=oauth_provider_app.client_id ) return jsonable_encoder( {