From a1b735a4c09974ff25920c975ade19eb3d5a2d42 Mon Sep 17 00:00:00 2001 From: Maries Date: Thu, 20 Nov 2025 10:15:23 +0800 Subject: [PATCH] feat: trigger billing (#28335) Signed-off-by: lyzno1 Co-authored-by: lyzno1 Co-authored-by: lyzno1 <92089059+lyzno1@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/configs/feature/__init__.py | 6 +- api/enums/quota_type.py | 209 ++++++++++++++++++ api/libs/email_i18n.py | 78 +++++++ api/models/enums.py | 1 + api/schedule/workflow_schedule_task.py | 27 +-- api/services/app_generate_service.py | 25 +-- api/services/async_workflow_service.py | 28 +-- api/services/billing_service.py | 47 ++++ api/services/errors/app.py | 26 ++- api/services/feature_service.py | 20 ++ api/services/trigger/app_trigger_service.py | 46 ++++ api/services/trigger/webhook_service.py | 21 ++ api/services/workflow/queue_dispatcher.py | 47 +--- api/services/workflow/rate_limiter.py | 183 --------------- api/services/workflow_service.py | 20 +- api/tasks/trigger_processing_tasks.py | 29 ++- .../trigger_subscription_refresh_tasks.py | 10 +- api/tasks/workflow_schedule_tasks.py | 13 ++ .../services/test_app_generate_service.py | 64 ++---- dev/start-worker | 20 ++ .../components/app/app-publisher/index.tsx | 74 +++++-- web/app/components/billing/config.ts | 4 + web/app/components/billing/plan/index.tsx | 31 ++- .../plans/cloud-plan-item/list/index.tsx | 32 +-- .../index.module.css | 30 +++ .../index.stories.tsx | 97 ++++++++ .../trigger-events-limit-modal/index.tsx | 90 ++++++++ web/app/components/billing/type.ts | 21 +- .../components/billing/upgrade-btn/index.tsx | 22 +- .../components/billing/usage-info/index.tsx | 43 ++-- web/app/components/billing/utils/index.ts | 53 ++++- .../workflow-header/features-trigger.tsx | 15 +- .../hooks/use-trigger-events-limit-modal.ts | 130 +++++++++++ web/context/modal-context.test.tsx | 181 +++++++++++++++ web/context/modal-context.tsx | 43 ++++ web/context/provider-context.tsx | 3 +- web/i18n/de-DE/billing.ts | 2 +- web/i18n/en-US/billing.ts | 20 +- web/i18n/en-US/workflow.ts | 5 + web/i18n/es-ES/billing.ts | 2 +- web/i18n/fa-IR/billing.ts | 2 +- web/i18n/fr-FR/billing.ts | 4 +- web/i18n/hi-IN/billing.ts | 2 +- web/i18n/it-IT/billing.ts | 4 +- web/i18n/ja-JP/billing.ts | 32 ++- web/i18n/ja-JP/workflow.ts | 5 + web/i18n/ko-KR/billing.ts | 2 +- web/i18n/pl-PL/billing.ts | 2 +- web/i18n/pt-BR/billing.ts | 2 +- web/i18n/ro-RO/billing.ts | 4 +- web/i18n/ru-RU/billing.ts | 2 +- web/i18n/sl-SI/billing.ts | 2 +- web/i18n/th-TH/billing.ts | 2 +- web/i18n/tr-TR/billing.ts | 2 +- web/i18n/uk-UA/billing.ts | 2 +- web/i18n/vi-VN/billing.ts | 2 +- web/i18n/zh-Hans/billing.ts | 32 ++- web/i18n/zh-Hans/workflow.ts | 5 + web/i18n/zh-Hant/billing.ts | 2 +- web/i18n/zh-Hant/workflow.ts | 5 + web/utils/time.ts | 7 + 61 files changed, 1475 insertions(+), 465 deletions(-) create mode 100644 api/enums/quota_type.py create mode 100644 api/services/trigger/app_trigger_service.py delete mode 100644 api/services/workflow/rate_limiter.py create mode 100644 web/app/components/billing/trigger-events-limit-modal/index.module.css create mode 100644 web/app/components/billing/trigger-events-limit-modal/index.stories.tsx create mode 100644 web/app/components/billing/trigger-events-limit-modal/index.tsx create mode 100644 web/context/hooks/use-trigger-events-limit-modal.ts create mode 100644 web/context/modal-context.test.tsx diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index ff1f983f94..7cce3847b4 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -77,10 +77,6 @@ class AppExecutionConfig(BaseSettings): description="Maximum number of concurrent active requests per app (0 for unlimited)", default=0, ) - APP_DAILY_RATE_LIMIT: NonNegativeInt = Field( - description="Maximum number of requests per app per day", - default=5000, - ) class CodeExecutionSandboxConfig(BaseSettings): @@ -1086,7 +1082,7 @@ class CeleryScheduleTasksConfig(BaseSettings): ) TRIGGER_PROVIDER_CREDENTIAL_THRESHOLD_SECONDS: int = Field( description="Proactive credential refresh threshold in seconds", - default=180, + default=60 * 60, ) TRIGGER_PROVIDER_SUBSCRIPTION_THRESHOLD_SECONDS: int = Field( description="Proactive subscription refresh threshold in seconds", diff --git a/api/enums/quota_type.py b/api/enums/quota_type.py new file mode 100644 index 0000000000..9f511b88ef --- /dev/null +++ b/api/enums/quota_type.py @@ -0,0 +1,209 @@ +import logging +from dataclasses import dataclass +from enum import StrEnum, auto + +logger = logging.getLogger(__name__) + + +@dataclass +class QuotaCharge: + """ + Result of a quota consumption operation. + + Attributes: + success: Whether the quota charge succeeded + charge_id: UUID for refund, or None if failed/disabled + """ + + success: bool + charge_id: str | None + _quota_type: "QuotaType" + + def refund(self) -> None: + """ + Refund this quota charge. + + Safe to call even if charge failed or was disabled. + This method guarantees no exceptions will be raised. + """ + if self.charge_id: + self._quota_type.refund(self.charge_id) + logger.info("Refunded quota for %s with charge_id: %s", self._quota_type.value, self.charge_id) + + +class QuotaType(StrEnum): + """ + Supported quota types for tenant feature usage. + + Add additional types here whenever new billable features become available. + """ + + # Trigger execution quota + TRIGGER = auto() + + # Workflow execution quota + WORKFLOW = auto() + + UNLIMITED = auto() + + @property + def billing_key(self) -> str: + """ + Get the billing key for the feature. + """ + match self: + case QuotaType.TRIGGER: + return "trigger_event" + case QuotaType.WORKFLOW: + return "api_rate_limit" + case _: + raise ValueError(f"Invalid quota type: {self}") + + def consume(self, tenant_id: str, amount: int = 1) -> QuotaCharge: + """ + Consume quota for the feature. + + Args: + tenant_id: The tenant identifier + amount: Amount to consume (default: 1) + + Returns: + QuotaCharge with success status and charge_id for refund + + Raises: + QuotaExceededError: When quota is insufficient + """ + from configs import dify_config + from services.billing_service import BillingService + from services.errors.app import QuotaExceededError + + if not dify_config.BILLING_ENABLED: + logger.debug("Billing disabled, allowing request for %s", tenant_id) + return QuotaCharge(success=True, charge_id=None, _quota_type=self) + + logger.info("Consuming %d %s quota for tenant %s", amount, self.value, tenant_id) + + if amount <= 0: + raise ValueError("Amount to consume must be greater than 0") + + try: + response = BillingService.update_tenant_feature_plan_usage(tenant_id, self.billing_key, delta=amount) + + if response.get("result") != "success": + logger.warning( + "Failed to consume quota for %s, feature %s details: %s", + tenant_id, + self.value, + response.get("detail"), + ) + raise QuotaExceededError(feature=self.value, tenant_id=tenant_id, required=amount) + + charge_id = response.get("history_id") + logger.debug( + "Successfully consumed %d %s quota for tenant %s, charge_id: %s", + amount, + self.value, + tenant_id, + charge_id, + ) + return QuotaCharge(success=True, charge_id=charge_id, _quota_type=self) + + except QuotaExceededError: + raise + except Exception: + # fail-safe: allow request on billing errors + logger.exception("Failed to consume quota for %s, feature %s", tenant_id, self.value) + return unlimited() + + def check(self, tenant_id: str, amount: int = 1) -> bool: + """ + Check if tenant has sufficient quota without consuming. + + Args: + tenant_id: The tenant identifier + amount: Amount to check (default: 1) + + Returns: + True if quota is sufficient, False otherwise + """ + from configs import dify_config + + if not dify_config.BILLING_ENABLED: + return True + + if amount <= 0: + raise ValueError("Amount to check must be greater than 0") + + try: + remaining = self.get_remaining(tenant_id) + return remaining >= amount if remaining != -1 else True + except Exception: + logger.exception("Failed to check quota for %s, feature %s", tenant_id, self.value) + # fail-safe: allow request on billing errors + return True + + def refund(self, charge_id: str) -> None: + """ + Refund quota using charge_id from consume(). + + This method guarantees no exceptions will be raised. + All errors are logged but silently handled. + + Args: + charge_id: The UUID returned from consume() + """ + try: + from configs import dify_config + from services.billing_service import BillingService + + if not dify_config.BILLING_ENABLED: + return + + if not charge_id: + logger.warning("Cannot refund: charge_id is empty") + return + + logger.info("Refunding %s quota with charge_id: %s", self.value, charge_id) + + response = BillingService.refund_tenant_feature_plan_usage(charge_id) + if response.get("result") == "success": + logger.debug("Successfully refunded %s quota, charge_id: %s", self.value, charge_id) + else: + logger.warning("Refund failed for charge_id: %s", charge_id) + + except Exception: + # Catch ALL exceptions - refund must never fail + logger.exception("Failed to refund quota for charge_id: %s", charge_id) + # Don't raise - refund is best-effort and must be silent + + def get_remaining(self, tenant_id: str) -> int: + """ + Get remaining quota for the tenant. + + Args: + tenant_id: The tenant identifier + + Returns: + Remaining quota amount + """ + from services.billing_service import BillingService + + try: + usage_info = BillingService.get_tenant_feature_plan_usage(tenant_id, self.billing_key) + # Assuming the API returns a dict with 'remaining' or 'limit' and 'used' + if isinstance(usage_info, dict): + return usage_info.get("remaining", 0) + # If it returns a simple number, treat it as remaining + return int(usage_info) if usage_info else 0 + except Exception: + logger.exception("Failed to get remaining quota for %s, feature %s", tenant_id, self.value) + return -1 + + +def unlimited() -> QuotaCharge: + """ + Return a quota charge for unlimited quota. + + This is useful for features that are not subject to quota limits, such as the UNLIMITED quota type. + """ + return QuotaCharge(success=True, charge_id=None, _quota_type=QuotaType.UNLIMITED) diff --git a/api/libs/email_i18n.py b/api/libs/email_i18n.py index 37ff1a438e..ff74ccbe8e 100644 --- a/api/libs/email_i18n.py +++ b/api/libs/email_i18n.py @@ -38,6 +38,12 @@ class EmailType(StrEnum): EMAIL_REGISTER = auto() EMAIL_REGISTER_WHEN_ACCOUNT_EXIST = auto() RESET_PASSWORD_WHEN_ACCOUNT_NOT_EXIST_NO_REGISTER = auto() + TRIGGER_EVENTS_LIMIT_SANDBOX = auto() + TRIGGER_EVENTS_LIMIT_PROFESSIONAL = auto() + TRIGGER_EVENTS_USAGE_WARNING_SANDBOX = auto() + TRIGGER_EVENTS_USAGE_WARNING_PROFESSIONAL = auto() + API_RATE_LIMIT_LIMIT_SANDBOX = auto() + API_RATE_LIMIT_WARNING_SANDBOX = auto() class EmailLanguage(StrEnum): @@ -445,6 +451,78 @@ def create_default_email_config() -> EmailI18nConfig: branded_template_path="clean_document_job_mail_template_zh-CN.html", ), }, + EmailType.TRIGGER_EVENTS_LIMIT_SANDBOX: { + EmailLanguage.EN_US: EmailTemplate( + subject="You’ve reached your Sandbox Trigger Events limit", + template_path="trigger_events_limit_template_en-US.html", + branded_template_path="without-brand/trigger_events_limit_template_en-US.html", + ), + EmailLanguage.ZH_HANS: EmailTemplate( + subject="您的 Sandbox 触发事件额度已用尽", + template_path="trigger_events_limit_template_zh-CN.html", + branded_template_path="without-brand/trigger_events_limit_template_zh-CN.html", + ), + }, + EmailType.TRIGGER_EVENTS_LIMIT_PROFESSIONAL: { + EmailLanguage.EN_US: EmailTemplate( + subject="You’ve reached your monthly Trigger Events limit", + template_path="trigger_events_limit_template_en-US.html", + branded_template_path="without-brand/trigger_events_limit_template_en-US.html", + ), + EmailLanguage.ZH_HANS: EmailTemplate( + subject="您的月度触发事件额度已用尽", + template_path="trigger_events_limit_template_zh-CN.html", + branded_template_path="without-brand/trigger_events_limit_template_zh-CN.html", + ), + }, + EmailType.TRIGGER_EVENTS_USAGE_WARNING_SANDBOX: { + EmailLanguage.EN_US: EmailTemplate( + subject="You’re nearing your Sandbox Trigger Events limit", + template_path="trigger_events_usage_warning_template_en-US.html", + branded_template_path="without-brand/trigger_events_usage_warning_template_en-US.html", + ), + EmailLanguage.ZH_HANS: EmailTemplate( + subject="您的 Sandbox 触发事件额度接近上限", + template_path="trigger_events_usage_warning_template_zh-CN.html", + branded_template_path="without-brand/trigger_events_usage_warning_template_zh-CN.html", + ), + }, + EmailType.TRIGGER_EVENTS_USAGE_WARNING_PROFESSIONAL: { + EmailLanguage.EN_US: EmailTemplate( + subject="You’re nearing your Monthly Trigger Events limit", + template_path="trigger_events_usage_warning_template_en-US.html", + branded_template_path="without-brand/trigger_events_usage_warning_template_en-US.html", + ), + EmailLanguage.ZH_HANS: EmailTemplate( + subject="您的月度触发事件额度接近上限", + template_path="trigger_events_usage_warning_template_zh-CN.html", + branded_template_path="without-brand/trigger_events_usage_warning_template_zh-CN.html", + ), + }, + EmailType.API_RATE_LIMIT_LIMIT_SANDBOX: { + EmailLanguage.EN_US: EmailTemplate( + subject="You’ve reached your API Rate Limit", + template_path="api_rate_limit_limit_template_en-US.html", + branded_template_path="without-brand/api_rate_limit_limit_template_en-US.html", + ), + EmailLanguage.ZH_HANS: EmailTemplate( + subject="您的 API 速率额度已用尽", + template_path="api_rate_limit_limit_template_zh-CN.html", + branded_template_path="without-brand/api_rate_limit_limit_template_zh-CN.html", + ), + }, + EmailType.API_RATE_LIMIT_WARNING_SANDBOX: { + EmailLanguage.EN_US: EmailTemplate( + subject="You’re nearing your API Rate Limit", + template_path="api_rate_limit_warning_template_en-US.html", + branded_template_path="without-brand/api_rate_limit_warning_template_en-US.html", + ), + EmailLanguage.ZH_HANS: EmailTemplate( + subject="您的 API 速率额度接近上限", + template_path="api_rate_limit_warning_template_zh-CN.html", + branded_template_path="without-brand/api_rate_limit_warning_template_zh-CN.html", + ), + }, EmailType.EMAIL_REGISTER: { EmailLanguage.EN_US: EmailTemplate( subject="Register Your {application_title} Account", diff --git a/api/models/enums.py b/api/models/enums.py index d06d0d5ebc..8cd3d4cf2a 100644 --- a/api/models/enums.py +++ b/api/models/enums.py @@ -64,6 +64,7 @@ class AppTriggerStatus(StrEnum): ENABLED = "enabled" DISABLED = "disabled" UNAUTHORIZED = "unauthorized" + RATE_LIMITED = "rate_limited" class AppTriggerType(StrEnum): diff --git a/api/schedule/workflow_schedule_task.py b/api/schedule/workflow_schedule_task.py index 41e2232353..d68b9565ec 100644 --- a/api/schedule/workflow_schedule_task.py +++ b/api/schedule/workflow_schedule_task.py @@ -9,7 +9,6 @@ from extensions.ext_database import db from libs.datetime_utils import naive_utc_now from libs.schedule_utils import calculate_next_run_at from models.trigger import AppTrigger, AppTriggerStatus, AppTriggerType, WorkflowSchedulePlan -from services.workflow.queue_dispatcher import QueueDispatcherManager from tasks.workflow_schedule_tasks import run_schedule_trigger logger = logging.getLogger(__name__) @@ -29,7 +28,6 @@ def poll_workflow_schedules() -> None: with session_factory() as session: total_dispatched = 0 - total_rate_limited = 0 # Process in batches until we've handled all due schedules or hit the limit while True: @@ -38,11 +36,10 @@ def poll_workflow_schedules() -> None: if not due_schedules: break - dispatched_count, rate_limited_count = _process_schedules(session, due_schedules) + dispatched_count = _process_schedules(session, due_schedules) total_dispatched += dispatched_count - total_rate_limited += rate_limited_count - logger.debug("Batch processed: %d dispatched, %d rate limited", dispatched_count, rate_limited_count) + logger.debug("Batch processed: %d dispatched", dispatched_count) # Circuit breaker: check if we've hit the per-tick limit (if enabled) if ( @@ -55,8 +52,8 @@ def poll_workflow_schedules() -> None: ) break - if total_dispatched > 0 or total_rate_limited > 0: - logger.info("Total processed: %d dispatched, %d rate limited", total_dispatched, total_rate_limited) + if total_dispatched > 0: + logger.info("Total processed: %d dispatched", total_dispatched) def _fetch_due_schedules(session: Session) -> list[WorkflowSchedulePlan]: @@ -93,15 +90,12 @@ def _fetch_due_schedules(session: Session) -> list[WorkflowSchedulePlan]: return list(due_schedules) -def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan]) -> tuple[int, int]: +def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan]) -> int: """Process schedules: check quota, update next run time and dispatch to Celery in parallel.""" if not schedules: - return 0, 0 + return 0 - dispatcher_manager = QueueDispatcherManager() tasks_to_dispatch: list[str] = [] - rate_limited_count = 0 - for schedule in schedules: next_run_at = calculate_next_run_at( schedule.cron_expression, @@ -109,12 +103,7 @@ def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan]) ) schedule.next_run_at = next_run_at - dispatcher = dispatcher_manager.get_dispatcher(schedule.tenant_id) - if not dispatcher.check_daily_quota(schedule.tenant_id): - logger.info("Tenant %s rate limited, skipping schedule_plan %s", schedule.tenant_id, schedule.id) - rate_limited_count += 1 - else: - tasks_to_dispatch.append(schedule.id) + tasks_to_dispatch.append(schedule.id) if tasks_to_dispatch: job = group(run_schedule_trigger.s(schedule_id) for schedule_id in tasks_to_dispatch) @@ -124,4 +113,4 @@ def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan]) session.commit() - return len(tasks_to_dispatch), rate_limited_count + return len(tasks_to_dispatch) diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 5b09bd9593..bb1ea742d0 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -10,19 +10,14 @@ from core.app.apps.completion.app_generator import CompletionAppGenerator from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom from core.app.features.rate_limiting import RateLimit -from enums.cloud_plan import CloudPlan -from libs.helper import RateLimiter +from enums.quota_type import QuotaType, unlimited from models.model import Account, App, AppMode, EndUser from models.workflow import Workflow -from services.billing_service import BillingService -from services.errors.app import WorkflowIdFormatError, WorkflowNotFoundError -from services.errors.llm import InvokeRateLimitError +from services.errors.app import InvokeRateLimitError, QuotaExceededError, WorkflowIdFormatError, WorkflowNotFoundError from services.workflow_service import WorkflowService class AppGenerateService: - system_rate_limiter = RateLimiter("app_daily_rate_limiter", dify_config.APP_DAILY_RATE_LIMIT, 86400) - @classmethod def generate( cls, @@ -42,17 +37,12 @@ class AppGenerateService: :param streaming: streaming :return: """ - # system level rate limiter + quota_charge = unlimited() if dify_config.BILLING_ENABLED: - # check if it's free plan - limit_info = BillingService.get_info(app_model.tenant_id) - if limit_info["subscription"]["plan"] == CloudPlan.SANDBOX: - if cls.system_rate_limiter.is_rate_limited(app_model.tenant_id): - raise InvokeRateLimitError( - "Rate limit exceeded, please upgrade your plan " - f"or your RPD was {dify_config.APP_DAILY_RATE_LIMIT} requests/day" - ) - cls.system_rate_limiter.increment_rate_limit(app_model.tenant_id) + try: + quota_charge = QuotaType.WORKFLOW.consume(app_model.tenant_id) + except QuotaExceededError: + raise InvokeRateLimitError(f"Workflow execution quota limit reached for tenant {app_model.tenant_id}") # app level rate limiter max_active_request = cls._get_max_active_requests(app_model) @@ -124,6 +114,7 @@ class AppGenerateService: else: raise ValueError(f"Invalid app mode {app_model.mode}") except Exception: + quota_charge.refund() rate_limit.exit(request_id) raise finally: diff --git a/api/services/async_workflow_service.py b/api/services/async_workflow_service.py index 034d7ffedb..8d62f121e2 100644 --- a/api/services/async_workflow_service.py +++ b/api/services/async_workflow_service.py @@ -13,18 +13,17 @@ from celery.result import AsyncResult from sqlalchemy import select from sqlalchemy.orm import Session +from enums.quota_type import QuotaType from extensions.ext_database import db -from extensions.ext_redis import redis_client from models.account import Account from models.enums import CreatorUserRole, WorkflowTriggerStatus from models.model import App, EndUser from models.trigger import WorkflowTriggerLog from models.workflow import Workflow from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository -from services.errors.app import InvokeDailyRateLimitError, WorkflowNotFoundError +from services.errors.app import InvokeRateLimitError, QuotaExceededError, WorkflowNotFoundError from services.workflow.entities import AsyncTriggerResponse, TriggerData, WorkflowTaskData from services.workflow.queue_dispatcher import QueueDispatcherManager, QueuePriority -from services.workflow.rate_limiter import TenantDailyRateLimiter from services.workflow_service import WorkflowService from tasks.async_workflow_tasks import ( execute_workflow_professional, @@ -82,7 +81,6 @@ class AsyncWorkflowService: trigger_log_repo = SQLAlchemyWorkflowTriggerLogRepository(session) dispatcher_manager = QueueDispatcherManager() workflow_service = WorkflowService() - rate_limiter = TenantDailyRateLimiter(redis_client) # 1. Validate app exists app_model = session.scalar(select(App).where(App.id == trigger_data.app_id)) @@ -127,25 +125,19 @@ class AsyncWorkflowService: trigger_log = trigger_log_repo.create(trigger_log) session.commit() - # 7. Check and consume daily quota - if not dispatcher.consume_quota(trigger_data.tenant_id): + # 7. Check and consume quota + try: + QuotaType.WORKFLOW.consume(trigger_data.tenant_id) + except QuotaExceededError as e: # Update trigger log status trigger_log.status = WorkflowTriggerStatus.RATE_LIMITED - trigger_log.error = f"Daily limit reached for {dispatcher.get_queue_name()}" + trigger_log.error = f"Quota limit reached: {e}" trigger_log_repo.update(trigger_log) session.commit() - tenant_owner_tz = rate_limiter.get_tenant_owner_timezone(trigger_data.tenant_id) - - remaining = rate_limiter.get_remaining_quota(trigger_data.tenant_id, dispatcher.get_daily_limit()) - - reset_time = rate_limiter.get_quota_reset_time(trigger_data.tenant_id, tenant_owner_tz) - - raise InvokeDailyRateLimitError( - f"Daily workflow execution limit reached. " - f"Limit resets at {reset_time.strftime('%Y-%m-%d %H:%M:%S %Z')}. " - f"Remaining quota: {remaining}" - ) + raise InvokeRateLimitError( + f"Workflow execution quota limit reached for tenant {trigger_data.tenant_id}" + ) from e # 8. Create task data queue_name = dispatcher.get_queue_name() diff --git a/api/services/billing_service.py b/api/services/billing_service.py index 1650bad0f5..1f1aea709f 100644 --- a/api/services/billing_service.py +++ b/api/services/billing_service.py @@ -24,6 +24,13 @@ class BillingService: billing_info = cls._send_request("GET", "/subscription/info", params=params) return billing_info + @classmethod + def get_tenant_feature_plan_usage_info(cls, tenant_id: str): + params = {"tenant_id": tenant_id} + + usage_info = cls._send_request("GET", "/tenant-feature-usage/info", params=params) + return usage_info + @classmethod def get_knowledge_rate_limit(cls, tenant_id: str): params = {"tenant_id": tenant_id} @@ -55,6 +62,44 @@ class BillingService: params = {"prefilled_email": prefilled_email, "tenant_id": tenant_id} return cls._send_request("GET", "/invoices", params=params) + @classmethod + def update_tenant_feature_plan_usage(cls, tenant_id: str, feature_key: str, delta: int) -> dict: + """ + Update tenant feature plan usage. + + Args: + tenant_id: Tenant identifier + feature_key: Feature key (e.g., 'trigger', 'workflow') + delta: Usage delta (positive to add, negative to consume) + + Returns: + Response dict with 'result' and 'history_id' + Example: {"result": "success", "history_id": "uuid"} + """ + return cls._send_request( + "POST", + "/tenant-feature-usage/usage", + params={"tenant_id": tenant_id, "feature_key": feature_key, "delta": delta}, + ) + + @classmethod + def refund_tenant_feature_plan_usage(cls, history_id: str) -> dict: + """ + Refund a previous usage charge. + + Args: + history_id: The history_id returned from update_tenant_feature_plan_usage + + Returns: + Response dict with 'result' and 'history_id' + """ + return cls._send_request("POST", "/tenant-feature-usage/refund", params={"quota_usage_history_id": history_id}) + + @classmethod + def get_tenant_feature_plan_usage(cls, tenant_id: str, feature_key: str): + params = {"tenant_id": tenant_id, "feature_key": feature_key} + return cls._send_request("GET", "/billing/tenant_feature_plan/usage", params=params) + @classmethod @retry( wait=wait_fixed(2), @@ -69,6 +114,8 @@ class BillingService: response = httpx.request(method, url, json=json, params=params, headers=headers) if method == "GET" and response.status_code != httpx.codes.OK: raise ValueError("Unable to retrieve billing information. Please try again later or contact support.") + if method == "POST" and response.status_code != httpx.codes.OK: + raise ValueError(f"Unable to send request to {url}. Please try again later or contact support.") return response.json() @staticmethod diff --git a/api/services/errors/app.py b/api/services/errors/app.py index 338636d9b6..24e4760acc 100644 --- a/api/services/errors/app.py +++ b/api/services/errors/app.py @@ -18,7 +18,29 @@ class WorkflowIdFormatError(Exception): pass -class InvokeDailyRateLimitError(Exception): - """Raised when daily rate limit is exceeded for workflow invocations.""" +class InvokeRateLimitError(Exception): + """Raised when rate limit is exceeded for workflow invocations.""" pass + + +class QuotaExceededError(ValueError): + """Raised when billing quota is exceeded for a feature.""" + + def __init__(self, feature: str, tenant_id: str, required: int): + self.feature = feature + self.tenant_id = tenant_id + self.required = required + super().__init__(f"Quota exceeded for feature '{feature}' (tenant: {tenant_id}). Required: {required}") + + +class TriggerNodeLimitExceededError(ValueError): + """Raised when trigger node count exceeds the plan limit.""" + + def __init__(self, count: int, limit: int): + self.count = count + self.limit = limit + super().__init__( + f"Trigger node count ({count}) exceeds the limit ({limit}) for your subscription plan. " + f"Please upgrade your plan or reduce the number of trigger nodes." + ) diff --git a/api/services/feature_service.py b/api/services/feature_service.py index 44bea57769..8035adc734 100644 --- a/api/services/feature_service.py +++ b/api/services/feature_service.py @@ -54,6 +54,12 @@ class LicenseLimitationModel(BaseModel): return (self.limit - self.size) >= required +class Quota(BaseModel): + usage: int = 0 + limit: int = 0 + reset_date: int = -1 + + class LicenseStatus(StrEnum): NONE = "none" INACTIVE = "inactive" @@ -129,6 +135,8 @@ class FeatureModel(BaseModel): webapp_copyright_enabled: bool = False workspace_members: LicenseLimitationModel = LicenseLimitationModel(enabled=False, size=0, limit=0) is_allow_transfer_workspace: bool = True + trigger_event: Quota = Quota(usage=0, limit=3000, reset_date=0) + api_rate_limit: Quota = Quota(usage=0, limit=5000, reset_date=0) # pydantic configs model_config = ConfigDict(protected_namespaces=()) knowledge_pipeline: KnowledgePipeline = KnowledgePipeline() @@ -236,6 +244,8 @@ class FeatureService: def _fulfill_params_from_billing_api(cls, features: FeatureModel, tenant_id: str): billing_info = BillingService.get_info(tenant_id) + features_usage_info = BillingService.get_tenant_feature_plan_usage_info(tenant_id) + features.billing.enabled = billing_info["enabled"] features.billing.subscription.plan = billing_info["subscription"]["plan"] features.billing.subscription.interval = billing_info["subscription"]["interval"] @@ -246,6 +256,16 @@ class FeatureService: else: features.is_allow_transfer_workspace = False + if "trigger_event" in features_usage_info: + features.trigger_event.usage = features_usage_info["trigger_event"]["usage"] + features.trigger_event.limit = features_usage_info["trigger_event"]["limit"] + features.trigger_event.reset_date = features_usage_info["trigger_event"].get("reset_date", -1) + + if "api_rate_limit" in features_usage_info: + features.api_rate_limit.usage = features_usage_info["api_rate_limit"]["usage"] + features.api_rate_limit.limit = features_usage_info["api_rate_limit"]["limit"] + features.api_rate_limit.reset_date = features_usage_info["api_rate_limit"].get("reset_date", -1) + if "members" in billing_info: features.members.size = billing_info["members"]["size"] features.members.limit = billing_info["members"]["limit"] diff --git a/api/services/trigger/app_trigger_service.py b/api/services/trigger/app_trigger_service.py new file mode 100644 index 0000000000..6d5a719f63 --- /dev/null +++ b/api/services/trigger/app_trigger_service.py @@ -0,0 +1,46 @@ +""" +AppTrigger management service. + +Handles AppTrigger model CRUD operations and status management. +This service centralizes all AppTrigger-related business logic. +""" + +import logging + +from sqlalchemy import update +from sqlalchemy.orm import Session + +from extensions.ext_database import db +from models.enums import AppTriggerStatus +from models.trigger import AppTrigger + +logger = logging.getLogger(__name__) + + +class AppTriggerService: + """Service for managing AppTrigger lifecycle and status.""" + + @staticmethod + def mark_tenant_triggers_rate_limited(tenant_id: str) -> None: + """ + Mark all enabled triggers for a tenant as rate limited due to quota exceeded. + + This method is called when a tenant's quota is exhausted. It updates all + enabled triggers to RATE_LIMITED status to prevent further executions until + quota is restored. + + Args: + tenant_id: Tenant ID whose triggers should be marked as rate limited + + """ + try: + with Session(db.engine) as session: + session.execute( + update(AppTrigger) + .where(AppTrigger.tenant_id == tenant_id, AppTrigger.status == AppTriggerStatus.ENABLED) + .values(status=AppTriggerStatus.RATE_LIMITED) + ) + session.commit() + logger.info("Marked all enabled triggers as rate limited for tenant %s", tenant_id) + except Exception: + logger.exception("Failed to mark all enabled triggers as rate limited for tenant %s", tenant_id) diff --git a/api/services/trigger/webhook_service.py b/api/services/trigger/webhook_service.py index 946764c35c..e2be4934e9 100644 --- a/api/services/trigger/webhook_service.py +++ b/api/services/trigger/webhook_service.py @@ -18,6 +18,7 @@ from core.file.models import FileTransferMethod from core.tools.tool_file_manager import ToolFileManager from core.variables.types import SegmentType from core.workflow.enums import NodeType +from enums.quota_type import QuotaType from extensions.ext_database import db from extensions.ext_redis import redis_client from factories import file_factory @@ -27,6 +28,8 @@ from models.trigger import AppTrigger, WorkflowWebhookTrigger from models.workflow import Workflow from services.async_workflow_service import AsyncWorkflowService from services.end_user_service import EndUserService +from services.errors.app import QuotaExceededError +from services.trigger.app_trigger_service import AppTriggerService from services.workflow.entities import WebhookTriggerData logger = logging.getLogger(__name__) @@ -98,6 +101,12 @@ class WebhookService: raise ValueError(f"App trigger not found for webhook {webhook_id}") # Only check enabled status if not in debug mode + + if app_trigger.status == AppTriggerStatus.RATE_LIMITED: + raise ValueError( + f"Webhook trigger is rate limited for webhook {webhook_id}, please upgrade your plan." + ) + if app_trigger.status != AppTriggerStatus.ENABLED: raise ValueError(f"Webhook trigger is disabled for webhook {webhook_id}") @@ -729,6 +738,18 @@ class WebhookService: user_id=None, ) + # consume quota before triggering workflow execution + try: + QuotaType.TRIGGER.consume(webhook_trigger.tenant_id) + except QuotaExceededError: + AppTriggerService.mark_tenant_triggers_rate_limited(webhook_trigger.tenant_id) + logger.info( + "Tenant %s rate limited, skipping webhook trigger %s", + webhook_trigger.tenant_id, + webhook_trigger.webhook_id, + ) + raise + # Trigger workflow execution asynchronously AsyncWorkflowService.trigger_workflow_async( session, diff --git a/api/services/workflow/queue_dispatcher.py b/api/services/workflow/queue_dispatcher.py index c55de7a085..cc366482c8 100644 --- a/api/services/workflow/queue_dispatcher.py +++ b/api/services/workflow/queue_dispatcher.py @@ -2,16 +2,14 @@ Queue dispatcher system for async workflow execution. Implements an ABC-based pattern for handling different subscription tiers -with appropriate queue routing and rate limiting. +with appropriate queue routing and priority assignment. """ from abc import ABC, abstractmethod from enum import StrEnum from configs import dify_config -from extensions.ext_redis import redis_client from services.billing_service import BillingService -from services.workflow.rate_limiter import TenantDailyRateLimiter class QueuePriority(StrEnum): @@ -25,50 +23,16 @@ class QueuePriority(StrEnum): class BaseQueueDispatcher(ABC): """Abstract base class for queue dispatchers""" - def __init__(self): - self.rate_limiter = TenantDailyRateLimiter(redis_client) - @abstractmethod def get_queue_name(self) -> str: """Get the queue name for this dispatcher""" pass - @abstractmethod - def get_daily_limit(self) -> int: - """Get daily execution limit""" - pass - @abstractmethod def get_priority(self) -> int: """Get task priority level""" pass - def check_daily_quota(self, tenant_id: str) -> bool: - """ - Check if tenant has remaining daily quota - - Args: - tenant_id: The tenant identifier - - Returns: - True if quota available, False otherwise - """ - # Check without consuming - remaining = self.rate_limiter.get_remaining_quota(tenant_id=tenant_id, max_daily_limit=self.get_daily_limit()) - return remaining > 0 - - def consume_quota(self, tenant_id: str) -> bool: - """ - Consume one execution from daily quota - - Args: - tenant_id: The tenant identifier - - Returns: - True if quota consumed successfully, False if limit reached - """ - return self.rate_limiter.check_and_consume(tenant_id=tenant_id, max_daily_limit=self.get_daily_limit()) - class ProfessionalQueueDispatcher(BaseQueueDispatcher): """Dispatcher for professional tier""" @@ -76,9 +40,6 @@ class ProfessionalQueueDispatcher(BaseQueueDispatcher): def get_queue_name(self) -> str: return QueuePriority.PROFESSIONAL - def get_daily_limit(self) -> int: - return int(1e9) - def get_priority(self) -> int: return 100 @@ -89,9 +50,6 @@ class TeamQueueDispatcher(BaseQueueDispatcher): def get_queue_name(self) -> str: return QueuePriority.TEAM - def get_daily_limit(self) -> int: - return int(1e9) - def get_priority(self) -> int: return 50 @@ -102,9 +60,6 @@ class SandboxQueueDispatcher(BaseQueueDispatcher): def get_queue_name(self) -> str: return QueuePriority.SANDBOX - def get_daily_limit(self) -> int: - return dify_config.APP_DAILY_RATE_LIMIT - def get_priority(self) -> int: return 10 diff --git a/api/services/workflow/rate_limiter.py b/api/services/workflow/rate_limiter.py deleted file mode 100644 index 1ccb4e1961..0000000000 --- a/api/services/workflow/rate_limiter.py +++ /dev/null @@ -1,183 +0,0 @@ -""" -Day-based rate limiter for workflow executions. - -Implements UTC-based daily quotas that reset at midnight UTC for consistent rate limiting. -""" - -from datetime import UTC, datetime, time, timedelta -from typing import Union - -import pytz -from redis import Redis -from sqlalchemy import select - -from extensions.ext_database import db -from extensions.ext_redis import RedisClientWrapper -from models.account import Account, TenantAccountJoin, TenantAccountRole - - -class TenantDailyRateLimiter: - """ - Day-based rate limiter that resets at midnight UTC - - This class provides Redis-based rate limiting with the following features: - - Daily quotas that reset at midnight UTC for consistency - - Atomic check-and-consume operations - - Automatic cleanup of stale counters - - Timezone-aware error messages for better UX - """ - - def __init__(self, redis_client: Union[Redis, RedisClientWrapper]): - self.redis = redis_client - - def get_tenant_owner_timezone(self, tenant_id: str) -> str: - """ - Get timezone of tenant owner - - Args: - tenant_id: The tenant identifier - - Returns: - Timezone string (e.g., 'America/New_York', 'UTC') - """ - # Query to get tenant owner's timezone using scalar and select - owner = db.session.scalar( - select(Account) - .join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id) - .where(TenantAccountJoin.tenant_id == tenant_id, TenantAccountJoin.role == TenantAccountRole.OWNER) - ) - - if not owner: - return "UTC" - - return owner.timezone or "UTC" - - def _get_day_key(self, tenant_id: str) -> str: - """ - Get Redis key for current UTC day - - Args: - tenant_id: The tenant identifier - - Returns: - Redis key for the current UTC day - """ - utc_now = datetime.now(UTC) - date_str = utc_now.strftime("%Y-%m-%d") - return f"workflow:daily_limit:{tenant_id}:{date_str}" - - def _get_ttl_seconds(self) -> int: - """ - Calculate seconds until UTC midnight - - Returns: - Number of seconds until UTC midnight - """ - utc_now = datetime.now(UTC) - - # Get next midnight in UTC - next_midnight = datetime.combine(utc_now.date() + timedelta(days=1), time.min) - next_midnight = next_midnight.replace(tzinfo=UTC) - - return int((next_midnight - utc_now).total_seconds()) - - def check_and_consume(self, tenant_id: str, max_daily_limit: int) -> bool: - """ - Check if quota available and consume one execution - - Args: - tenant_id: The tenant identifier - max_daily_limit: Maximum daily limit - - Returns: - True if quota consumed successfully, False if limit reached - """ - key = self._get_day_key(tenant_id) - ttl = self._get_ttl_seconds() - - # Check current usage - current = self.redis.get(key) - - if current is None: - # First execution of the day - set to 1 - self.redis.setex(key, ttl, 1) - return True - - current_count = int(current) - if current_count < max_daily_limit: - # Within limit, increment - new_count = self.redis.incr(key) - # Update TTL - self.redis.expire(key, ttl) - - # Double-check in case of race condition - if new_count <= max_daily_limit: - return True - else: - # Race condition occurred, decrement back - self.redis.decr(key) - return False - else: - # Limit exceeded - return False - - def get_remaining_quota(self, tenant_id: str, max_daily_limit: int) -> int: - """ - Get remaining quota for the day - - Args: - tenant_id: The tenant identifier - max_daily_limit: Maximum daily limit - - Returns: - Number of remaining executions for the day - """ - key = self._get_day_key(tenant_id) - used = int(self.redis.get(key) or 0) - return max(0, max_daily_limit - used) - - def get_current_usage(self, tenant_id: str) -> int: - """ - Get current usage for the day - - Args: - tenant_id: The tenant identifier - - Returns: - Number of executions used today - """ - key = self._get_day_key(tenant_id) - return int(self.redis.get(key) or 0) - - def reset_quota(self, tenant_id: str) -> bool: - """ - Reset quota for testing purposes - - Args: - tenant_id: The tenant identifier - - Returns: - True if key was deleted, False if key didn't exist - """ - key = self._get_day_key(tenant_id) - return bool(self.redis.delete(key)) - - def get_quota_reset_time(self, tenant_id: str, timezone_str: str) -> datetime: - """ - Get the time when quota will reset (next UTC midnight in tenant's timezone) - - Args: - tenant_id: The tenant identifier - timezone_str: Tenant's timezone for display purposes - - Returns: - Datetime when quota resets (next UTC midnight in tenant's timezone) - """ - tz = pytz.timezone(timezone_str) - utc_now = datetime.now(UTC) - - # Get next midnight in UTC, then convert to tenant's timezone - next_utc_midnight = datetime.combine(utc_now.date() + timedelta(days=1), time.min) - next_utc_midnight = pytz.UTC.localize(next_utc_midnight) - - return next_utc_midnight.astimezone(tz) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index e8088e17c1..b6764f1fa7 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -7,6 +7,7 @@ from typing import Any, cast from sqlalchemy import exists, select from sqlalchemy.orm import Session, sessionmaker +from configs import dify_config from core.app.app_config.entities import VariableEntityType from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager @@ -25,6 +26,7 @@ from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_M from core.workflow.nodes.start.entities import StartNodeData from core.workflow.system_variable import SystemVariable from core.workflow.workflow_entry import WorkflowEntry +from enums.cloud_plan import CloudPlan from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated from extensions.ext_database import db from extensions.ext_storage import storage @@ -35,8 +37,9 @@ from models.model import App, AppMode from models.tools import WorkflowToolProvider from models.workflow import Workflow, WorkflowNodeExecutionModel, WorkflowNodeExecutionTriggeredFrom, WorkflowType from repositories.factory import DifyAPIRepositoryFactory +from services.billing_service import BillingService from services.enterprise.plugin_manager_service import PluginCredentialType -from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError +from services.errors.app import IsDraftWorkflowError, TriggerNodeLimitExceededError, WorkflowHashNotEqualError from services.workflow.workflow_converter import WorkflowConverter from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError @@ -272,6 +275,21 @@ class WorkflowService: # validate graph structure self.validate_graph_structure(graph=draft_workflow.graph_dict) + # billing check + if dify_config.BILLING_ENABLED: + limit_info = BillingService.get_info(app_model.tenant_id) + if limit_info["subscription"]["plan"] == CloudPlan.SANDBOX: + # Check trigger node count limit for SANDBOX plan + trigger_node_count = sum( + 1 + for _, node_data in draft_workflow.walk_nodes() + if (node_type_str := node_data.get("type")) + and isinstance(node_type_str, str) + and NodeType(node_type_str).is_trigger_node + ) + if trigger_node_count > 2: + raise TriggerNodeLimitExceededError(count=trigger_node_count, limit=2) + # create new workflow workflow = Workflow.new( tenant_id=app_model.tenant_id, diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 985125e66b..2619d8dd28 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -26,14 +26,22 @@ from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager from core.workflow.enums import NodeType, WorkflowExecutionStatus from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData +from enums.quota_type import QuotaType, unlimited from extensions.ext_database import db -from models.enums import AppTriggerType, CreatorUserRole, WorkflowRunTriggeredFrom, WorkflowTriggerStatus +from models.enums import ( + AppTriggerType, + CreatorUserRole, + WorkflowRunTriggeredFrom, + WorkflowTriggerStatus, +) from models.model import EndUser from models.provider_ids import TriggerProviderID from models.trigger import TriggerSubscription, WorkflowPluginTrigger, WorkflowTriggerLog from models.workflow import Workflow, WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowRun from services.async_workflow_service import AsyncWorkflowService from services.end_user_service import EndUserService +from services.errors.app import QuotaExceededError +from services.trigger.app_trigger_service import AppTriggerService from services.trigger.trigger_provider_service import TriggerProviderService from services.trigger.trigger_request_service import TriggerHttpRequestCachingService from services.trigger.trigger_subscription_operator_service import TriggerSubscriptionOperatorService @@ -287,6 +295,17 @@ def dispatch_triggered_workflow( icon_dark_filename=trigger_entity.identity.icon_dark or "", ) + # consume quota before invoking trigger + quota_charge = unlimited() + try: + quota_charge = QuotaType.TRIGGER.consume(subscription.tenant_id) + except QuotaExceededError: + AppTriggerService.mark_tenant_triggers_rate_limited(subscription.tenant_id) + logger.info( + "Tenant %s rate limited, skipping plugin trigger %s", subscription.tenant_id, plugin_trigger.id + ) + return 0 + node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node) invoke_response: TriggerInvokeEventResponse | None = None try: @@ -305,6 +324,8 @@ def dispatch_triggered_workflow( payload=payload, ) except PluginInvokeError as e: + quota_charge.refund() + error_message = e.to_user_friendly_error(plugin_name=trigger_entity.identity.name) try: end_user = end_users.get(plugin_trigger.app_id) @@ -326,6 +347,8 @@ def dispatch_triggered_workflow( ) continue except Exception: + quota_charge.refund() + logger.exception( "Failed to invoke trigger event for app %s", plugin_trigger.app_id, @@ -333,6 +356,8 @@ def dispatch_triggered_workflow( continue if invoke_response is not None and invoke_response.cancelled: + quota_charge.refund() + logger.info( "Trigger ignored for app %s with trigger event %s", plugin_trigger.app_id, @@ -366,6 +391,8 @@ def dispatch_triggered_workflow( event_name, ) except Exception: + quota_charge.refund() + logger.exception( "Failed to trigger workflow for app %s", plugin_trigger.app_id, diff --git a/api/tasks/trigger_subscription_refresh_tasks.py b/api/tasks/trigger_subscription_refresh_tasks.py index 11324df881..ed92f3f3c5 100644 --- a/api/tasks/trigger_subscription_refresh_tasks.py +++ b/api/tasks/trigger_subscription_refresh_tasks.py @@ -6,6 +6,7 @@ from typing import Any from celery import shared_task from sqlalchemy.orm import Session +from configs import dify_config from core.plugin.entities.plugin_daemon import CredentialType from core.trigger.utils.locks import build_trigger_refresh_lock_key from extensions.ext_database import db @@ -25,9 +26,10 @@ def _load_subscription(session: Session, tenant_id: str, subscription_id: str) - def _refresh_oauth_if_expired(tenant_id: str, subscription: TriggerSubscription, now: int) -> None: + threshold_seconds: int = int(dify_config.TRIGGER_PROVIDER_CREDENTIAL_THRESHOLD_SECONDS) if ( subscription.credential_expires_at != -1 - and int(subscription.credential_expires_at) <= now + and int(subscription.credential_expires_at) <= now + threshold_seconds and CredentialType.of(subscription.credential_type) == CredentialType.OAUTH2 ): logger.info( @@ -53,13 +55,15 @@ def _refresh_subscription_if_expired( subscription: TriggerSubscription, now: int, ) -> None: - if subscription.expires_at == -1 or int(subscription.expires_at) > now: + threshold_seconds: int = int(dify_config.TRIGGER_PROVIDER_SUBSCRIPTION_THRESHOLD_SECONDS) + if subscription.expires_at == -1 or int(subscription.expires_at) > now + threshold_seconds: logger.debug( - "Subscription not due: tenant=%s subscription_id=%s expires_at=%s now=%s", + "Subscription not due: tenant=%s subscription_id=%s expires_at=%s now=%s threshold=%s", tenant_id, subscription.id, subscription.expires_at, now, + threshold_seconds, ) return diff --git a/api/tasks/workflow_schedule_tasks.py b/api/tasks/workflow_schedule_tasks.py index f0596a8f4a..f54e02a219 100644 --- a/api/tasks/workflow_schedule_tasks.py +++ b/api/tasks/workflow_schedule_tasks.py @@ -8,9 +8,12 @@ from core.workflow.nodes.trigger_schedule.exc import ( ScheduleNotFoundError, TenantOwnerNotFoundError, ) +from enums.quota_type import QuotaType, unlimited from extensions.ext_database import db from models.trigger import WorkflowSchedulePlan from services.async_workflow_service import AsyncWorkflowService +from services.errors.app import QuotaExceededError +from services.trigger.app_trigger_service import AppTriggerService from services.trigger.schedule_service import ScheduleService from services.workflow.entities import ScheduleTriggerData @@ -30,6 +33,7 @@ def run_schedule_trigger(schedule_id: str) -> None: TenantOwnerNotFoundError: If no owner/admin for tenant ScheduleExecutionError: If workflow trigger fails """ + session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) with session_factory() as session: @@ -41,6 +45,14 @@ def run_schedule_trigger(schedule_id: str) -> None: if not tenant_owner: raise TenantOwnerNotFoundError(f"No owner or admin found for tenant {schedule.tenant_id}") + quota_charge = unlimited() + try: + quota_charge = QuotaType.TRIGGER.consume(schedule.tenant_id) + except QuotaExceededError: + AppTriggerService.mark_tenant_triggers_rate_limited(schedule.tenant_id) + logger.info("Tenant %s rate limited, skipping schedule trigger %s", schedule.tenant_id, schedule_id) + return + try: # Production dispatch: Trigger the workflow normally response = AsyncWorkflowService.trigger_workflow_async( @@ -55,6 +67,7 @@ def run_schedule_trigger(schedule_id: str) -> None: ) logger.info("Schedule %s triggered workflow: %s", schedule_id, response.workflow_trigger_log_id) except Exception as e: + quota_charge.refund() raise ScheduleExecutionError( f"Failed to trigger workflow for schedule {schedule_id}, app {schedule.app_id}" ) from e diff --git a/api/tests/test_containers_integration_tests/services/test_app_generate_service.py b/api/tests/test_containers_integration_tests/services/test_app_generate_service.py index 8b8739d557..0f9ed94017 100644 --- a/api/tests/test_containers_integration_tests/services/test_app_generate_service.py +++ b/api/tests/test_containers_integration_tests/services/test_app_generate_service.py @@ -5,12 +5,10 @@ import pytest from faker import Faker from core.app.entities.app_invoke_entities import InvokeFrom -from enums.cloud_plan import CloudPlan from models.model import EndUser from models.workflow import Workflow from services.app_generate_service import AppGenerateService from services.errors.app import WorkflowIdFormatError, WorkflowNotFoundError -from services.errors.llm import InvokeRateLimitError class TestAppGenerateService: @@ -20,10 +18,9 @@ class TestAppGenerateService: def mock_external_service_dependencies(self): """Mock setup for external service dependencies.""" with ( - patch("services.app_generate_service.BillingService") as mock_billing_service, + patch("services.billing_service.BillingService") as mock_billing_service, patch("services.app_generate_service.WorkflowService") as mock_workflow_service, patch("services.app_generate_service.RateLimit") as mock_rate_limit, - patch("services.app_generate_service.RateLimiter") as mock_rate_limiter, patch("services.app_generate_service.CompletionAppGenerator") as mock_completion_generator, patch("services.app_generate_service.ChatAppGenerator") as mock_chat_generator, patch("services.app_generate_service.AgentChatAppGenerator") as mock_agent_chat_generator, @@ -31,9 +28,13 @@ class TestAppGenerateService: patch("services.app_generate_service.WorkflowAppGenerator") as mock_workflow_generator, patch("services.account_service.FeatureService") as mock_account_feature_service, patch("services.app_generate_service.dify_config") as mock_dify_config, + patch("configs.dify_config") as mock_global_dify_config, ): # Setup default mock returns for billing service - mock_billing_service.get_info.return_value = {"subscription": {"plan": CloudPlan.SANDBOX}} + mock_billing_service.update_tenant_feature_plan_usage.return_value = { + "result": "success", + "history_id": "test_history_id", + } # Setup default mock returns for workflow service mock_workflow_service_instance = mock_workflow_service.return_value @@ -47,10 +48,6 @@ class TestAppGenerateService: mock_rate_limit_instance.generate.return_value = ["test_response"] mock_rate_limit_instance.exit.return_value = None - mock_rate_limiter_instance = mock_rate_limiter.return_value - mock_rate_limiter_instance.is_rate_limited.return_value = False - mock_rate_limiter_instance.increment_rate_limit.return_value = None - # Setup default mock returns for app generators mock_completion_generator_instance = mock_completion_generator.return_value mock_completion_generator_instance.generate.return_value = ["completion_response"] @@ -87,11 +84,14 @@ class TestAppGenerateService: mock_dify_config.APP_MAX_ACTIVE_REQUESTS = 100 mock_dify_config.APP_DAILY_RATE_LIMIT = 1000 + mock_global_dify_config.BILLING_ENABLED = False + mock_global_dify_config.APP_MAX_ACTIVE_REQUESTS = 100 + mock_global_dify_config.APP_DAILY_RATE_LIMIT = 1000 + yield { "billing_service": mock_billing_service, "workflow_service": mock_workflow_service, "rate_limit": mock_rate_limit, - "rate_limiter": mock_rate_limiter, "completion_generator": mock_completion_generator, "chat_generator": mock_chat_generator, "agent_chat_generator": mock_agent_chat_generator, @@ -99,6 +99,7 @@ class TestAppGenerateService: "workflow_generator": mock_workflow_generator, "account_feature_service": mock_account_feature_service, "dify_config": mock_dify_config, + "global_dify_config": mock_global_dify_config, } def _create_test_app_and_account(self, db_session_with_containers, mock_external_service_dependencies, mode="chat"): @@ -429,13 +430,9 @@ class TestAppGenerateService: db_session_with_containers, mock_external_service_dependencies, mode="completion" ) - # Setup billing service mock for sandbox plan - mock_external_service_dependencies["billing_service"].get_info.return_value = { - "subscription": {"plan": CloudPlan.SANDBOX} - } - # Set BILLING_ENABLED to True for this test mock_external_service_dependencies["dify_config"].BILLING_ENABLED = True + mock_external_service_dependencies["global_dify_config"].BILLING_ENABLED = True # Setup test arguments args = {"inputs": {"query": fake.text(max_nb_chars=50)}, "response_mode": "streaming"} @@ -448,41 +445,8 @@ class TestAppGenerateService: # Verify the result assert result == ["test_response"] - # Verify billing service was called - mock_external_service_dependencies["billing_service"].get_info.assert_called_once_with(app.tenant_id) - - def test_generate_with_rate_limit_exceeded(self, db_session_with_containers, mock_external_service_dependencies): - """ - Test generation when rate limit is exceeded. - """ - fake = Faker() - app, account = self._create_test_app_and_account( - db_session_with_containers, mock_external_service_dependencies, mode="completion" - ) - - # Setup billing service mock for sandbox plan - mock_external_service_dependencies["billing_service"].get_info.return_value = { - "subscription": {"plan": CloudPlan.SANDBOX} - } - - # Set BILLING_ENABLED to True for this test - mock_external_service_dependencies["dify_config"].BILLING_ENABLED = True - - # Setup system rate limiter to return rate limited - with patch("services.app_generate_service.AppGenerateService.system_rate_limiter") as mock_system_rate_limiter: - mock_system_rate_limiter.is_rate_limited.return_value = True - - # Setup test arguments - args = {"inputs": {"query": fake.text(max_nb_chars=50)}, "response_mode": "streaming"} - - # Execute the method under test and expect rate limit error - with pytest.raises(InvokeRateLimitError) as exc_info: - AppGenerateService.generate( - app_model=app, user=account, args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=True - ) - - # Verify error message - assert "Rate limit exceeded" in str(exc_info.value) + # Verify billing service was called to consume quota + mock_external_service_dependencies["billing_service"].update_tenant_feature_plan_usage.assert_called_once() def test_generate_with_invalid_app_mode(self, db_session_with_containers, mock_external_service_dependencies): """ diff --git a/dev/start-worker b/dev/start-worker index b1e010975b..a01da11d86 100755 --- a/dev/start-worker +++ b/dev/start-worker @@ -11,6 +11,7 @@ show_help() { echo " -c, --concurrency NUM Number of worker processes (default: 1)" echo " -P, --pool POOL Pool implementation (default: gevent)" echo " --loglevel LEVEL Log level (default: INFO)" + echo " -e, --env-file FILE Path to an env file to source before starting" echo " -h, --help Show this help message" echo "" echo "Examples:" @@ -44,6 +45,8 @@ CONCURRENCY=1 POOL="gevent" LOGLEVEL="INFO" +ENV_FILE="" + while [[ $# -gt 0 ]]; do case $1 in -q|--queues) @@ -62,6 +65,10 @@ while [[ $# -gt 0 ]]; do LOGLEVEL="$2" shift 2 ;; + -e|--env-file) + ENV_FILE="$2" + shift 2 + ;; -h|--help) show_help exit 0 @@ -77,6 +84,19 @@ done SCRIPT_DIR="$(dirname "$(realpath "$0")")" cd "$SCRIPT_DIR/.." +if [[ -n "${ENV_FILE}" ]]; then + if [[ ! -f "${ENV_FILE}" ]]; then + echo "Env file ${ENV_FILE} not found" + exit 1 + fi + + echo "Loading environment variables from ${ENV_FILE}" + # Export everything sourced from the env file + set -a + source "${ENV_FILE}" + set +a +fi + # If no queues specified, use edition-based defaults if [[ -z "${QUEUES}" ]]; then # Get EDITION from environment, default to SELF_HOSTED (community edition) diff --git a/web/app/components/app/app-publisher/index.tsx b/web/app/components/app/app-publisher/index.tsx index 64ce869c5d..a11af3b816 100644 --- a/web/app/components/app/app-publisher/index.tsx +++ b/web/app/components/app/app-publisher/index.tsx @@ -49,6 +49,7 @@ import { fetchInstalledAppList } from '@/service/explore' import { AppModeEnum } from '@/types/app' import type { PublishWorkflowParams } from '@/types/workflow' import { basePath } from '@/utils/var' +import UpgradeBtn from '@/app/components/billing/upgrade-btn' const ACCESS_MODE_MAP: Record = { [AccessMode.ORGANIZATION]: { @@ -106,6 +107,7 @@ export type AppPublisherProps = { workflowToolAvailable?: boolean missingStartNode?: boolean hasTriggerNode?: boolean // Whether workflow currently contains any trigger nodes (used to hide missing-start CTA when triggers exist). + startNodeLimitExceeded?: boolean } const PUBLISH_SHORTCUT = ['ctrl', '⇧', 'P'] @@ -127,6 +129,7 @@ const AppPublisher = ({ workflowToolAvailable = true, missingStartNode = false, hasTriggerNode = false, + startNodeLimitExceeded = false, }: AppPublisherProps) => { const { t } = useTranslation() @@ -246,6 +249,13 @@ const AppPublisher = ({ const hasPublishedVersion = !!publishedAt const workflowToolDisabled = !hasPublishedVersion || !workflowToolAvailable const workflowToolMessage = workflowToolDisabled ? t('workflow.common.workflowAsToolDisabledHint') : undefined + const showStartNodeLimitHint = Boolean(startNodeLimitExceeded) + const upgradeHighlightStyle = useMemo(() => ({ + background: 'linear-gradient(97deg, var(--components-input-border-active-prompt-1, rgba(11, 165, 236, 0.95)) -3.64%, var(--components-input-border-active-prompt-2, rgba(21, 90, 239, 0.95)) 45.14%)', + WebkitBackgroundClip: 'text', + backgroundClip: 'text', + WebkitTextFillColor: 'transparent', + }), []) return ( <> @@ -304,29 +314,49 @@ const AppPublisher = ({ /> ) : ( - + ) + } + + {showStartNodeLimitHint && ( +
+

+ {t('workflow.publishLimit.startNodeTitlePrefix')} + {t('workflow.publishLimit.startNodeTitleSuffix')} +

+

+ {t('workflow.publishLimit.startNodeDesc')} +

+ +
+ )} + ) } diff --git a/web/app/components/billing/config.ts b/web/app/components/billing/config.ts index c0a21c1ebf..f343f4b487 100644 --- a/web/app/components/billing/config.ts +++ b/web/app/components/billing/config.ts @@ -90,4 +90,8 @@ export const defaultPlan = { apiRateLimit: ALL_PLANS.sandbox.apiRateLimit, triggerEvents: ALL_PLANS.sandbox.triggerEvents, }, + reset: { + apiRateLimit: null, + triggerEvents: null, + }, } diff --git a/web/app/components/billing/plan/index.tsx b/web/app/components/billing/plan/index.tsx index 4b68fcfb15..b695302965 100644 --- a/web/app/components/billing/plan/index.tsx +++ b/web/app/components/billing/plan/index.tsx @@ -6,15 +6,16 @@ import { useRouter } from 'next/navigation' import { RiBook2Line, RiFileEditLine, - RiFlashlightLine, RiGraduationCapLine, RiGroupLine, - RiSpeedLine, } from '@remixicon/react' import { Plan, SelfHostedPlan } from '../type' +import { NUM_INFINITE } from '../config' +import { getDaysUntilEndOfMonth } from '@/utils/time' import VectorSpaceInfo from '../usage-info/vector-space-info' import AppsInfo from '../usage-info/apps-info' import UpgradeBtn from '../upgrade-btn' +import { ApiAggregate, TriggerAll } from '@/app/components/base/icons/src/vender/workflow' import { useProviderContext } from '@/context/provider-context' import { useAppContext } from '@/context/app-context' import Button from '@/app/components/base/button' @@ -44,9 +45,20 @@ const PlanComp: FC = ({ const { usage, total, + reset, } = plan - const perMonthUnit = ` ${t('billing.usagePage.perMonth')}` - const triggerEventUnit = plan.type === Plan.sandbox ? undefined : perMonthUnit + const triggerEventsResetInDays = type === Plan.professional && total.triggerEvents !== NUM_INFINITE + ? reset.triggerEvents ?? undefined + : undefined + const apiRateLimitResetInDays = (() => { + if (total.apiRateLimit === NUM_INFINITE) + return undefined + if (typeof reset.apiRateLimit === 'number') + return reset.apiRateLimit + if (type === Plan.sandbox) + return getDaysUntilEndOfMonth() + return undefined + })() const [showModal, setShowModal] = React.useState(false) const { mutateAsync } = useEducationVerify() @@ -79,7 +91,6 @@ const PlanComp: FC = ({
{t(`billing.plans.${type}.name`)}
-
{t('billing.currentPlan')}
{t(`billing.plans.${type}.for`)}
@@ -124,18 +135,20 @@ const PlanComp: FC = ({ total={total.annotatedResponse} /> diff --git a/web/app/components/billing/pricing/plans/cloud-plan-item/list/index.tsx b/web/app/components/billing/pricing/plans/cloud-plan-item/list/index.tsx index 0b35ee7e97..7674affc15 100644 --- a/web/app/components/billing/pricing/plans/cloud-plan-item/list/index.tsx +++ b/web/app/components/billing/pricing/plans/cloud-plan-item/list/index.tsx @@ -46,16 +46,10 @@ const List = ({ label={t('billing.plansCommon.documentsRequestQuota', { count: planInfo.documentsRequestQuota })} tooltip={t('billing.plansCommon.documentsRequestQuotaTooltip')} /> - + + - + ) => { + const [visible, setVisible] = useState(args.show ?? true) + useEffect(() => { + setVisible(args.show ?? true) + }, [args.show]) + const handleHide = () => setVisible(false) + return ( + +
+ + +
+
+ ) +} + +const meta = { + title: 'Billing/TriggerEventsLimitModal', + component: TriggerEventsLimitModal, + parameters: { + layout: 'centered', + }, + args: { + show: true, + usage: 120, + total: 120, + resetInDays: 5, + planType: Plan.professional, + }, +} satisfies Meta + +export default meta +type Story = StoryObj + +export const Professional: Story = { + args: { + onDismiss: () => { /* noop */ }, + onUpgrade: () => { /* noop */ }, + }, + render: args =>