From 7f70d1de1caaa7c59c54fd5a3fa108e5e08440cd Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Wed, 22 Oct 2025 12:10:12 +0800 Subject: [PATCH] ASYNC_WORKFLOW_SCHEDULER_GRANULARITY --- api/configs/feature/__init__.py | 2 +- api/core/app/engine_layers/suspend_layer.py | 7 ++++++- api/core/app/engine_layers/timeslice_layer.py | 5 +---- api/tasks/async_workflow_tasks.py | 13 ++++++++++--- api/tasks/workflow_cfs_scheduler/cfs_scheduler.py | 2 +- 5 files changed, 19 insertions(+), 10 deletions(-) diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index bd58132839..f5262a2f8b 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -196,7 +196,7 @@ class AsyncWorkflowConfig(BaseSettings): "to avoid this, workflow can be suspended if needed, to achieve" "this, a time-based checker is required, every granularity seconds, " "the checker will check the workflow queue and suspend the workflow", - default=1, + default=60, ge=1, ) diff --git a/api/core/app/engine_layers/suspend_layer.py b/api/core/app/engine_layers/suspend_layer.py index f981523149..0a107de012 100644 --- a/api/core/app/engine_layers/suspend_layer.py +++ b/api/core/app/engine_layers/suspend_layer.py @@ -1,5 +1,6 @@ from core.workflow.graph_engine.layers.base import GraphEngineLayer from core.workflow.graph_events.base import GraphEngineEvent +from core.workflow.graph_events.graph import GraphRunPausedEvent class SuspendLayer(GraphEngineLayer): @@ -9,7 +10,11 @@ class SuspendLayer(GraphEngineLayer): pass def on_event(self, event: GraphEngineEvent): - pass + """ + Handle the paused event, stash runtime state into storage and wait for resume. + """ + if isinstance(event, GraphRunPausedEvent): + pass def on_graph_end(self, error: Exception | None): """ """ diff --git a/api/core/app/engine_layers/timeslice_layer.py b/api/core/app/engine_layers/timeslice_layer.py index 9c23528714..4f47379a40 100644 --- a/api/core/app/engine_layers/timeslice_layer.py +++ b/api/core/app/engine_layers/timeslice_layer.py @@ -4,7 +4,6 @@ from typing import ClassVar from apscheduler.schedulers.background import BackgroundScheduler # type: ignore -from configs import dify_config from core.workflow.graph_engine.entities.commands import CommandType, GraphEngineCommand from core.workflow.graph_engine.layers.base import GraphEngineLayer from core.workflow.graph_events.base import GraphEngineEvent @@ -70,9 +69,7 @@ class TimeSliceLayer(GraphEngineLayer): except Exception: logger.exception("scheduler error during check if the workflow need to be suspended") - self.scheduler.add_job( - runner, "interval", seconds=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY, id=schedule_id - ) + self.scheduler.add_job(runner, "interval", seconds=self.cfs_plan_scheduler.plan.granularity, id=schedule_id) def on_event(self, event: GraphEngineEvent): pass diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index 111d2dee0c..5a8274929a 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -12,6 +12,7 @@ from celery import shared_task from sqlalchemy import select from sqlalchemy.orm import Session, sessionmaker +from configs import dify_config from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.engine_layers.timeslice_layer import TimeSliceLayer from core.app.engine_layers.trigger_post_layer import TriggerPostLayer @@ -38,7 +39,9 @@ def execute_workflow_professional(task_data_dict: dict[str, Any]): """Execute workflow for professional tier with highest priority""" task_data = WorkflowTaskData.model_validate(task_data_dict) cfs_plan_scheduler_entity = TriggerWorkflowCFSPlanEntity( - queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE, schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice + queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE, + schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice, + granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY, ) _execute_workflow_common( task_data, @@ -52,7 +55,9 @@ def execute_workflow_team(task_data_dict: dict[str, Any]): """Execute workflow for team tier""" task_data = WorkflowTaskData.model_validate(task_data_dict) cfs_plan_scheduler_entity = TriggerWorkflowCFSPlanEntity( - queue=AsyncWorkflowQueue.TEAM_QUEUE, schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice + queue=AsyncWorkflowQueue.TEAM_QUEUE, + schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice, + granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY, ) _execute_workflow_common( task_data, @@ -66,7 +71,9 @@ def execute_workflow_sandbox(task_data_dict: dict[str, Any]): """Execute workflow for free tier with lower retry limit""" task_data = WorkflowTaskData.model_validate(task_data_dict) cfs_plan_scheduler_entity = TriggerWorkflowCFSPlanEntity( - queue=AsyncWorkflowQueue.SANDBOX_QUEUE, schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice + queue=AsyncWorkflowQueue.SANDBOX_QUEUE, + schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice, + granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY, ) _execute_workflow_common( task_data, diff --git a/api/tasks/workflow_cfs_scheduler/cfs_scheduler.py b/api/tasks/workflow_cfs_scheduler/cfs_scheduler.py index 2a4153dcd2..c7a1650d23 100644 --- a/api/tasks/workflow_cfs_scheduler/cfs_scheduler.py +++ b/api/tasks/workflow_cfs_scheduler/cfs_scheduler.py @@ -24,7 +24,7 @@ class TriggerCFSPlanScheduler(CFSPlanScheduler): if self.plan.queue in [AsyncWorkflowQueue.PROFESSIONAL_QUEUE, AsyncWorkflowQueue.TEAM_QUEUE]: """ - permitted all paid users to schedule the workflow + permitted all paid users to schedule the workflow any time """ return SchedulerCommand.NONE