diff --git a/api/commands.py b/api/commands.py index 0a3b808950..5c13667efe 100644 --- a/api/commands.py +++ b/api/commands.py @@ -948,6 +948,8 @@ def clean_workflow_runs( """ Clean workflow runs and related workflow data for free tenants. """ + from extensions.otel.runtime import flush_telemetry + if (start_from is None) ^ (end_before is None): raise click.UsageError("--start-from and --end-before must be provided together.") @@ -967,13 +969,16 @@ def clean_workflow_runs( start_time = datetime.datetime.now(datetime.UTC) click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white")) - WorkflowRunCleanup( - days=before_days, - batch_size=batch_size, - start_from=start_from, - end_before=end_before, - dry_run=dry_run, - ).run() + try: + WorkflowRunCleanup( + days=before_days, + batch_size=batch_size, + start_from=start_from, + end_before=end_before, + dry_run=dry_run, + ).run() + finally: + flush_telemetry() end_time = datetime.datetime.now(datetime.UTC) elapsed = end_time - start_time diff --git a/api/extensions/otel/runtime.py b/api/extensions/otel/runtime.py index a7181d2683..f9a433a7a6 100644 --- a/api/extensions/otel/runtime.py +++ b/api/extensions/otel/runtime.py @@ -1,3 +1,4 @@ +import contextlib import logging import os import sys @@ -5,7 +6,7 @@ from typing import Union from celery.signals import worker_init from flask_login import user_loaded_from_request, user_logged_in -from opentelemetry import trace +from opentelemetry import metrics, trace from opentelemetry.propagate import set_global_textmap from opentelemetry.propagators.b3 import B3Format from opentelemetry.propagators.composite import CompositePropagator @@ -31,9 +32,25 @@ def setup_context_propagation() -> None: def shutdown_tracer() -> None: + flush_telemetry() + + +def flush_telemetry() -> None: + """ + Best-effort flush for telemetry providers. + + This is mainly used by short-lived command processes (e.g. Kubernetes CronJob) + so counters/histograms are exported before the process exits. + """ provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): - provider.force_flush() + with contextlib.suppress(Exception): + provider.force_flush() + + metric_provider = metrics.get_meter_provider() + if hasattr(metric_provider, "force_flush"): + with contextlib.suppress(Exception): + metric_provider.force_flush() def is_celery_worker(): diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py index 2c94cb5324..57e411fd26 100644 --- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py @@ -1,9 +1,9 @@ import datetime import logging -import os import random import time from collections.abc import Iterable, Sequence +from typing import TYPE_CHECKING import click from sqlalchemy.orm import Session, sessionmaker @@ -20,6 +20,155 @@ from services.billing_service import BillingService, SubscriptionPlan logger = logging.getLogger(__name__) +if TYPE_CHECKING: + from opentelemetry.metrics import Counter, Histogram + + +class WorkflowRunCleanupMetrics: + """ + Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs. + + Metrics are emitted with stable labels only (dry_run/window_mode/status) + to keep dashboard and alert cardinality predictable in production clusters. + """ + + _job_runs_total: "Counter | None" + _batches_total: "Counter | None" + _runs_scanned_total: "Counter | None" + _runs_targeted_total: "Counter | None" + _runs_deleted_total: "Counter | None" + _runs_skipped_total: "Counter | None" + _related_records_total: "Counter | None" + _job_duration_seconds: "Histogram | None" + _batch_duration_seconds: "Histogram | None" + _base_attributes: dict[str, str] + + def __init__(self, *, dry_run: bool, has_window: bool) -> None: + self._job_runs_total = None + self._batches_total = None + self._runs_scanned_total = None + self._runs_targeted_total = None + self._runs_deleted_total = None + self._runs_skipped_total = None + self._related_records_total = None + self._job_duration_seconds = None + self._batch_duration_seconds = None + self._base_attributes = { + "job_name": "workflow_run_cleanup", + "dry_run": str(dry_run).lower(), + "window_mode": "between" if has_window else "before_cutoff", + } + self._init_instruments() + + def _init_instruments(self) -> None: + try: + from opentelemetry.metrics import get_meter + + meter = get_meter("workflow_run_cleanup", version=dify_config.project.version) + self._job_runs_total = meter.create_counter( + "workflow_run_cleanup_jobs_total", + description="Total number of workflow run cleanup jobs by status.", + unit="{job}", + ) + self._batches_total = meter.create_counter( + "workflow_run_cleanup_batches_total", + description="Total number of processed cleanup batches.", + unit="{batch}", + ) + self._runs_scanned_total = meter.create_counter( + "workflow_run_cleanup_scanned_runs_total", + description="Total workflow runs scanned by cleanup jobs.", + unit="{run}", + ) + self._runs_targeted_total = meter.create_counter( + "workflow_run_cleanup_targeted_runs_total", + description="Total workflow runs targeted by cleanup policy.", + unit="{run}", + ) + self._runs_deleted_total = meter.create_counter( + "workflow_run_cleanup_deleted_runs_total", + description="Total workflow runs deleted by cleanup jobs.", + unit="{run}", + ) + self._runs_skipped_total = meter.create_counter( + "workflow_run_cleanup_skipped_runs_total", + description="Total workflow runs skipped because tenant is paid/unknown.", + unit="{run}", + ) + self._related_records_total = meter.create_counter( + "workflow_run_cleanup_related_records_total", + description="Total related records processed by cleanup jobs.", + unit="{record}", + ) + self._job_duration_seconds = meter.create_histogram( + "workflow_run_cleanup_job_duration_seconds", + description="Duration of workflow run cleanup jobs in seconds.", + unit="s", + ) + self._batch_duration_seconds = meter.create_histogram( + "workflow_run_cleanup_batch_duration_seconds", + description="Duration of workflow run cleanup batch processing in seconds.", + unit="s", + ) + except Exception: + logger.exception("workflow_run_cleanup_metrics: failed to initialize instruments") + + def _attrs(self, **extra: str) -> dict[str, str]: + return {**self._base_attributes, **extra} + + @staticmethod + def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None: + if not counter or value <= 0: + return + try: + counter.add(value, attributes) + except Exception: + logger.exception("workflow_run_cleanup_metrics: failed to add counter value") + + @staticmethod + def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None: + if not histogram: + return + try: + histogram.record(value, attributes) + except Exception: + logger.exception("workflow_run_cleanup_metrics: failed to record histogram value") + + def record_batch( + self, + *, + batch_rows: int, + targeted_runs: int, + skipped_runs: int, + deleted_runs: int, + related_counts: dict[str, int] | None, + related_action: str | None, + batch_duration_seconds: float, + ) -> None: + attributes = self._attrs() + self._add(self._batches_total, 1, attributes) + self._add(self._runs_scanned_total, batch_rows, attributes) + self._add(self._runs_targeted_total, targeted_runs, attributes) + self._add(self._runs_skipped_total, skipped_runs, attributes) + self._add(self._runs_deleted_total, deleted_runs, attributes) + self._record(self._batch_duration_seconds, batch_duration_seconds, attributes) + + if not related_counts or not related_action: + return + + for record_type, count in related_counts.items(): + self._add( + self._related_records_total, + count, + self._attrs(action=related_action, record_type=record_type), + ) + + def record_completion(self, *, status: str, job_duration_seconds: float) -> None: + attributes = self._attrs(status=status) + self._add(self._job_runs_total, 1, attributes) + self._record(self._job_duration_seconds, job_duration_seconds, attributes) + + class WorkflowRunCleanup: def __init__( self, @@ -46,6 +195,10 @@ class WorkflowRunCleanup: self.batch_size = batch_size self._cleanup_whitelist: set[str] | None = None self.dry_run = dry_run + self._metrics = WorkflowRunCleanupMetrics( + dry_run=dry_run, + has_window=bool(start_from), + ) self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD self.workflow_run_repo: APIWorkflowRunRepository if workflow_run_repo: @@ -74,153 +227,193 @@ class WorkflowRunCleanup: related_totals = self._empty_related_counts() if self.dry_run else None batch_index = 0 last_seen: tuple[datetime.datetime, str] | None = None + status = "success" + run_start = time.monotonic() + max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL - max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200)) + try: + while True: + batch_start = time.monotonic() - while True: - batch_start = time.monotonic() - - fetch_start = time.monotonic() - run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( - start_from=self.window_start, - end_before=self.window_end, - last_seen=last_seen, - batch_size=self.batch_size, - ) - if not run_rows: - logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1) - break - - batch_index += 1 - last_seen = (run_rows[-1].created_at, run_rows[-1].id) - logger.info( - "workflow_run_cleanup (batch #%s): fetched %s rows in %sms", - batch_index, - len(run_rows), - int((time.monotonic() - fetch_start) * 1000), - ) - - tenant_ids = {row.tenant_id for row in run_rows} - - filter_start = time.monotonic() - free_tenants = self._filter_free_tenants(tenant_ids) - logger.info( - "workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms", - batch_index, - len(free_tenants), - len(tenant_ids), - int((time.monotonic() - filter_start) * 1000), - ) - - free_runs = [row for row in run_rows if row.tenant_id in free_tenants] - paid_or_skipped = len(run_rows) - len(free_runs) - - if not free_runs: - skipped_message = ( - f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)" + fetch_start = time.monotonic() + run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( + start_from=self.window_start, + end_before=self.window_end, + last_seen=last_seen, + batch_size=self.batch_size, ) - click.echo( - click.style( - skipped_message, - fg="yellow", - ) - ) - continue + if not run_rows: + logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1) + break - total_runs_targeted += len(free_runs) - - if self.dry_run: - count_start = time.monotonic() - batch_counts = self.workflow_run_repo.count_runs_with_related( - free_runs, - count_node_executions=self._count_node_executions, - count_trigger_logs=self._count_trigger_logs, - ) + batch_index += 1 + last_seen = (run_rows[-1].created_at, run_rows[-1].id) logger.info( - "workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms", + "workflow_run_cleanup (batch #%s): fetched %s rows in %sms", batch_index, - int((time.monotonic() - count_start) * 1000), + len(run_rows), + int((time.monotonic() - fetch_start) * 1000), ) - if related_totals is not None: - for key in related_totals: - related_totals[key] += batch_counts.get(key, 0) - sample_ids = ", ".join(run.id for run in free_runs[:5]) + + tenant_ids = {row.tenant_id for row in run_rows} + + filter_start = time.monotonic() + free_tenants = self._filter_free_tenants(tenant_ids) + logger.info( + "workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms", + batch_index, + len(free_tenants), + len(tenant_ids), + int((time.monotonic() - filter_start) * 1000), + ) + + free_runs = [row for row in run_rows if row.tenant_id in free_tenants] + paid_or_skipped = len(run_rows) - len(free_runs) + + if not free_runs: + skipped_message = ( + f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)" + ) + click.echo( + click.style( + skipped_message, + fg="yellow", + ) + ) + self._metrics.record_batch( + batch_rows=len(run_rows), + targeted_runs=0, + skipped_runs=paid_or_skipped, + deleted_runs=0, + related_counts=None, + related_action=None, + batch_duration_seconds=time.monotonic() - batch_start, + ) + continue + + total_runs_targeted += len(free_runs) + + if self.dry_run: + count_start = time.monotonic() + batch_counts = self.workflow_run_repo.count_runs_with_related( + free_runs, + count_node_executions=self._count_node_executions, + count_trigger_logs=self._count_trigger_logs, + ) + logger.info( + "workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms", + batch_index, + int((time.monotonic() - count_start) * 1000), + ) + if related_totals is not None: + for key in related_totals: + related_totals[key] += batch_counts.get(key, 0) + sample_ids = ", ".join(run.id for run in free_runs[:5]) + click.echo( + click.style( + f"[batch #{batch_index}] would delete {len(free_runs)} runs " + f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown", + fg="yellow", + ) + ) + logger.info( + "workflow_run_cleanup (batch #%s, dry_run): batch total %sms", + batch_index, + int((time.monotonic() - batch_start) * 1000), + ) + self._metrics.record_batch( + batch_rows=len(run_rows), + targeted_runs=len(free_runs), + skipped_runs=paid_or_skipped, + deleted_runs=0, + related_counts={key: batch_counts.get(key, 0) for key in self._empty_related_counts()}, + related_action="would_delete", + batch_duration_seconds=time.monotonic() - batch_start, + ) + continue + + try: + delete_start = time.monotonic() + counts = self.workflow_run_repo.delete_runs_with_related( + free_runs, + delete_node_executions=self._delete_node_executions, + delete_trigger_logs=self._delete_trigger_logs, + ) + delete_ms = int((time.monotonic() - delete_start) * 1000) + except Exception: + logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0]) + raise + + total_runs_deleted += counts["runs"] click.echo( click.style( - f"[batch #{batch_index}] would delete {len(free_runs)} runs " - f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown", - fg="yellow", + f"[batch #{batch_index}] deleted runs: {counts['runs']} " + f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, " + f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, " + f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); " + f"skipped {paid_or_skipped} paid/unknown", + fg="green", ) ) logger.info( - "workflow_run_cleanup (batch #%s, dry_run): batch total %sms", + "workflow_run_cleanup (batch #%s): delete %sms, batch total %sms", batch_index, + delete_ms, int((time.monotonic() - batch_start) * 1000), ) - continue - - try: - delete_start = time.monotonic() - counts = self.workflow_run_repo.delete_runs_with_related( - free_runs, - delete_node_executions=self._delete_node_executions, - delete_trigger_logs=self._delete_trigger_logs, + self._metrics.record_batch( + batch_rows=len(run_rows), + targeted_runs=len(free_runs), + skipped_runs=paid_or_skipped, + deleted_runs=counts["runs"], + related_counts={key: counts.get(key, 0) for key in self._empty_related_counts()}, + related_action="deleted", + batch_duration_seconds=time.monotonic() - batch_start, ) - delete_ms = int((time.monotonic() - delete_start) * 1000) - except Exception: - logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0]) - raise - total_runs_deleted += counts["runs"] - click.echo( - click.style( - f"[batch #{batch_index}] deleted runs: {counts['runs']} " - f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, " - f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, " - f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); " - f"skipped {paid_or_skipped} paid/unknown", - fg="green", - ) - ) - logger.info( - "workflow_run_cleanup (batch #%s): delete %sms, batch total %sms", - batch_index, - delete_ms, - int((time.monotonic() - batch_start) * 1000), - ) + # Random sleep between batches to avoid overwhelming the database + sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311 + logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms) + time.sleep(sleep_ms / 1000) - # Random sleep between batches to avoid overwhelming the database - sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311 - logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms) - time.sleep(sleep_ms / 1000) - - if self.dry_run: - if self.window_start: - summary_message = ( - f"Dry run complete. Would delete {total_runs_targeted} workflow runs " - f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" - ) + if self.dry_run: + if self.window_start: + summary_message = ( + f"Dry run complete. Would delete {total_runs_targeted} workflow runs " + f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" + ) + else: + summary_message = ( + f"Dry run complete. Would delete {total_runs_targeted} workflow runs " + f"before {self.window_end.isoformat()}" + ) + if related_totals is not None: + summary_message = ( + f"{summary_message}; related records: {self._format_related_counts(related_totals)}" + ) + summary_color = "yellow" else: - summary_message = ( - f"Dry run complete. Would delete {total_runs_targeted} workflow runs " - f"before {self.window_end.isoformat()}" - ) - if related_totals is not None: - summary_message = f"{summary_message}; related records: {self._format_related_counts(related_totals)}" - summary_color = "yellow" - else: - if self.window_start: - summary_message = ( - f"Cleanup complete. Deleted {total_runs_deleted} workflow runs " - f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" - ) - else: - summary_message = ( - f"Cleanup complete. Deleted {total_runs_deleted} workflow runs before {self.window_end.isoformat()}" - ) - summary_color = "white" + if self.window_start: + summary_message = ( + f"Cleanup complete. Deleted {total_runs_deleted} workflow runs " + f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" + ) + else: + summary_message = ( + f"Cleanup complete. Deleted {total_runs_deleted} workflow runs " + f"before {self.window_end.isoformat()}" + ) + summary_color = "white" - click.echo(click.style(summary_message, fg=summary_color)) + click.echo(click.style(summary_message, fg=summary_color)) + except Exception: + status = "failed" + raise + finally: + self._metrics.record_completion( + status=status, + job_duration_seconds=time.monotonic() - run_start, + ) def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]: tenant_id_list = list(tenant_ids) diff --git a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py index 50826d6798..6bf78d3411 100644 --- a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py +++ b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py @@ -265,6 +265,61 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None: cleanup.run() +def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None: + cutoff = datetime.datetime.now() + repo = FakeRepo( + batches=[[FakeRun("run-free", "t_free", cutoff)]], + delete_result={ + "runs": 0, + "node_executions": 2, + "offloads": 1, + "app_logs": 3, + "trigger_logs": 4, + "pauses": 5, + "pause_reasons": 6, + }, + ) + cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10) + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False) + + batch_calls: list[dict[str, object]] = [] + completion_calls: list[dict[str, object]] = [] + monkeypatch.setattr(cleanup._metrics, "record_batch", lambda **kwargs: batch_calls.append(kwargs)) + monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs)) + + cleanup.run() + + assert len(batch_calls) == 1 + assert batch_calls[0]["batch_rows"] == 1 + assert batch_calls[0]["targeted_runs"] == 1 + assert batch_calls[0]["deleted_runs"] == 1 + assert batch_calls[0]["related_action"] == "deleted" + assert len(completion_calls) == 1 + assert completion_calls[0]["status"] == "success" + + +def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None: + class FailingRepo(FakeRepo): + def delete_runs_with_related( + self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None + ) -> dict[str, int]: + raise RuntimeError("delete failed") + + cutoff = datetime.datetime.now() + repo = FailingRepo(batches=[[FakeRun("run-free", "t_free", cutoff)]]) + cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10) + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False) + + completion_calls: list[dict[str, object]] = [] + monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs)) + + with pytest.raises(RuntimeError, match="delete failed"): + cleanup.run() + + assert len(completion_calls) == 1 + assert completion_calls[0]["status"] == "failed" + + def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None: cutoff = datetime.datetime.now() repo = FakeRepo(