avoid time slice strategy in community edition

This commit is contained in:
Yeuoly 2025-10-22 12:50:00 +08:00
parent 94ea289c75
commit df9932088f
5 changed files with 22 additions and 13 deletions

View File

@ -196,7 +196,7 @@ class AsyncWorkflowConfig(BaseSettings):
"to avoid this, workflow can be suspended if needed, to achieve"
"this, a time-based checker is required, every granularity seconds, "
"the checker will check the workflow queue and suspend the workflow",
default=60,
default=120,
ge=1,
)

View File

@ -7,6 +7,7 @@ from apscheduler.schedulers.background import BackgroundScheduler # type: ignor
from core.workflow.graph_engine.entities.commands import CommandType, GraphEngineCommand
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_events.base import GraphEngineEvent
from services.workflow.entities import WorkflowScheduleCFSPlanEntity
from services.workflow.scheduler import CFSPlanScheduler, SchedulerCommand
logger = logging.getLogger(__name__)
@ -30,6 +31,7 @@ class TimeSliceLayer(GraphEngineLayer):
super().__init__()
self.cfs_plan_scheduler = cfs_plan_scheduler
self.stopped = False
self.schedule_id = ""
def _checker_job(self, schedule_id: str):
"""
@ -66,17 +68,21 @@ class TimeSliceLayer(GraphEngineLayer):
Start timer to check if the workflow need to be suspended.
"""
schedule_id = uuid.uuid4().hex
if self.cfs_plan_scheduler.plan.schedule_strategy == WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice:
self.schedule_id = uuid.uuid4().hex
self.scheduler.add_job(
lambda: self._checker_job(schedule_id),
"interval",
seconds=self.cfs_plan_scheduler.plan.granularity,
id=schedule_id,
)
self.scheduler.add_job(
lambda: self._checker_job(self.schedule_id),
"interval",
seconds=self.cfs_plan_scheduler.plan.granularity,
id=self.schedule_id,
)
def on_event(self, event: GraphEngineEvent):
pass
def on_graph_end(self, error: Exception | None) -> None:
self.stopped = True
# remove the scheduler
if self.schedule_id:
self.scheduler.remove_job(self.schedule_id)

View File

@ -139,6 +139,7 @@ class WorkflowScheduleCFSPlanEntity(BaseModel):
"""
TimeSlice = "time-slice" # time-slice based plan
Nop = "nop" # no plan, just run the workflow
schedule_strategy: Strategy
granularity: int = Field(default=-1) # -1 means infinite

View File

@ -27,11 +27,10 @@ from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWo
from services.errors.app import WorkflowNotFoundError
from services.workflow.entities import (
TriggerData,
WorkflowScheduleCFSPlanEntity,
WorkflowTaskData,
)
from tasks.workflow_cfs_scheduler.cfs_scheduler import AsyncWorkflowCFSPlanEntity, AsyncWorkflowCFSPlanScheduler
from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue
from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue, AsyncWorkflowSystemStrategy
@shared_task(queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE)
@ -40,7 +39,7 @@ def execute_workflow_professional(task_data_dict: dict[str, Any]):
task_data = WorkflowTaskData.model_validate(task_data_dict)
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE,
schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice,
schedule_strategy=AsyncWorkflowSystemStrategy,
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
)
_execute_workflow_common(
@ -56,7 +55,7 @@ def execute_workflow_team(task_data_dict: dict[str, Any]):
task_data = WorkflowTaskData.model_validate(task_data_dict)
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
queue=AsyncWorkflowQueue.TEAM_QUEUE,
schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice,
schedule_strategy=AsyncWorkflowSystemStrategy,
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
)
_execute_workflow_common(
@ -72,7 +71,7 @@ def execute_workflow_sandbox(task_data_dict: dict[str, Any]):
task_data = WorkflowTaskData.model_validate(task_data_dict)
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
queue=AsyncWorkflowQueue.SANDBOX_QUEUE,
schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice,
schedule_strategy=AsyncWorkflowSystemStrategy,
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
)
_execute_workflow_common(

View File

@ -1,6 +1,7 @@
from enum import StrEnum
from configs import dify_config
from services.workflow.entities import WorkflowScheduleCFSPlanEntity
# Determine queue names based on edition
if dify_config.EDITION == "CLOUD":
@ -8,11 +9,13 @@ if dify_config.EDITION == "CLOUD":
_professional_queue = "workflow_professional"
_team_queue = "workflow_team"
_sandbox_queue = "workflow_sandbox"
AsyncWorkflowSystemStrategy = WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice
else:
# Community edition: single workflow queue (not dataset)
_professional_queue = "workflow"
_team_queue = "workflow"
_sandbox_queue = "workflow"
AsyncWorkflowSystemStrategy = WorkflowScheduleCFSPlanEntity.Strategy.Nop
class AsyncWorkflowQueue(StrEnum):