From e8b0144cf71166d1fd1d10dfca68feb1a754a7d0 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Sat, 18 Oct 2025 19:09:55 +0800 Subject: [PATCH] refactor: remove common end user operations out of wraps.py and move it into EndUserService --- api/controllers/service_api/wraps.py | 126 +----------------------- api/services/end_user_service.py | 141 +++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 122 deletions(-) create mode 100644 api/services/end_user_service.py diff --git a/api/controllers/service_api/wraps.py b/api/controllers/service_api/wraps.py index dd58fef1e5..340e605b85 100644 --- a/api/controllers/service_api/wraps.py +++ b/api/controllers/service_api/wraps.py @@ -1,5 +1,5 @@ import time -from collections.abc import Callable, Mapping +from collections.abc import Callable from datetime import timedelta from enum import StrEnum, auto from functools import wraps @@ -13,14 +13,14 @@ from sqlalchemy import select, update from sqlalchemy.orm import Session from werkzeug.exceptions import Forbidden, NotFound, Unauthorized -from core.app.entities.app_invoke_entities import InvokeFrom from extensions.ext_database import db from extensions.ext_redis import redis_client from libs.datetime_utils import naive_utc_now from libs.login import current_user from models import Account, Tenant, TenantAccountJoin, TenantStatus from models.dataset import Dataset, RateLimitLog -from models.model import ApiToken, App, DefaultEndUserSessionID, EndUser +from models.model import ApiToken, App +from services.end_user_service import EndUserService from services.feature_service import FeatureService P = ParamSpec("P") @@ -84,7 +84,7 @@ def validate_app_token(view: Callable[P, R] | None = None, *, fetch_user_arg: Fe if user_id: user_id = str(user_id) - end_user = get_or_create_end_user(app_model, user_id) + end_user = EndUserService.get_or_create_end_user(app_model, user_id) kwargs["end_user"] = end_user # Set EndUser as current logged-in user for flask_login.current_user @@ -309,124 +309,6 @@ def validate_and_get_api_token(scope: str | None = None): return api_token -def get_or_create_end_user(app_model: App, user_id: str | None = None) -> EndUser: - return get_or_create_end_user_by_type(InvokeFrom.SERVICE_API, app_model.tenant_id, app_model.id, user_id) - - -def get_or_create_end_user_by_type( - type: InvokeFrom, tenant_id: str, app_id: str, user_id: str | None = None -) -> EndUser: - if not user_id: - user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID - - with Session(db.engine, expire_on_commit=False) as session: - end_user = ( - session.query(EndUser) - .where( - EndUser.tenant_id == tenant_id, - EndUser.app_id == app_id, - EndUser.session_id == user_id, - EndUser.type == type, - ) - .first() - ) - - if end_user is None: - end_user = EndUser( - tenant_id=tenant_id, - app_id=app_id, - type=type, - is_anonymous=user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID, - session_id=user_id, - external_user_id=user_id, - ) - session.add(end_user) - session.commit() - - return end_user - - -def create_end_user_batch(type: InvokeFrom, tenant_id: str, app_ids: list[str], user_id: str) -> Mapping[str, EndUser]: - """Create end users in batch. - - Creates end users in batch for the specified tenant and application IDs in O(1) time. - - This batch creation is necessary because trigger subscriptions can span multiple applications, - and trigger events may be dispatched to multiple applications simultaneously. - - For each app_id in app_ids, check if an `EndUser` with the given - `user_id` (as session_id/external_user_id) already exists for the - tenant/app and type `type`. If it exists, return it; otherwise, - create it. Operates with minimal DB I/O by querying and inserting in - batches. - - Returns a mapping of `app_id -> EndUser`. - """ - - # Normalize user_id to default if empty - if not user_id: - user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID - - # Deduplicate app_ids while preserving input order - seen: set[str] = set() - unique_app_ids: list[str] = [] - for app_id in app_ids: - if app_id not in seen: - seen.add(app_id) - unique_app_ids.append(app_id) - - # Result is a simple app_id -> EndUser mapping - result: dict[str, EndUser] = {} - if not unique_app_ids: - return result - - with Session(db.engine, expire_on_commit=False) as session: - # Fetch existing end users for all target apps in a single query - existing_end_users: list[EndUser] = ( - session.query(EndUser) - .where( - EndUser.tenant_id == tenant_id, - EndUser.app_id.in_(unique_app_ids), - EndUser.session_id == user_id, - EndUser.type == type, - ) - .all() - ) - - found_app_ids: set[str] = set() - for eu in existing_end_users: - # If duplicates exist due to weak DB constraints, prefer the first - if eu.app_id not in result: - result[eu.app_id] = eu - found_app_ids.add(eu.app_id) - - # Determine which apps still need an EndUser created - missing_app_ids = [app_id for app_id in unique_app_ids if app_id not in found_app_ids] - - if missing_app_ids: - new_end_users: list[EndUser] = [] - is_anonymous = user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID - for app_id in missing_app_ids: - new_end_users.append( - EndUser( - tenant_id=tenant_id, - app_id=app_id, - type=type, - is_anonymous=is_anonymous, - session_id=user_id, - external_user_id=user_id, - ) - ) - - session.add_all(new_end_users) - session.commit() - - for eu in new_end_users: - result[eu.app_id] = eu - - return result - - class DatasetApiResource(Resource): method_decorators = [validate_dataset_token] diff --git a/api/services/end_user_service.py b/api/services/end_user_service.py new file mode 100644 index 0000000000..aa4a2e46ec --- /dev/null +++ b/api/services/end_user_service.py @@ -0,0 +1,141 @@ +from collections.abc import Mapping + +from sqlalchemy.orm import Session + +from core.app.entities.app_invoke_entities import InvokeFrom +from extensions.ext_database import db +from models.model import App, DefaultEndUserSessionID, EndUser + + +class EndUserService: + """ + Service for managing end users. + """ + + @classmethod + def get_or_create_end_user(cls, app_model: App, user_id: str | None = None) -> EndUser: + """ + Get or create an end user for a given app. + """ + + return cls.get_or_create_end_user_by_type(InvokeFrom.SERVICE_API, app_model.tenant_id, app_model.id, user_id) + + @classmethod + def get_or_create_end_user_by_type( + cls, type: InvokeFrom, tenant_id: str, app_id: str, user_id: str | None = None + ) -> EndUser: + """ + Get or create an end user for a given app and type. + """ + + if not user_id: + user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID + + with Session(db.engine, expire_on_commit=False) as session: + end_user = ( + session.query(EndUser) + .where( + EndUser.tenant_id == tenant_id, + EndUser.app_id == app_id, + EndUser.session_id == user_id, + EndUser.type == type, + ) + .first() + ) + + if end_user is None: + end_user = EndUser( + tenant_id=tenant_id, + app_id=app_id, + type=type, + is_anonymous=user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID, + session_id=user_id, + external_user_id=user_id, + ) + session.add(end_user) + session.commit() + + return end_user + + @classmethod + def create_end_user_batch( + cls, type: InvokeFrom, tenant_id: str, app_ids: list[str], user_id: str + ) -> Mapping[str, EndUser]: + """Create end users in batch. + + Creates end users in batch for the specified tenant and application IDs in O(1) time. + + This batch creation is necessary because trigger subscriptions can span multiple applications, + and trigger events may be dispatched to multiple applications simultaneously. + + For each app_id in app_ids, check if an `EndUser` with the given + `user_id` (as session_id/external_user_id) already exists for the + tenant/app and type `type`. If it exists, return it; otherwise, + create it. Operates with minimal DB I/O by querying and inserting in + batches. + + Returns a mapping of `app_id -> EndUser`. + """ + + # Normalize user_id to default if empty + if not user_id: + user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID + + # Deduplicate app_ids while preserving input order + seen: set[str] = set() + unique_app_ids: list[str] = [] + for app_id in app_ids: + if app_id not in seen: + seen.add(app_id) + unique_app_ids.append(app_id) + + # Result is a simple app_id -> EndUser mapping + result: dict[str, EndUser] = {} + if not unique_app_ids: + return result + + with Session(db.engine, expire_on_commit=False) as session: + # Fetch existing end users for all target apps in a single query + existing_end_users: list[EndUser] = ( + session.query(EndUser) + .where( + EndUser.tenant_id == tenant_id, + EndUser.app_id.in_(unique_app_ids), + EndUser.session_id == user_id, + EndUser.type == type, + ) + .all() + ) + + found_app_ids: set[str] = set() + for eu in existing_end_users: + # If duplicates exist due to weak DB constraints, prefer the first + if eu.app_id not in result: + result[eu.app_id] = eu + found_app_ids.add(eu.app_id) + + # Determine which apps still need an EndUser created + missing_app_ids = [app_id for app_id in unique_app_ids if app_id not in found_app_ids] + + if missing_app_ids: + new_end_users: list[EndUser] = [] + is_anonymous = user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID + for app_id in missing_app_ids: + new_end_users.append( + EndUser( + tenant_id=tenant_id, + app_id=app_id, + type=type, + is_anonymous=is_anonymous, + session_id=user_id, + external_user_id=user_id, + ) + ) + + session.add_all(new_end_users) + session.commit() + + for eu in new_end_users: + result[eu.app_id] = eu + + return result