diff --git a/api/services/async_workflow_service.py b/api/services/async_workflow_service.py index 5221f8f391..3b20ec500a 100644 --- a/api/services/async_workflow_service.py +++ b/api/services/async_workflow_service.py @@ -20,7 +20,7 @@ from models.enums import CreatorUserRole from models.model import App, EndUser from models.workflow import Workflow, WorkflowTriggerLog, WorkflowTriggerStatus from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository -from services.errors.llm import InvokeRateLimitError +from services.errors.app import InvokeDailyRateLimitError, WorkflowNotFoundError from services.workflow.entities import AsyncTriggerResponse, TriggerData, WorkflowTaskData from services.workflow.queue_dispatcher import QueueDispatcherManager, QueuePriority from services.workflow.rate_limiter import TenantDailyRateLimiter @@ -69,8 +69,8 @@ class AsyncWorkflowService: Note: The actual workflow execution status must be checked separately via workflow_trigger_log_id Raises: - ValueError: If app or workflow not found - InvokeRateLimitError: If daily rate limit exceeded + WorkflowNotFoundError: If app or workflow not found + InvokeDailyRateLimitError: If daily rate limit exceeded Behavior: - Non-blocking: Returns immediately after queuing @@ -86,7 +86,7 @@ class AsyncWorkflowService: # 1. Validate app exists app_model = session.scalar(select(App).where(App.id == trigger_data.app_id)) if not app_model: - raise ValueError(f"App not found: {trigger_data.app_id}") + raise WorkflowNotFoundError(f"App not found: {trigger_data.app_id}") # 2. Get workflow workflow = cls._get_workflow(workflow_service, app_model, trigger_data.workflow_id) @@ -138,7 +138,7 @@ class AsyncWorkflowService: reset_time = rate_limiter.get_quota_reset_time(trigger_data.tenant_id, tenant_owner_tz) - raise InvokeRateLimitError( + raise InvokeDailyRateLimitError( f"Daily workflow execution limit reached. " f"Limit resets at {reset_time.strftime('%Y-%m-%d %H:%M:%S %Z')}. " f"Remaining quota: {remaining}" @@ -305,17 +305,17 @@ class AsyncWorkflowService: Workflow instance Raises: - ValueError: If workflow not found + WorkflowNotFoundError: If workflow not found """ if workflow_id: # Get specific published workflow workflow = workflow_service.get_published_workflow_by_id(app_model, workflow_id) if not workflow: - raise ValueError(f"Published workflow not found: {workflow_id}") + raise WorkflowNotFoundError(f"Published workflow not found: {workflow_id}") else: # Get default published workflow workflow = workflow_service.get_published_workflow(app_model) if not workflow: - raise ValueError(f"No published workflow found for app: {app_model.id}") + raise WorkflowNotFoundError(f"No published workflow found for app: {app_model.id}") return workflow diff --git a/api/services/errors/app.py b/api/services/errors/app.py index 390716a47f..7bb142eb8d 100644 --- a/api/services/errors/app.py +++ b/api/services/errors/app.py @@ -16,3 +16,8 @@ class WorkflowNotFoundError(Exception): class WorkflowIdFormatError(Exception): pass + + +class InvokeDailyRateLimitError(Exception): + """Raised when daily rate limit is exceeded for workflow invocations.""" + pass diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index 50824df078..6fc96f5155 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -21,6 +21,7 @@ from models.enums import CreatorUserRole from models.model import App, EndUser, Tenant from models.workflow import Workflow, WorkflowTriggerLog, WorkflowTriggerStatus from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository +from services.errors.app import WorkflowNotFoundError from services.workflow.entities import AsyncTriggerExecutionResult, AsyncTriggerStatus, TriggerData, WorkflowTaskData # Determine queue names based on edition @@ -99,11 +100,11 @@ def _execute_workflow_common(task_data: WorkflowTaskData) -> AsyncTriggerExecuti app_model = session.scalar(select(App).where(App.id == trigger_log.app_id)) if not app_model: - raise ValueError(f"App not found: {trigger_log.app_id}") + raise WorkflowNotFoundError(f"App not found: {trigger_log.app_id}") workflow = session.scalar(select(Workflow).where(Workflow.id == trigger_log.workflow_id)) if not workflow: - raise ValueError(f"Workflow not found: {trigger_log.workflow_id}") + raise WorkflowNotFoundError(f"Workflow not found: {trigger_log.workflow_id}") user = _get_user(session, trigger_log)