From d1cdc85a2ea262133104fd81eb73964d870f5e84 Mon Sep 17 00:00:00 2001 From: hj24 Date: Fri, 6 Feb 2026 13:58:49 +0800 Subject: [PATCH] chore: add random sleep for workflow cleanup --- ...ear_free_plan_expired_workflow_run_logs.py | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py index c3e0dce399..2c94cb5324 100644 --- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py @@ -1,5 +1,8 @@ import datetime import logging +import os +import random +import time from collections.abc import Iterable, Sequence import click @@ -72,7 +75,12 @@ class WorkflowRunCleanup: batch_index = 0 last_seen: tuple[datetime.datetime, str] | None = None + max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200)) + while True: + batch_start = time.monotonic() + + fetch_start = time.monotonic() run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( start_from=self.window_start, end_before=self.window_end, @@ -80,12 +88,30 @@ class WorkflowRunCleanup: batch_size=self.batch_size, ) if not run_rows: + logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1) break batch_index += 1 last_seen = (run_rows[-1].created_at, run_rows[-1].id) + logger.info( + "workflow_run_cleanup (batch #%s): fetched %s rows in %sms", + batch_index, + len(run_rows), + int((time.monotonic() - fetch_start) * 1000), + ) + tenant_ids = {row.tenant_id for row in run_rows} + + filter_start = time.monotonic() free_tenants = self._filter_free_tenants(tenant_ids) + logger.info( + "workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms", + batch_index, + len(free_tenants), + len(tenant_ids), + int((time.monotonic() - filter_start) * 1000), + ) + free_runs = [row for row in run_rows if row.tenant_id in free_tenants] paid_or_skipped = len(run_rows) - len(free_runs) @@ -104,11 +130,17 @@ class WorkflowRunCleanup: total_runs_targeted += len(free_runs) if self.dry_run: + count_start = time.monotonic() batch_counts = self.workflow_run_repo.count_runs_with_related( free_runs, count_node_executions=self._count_node_executions, count_trigger_logs=self._count_trigger_logs, ) + logger.info( + "workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms", + batch_index, + int((time.monotonic() - count_start) * 1000), + ) if related_totals is not None: for key in related_totals: related_totals[key] += batch_counts.get(key, 0) @@ -120,14 +152,21 @@ class WorkflowRunCleanup: fg="yellow", ) ) + logger.info( + "workflow_run_cleanup (batch #%s, dry_run): batch total %sms", + batch_index, + int((time.monotonic() - batch_start) * 1000), + ) continue try: + delete_start = time.monotonic() counts = self.workflow_run_repo.delete_runs_with_related( free_runs, delete_node_executions=self._delete_node_executions, delete_trigger_logs=self._delete_trigger_logs, ) + delete_ms = int((time.monotonic() - delete_start) * 1000) except Exception: logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0]) raise @@ -143,6 +182,17 @@ class WorkflowRunCleanup: fg="green", ) ) + logger.info( + "workflow_run_cleanup (batch #%s): delete %sms, batch total %sms", + batch_index, + delete_ms, + int((time.monotonic() - batch_start) * 1000), + ) + + # Random sleep between batches to avoid overwhelming the database + sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311 + logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms) + time.sleep(sleep_ms / 1000) if self.dry_run: if self.window_start: