From 65a9233559314e9370d7f6bd3ee6af6d75522c29 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Fri, 16 Jan 2026 13:58:45 +0800 Subject: [PATCH] Add SQL timing logs to workflow run cleanup --- api/commands.py | 15 ++ ...ear_free_plan_expired_workflow_run_logs.py | 184 ++++++++++++------ 2 files changed, 137 insertions(+), 62 deletions(-) diff --git a/api/commands.py b/api/commands.py index e223df74d4..afb7c1a6ec 100644 --- a/api/commands.py +++ b/api/commands.py @@ -881,12 +881,25 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[ is_flag=True, help="Preview cleanup results without deleting any workflow run data.", ) +@click.option( + "--log-sql", + is_flag=True, + help="Log SQL statements and timings for cleanup queries.", +) +@click.option( + "--log-sql-min-ms", + default=0, + show_default=True, + help="Only log SQL statements slower than N milliseconds (0 logs all).", +) def clean_workflow_runs( days: int, batch_size: int, start_from: datetime.datetime | None, end_before: datetime.datetime | None, dry_run: bool, + log_sql: bool, + log_sql_min_ms: int, ): """ Clean workflow runs and related workflow data for free tenants. @@ -903,6 +916,8 @@ def clean_workflow_runs( start_from=start_from, end_before=end_before, dry_run=dry_run, + log_sql=log_sql, + log_sql_min_ms=log_sql_min_ms, ).run() end_time = datetime.datetime.now(datetime.UTC) diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py index 2213169510..05e2449d5c 100644 --- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py @@ -1,8 +1,11 @@ import datetime import logging +import time from collections.abc import Iterable, Sequence +from contextlib import contextmanager import click +from sqlalchemy import event from sqlalchemy.orm import Session, sessionmaker from configs import dify_config @@ -28,6 +31,8 @@ class WorkflowRunCleanup: end_before: datetime.datetime | None = None, workflow_run_repo: APIWorkflowRunRepository | None = None, dry_run: bool = False, + log_sql: bool = False, + log_sql_min_ms: int = 0, ): if (start_from is None) ^ (end_before is None): raise ValueError("start_from and end_before must be both set or both omitted.") @@ -45,6 +50,8 @@ class WorkflowRunCleanup: self.batch_size = batch_size self._cleanup_whitelist: set[str] | None = None self.dry_run = dry_run + self.log_sql = log_sql + self.log_sql_min_ms = max(0, log_sql_min_ms) self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD self.workflow_run_repo: APIWorkflowRunRepository if workflow_run_repo: @@ -56,6 +63,38 @@ class WorkflowRunCleanup: session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) self.workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) + @contextmanager + def _sql_logger(self): + if not self.log_sql: + yield + return + + def _before_cursor_execute(conn, cursor, statement, parameters, context, executemany) -> None: + context._dify_sql_start_time = time.monotonic() + context._dify_sql_statement = statement + context._dify_sql_parameters = parameters + + def _after_cursor_execute(conn, cursor, statement, parameters, context, executemany) -> None: + start = getattr(context, "_dify_sql_start_time", None) + if start is None: + return + elapsed_ms = (time.monotonic() - start) * 1000 + if elapsed_ms < self.log_sql_min_ms: + return + logged_statement = getattr(context, "_dify_sql_statement", statement) + logged_parameters = getattr(context, "_dify_sql_parameters", parameters) + click.echo(f"[sql] {elapsed_ms:.1f} ms {logged_statement}") + if logged_parameters: + click.echo(f"[sql] params: {logged_parameters}") + + event.listen(db.engine, "before_cursor_execute", _before_cursor_execute) + event.listen(db.engine, "after_cursor_execute", _after_cursor_execute) + try: + yield + finally: + event.remove(db.engine, "before_cursor_execute", _before_cursor_execute) + event.remove(db.engine, "after_cursor_execute", _after_cursor_execute) + def run(self) -> None: click.echo( click.style( @@ -74,74 +113,95 @@ class WorkflowRunCleanup: batch_index = 0 last_seen: tuple[datetime.datetime, str] | None = None - while True: - run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( - start_from=self.window_start, - end_before=self.window_end, - last_seen=last_seen, - batch_size=self.batch_size, - ) - if not run_rows: - break + with self._sql_logger(): + while True: + batch_start = time.monotonic() + run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( + start_from=self.window_start, + end_before=self.window_end, + last_seen=last_seen, + batch_size=self.batch_size, + ) + fetch_ms = (time.monotonic() - batch_start) * 1000 + if not run_rows: + break - batch_index += 1 - last_seen = (run_rows[-1].created_at, run_rows[-1].id) - tenant_ids = {row.tenant_id for row in run_rows} - free_tenants = self._filter_free_tenants(tenant_ids) - free_runs = [row for row in run_rows if row.tenant_id in free_tenants] - paid_or_skipped = len(run_rows) - len(free_runs) + batch_index += 1 + last_seen = (run_rows[-1].created_at, run_rows[-1].id) + tenant_ids = {row.tenant_id for row in run_rows} + billing_start = time.monotonic() + free_tenants = self._filter_free_tenants(tenant_ids) + billing_ms = (time.monotonic() - billing_start) * 1000 + free_runs = [row for row in run_rows if row.tenant_id in free_tenants] + paid_or_skipped = len(run_rows) - len(free_runs) - if not free_runs: + if self.log_sql: + click.echo( + click.style( + f"[batch #{batch_index}] fetch_ms={fetch_ms:.1f} billing_ms={billing_ms:.1f}", + fg="white", + ) + ) + + if not free_runs: + skipped_message = ( + f"[batch #{batch_index}] skipped (no sandbox runs in batch, " + f"{paid_or_skipped} paid/unknown)" + ) + click.echo( + click.style( + skipped_message, + fg="yellow", + ) + ) + continue + + total_runs_targeted += len(free_runs) + + if self.dry_run: + count_start = time.monotonic() + batch_counts = self.workflow_run_repo.count_runs_with_related( + free_runs, + count_node_executions=self._count_node_executions, + count_trigger_logs=self._count_trigger_logs, + ) + count_ms = (time.monotonic() - count_start) * 1000 + if self.log_sql: + click.echo(click.style(f"[batch #{batch_index}] count_ms={count_ms:.1f}", fg="white")) + if related_totals is not None: + for key in related_totals: + related_totals[key] += batch_counts.get(key, 0) + sample_ids = ", ".join(run.id for run in free_runs[:5]) + click.echo( + click.style( + f"[batch #{batch_index}] would delete {len(free_runs)} runs " + f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown", + fg="yellow", + ) + ) + continue + + try: + counts = self.workflow_run_repo.delete_runs_with_related( + free_runs, + delete_node_executions=self._delete_node_executions, + delete_trigger_logs=self._delete_trigger_logs, + ) + except Exception: + logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0]) + raise + + total_runs_deleted += counts["runs"] click.echo( click.style( - f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)", - fg="yellow", + f"[batch #{batch_index}] deleted runs: {counts['runs']} " + f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, " + f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, " + f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); " + f"skipped {paid_or_skipped} paid/unknown", + fg="green", ) ) - continue - - total_runs_targeted += len(free_runs) - - if self.dry_run: - batch_counts = self.workflow_run_repo.count_runs_with_related( - free_runs, - count_node_executions=self._count_node_executions, - count_trigger_logs=self._count_trigger_logs, - ) - if related_totals is not None: - for key in related_totals: - related_totals[key] += batch_counts.get(key, 0) - sample_ids = ", ".join(run.id for run in free_runs[:5]) - click.echo( - click.style( - f"[batch #{batch_index}] would delete {len(free_runs)} runs " - f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown", - fg="yellow", - ) - ) - continue - - try: - counts = self.workflow_run_repo.delete_runs_with_related( - free_runs, - delete_node_executions=self._delete_node_executions, - delete_trigger_logs=self._delete_trigger_logs, - ) - except Exception: - logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0]) - raise - - total_runs_deleted += counts["runs"] - click.echo( - click.style( - f"[batch #{batch_index}] deleted runs: {counts['runs']} " - f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, " - f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, " - f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); " - f"skipped {paid_or_skipped} paid/unknown", - fg="green", - ) - ) if self.dry_run: if self.window_start: