This commit is contained in:
Yeuoly 2025-10-22 12:41:19 +08:00
parent cb5607fc8c
commit 14acd05846
3 changed files with 16 additions and 17 deletions

View File

@ -11,7 +11,7 @@ from core.workflow.graph_events.graph import GraphRunFailedEvent, GraphRunPaused
from models.engine import db
from models.enums import WorkflowTriggerStatus
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
from tasks.workflow_cfs_scheduler.cfs_scheduler import TriggerWorkflowCFSPlanEntity
from tasks.workflow_cfs_scheduler.cfs_scheduler import AsyncWorkflowCFSPlanEntity
logger = logging.getLogger(__name__)
@ -29,7 +29,7 @@ class TriggerPostLayer(GraphEngineLayer):
def __init__(
self,
cfs_plan_scheduler_entity: TriggerWorkflowCFSPlanEntity,
cfs_plan_scheduler_entity: AsyncWorkflowCFSPlanEntity,
start_time: datetime,
trigger_log_id: str,
):

View File

@ -30,7 +30,7 @@ from services.workflow.entities import (
WorkflowScheduleCFSPlanEntity,
WorkflowTaskData,
)
from tasks.workflow_cfs_scheduler.cfs_scheduler import TriggerCFSPlanScheduler, TriggerWorkflowCFSPlanEntity
from tasks.workflow_cfs_scheduler.cfs_scheduler import AsyncWorkflowCFSPlanEntity, AsyncWorkflowCFSPlanScheduler
from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue
@ -38,14 +38,14 @@ from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue
def execute_workflow_professional(task_data_dict: dict[str, Any]):
"""Execute workflow for professional tier with highest priority"""
task_data = WorkflowTaskData.model_validate(task_data_dict)
cfs_plan_scheduler_entity = TriggerWorkflowCFSPlanEntity(
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE,
schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice,
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
)
_execute_workflow_common(
task_data,
TriggerCFSPlanScheduler(plan=cfs_plan_scheduler_entity),
AsyncWorkflowCFSPlanScheduler(plan=cfs_plan_scheduler_entity),
cfs_plan_scheduler_entity,
)
@ -54,14 +54,14 @@ def execute_workflow_professional(task_data_dict: dict[str, Any]):
def execute_workflow_team(task_data_dict: dict[str, Any]):
"""Execute workflow for team tier"""
task_data = WorkflowTaskData.model_validate(task_data_dict)
cfs_plan_scheduler_entity = TriggerWorkflowCFSPlanEntity(
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
queue=AsyncWorkflowQueue.TEAM_QUEUE,
schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice,
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
)
_execute_workflow_common(
task_data,
TriggerCFSPlanScheduler(plan=cfs_plan_scheduler_entity),
AsyncWorkflowCFSPlanScheduler(plan=cfs_plan_scheduler_entity),
cfs_plan_scheduler_entity,
)
@ -70,22 +70,22 @@ def execute_workflow_team(task_data_dict: dict[str, Any]):
def execute_workflow_sandbox(task_data_dict: dict[str, Any]):
"""Execute workflow for free tier with lower retry limit"""
task_data = WorkflowTaskData.model_validate(task_data_dict)
cfs_plan_scheduler_entity = TriggerWorkflowCFSPlanEntity(
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
queue=AsyncWorkflowQueue.SANDBOX_QUEUE,
schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice,
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
)
_execute_workflow_common(
task_data,
TriggerCFSPlanScheduler(plan=cfs_plan_scheduler_entity),
AsyncWorkflowCFSPlanScheduler(plan=cfs_plan_scheduler_entity),
cfs_plan_scheduler_entity,
)
def _execute_workflow_common(
task_data: WorkflowTaskData,
cfs_plan_scheduler: TriggerCFSPlanScheduler,
cfs_plan_scheduler_entity: TriggerWorkflowCFSPlanEntity,
cfs_plan_scheduler: AsyncWorkflowCFSPlanScheduler,
cfs_plan_scheduler_entity: AsyncWorkflowCFSPlanEntity,
):
"""Execute workflow with common logic and trigger log updates."""

View File

@ -3,7 +3,7 @@ from services.workflow.scheduler import CFSPlanScheduler, SchedulerCommand
from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue
class TriggerWorkflowCFSPlanEntity(WorkflowScheduleCFSPlanEntity):
class AsyncWorkflowCFSPlanEntity(WorkflowScheduleCFSPlanEntity):
"""
Trigger workflow CFS plan entity.
"""
@ -11,17 +11,17 @@ class TriggerWorkflowCFSPlanEntity(WorkflowScheduleCFSPlanEntity):
queue: AsyncWorkflowQueue
class TriggerCFSPlanScheduler(CFSPlanScheduler):
class AsyncWorkflowCFSPlanScheduler(CFSPlanScheduler):
"""
Trigger workflow CFS plan scheduler.
"""
plan: AsyncWorkflowCFSPlanEntity
def can_schedule(self) -> SchedulerCommand:
"""
Check if the workflow can be scheduled.
"""
assert isinstance(self.plan, TriggerWorkflowCFSPlanEntity)
if self.plan.queue in [AsyncWorkflowQueue.PROFESSIONAL_QUEUE, AsyncWorkflowQueue.TEAM_QUEUE]:
"""
permitted all paid users to schedule the workflow any time
@ -29,5 +29,4 @@ class TriggerCFSPlanScheduler(CFSPlanScheduler):
return SchedulerCommand.NONE
# FIXME: avoid the sandbox user's workflow at a running state for ever
return SchedulerCommand.NONE
return SchedulerCommand.RESOURCE_LIMIT_REACHED