mirror of https://github.com/langgenius/dify.git
refactor: remove common end user operations out of wraps.py and move it into EndUserService
This commit is contained in:
parent
2c8c1860ca
commit
e8b0144cf7
|
|
@ -1,5 +1,5 @@
|
|||
import time
|
||||
from collections.abc import Callable, Mapping
|
||||
from collections.abc import Callable
|
||||
from datetime import timedelta
|
||||
from enum import StrEnum, auto
|
||||
from functools import wraps
|
||||
|
|
@ -13,14 +13,14 @@ from sqlalchemy import select, update
|
|||
from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import Forbidden, NotFound, Unauthorized
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from libs.login import current_user
|
||||
from models import Account, Tenant, TenantAccountJoin, TenantStatus
|
||||
from models.dataset import Dataset, RateLimitLog
|
||||
from models.model import ApiToken, App, DefaultEndUserSessionID, EndUser
|
||||
from models.model import ApiToken, App
|
||||
from services.end_user_service import EndUserService
|
||||
from services.feature_service import FeatureService
|
||||
|
||||
P = ParamSpec("P")
|
||||
|
|
@ -84,7 +84,7 @@ def validate_app_token(view: Callable[P, R] | None = None, *, fetch_user_arg: Fe
|
|||
if user_id:
|
||||
user_id = str(user_id)
|
||||
|
||||
end_user = get_or_create_end_user(app_model, user_id)
|
||||
end_user = EndUserService.get_or_create_end_user(app_model, user_id)
|
||||
kwargs["end_user"] = end_user
|
||||
|
||||
# Set EndUser as current logged-in user for flask_login.current_user
|
||||
|
|
@ -309,124 +309,6 @@ def validate_and_get_api_token(scope: str | None = None):
|
|||
return api_token
|
||||
|
||||
|
||||
def get_or_create_end_user(app_model: App, user_id: str | None = None) -> EndUser:
|
||||
return get_or_create_end_user_by_type(InvokeFrom.SERVICE_API, app_model.tenant_id, app_model.id, user_id)
|
||||
|
||||
|
||||
def get_or_create_end_user_by_type(
|
||||
type: InvokeFrom, tenant_id: str, app_id: str, user_id: str | None = None
|
||||
) -> EndUser:
|
||||
if not user_id:
|
||||
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
end_user = (
|
||||
session.query(EndUser)
|
||||
.where(
|
||||
EndUser.tenant_id == tenant_id,
|
||||
EndUser.app_id == app_id,
|
||||
EndUser.session_id == user_id,
|
||||
EndUser.type == type,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if end_user is None:
|
||||
end_user = EndUser(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
type=type,
|
||||
is_anonymous=user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID,
|
||||
session_id=user_id,
|
||||
external_user_id=user_id,
|
||||
)
|
||||
session.add(end_user)
|
||||
session.commit()
|
||||
|
||||
return end_user
|
||||
|
||||
|
||||
def create_end_user_batch(type: InvokeFrom, tenant_id: str, app_ids: list[str], user_id: str) -> Mapping[str, EndUser]:
|
||||
"""Create end users in batch.
|
||||
|
||||
Creates end users in batch for the specified tenant and application IDs in O(1) time.
|
||||
|
||||
This batch creation is necessary because trigger subscriptions can span multiple applications,
|
||||
and trigger events may be dispatched to multiple applications simultaneously.
|
||||
|
||||
For each app_id in app_ids, check if an `EndUser` with the given
|
||||
`user_id` (as session_id/external_user_id) already exists for the
|
||||
tenant/app and type `type`. If it exists, return it; otherwise,
|
||||
create it. Operates with minimal DB I/O by querying and inserting in
|
||||
batches.
|
||||
|
||||
Returns a mapping of `app_id -> EndUser`.
|
||||
"""
|
||||
|
||||
# Normalize user_id to default if empty
|
||||
if not user_id:
|
||||
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
|
||||
|
||||
# Deduplicate app_ids while preserving input order
|
||||
seen: set[str] = set()
|
||||
unique_app_ids: list[str] = []
|
||||
for app_id in app_ids:
|
||||
if app_id not in seen:
|
||||
seen.add(app_id)
|
||||
unique_app_ids.append(app_id)
|
||||
|
||||
# Result is a simple app_id -> EndUser mapping
|
||||
result: dict[str, EndUser] = {}
|
||||
if not unique_app_ids:
|
||||
return result
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
# Fetch existing end users for all target apps in a single query
|
||||
existing_end_users: list[EndUser] = (
|
||||
session.query(EndUser)
|
||||
.where(
|
||||
EndUser.tenant_id == tenant_id,
|
||||
EndUser.app_id.in_(unique_app_ids),
|
||||
EndUser.session_id == user_id,
|
||||
EndUser.type == type,
|
||||
)
|
||||
.all()
|
||||
)
|
||||
|
||||
found_app_ids: set[str] = set()
|
||||
for eu in existing_end_users:
|
||||
# If duplicates exist due to weak DB constraints, prefer the first
|
||||
if eu.app_id not in result:
|
||||
result[eu.app_id] = eu
|
||||
found_app_ids.add(eu.app_id)
|
||||
|
||||
# Determine which apps still need an EndUser created
|
||||
missing_app_ids = [app_id for app_id in unique_app_ids if app_id not in found_app_ids]
|
||||
|
||||
if missing_app_ids:
|
||||
new_end_users: list[EndUser] = []
|
||||
is_anonymous = user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID
|
||||
for app_id in missing_app_ids:
|
||||
new_end_users.append(
|
||||
EndUser(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
type=type,
|
||||
is_anonymous=is_anonymous,
|
||||
session_id=user_id,
|
||||
external_user_id=user_id,
|
||||
)
|
||||
)
|
||||
|
||||
session.add_all(new_end_users)
|
||||
session.commit()
|
||||
|
||||
for eu in new_end_users:
|
||||
result[eu.app_id] = eu
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class DatasetApiResource(Resource):
|
||||
method_decorators = [validate_dataset_token]
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,141 @@
|
|||
from collections.abc import Mapping
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from extensions.ext_database import db
|
||||
from models.model import App, DefaultEndUserSessionID, EndUser
|
||||
|
||||
|
||||
class EndUserService:
|
||||
"""
|
||||
Service for managing end users.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def get_or_create_end_user(cls, app_model: App, user_id: str | None = None) -> EndUser:
|
||||
"""
|
||||
Get or create an end user for a given app.
|
||||
"""
|
||||
|
||||
return cls.get_or_create_end_user_by_type(InvokeFrom.SERVICE_API, app_model.tenant_id, app_model.id, user_id)
|
||||
|
||||
@classmethod
|
||||
def get_or_create_end_user_by_type(
|
||||
cls, type: InvokeFrom, tenant_id: str, app_id: str, user_id: str | None = None
|
||||
) -> EndUser:
|
||||
"""
|
||||
Get or create an end user for a given app and type.
|
||||
"""
|
||||
|
||||
if not user_id:
|
||||
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
end_user = (
|
||||
session.query(EndUser)
|
||||
.where(
|
||||
EndUser.tenant_id == tenant_id,
|
||||
EndUser.app_id == app_id,
|
||||
EndUser.session_id == user_id,
|
||||
EndUser.type == type,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if end_user is None:
|
||||
end_user = EndUser(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
type=type,
|
||||
is_anonymous=user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID,
|
||||
session_id=user_id,
|
||||
external_user_id=user_id,
|
||||
)
|
||||
session.add(end_user)
|
||||
session.commit()
|
||||
|
||||
return end_user
|
||||
|
||||
@classmethod
|
||||
def create_end_user_batch(
|
||||
cls, type: InvokeFrom, tenant_id: str, app_ids: list[str], user_id: str
|
||||
) -> Mapping[str, EndUser]:
|
||||
"""Create end users in batch.
|
||||
|
||||
Creates end users in batch for the specified tenant and application IDs in O(1) time.
|
||||
|
||||
This batch creation is necessary because trigger subscriptions can span multiple applications,
|
||||
and trigger events may be dispatched to multiple applications simultaneously.
|
||||
|
||||
For each app_id in app_ids, check if an `EndUser` with the given
|
||||
`user_id` (as session_id/external_user_id) already exists for the
|
||||
tenant/app and type `type`. If it exists, return it; otherwise,
|
||||
create it. Operates with minimal DB I/O by querying and inserting in
|
||||
batches.
|
||||
|
||||
Returns a mapping of `app_id -> EndUser`.
|
||||
"""
|
||||
|
||||
# Normalize user_id to default if empty
|
||||
if not user_id:
|
||||
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
|
||||
|
||||
# Deduplicate app_ids while preserving input order
|
||||
seen: set[str] = set()
|
||||
unique_app_ids: list[str] = []
|
||||
for app_id in app_ids:
|
||||
if app_id not in seen:
|
||||
seen.add(app_id)
|
||||
unique_app_ids.append(app_id)
|
||||
|
||||
# Result is a simple app_id -> EndUser mapping
|
||||
result: dict[str, EndUser] = {}
|
||||
if not unique_app_ids:
|
||||
return result
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
# Fetch existing end users for all target apps in a single query
|
||||
existing_end_users: list[EndUser] = (
|
||||
session.query(EndUser)
|
||||
.where(
|
||||
EndUser.tenant_id == tenant_id,
|
||||
EndUser.app_id.in_(unique_app_ids),
|
||||
EndUser.session_id == user_id,
|
||||
EndUser.type == type,
|
||||
)
|
||||
.all()
|
||||
)
|
||||
|
||||
found_app_ids: set[str] = set()
|
||||
for eu in existing_end_users:
|
||||
# If duplicates exist due to weak DB constraints, prefer the first
|
||||
if eu.app_id not in result:
|
||||
result[eu.app_id] = eu
|
||||
found_app_ids.add(eu.app_id)
|
||||
|
||||
# Determine which apps still need an EndUser created
|
||||
missing_app_ids = [app_id for app_id in unique_app_ids if app_id not in found_app_ids]
|
||||
|
||||
if missing_app_ids:
|
||||
new_end_users: list[EndUser] = []
|
||||
is_anonymous = user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID
|
||||
for app_id in missing_app_ids:
|
||||
new_end_users.append(
|
||||
EndUser(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
type=type,
|
||||
is_anonymous=is_anonymous,
|
||||
session_id=user_id,
|
||||
external_user_id=user_id,
|
||||
)
|
||||
)
|
||||
|
||||
session.add_all(new_end_users)
|
||||
session.commit()
|
||||
|
||||
for eu in new_end_users:
|
||||
result[eu.app_id] = eu
|
||||
|
||||
return result
|
||||
Loading…
Reference in New Issue