mirror of https://github.com/langgenius/dify.git
refactor: Use specific error types for workflow execution (#24475)
Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
parent
e78903302f
commit
18f2e6f166
|
|
@ -20,7 +20,7 @@ from models.enums import CreatorUserRole
|
|||
from models.model import App, EndUser
|
||||
from models.workflow import Workflow, WorkflowTriggerLog, WorkflowTriggerStatus
|
||||
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
|
||||
from services.errors.llm import InvokeRateLimitError
|
||||
from services.errors.app import InvokeDailyRateLimitError, WorkflowNotFoundError
|
||||
from services.workflow.entities import AsyncTriggerResponse, TriggerData, WorkflowTaskData
|
||||
from services.workflow.queue_dispatcher import QueueDispatcherManager, QueuePriority
|
||||
from services.workflow.rate_limiter import TenantDailyRateLimiter
|
||||
|
|
@ -69,8 +69,8 @@ class AsyncWorkflowService:
|
|||
Note: The actual workflow execution status must be checked separately via workflow_trigger_log_id
|
||||
|
||||
Raises:
|
||||
ValueError: If app or workflow not found
|
||||
InvokeRateLimitError: If daily rate limit exceeded
|
||||
WorkflowNotFoundError: If app or workflow not found
|
||||
InvokeDailyRateLimitError: If daily rate limit exceeded
|
||||
|
||||
Behavior:
|
||||
- Non-blocking: Returns immediately after queuing
|
||||
|
|
@ -86,7 +86,7 @@ class AsyncWorkflowService:
|
|||
# 1. Validate app exists
|
||||
app_model = session.scalar(select(App).where(App.id == trigger_data.app_id))
|
||||
if not app_model:
|
||||
raise ValueError(f"App not found: {trigger_data.app_id}")
|
||||
raise WorkflowNotFoundError(f"App not found: {trigger_data.app_id}")
|
||||
|
||||
# 2. Get workflow
|
||||
workflow = cls._get_workflow(workflow_service, app_model, trigger_data.workflow_id)
|
||||
|
|
@ -138,7 +138,7 @@ class AsyncWorkflowService:
|
|||
|
||||
reset_time = rate_limiter.get_quota_reset_time(trigger_data.tenant_id, tenant_owner_tz)
|
||||
|
||||
raise InvokeRateLimitError(
|
||||
raise InvokeDailyRateLimitError(
|
||||
f"Daily workflow execution limit reached. "
|
||||
f"Limit resets at {reset_time.strftime('%Y-%m-%d %H:%M:%S %Z')}. "
|
||||
f"Remaining quota: {remaining}"
|
||||
|
|
@ -305,17 +305,17 @@ class AsyncWorkflowService:
|
|||
Workflow instance
|
||||
|
||||
Raises:
|
||||
ValueError: If workflow not found
|
||||
WorkflowNotFoundError: If workflow not found
|
||||
"""
|
||||
if workflow_id:
|
||||
# Get specific published workflow
|
||||
workflow = workflow_service.get_published_workflow_by_id(app_model, workflow_id)
|
||||
if not workflow:
|
||||
raise ValueError(f"Published workflow not found: {workflow_id}")
|
||||
raise WorkflowNotFoundError(f"Published workflow not found: {workflow_id}")
|
||||
else:
|
||||
# Get default published workflow
|
||||
workflow = workflow_service.get_published_workflow(app_model)
|
||||
if not workflow:
|
||||
raise ValueError(f"No published workflow found for app: {app_model.id}")
|
||||
raise WorkflowNotFoundError(f"No published workflow found for app: {app_model.id}")
|
||||
|
||||
return workflow
|
||||
|
|
|
|||
|
|
@ -16,3 +16,8 @@ class WorkflowNotFoundError(Exception):
|
|||
|
||||
class WorkflowIdFormatError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class InvokeDailyRateLimitError(Exception):
|
||||
"""Raised when daily rate limit is exceeded for workflow invocations."""
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ from models.enums import CreatorUserRole
|
|||
from models.model import App, EndUser, Tenant
|
||||
from models.workflow import Workflow, WorkflowTriggerLog, WorkflowTriggerStatus
|
||||
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
|
||||
from services.errors.app import WorkflowNotFoundError
|
||||
from services.workflow.entities import AsyncTriggerExecutionResult, AsyncTriggerStatus, TriggerData, WorkflowTaskData
|
||||
|
||||
# Determine queue names based on edition
|
||||
|
|
@ -99,11 +100,11 @@ def _execute_workflow_common(task_data: WorkflowTaskData) -> AsyncTriggerExecuti
|
|||
app_model = session.scalar(select(App).where(App.id == trigger_log.app_id))
|
||||
|
||||
if not app_model:
|
||||
raise ValueError(f"App not found: {trigger_log.app_id}")
|
||||
raise WorkflowNotFoundError(f"App not found: {trigger_log.app_id}")
|
||||
|
||||
workflow = session.scalar(select(Workflow).where(Workflow.id == trigger_log.workflow_id))
|
||||
if not workflow:
|
||||
raise ValueError(f"Workflow not found: {trigger_log.workflow_id}")
|
||||
raise WorkflowNotFoundError(f"Workflow not found: {trigger_log.workflow_id}")
|
||||
|
||||
user = _get_user(session, trigger_log)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue