diff --git a/api/core/app/engine_layers/timeslice_layer.py b/api/core/app/engine_layers/timeslice_layer.py index 4f47379a40..8d4491b93c 100644 --- a/api/core/app/engine_layers/timeslice_layer.py +++ b/api/core/app/engine_layers/timeslice_layer.py @@ -31,6 +31,36 @@ class TimeSliceLayer(GraphEngineLayer): self.cfs_plan_scheduler = cfs_plan_scheduler self.stopped = False + def _checker_job(self, schedule_id: str): + """ + Check if the workflow need to be suspended. + """ + try: + if self.stopped: + self.scheduler.remove_job(schedule_id) + return + + if self.cfs_plan_scheduler.can_schedule() == SchedulerCommand.RESOURCE_LIMIT_REACHED: + # remove the job + self.scheduler.remove_job(schedule_id) + + if not self.command_channel: + logger.exception("No command channel to stop the workflow") + return + + # send command to pause the workflow + self.command_channel.send_command( + GraphEngineCommand( + command_type=CommandType.PAUSE, + payload={ + "reason": SchedulerCommand.RESOURCE_LIMIT_REACHED, + }, + ) + ) + + except Exception: + logger.exception("scheduler error during check if the workflow need to be suspended") + def on_graph_start(self): """ Start timer to check if the workflow need to be suspended. @@ -38,38 +68,12 @@ class TimeSliceLayer(GraphEngineLayer): schedule_id = uuid.uuid4().hex - def runner(): - """ - Whenever the workflow is running, keep checking if we need to suspend it. - Otherwise, return directly. - """ - try: - if self.stopped: - self.scheduler.remove_job(schedule_id) - return - - if self.cfs_plan_scheduler.can_schedule() == SchedulerCommand.RESOURCE_LIMIT_REACHED: - # remove the job - self.scheduler.remove_job(schedule_id) - - if not self.command_channel: - logger.exception("No command channel to stop the workflow") - return - - # send command to pause the workflow - self.command_channel.send_command( - GraphEngineCommand( - command_type=CommandType.PAUSE, - payload={ - "reason": SchedulerCommand.RESOURCE_LIMIT_REACHED, - }, - ) - ) - - except Exception: - logger.exception("scheduler error during check if the workflow need to be suspended") - - self.scheduler.add_job(runner, "interval", seconds=self.cfs_plan_scheduler.plan.granularity, id=schedule_id) + self.scheduler.add_job( + lambda: self._checker_job(schedule_id), + "interval", + seconds=self.cfs_plan_scheduler.plan.granularity, + id=schedule_id, + ) def on_event(self, event: GraphEngineEvent): pass