mirror of
https://github.com/langgenius/dify.git
synced 2026-04-28 03:36:36 +08:00
refactor: optimize TenantDailyRateLimiter to use UTC internally with timezone-aware error messages (#24632)
Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
parent
17908fbf6b
commit
e53edb0fc2
@ -94,8 +94,7 @@ class AsyncWorkflowService:
|
|||||||
# 3. Get dispatcher based on tenant subscription
|
# 3. Get dispatcher based on tenant subscription
|
||||||
dispatcher = dispatcher_manager.get_dispatcher(trigger_data.tenant_id)
|
dispatcher = dispatcher_manager.get_dispatcher(trigger_data.tenant_id)
|
||||||
|
|
||||||
# 4. Get tenant owner timezone for rate limiting
|
# 4. Rate limiting check will be done without timezone first
|
||||||
tenant_owner_tz = rate_limiter._get_tenant_owner_timezone(trigger_data.tenant_id)
|
|
||||||
|
|
||||||
# 5. Determine user role and ID
|
# 5. Determine user role and ID
|
||||||
if isinstance(user, Account):
|
if isinstance(user, Account):
|
||||||
@ -125,16 +124,16 @@ class AsyncWorkflowService:
|
|||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
# 7. Check and consume daily quota
|
# 7. Check and consume daily quota
|
||||||
if not dispatcher.consume_quota(trigger_data.tenant_id, tenant_owner_tz):
|
if not dispatcher.consume_quota(trigger_data.tenant_id):
|
||||||
# Update trigger log status
|
# Update trigger log status
|
||||||
trigger_log.status = WorkflowTriggerStatus.RATE_LIMITED
|
trigger_log.status = WorkflowTriggerStatus.RATE_LIMITED
|
||||||
trigger_log.error = f"Daily limit reached for {dispatcher.get_queue_name()}"
|
trigger_log.error = f"Daily limit reached for {dispatcher.get_queue_name()}"
|
||||||
trigger_log_repo.update(trigger_log)
|
trigger_log_repo.update(trigger_log)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
remaining = rate_limiter.get_remaining_quota(
|
tenant_owner_tz = rate_limiter._get_tenant_owner_timezone(trigger_data.tenant_id)
|
||||||
trigger_data.tenant_id, dispatcher.get_daily_limit(), tenant_owner_tz
|
|
||||||
)
|
remaining = rate_limiter.get_remaining_quota(trigger_data.tenant_id, dispatcher.get_daily_limit())
|
||||||
|
|
||||||
reset_time = rate_limiter.get_quota_reset_time(trigger_data.tenant_id, tenant_owner_tz)
|
reset_time = rate_limiter.get_quota_reset_time(trigger_data.tenant_id, tenant_owner_tz)
|
||||||
|
|
||||||
|
|||||||
@ -44,36 +44,34 @@ class BaseQueueDispatcher(ABC):
|
|||||||
"""Get task priority level"""
|
"""Get task priority level"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def check_daily_quota(self, tenant_id: str, tenant_owner_tz: str) -> bool:
|
def check_daily_quota(self, tenant_id: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Check if tenant has remaining daily quota
|
Check if tenant has remaining daily quota
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
tenant_id: The tenant identifier
|
tenant_id: The tenant identifier
|
||||||
tenant_owner_tz: Tenant owner's timezone
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if quota available, False otherwise
|
True if quota available, False otherwise
|
||||||
"""
|
"""
|
||||||
# Check without consuming
|
# Check without consuming
|
||||||
remaining = self.rate_limiter.get_remaining_quota(
|
remaining = self.rate_limiter.get_remaining_quota(
|
||||||
tenant_id=tenant_id, max_daily_limit=self.get_daily_limit(), timezone_str=tenant_owner_tz
|
tenant_id=tenant_id, max_daily_limit=self.get_daily_limit()
|
||||||
)
|
)
|
||||||
return remaining > 0
|
return remaining > 0
|
||||||
|
|
||||||
def consume_quota(self, tenant_id: str, tenant_owner_tz: str) -> bool:
|
def consume_quota(self, tenant_id: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Consume one execution from daily quota
|
Consume one execution from daily quota
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
tenant_id: The tenant identifier
|
tenant_id: The tenant identifier
|
||||||
tenant_owner_tz: Tenant owner's timezone
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if quota consumed successfully, False if limit reached
|
True if quota consumed successfully, False if limit reached
|
||||||
"""
|
"""
|
||||||
return self.rate_limiter.check_and_consume(
|
return self.rate_limiter.check_and_consume(
|
||||||
tenant_id=tenant_id, max_daily_limit=self.get_daily_limit(), timezone_str=tenant_owner_tz
|
tenant_id=tenant_id, max_daily_limit=self.get_daily_limit()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -1,11 +1,11 @@
|
|||||||
"""
|
"""
|
||||||
Day-based rate limiter for workflow executions.
|
Day-based rate limiter for workflow executions.
|
||||||
|
|
||||||
Implements timezone-aware daily quotas that reset at midnight in the tenant owner's timezone.
|
Implements UTC-based daily quotas that reset at midnight UTC for consistent rate limiting.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from datetime import datetime, time, timedelta
|
from datetime import datetime, time, timedelta
|
||||||
from typing import Optional, Union
|
from typing import Union
|
||||||
|
|
||||||
import pytz
|
import pytz
|
||||||
from redis import Redis
|
from redis import Redis
|
||||||
@ -18,13 +18,13 @@ from models.account import Account, TenantAccountJoin, TenantAccountRole
|
|||||||
|
|
||||||
class TenantDailyRateLimiter:
|
class TenantDailyRateLimiter:
|
||||||
"""
|
"""
|
||||||
Day-based rate limiter that resets at midnight in tenant owner's timezone
|
Day-based rate limiter that resets at midnight UTC
|
||||||
|
|
||||||
This class provides Redis-based rate limiting with the following features:
|
This class provides Redis-based rate limiting with the following features:
|
||||||
- Daily quotas that reset at midnight in tenant owner's timezone
|
- Daily quotas that reset at midnight UTC for consistency
|
||||||
- Atomic check-and-consume operations
|
- Atomic check-and-consume operations
|
||||||
- Automatic cleanup of stale counters
|
- Automatic cleanup of stale counters
|
||||||
- Support for timezone changes without duplicate limits
|
- Timezone-aware error messages for better UX
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, redis_client: Union[Redis, RedisClientWrapper]):
|
def __init__(self, redis_client: Union[Redis, RedisClientWrapper]):
|
||||||
@ -52,57 +52,47 @@ class TenantDailyRateLimiter:
|
|||||||
|
|
||||||
return owner.timezone or "UTC"
|
return owner.timezone or "UTC"
|
||||||
|
|
||||||
def _get_day_key(self, tenant_id: str, timezone_str: str) -> str:
|
def _get_day_key(self, tenant_id: str) -> str:
|
||||||
"""
|
"""
|
||||||
Get Redis key for current day in tenant's timezone
|
Get Redis key for current UTC day
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
tenant_id: The tenant identifier
|
tenant_id: The tenant identifier
|
||||||
timezone_str: Timezone string
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Redis key for the current day
|
Redis key for the current UTC day
|
||||||
"""
|
"""
|
||||||
tz = pytz.timezone(timezone_str)
|
utc_now = datetime.utcnow()
|
||||||
now = datetime.now(tz)
|
date_str = utc_now.strftime("%Y-%m-%d")
|
||||||
date_str = now.strftime("%Y-%m-%d")
|
return f"workflow:daily_limit:{tenant_id}:{date_str}"
|
||||||
return f"workflow:daily_limit:{tenant_id}:{date_str}:{timezone_str}"
|
|
||||||
|
|
||||||
def _get_ttl_seconds(self, timezone_str: str) -> int:
|
def _get_ttl_seconds(self) -> int:
|
||||||
"""
|
"""
|
||||||
Calculate seconds until midnight in given timezone
|
Calculate seconds until UTC midnight
|
||||||
|
|
||||||
Args:
|
|
||||||
timezone_str: Timezone string
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Number of seconds until midnight
|
Number of seconds until UTC midnight
|
||||||
"""
|
"""
|
||||||
tz = pytz.timezone(timezone_str)
|
utc_now = datetime.utcnow()
|
||||||
now = datetime.now(tz)
|
|
||||||
|
# Get next midnight in UTC
|
||||||
|
next_midnight = datetime.combine(utc_now.date() + timedelta(days=1), time.min)
|
||||||
|
|
||||||
|
return int((next_midnight - utc_now).total_seconds())
|
||||||
|
|
||||||
# Get next midnight in the timezone
|
def check_and_consume(self, tenant_id: str, max_daily_limit: int) -> bool:
|
||||||
midnight = tz.localize(datetime.combine(now.date() + timedelta(days=1), time.min))
|
|
||||||
|
|
||||||
return int((midnight - now).total_seconds())
|
|
||||||
|
|
||||||
def check_and_consume(self, tenant_id: str, max_daily_limit: int, timezone_str: Optional[str] = None) -> bool:
|
|
||||||
"""
|
"""
|
||||||
Check if quota available and consume one execution
|
Check if quota available and consume one execution
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
tenant_id: The tenant identifier
|
tenant_id: The tenant identifier
|
||||||
max_daily_limit: Maximum daily limit
|
max_daily_limit: Maximum daily limit
|
||||||
timezone_str: Optional timezone string (will be fetched if not provided)
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if quota consumed successfully, False if limit reached
|
True if quota consumed successfully, False if limit reached
|
||||||
"""
|
"""
|
||||||
if not timezone_str:
|
key = self._get_day_key(tenant_id)
|
||||||
timezone_str = self._get_tenant_owner_timezone(tenant_id)
|
ttl = self._get_ttl_seconds()
|
||||||
|
|
||||||
key = self._get_day_key(tenant_id, timezone_str)
|
|
||||||
ttl = self._get_ttl_seconds(timezone_str)
|
|
||||||
|
|
||||||
# Check current usage
|
# Check current usage
|
||||||
current = self.redis.get(key)
|
current = self.redis.get(key)
|
||||||
@ -116,7 +106,7 @@ class TenantDailyRateLimiter:
|
|||||||
if current_count < max_daily_limit:
|
if current_count < max_daily_limit:
|
||||||
# Within limit, increment
|
# Within limit, increment
|
||||||
new_count = self.redis.incr(key)
|
new_count = self.redis.incr(key)
|
||||||
# Update TTL in case timezone changed
|
# Update TTL
|
||||||
self.redis.expire(key, ttl)
|
self.redis.expire(key, ttl)
|
||||||
|
|
||||||
# Double-check in case of race condition
|
# Double-check in case of race condition
|
||||||
@ -130,77 +120,63 @@ class TenantDailyRateLimiter:
|
|||||||
# Limit exceeded
|
# Limit exceeded
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_remaining_quota(self, tenant_id: str, max_daily_limit: int, timezone_str: Optional[str] = None) -> int:
|
def get_remaining_quota(self, tenant_id: str, max_daily_limit: int) -> int:
|
||||||
"""
|
"""
|
||||||
Get remaining quota for the day
|
Get remaining quota for the day
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
tenant_id: The tenant identifier
|
tenant_id: The tenant identifier
|
||||||
max_daily_limit: Maximum daily limit
|
max_daily_limit: Maximum daily limit
|
||||||
timezone_str: Optional timezone string (will be fetched if not provided)
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Number of remaining executions for the day
|
Number of remaining executions for the day
|
||||||
"""
|
"""
|
||||||
if not timezone_str:
|
key = self._get_day_key(tenant_id)
|
||||||
timezone_str = self._get_tenant_owner_timezone(tenant_id)
|
|
||||||
|
|
||||||
key = self._get_day_key(tenant_id, timezone_str)
|
|
||||||
used = int(self.redis.get(key) or 0)
|
used = int(self.redis.get(key) or 0)
|
||||||
return max(0, max_daily_limit - used)
|
return max(0, max_daily_limit - used)
|
||||||
|
|
||||||
def get_current_usage(self, tenant_id: str, timezone_str: Optional[str] = None) -> int:
|
def get_current_usage(self, tenant_id: str) -> int:
|
||||||
"""
|
"""
|
||||||
Get current usage for the day
|
Get current usage for the day
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
tenant_id: The tenant identifier
|
tenant_id: The tenant identifier
|
||||||
timezone_str: Optional timezone string (will be fetched if not provided)
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Number of executions used today
|
Number of executions used today
|
||||||
"""
|
"""
|
||||||
if not timezone_str:
|
key = self._get_day_key(tenant_id)
|
||||||
timezone_str = self._get_tenant_owner_timezone(tenant_id)
|
|
||||||
|
|
||||||
key = self._get_day_key(tenant_id, timezone_str)
|
|
||||||
return int(self.redis.get(key) or 0)
|
return int(self.redis.get(key) or 0)
|
||||||
|
|
||||||
def reset_quota(self, tenant_id: str, timezone_str: Optional[str] = None) -> bool:
|
def reset_quota(self, tenant_id: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Reset quota for testing purposes
|
Reset quota for testing purposes
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
tenant_id: The tenant identifier
|
tenant_id: The tenant identifier
|
||||||
timezone_str: Optional timezone string (will be fetched if not provided)
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if key was deleted, False if key didn't exist
|
True if key was deleted, False if key didn't exist
|
||||||
"""
|
"""
|
||||||
if not timezone_str:
|
key = self._get_day_key(tenant_id)
|
||||||
timezone_str = self._get_tenant_owner_timezone(tenant_id)
|
|
||||||
|
|
||||||
key = self._get_day_key(tenant_id, timezone_str)
|
|
||||||
return bool(self.redis.delete(key))
|
return bool(self.redis.delete(key))
|
||||||
|
|
||||||
def get_quota_reset_time(self, tenant_id: str, timezone_str: Optional[str] = None) -> datetime:
|
def get_quota_reset_time(self, tenant_id: str, timezone_str: str) -> datetime:
|
||||||
"""
|
"""
|
||||||
Get the time when quota will reset (midnight in tenant's timezone)
|
Get the time when quota will reset (next UTC midnight in tenant's timezone)
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
tenant_id: The tenant identifier
|
tenant_id: The tenant identifier
|
||||||
timezone_str: Optional timezone string (will be fetched if not provided)
|
timezone_str: Tenant's timezone for display purposes
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Datetime when quota resets
|
Datetime when quota resets (next UTC midnight in tenant's timezone)
|
||||||
"""
|
"""
|
||||||
if not timezone_str:
|
|
||||||
timezone_str = self._get_tenant_owner_timezone(tenant_id)
|
|
||||||
|
|
||||||
tz = pytz.timezone(timezone_str)
|
tz = pytz.timezone(timezone_str)
|
||||||
now = datetime.now(tz)
|
utc_now = datetime.utcnow()
|
||||||
|
|
||||||
# Get next midnight in the timezone
|
# Get next midnight in UTC, then convert to tenant's timezone
|
||||||
midnight = tz.localize(datetime.combine(now.date() + timedelta(days=1), time.min))
|
next_utc_midnight = datetime.combine(utc_now.date() + timedelta(days=1), time.min)
|
||||||
|
next_utc_midnight = pytz.UTC.localize(next_utc_midnight)
|
||||||
return midnight
|
|
||||||
|
return next_utc_midnight.astimezone(tz)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user