mirror of
https://github.com/langgenius/dify.git
synced 2026-04-25 01:26:57 +08:00
Merge remote-tracking branch 'origin/feat/trigger' into feat/trigger
This commit is contained in:
commit
37805184d9
@ -196,7 +196,7 @@ class AsyncWorkflowConfig(BaseSettings):
|
|||||||
"to avoid this, workflow can be suspended if needed, to achieve"
|
"to avoid this, workflow can be suspended if needed, to achieve"
|
||||||
"this, a time-based checker is required, every granularity seconds, "
|
"this, a time-based checker is required, every granularity seconds, "
|
||||||
"the checker will check the workflow queue and suspend the workflow",
|
"the checker will check the workflow queue and suspend the workflow",
|
||||||
default=60,
|
default=120,
|
||||||
ge=1,
|
ge=1,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -7,6 +7,7 @@ from apscheduler.schedulers.background import BackgroundScheduler # type: ignor
|
|||||||
from core.workflow.graph_engine.entities.commands import CommandType, GraphEngineCommand
|
from core.workflow.graph_engine.entities.commands import CommandType, GraphEngineCommand
|
||||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||||
from core.workflow.graph_events.base import GraphEngineEvent
|
from core.workflow.graph_events.base import GraphEngineEvent
|
||||||
|
from services.workflow.entities import WorkflowScheduleCFSPlanEntity
|
||||||
from services.workflow.scheduler import CFSPlanScheduler, SchedulerCommand
|
from services.workflow.scheduler import CFSPlanScheduler, SchedulerCommand
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -30,6 +31,7 @@ class TimeSliceLayer(GraphEngineLayer):
|
|||||||
super().__init__()
|
super().__init__()
|
||||||
self.cfs_plan_scheduler = cfs_plan_scheduler
|
self.cfs_plan_scheduler = cfs_plan_scheduler
|
||||||
self.stopped = False
|
self.stopped = False
|
||||||
|
self.schedule_id = ""
|
||||||
|
|
||||||
def _checker_job(self, schedule_id: str):
|
def _checker_job(self, schedule_id: str):
|
||||||
"""
|
"""
|
||||||
@ -66,17 +68,21 @@ class TimeSliceLayer(GraphEngineLayer):
|
|||||||
Start timer to check if the workflow need to be suspended.
|
Start timer to check if the workflow need to be suspended.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
schedule_id = uuid.uuid4().hex
|
if self.cfs_plan_scheduler.plan.schedule_strategy == WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice:
|
||||||
|
self.schedule_id = uuid.uuid4().hex
|
||||||
|
|
||||||
self.scheduler.add_job(
|
self.scheduler.add_job(
|
||||||
lambda: self._checker_job(schedule_id),
|
lambda: self._checker_job(self.schedule_id),
|
||||||
"interval",
|
"interval",
|
||||||
seconds=self.cfs_plan_scheduler.plan.granularity,
|
seconds=self.cfs_plan_scheduler.plan.granularity,
|
||||||
id=schedule_id,
|
id=self.schedule_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
def on_event(self, event: GraphEngineEvent):
|
def on_event(self, event: GraphEngineEvent):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def on_graph_end(self, error: Exception | None) -> None:
|
def on_graph_end(self, error: Exception | None) -> None:
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
|
# remove the scheduler
|
||||||
|
if self.schedule_id:
|
||||||
|
self.scheduler.remove_job(self.schedule_id)
|
||||||
|
|||||||
@ -139,6 +139,7 @@ class WorkflowScheduleCFSPlanEntity(BaseModel):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
TimeSlice = "time-slice" # time-slice based plan
|
TimeSlice = "time-slice" # time-slice based plan
|
||||||
|
Nop = "nop" # no plan, just run the workflow
|
||||||
|
|
||||||
schedule_strategy: Strategy
|
schedule_strategy: Strategy
|
||||||
granularity: int = Field(default=-1) # -1 means infinite
|
granularity: int = Field(default=-1) # -1 means infinite
|
||||||
|
|||||||
@ -27,11 +27,10 @@ from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWo
|
|||||||
from services.errors.app import WorkflowNotFoundError
|
from services.errors.app import WorkflowNotFoundError
|
||||||
from services.workflow.entities import (
|
from services.workflow.entities import (
|
||||||
TriggerData,
|
TriggerData,
|
||||||
WorkflowScheduleCFSPlanEntity,
|
|
||||||
WorkflowTaskData,
|
WorkflowTaskData,
|
||||||
)
|
)
|
||||||
from tasks.workflow_cfs_scheduler.cfs_scheduler import AsyncWorkflowCFSPlanEntity, AsyncWorkflowCFSPlanScheduler
|
from tasks.workflow_cfs_scheduler.cfs_scheduler import AsyncWorkflowCFSPlanEntity, AsyncWorkflowCFSPlanScheduler
|
||||||
from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue
|
from tasks.workflow_cfs_scheduler.entities import AsyncWorkflowQueue, AsyncWorkflowSystemStrategy
|
||||||
|
|
||||||
|
|
||||||
@shared_task(queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE)
|
@shared_task(queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE)
|
||||||
@ -40,7 +39,7 @@ def execute_workflow_professional(task_data_dict: dict[str, Any]):
|
|||||||
task_data = WorkflowTaskData.model_validate(task_data_dict)
|
task_data = WorkflowTaskData.model_validate(task_data_dict)
|
||||||
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
|
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
|
||||||
queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE,
|
queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE,
|
||||||
schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice,
|
schedule_strategy=AsyncWorkflowSystemStrategy,
|
||||||
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
|
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
|
||||||
)
|
)
|
||||||
_execute_workflow_common(
|
_execute_workflow_common(
|
||||||
@ -56,7 +55,7 @@ def execute_workflow_team(task_data_dict: dict[str, Any]):
|
|||||||
task_data = WorkflowTaskData.model_validate(task_data_dict)
|
task_data = WorkflowTaskData.model_validate(task_data_dict)
|
||||||
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
|
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
|
||||||
queue=AsyncWorkflowQueue.TEAM_QUEUE,
|
queue=AsyncWorkflowQueue.TEAM_QUEUE,
|
||||||
schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice,
|
schedule_strategy=AsyncWorkflowSystemStrategy,
|
||||||
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
|
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
|
||||||
)
|
)
|
||||||
_execute_workflow_common(
|
_execute_workflow_common(
|
||||||
@ -72,7 +71,7 @@ def execute_workflow_sandbox(task_data_dict: dict[str, Any]):
|
|||||||
task_data = WorkflowTaskData.model_validate(task_data_dict)
|
task_data = WorkflowTaskData.model_validate(task_data_dict)
|
||||||
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
|
cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
|
||||||
queue=AsyncWorkflowQueue.SANDBOX_QUEUE,
|
queue=AsyncWorkflowQueue.SANDBOX_QUEUE,
|
||||||
schedule_strategy=WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice,
|
schedule_strategy=AsyncWorkflowSystemStrategy,
|
||||||
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
|
granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
|
||||||
)
|
)
|
||||||
_execute_workflow_common(
|
_execute_workflow_common(
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
from enum import StrEnum
|
from enum import StrEnum
|
||||||
|
|
||||||
from configs import dify_config
|
from configs import dify_config
|
||||||
|
from services.workflow.entities import WorkflowScheduleCFSPlanEntity
|
||||||
|
|
||||||
# Determine queue names based on edition
|
# Determine queue names based on edition
|
||||||
if dify_config.EDITION == "CLOUD":
|
if dify_config.EDITION == "CLOUD":
|
||||||
@ -8,11 +9,13 @@ if dify_config.EDITION == "CLOUD":
|
|||||||
_professional_queue = "workflow_professional"
|
_professional_queue = "workflow_professional"
|
||||||
_team_queue = "workflow_team"
|
_team_queue = "workflow_team"
|
||||||
_sandbox_queue = "workflow_sandbox"
|
_sandbox_queue = "workflow_sandbox"
|
||||||
|
AsyncWorkflowSystemStrategy = WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice
|
||||||
else:
|
else:
|
||||||
# Community edition: single workflow queue (not dataset)
|
# Community edition: single workflow queue (not dataset)
|
||||||
_professional_queue = "workflow"
|
_professional_queue = "workflow"
|
||||||
_team_queue = "workflow"
|
_team_queue = "workflow"
|
||||||
_sandbox_queue = "workflow"
|
_sandbox_queue = "workflow"
|
||||||
|
AsyncWorkflowSystemStrategy = WorkflowScheduleCFSPlanEntity.Strategy.Nop
|
||||||
|
|
||||||
|
|
||||||
class AsyncWorkflowQueue(StrEnum):
|
class AsyncWorkflowQueue(StrEnum):
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user