refactor: TimeSliceLayer

This commit is contained in:
Yeuoly 2025-10-22 12:13:12 +08:00
parent 7f70d1de1c
commit cb5607fc8c
1 changed files with 36 additions and 32 deletions

View File

@ -31,6 +31,36 @@ class TimeSliceLayer(GraphEngineLayer):
self.cfs_plan_scheduler = cfs_plan_scheduler
self.stopped = False
def _checker_job(self, schedule_id: str):
"""
Check if the workflow need to be suspended.
"""
try:
if self.stopped:
self.scheduler.remove_job(schedule_id)
return
if self.cfs_plan_scheduler.can_schedule() == SchedulerCommand.RESOURCE_LIMIT_REACHED:
# remove the job
self.scheduler.remove_job(schedule_id)
if not self.command_channel:
logger.exception("No command channel to stop the workflow")
return
# send command to pause the workflow
self.command_channel.send_command(
GraphEngineCommand(
command_type=CommandType.PAUSE,
payload={
"reason": SchedulerCommand.RESOURCE_LIMIT_REACHED,
},
)
)
except Exception:
logger.exception("scheduler error during check if the workflow need to be suspended")
def on_graph_start(self):
"""
Start timer to check if the workflow need to be suspended.
@ -38,38 +68,12 @@ class TimeSliceLayer(GraphEngineLayer):
schedule_id = uuid.uuid4().hex
def runner():
"""
Whenever the workflow is running, keep checking if we need to suspend it.
Otherwise, return directly.
"""
try:
if self.stopped:
self.scheduler.remove_job(schedule_id)
return
if self.cfs_plan_scheduler.can_schedule() == SchedulerCommand.RESOURCE_LIMIT_REACHED:
# remove the job
self.scheduler.remove_job(schedule_id)
if not self.command_channel:
logger.exception("No command channel to stop the workflow")
return
# send command to pause the workflow
self.command_channel.send_command(
GraphEngineCommand(
command_type=CommandType.PAUSE,
payload={
"reason": SchedulerCommand.RESOURCE_LIMIT_REACHED,
},
)
)
except Exception:
logger.exception("scheduler error during check if the workflow need to be suspended")
self.scheduler.add_job(runner, "interval", seconds=self.cfs_plan_scheduler.plan.granularity, id=schedule_id)
self.scheduler.add_job(
lambda: self._checker_job(schedule_id),
"interval",
seconds=self.cfs_plan_scheduler.plan.granularity,
id=schedule_id,
)
def on_event(self, event: GraphEngineEvent):
pass