diff --git a/api/core/app/engine_layers/trigger_post_layer.py b/api/core/app/engine_layers/trigger_post_layer.py index 5b31ec2ae1..1309295b1a 100644 --- a/api/core/app/engine_layers/trigger_post_layer.py +++ b/api/core/app/engine_layers/trigger_post_layer.py @@ -11,7 +11,7 @@ from core.workflow.graph_events.graph import GraphRunFailedEvent, GraphRunPaused from models.engine import db from models.enums import WorkflowTriggerStatus from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository -from tasks.workflow_cfs_scheduler.cfs_scheduler import TriggerWorkflowCFSPlanEntity +from tasks.workflow_cfs_scheduler.cfs_scheduler import AsyncWorkflowCFSPlanEntity logger = logging.getLogger(__name__) @@ -29,7 +29,7 @@ class TriggerPostLayer(GraphEngineLayer): def __init__( self, - cfs_plan_scheduler_entity: TriggerWorkflowCFSPlanEntity, + cfs_plan_scheduler_entity: AsyncWorkflowCFSPlanEntity, start_time: datetime, trigger_log_id: str, ): diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index 5a8274929a..cce0256a42 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -30,7 +30,7 @@ from services.workflow.entities import ( WorkflowScheduleCFSPlanEntity, WorkflowTaskData, ) -from tasks.workflow_cfs_scheduler.cfs_scheduler import TriggerCFSPlanScheduler, TriggerWorkflowCFSPlanEntity +from tasks.workflow_cfs_scheduler.cfs_scheduler import AsyncWorkflowCFSPlanEntity, AsyncWorkflowCFSPlanScheduler from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue @@ -38,14 +38,14 @@ from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue def execute_workflow_professional(task_data_dict: dict[str, Any]): """Execute workflow for professional tier with highest priority""" task_data = WorkflowTaskData.model_validate(task_data_dict) - cfs_plan_scheduler_entity = TriggerWorkflowCFSPlanEntity( + cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity( queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE, schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice, granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY, ) _execute_workflow_common( task_data, - TriggerCFSPlanScheduler(plan=cfs_plan_scheduler_entity), + AsyncWorkflowCFSPlanScheduler(plan=cfs_plan_scheduler_entity), cfs_plan_scheduler_entity, ) @@ -54,14 +54,14 @@ def execute_workflow_professional(task_data_dict: dict[str, Any]): def execute_workflow_team(task_data_dict: dict[str, Any]): """Execute workflow for team tier""" task_data = WorkflowTaskData.model_validate(task_data_dict) - cfs_plan_scheduler_entity = TriggerWorkflowCFSPlanEntity( + cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity( queue=AsyncWorkflowQueue.TEAM_QUEUE, schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice, granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY, ) _execute_workflow_common( task_data, - TriggerCFSPlanScheduler(plan=cfs_plan_scheduler_entity), + AsyncWorkflowCFSPlanScheduler(plan=cfs_plan_scheduler_entity), cfs_plan_scheduler_entity, ) @@ -70,22 +70,22 @@ def execute_workflow_team(task_data_dict: dict[str, Any]): def execute_workflow_sandbox(task_data_dict: dict[str, Any]): """Execute workflow for free tier with lower retry limit""" task_data = WorkflowTaskData.model_validate(task_data_dict) - cfs_plan_scheduler_entity = TriggerWorkflowCFSPlanEntity( + cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity( queue=AsyncWorkflowQueue.SANDBOX_QUEUE, schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice, granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY, ) _execute_workflow_common( task_data, - TriggerCFSPlanScheduler(plan=cfs_plan_scheduler_entity), + AsyncWorkflowCFSPlanScheduler(plan=cfs_plan_scheduler_entity), cfs_plan_scheduler_entity, ) def _execute_workflow_common( task_data: WorkflowTaskData, - cfs_plan_scheduler: TriggerCFSPlanScheduler, - cfs_plan_scheduler_entity: TriggerWorkflowCFSPlanEntity, + cfs_plan_scheduler: AsyncWorkflowCFSPlanScheduler, + cfs_plan_scheduler_entity: AsyncWorkflowCFSPlanEntity, ): """Execute workflow with common logic and trigger log updates.""" diff --git a/api/tasks/workflow_cfs_scheduler/cfs_scheduler.py b/api/tasks/workflow_cfs_scheduler/cfs_scheduler.py index c7a1650d23..218e61f6d9 100644 --- a/api/tasks/workflow_cfs_scheduler/cfs_scheduler.py +++ b/api/tasks/workflow_cfs_scheduler/cfs_scheduler.py @@ -3,7 +3,7 @@ from services.workflow.scheduler import CFSPlanScheduler, SchedulerCommand from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue -class TriggerWorkflowCFSPlanEntity(WorkflowScheduleCFSPlanEntity): +class AsyncWorkflowCFSPlanEntity(WorkflowScheduleCFSPlanEntity): """ Trigger workflow CFS plan entity. """ @@ -11,17 +11,17 @@ class TriggerWorkflowCFSPlanEntity(WorkflowScheduleCFSPlanEntity): queue: AsyncWorkflowQueue -class TriggerCFSPlanScheduler(CFSPlanScheduler): +class AsyncWorkflowCFSPlanScheduler(CFSPlanScheduler): """ Trigger workflow CFS plan scheduler. """ + plan: AsyncWorkflowCFSPlanEntity + def can_schedule(self) -> SchedulerCommand: """ Check if the workflow can be scheduled. """ - assert isinstance(self.plan, TriggerWorkflowCFSPlanEntity) - if self.plan.queue in [AsyncWorkflowQueue.PROFESSIONAL_QUEUE, AsyncWorkflowQueue.TEAM_QUEUE]: """ permitted all paid users to schedule the workflow any time @@ -29,5 +29,4 @@ class TriggerCFSPlanScheduler(CFSPlanScheduler): return SchedulerCommand.NONE # FIXME: avoid the sandbox user's workflow at a running state for ever - - return SchedulerCommand.NONE + return SchedulerCommand.RESOURCE_LIMIT_REACHED