From df9932088f7c683abc78cbebf8794a0879e39ea2 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Wed, 22 Oct 2025 12:50:00 +0800 Subject: [PATCH] avoid time slice strategy in community edition --- api/configs/feature/__init__.py | 2 +- api/core/app/engine_layers/timeslice_layer.py | 20 ++++++++++++------- api/services/workflow/entities.py | 1 + api/tasks/async_workflow_tasks.py | 9 ++++----- api/tasks/workflow_cfs_scheduler/entities.py | 3 +++ 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index f5262a2f8b..f7e0e7e865 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -196,7 +196,7 @@ class AsyncWorkflowConfig(BaseSettings): "to avoid this, workflow can be suspended if needed, to achieve" "this, a time-based checker is required, every granularity seconds, " "the checker will check the workflow queue and suspend the workflow", - default=60, + default=120, ge=1, ) diff --git a/api/core/app/engine_layers/timeslice_layer.py b/api/core/app/engine_layers/timeslice_layer.py index 8d4491b93c..f82397deca 100644 --- a/api/core/app/engine_layers/timeslice_layer.py +++ b/api/core/app/engine_layers/timeslice_layer.py @@ -7,6 +7,7 @@ from apscheduler.schedulers.background import BackgroundScheduler # type: ignor from core.workflow.graph_engine.entities.commands import CommandType, GraphEngineCommand from core.workflow.graph_engine.layers.base import GraphEngineLayer from core.workflow.graph_events.base import GraphEngineEvent +from services.workflow.entities import WorkflowScheduleCFSPlanEntity from services.workflow.scheduler import CFSPlanScheduler, SchedulerCommand logger = logging.getLogger(__name__) @@ -30,6 +31,7 @@ class TimeSliceLayer(GraphEngineLayer): super().__init__() self.cfs_plan_scheduler = cfs_plan_scheduler self.stopped = False + self.schedule_id = "" def _checker_job(self, schedule_id: str): """ @@ -66,17 +68,21 @@ class TimeSliceLayer(GraphEngineLayer): Start timer to check if the workflow need to be suspended. """ - schedule_id = uuid.uuid4().hex + if self.cfs_plan_scheduler.plan.schedule_strategy == WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice: + self.schedule_id = uuid.uuid4().hex - self.scheduler.add_job( - lambda: self._checker_job(schedule_id), - "interval", - seconds=self.cfs_plan_scheduler.plan.granularity, - id=schedule_id, - ) + self.scheduler.add_job( + lambda: self._checker_job(self.schedule_id), + "interval", + seconds=self.cfs_plan_scheduler.plan.granularity, + id=self.schedule_id, + ) def on_event(self, event: GraphEngineEvent): pass def on_graph_end(self, error: Exception | None) -> None: self.stopped = True + # remove the scheduler + if self.schedule_id: + self.scheduler.remove_job(self.schedule_id) diff --git a/api/services/workflow/entities.py b/api/services/workflow/entities.py index 4408ecdebd..dd126cdef4 100644 --- a/api/services/workflow/entities.py +++ b/api/services/workflow/entities.py @@ -139,6 +139,7 @@ class WorkflowScheduleCFSPlanEntity(BaseModel): """ TimeSlice = "time-slice" # time-slice based plan + Nop = "nop" # no plan, just run the workflow schedule_strategy: Strategy granularity: int = Field(default=-1) # -1 means infinite diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index cce0256a42..c662b7c4a7 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -27,11 +27,10 @@ from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWo from services.errors.app import WorkflowNotFoundError from services.workflow.entities import ( TriggerData, - WorkflowScheduleCFSPlanEntity, WorkflowTaskData, ) from tasks.workflow_cfs_scheduler.cfs_scheduler import AsyncWorkflowCFSPlanEntity, AsyncWorkflowCFSPlanScheduler -from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue +from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue, AsyncWorkflowSystemStrategy @shared_task(queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE) @@ -40,7 +39,7 @@ def execute_workflow_professional(task_data_dict: dict[str, Any]): task_data = WorkflowTaskData.model_validate(task_data_dict) cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity( queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE, - schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice, + schedule_strategy=AsyncWorkflowSystemStrategy, granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY, ) _execute_workflow_common( @@ -56,7 +55,7 @@ def execute_workflow_team(task_data_dict: dict[str, Any]): task_data = WorkflowTaskData.model_validate(task_data_dict) cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity( queue=AsyncWorkflowQueue.TEAM_QUEUE, - schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice, + schedule_strategy=AsyncWorkflowSystemStrategy, granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY, ) _execute_workflow_common( @@ -72,7 +71,7 @@ def execute_workflow_sandbox(task_data_dict: dict[str, Any]): task_data = WorkflowTaskData.model_validate(task_data_dict) cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity( queue=AsyncWorkflowQueue.SANDBOX_QUEUE, - schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice, + schedule_strategy=AsyncWorkflowSystemStrategy, granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY, ) _execute_workflow_common( diff --git a/api/tasks/workflow_cfs_scheduler/entities.py b/api/tasks/workflow_cfs_scheduler/entities.py index f348bec275..6990f6968a 100644 --- a/api/tasks/workflow_cfs_scheduler/entities.py +++ b/api/tasks/workflow_cfs_scheduler/entities.py @@ -1,6 +1,7 @@ from enum import StrEnum from configs import dify_config +from services.workflow.entities import WorkflowScheduleCFSPlanEntity # Determine queue names based on edition if dify_config.EDITION == "CLOUD": @@ -8,11 +9,13 @@ if dify_config.EDITION == "CLOUD": _professional_queue = "workflow_professional" _team_queue = "workflow_team" _sandbox_queue = "workflow_sandbox" + AsyncWorkflowSystemStrategy = WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice else: # Community edition: single workflow queue (not dataset) _professional_queue = "workflow" _team_queue = "workflow" _sandbox_queue = "workflow" + AsyncWorkflowSystemStrategy = WorkflowScheduleCFSPlanEntity.Strategy.Nop class AsyncWorkflowQueue(StrEnum):