mirror of
https://github.com/langgenius/dify.git
synced 2026-05-09 12:59:18 +08:00
feat: add metrics to clean workflow run task
This commit is contained in:
parent
c925d17e8f
commit
f90e0d781a
@ -948,6 +948,8 @@ def clean_workflow_runs(
|
||||
"""
|
||||
Clean workflow runs and related workflow data for free tenants.
|
||||
"""
|
||||
from extensions.otel.runtime import flush_telemetry
|
||||
|
||||
if (start_from is None) ^ (end_before is None):
|
||||
raise click.UsageError("--start-from and --end-before must be provided together.")
|
||||
|
||||
@ -967,13 +969,16 @@ def clean_workflow_runs(
|
||||
start_time = datetime.datetime.now(datetime.UTC)
|
||||
click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white"))
|
||||
|
||||
WorkflowRunCleanup(
|
||||
days=before_days,
|
||||
batch_size=batch_size,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
dry_run=dry_run,
|
||||
).run()
|
||||
try:
|
||||
WorkflowRunCleanup(
|
||||
days=before_days,
|
||||
batch_size=batch_size,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
dry_run=dry_run,
|
||||
).run()
|
||||
finally:
|
||||
flush_telemetry()
|
||||
|
||||
end_time = datetime.datetime.now(datetime.UTC)
|
||||
elapsed = end_time - start_time
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
@ -5,7 +6,7 @@ from typing import Union
|
||||
|
||||
from celery.signals import worker_init
|
||||
from flask_login import user_loaded_from_request, user_logged_in
|
||||
from opentelemetry import trace
|
||||
from opentelemetry import metrics, trace
|
||||
from opentelemetry.propagate import set_global_textmap
|
||||
from opentelemetry.propagators.b3 import B3Format
|
||||
from opentelemetry.propagators.composite import CompositePropagator
|
||||
@ -31,9 +32,25 @@ def setup_context_propagation() -> None:
|
||||
|
||||
|
||||
def shutdown_tracer() -> None:
|
||||
flush_telemetry()
|
||||
|
||||
|
||||
def flush_telemetry() -> None:
|
||||
"""
|
||||
Best-effort flush for telemetry providers.
|
||||
|
||||
This is mainly used by short-lived command processes (e.g. Kubernetes CronJob)
|
||||
so counters/histograms are exported before the process exits.
|
||||
"""
|
||||
provider = trace.get_tracer_provider()
|
||||
if hasattr(provider, "force_flush"):
|
||||
provider.force_flush()
|
||||
with contextlib.suppress(Exception):
|
||||
provider.force_flush()
|
||||
|
||||
metric_provider = metrics.get_meter_provider()
|
||||
if hasattr(metric_provider, "force_flush"):
|
||||
with contextlib.suppress(Exception):
|
||||
metric_provider.force_flush()
|
||||
|
||||
|
||||
def is_celery_worker():
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from collections.abc import Iterable, Sequence
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import click
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
@ -20,6 +20,155 @@ from services.billing_service import BillingService, SubscriptionPlan
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from opentelemetry.metrics import Counter, Histogram
|
||||
|
||||
|
||||
class WorkflowRunCleanupMetrics:
|
||||
"""
|
||||
Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs.
|
||||
|
||||
Metrics are emitted with stable labels only (dry_run/window_mode/status)
|
||||
to keep dashboard and alert cardinality predictable in production clusters.
|
||||
"""
|
||||
|
||||
_job_runs_total: "Counter | None"
|
||||
_batches_total: "Counter | None"
|
||||
_runs_scanned_total: "Counter | None"
|
||||
_runs_targeted_total: "Counter | None"
|
||||
_runs_deleted_total: "Counter | None"
|
||||
_runs_skipped_total: "Counter | None"
|
||||
_related_records_total: "Counter | None"
|
||||
_job_duration_seconds: "Histogram | None"
|
||||
_batch_duration_seconds: "Histogram | None"
|
||||
_base_attributes: dict[str, str]
|
||||
|
||||
def __init__(self, *, dry_run: bool, has_window: bool) -> None:
|
||||
self._job_runs_total = None
|
||||
self._batches_total = None
|
||||
self._runs_scanned_total = None
|
||||
self._runs_targeted_total = None
|
||||
self._runs_deleted_total = None
|
||||
self._runs_skipped_total = None
|
||||
self._related_records_total = None
|
||||
self._job_duration_seconds = None
|
||||
self._batch_duration_seconds = None
|
||||
self._base_attributes = {
|
||||
"job_name": "workflow_run_cleanup",
|
||||
"dry_run": str(dry_run).lower(),
|
||||
"window_mode": "between" if has_window else "before_cutoff",
|
||||
}
|
||||
self._init_instruments()
|
||||
|
||||
def _init_instruments(self) -> None:
|
||||
try:
|
||||
from opentelemetry.metrics import get_meter
|
||||
|
||||
meter = get_meter("workflow_run_cleanup", version=dify_config.project.version)
|
||||
self._job_runs_total = meter.create_counter(
|
||||
"workflow_run_cleanup_jobs_total",
|
||||
description="Total number of workflow run cleanup jobs by status.",
|
||||
unit="{job}",
|
||||
)
|
||||
self._batches_total = meter.create_counter(
|
||||
"workflow_run_cleanup_batches_total",
|
||||
description="Total number of processed cleanup batches.",
|
||||
unit="{batch}",
|
||||
)
|
||||
self._runs_scanned_total = meter.create_counter(
|
||||
"workflow_run_cleanup_scanned_runs_total",
|
||||
description="Total workflow runs scanned by cleanup jobs.",
|
||||
unit="{run}",
|
||||
)
|
||||
self._runs_targeted_total = meter.create_counter(
|
||||
"workflow_run_cleanup_targeted_runs_total",
|
||||
description="Total workflow runs targeted by cleanup policy.",
|
||||
unit="{run}",
|
||||
)
|
||||
self._runs_deleted_total = meter.create_counter(
|
||||
"workflow_run_cleanup_deleted_runs_total",
|
||||
description="Total workflow runs deleted by cleanup jobs.",
|
||||
unit="{run}",
|
||||
)
|
||||
self._runs_skipped_total = meter.create_counter(
|
||||
"workflow_run_cleanup_skipped_runs_total",
|
||||
description="Total workflow runs skipped because tenant is paid/unknown.",
|
||||
unit="{run}",
|
||||
)
|
||||
self._related_records_total = meter.create_counter(
|
||||
"workflow_run_cleanup_related_records_total",
|
||||
description="Total related records processed by cleanup jobs.",
|
||||
unit="{record}",
|
||||
)
|
||||
self._job_duration_seconds = meter.create_histogram(
|
||||
"workflow_run_cleanup_job_duration_seconds",
|
||||
description="Duration of workflow run cleanup jobs in seconds.",
|
||||
unit="s",
|
||||
)
|
||||
self._batch_duration_seconds = meter.create_histogram(
|
||||
"workflow_run_cleanup_batch_duration_seconds",
|
||||
description="Duration of workflow run cleanup batch processing in seconds.",
|
||||
unit="s",
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("workflow_run_cleanup_metrics: failed to initialize instruments")
|
||||
|
||||
def _attrs(self, **extra: str) -> dict[str, str]:
|
||||
return {**self._base_attributes, **extra}
|
||||
|
||||
@staticmethod
|
||||
def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None:
|
||||
if not counter or value <= 0:
|
||||
return
|
||||
try:
|
||||
counter.add(value, attributes)
|
||||
except Exception:
|
||||
logger.exception("workflow_run_cleanup_metrics: failed to add counter value")
|
||||
|
||||
@staticmethod
|
||||
def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None:
|
||||
if not histogram:
|
||||
return
|
||||
try:
|
||||
histogram.record(value, attributes)
|
||||
except Exception:
|
||||
logger.exception("workflow_run_cleanup_metrics: failed to record histogram value")
|
||||
|
||||
def record_batch(
|
||||
self,
|
||||
*,
|
||||
batch_rows: int,
|
||||
targeted_runs: int,
|
||||
skipped_runs: int,
|
||||
deleted_runs: int,
|
||||
related_counts: dict[str, int] | None,
|
||||
related_action: str | None,
|
||||
batch_duration_seconds: float,
|
||||
) -> None:
|
||||
attributes = self._attrs()
|
||||
self._add(self._batches_total, 1, attributes)
|
||||
self._add(self._runs_scanned_total, batch_rows, attributes)
|
||||
self._add(self._runs_targeted_total, targeted_runs, attributes)
|
||||
self._add(self._runs_skipped_total, skipped_runs, attributes)
|
||||
self._add(self._runs_deleted_total, deleted_runs, attributes)
|
||||
self._record(self._batch_duration_seconds, batch_duration_seconds, attributes)
|
||||
|
||||
if not related_counts or not related_action:
|
||||
return
|
||||
|
||||
for record_type, count in related_counts.items():
|
||||
self._add(
|
||||
self._related_records_total,
|
||||
count,
|
||||
self._attrs(action=related_action, record_type=record_type),
|
||||
)
|
||||
|
||||
def record_completion(self, *, status: str, job_duration_seconds: float) -> None:
|
||||
attributes = self._attrs(status=status)
|
||||
self._add(self._job_runs_total, 1, attributes)
|
||||
self._record(self._job_duration_seconds, job_duration_seconds, attributes)
|
||||
|
||||
|
||||
class WorkflowRunCleanup:
|
||||
def __init__(
|
||||
self,
|
||||
@ -46,6 +195,10 @@ class WorkflowRunCleanup:
|
||||
self.batch_size = batch_size
|
||||
self._cleanup_whitelist: set[str] | None = None
|
||||
self.dry_run = dry_run
|
||||
self._metrics = WorkflowRunCleanupMetrics(
|
||||
dry_run=dry_run,
|
||||
has_window=bool(start_from),
|
||||
)
|
||||
self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD
|
||||
self.workflow_run_repo: APIWorkflowRunRepository
|
||||
if workflow_run_repo:
|
||||
@ -74,153 +227,193 @@ class WorkflowRunCleanup:
|
||||
related_totals = self._empty_related_counts() if self.dry_run else None
|
||||
batch_index = 0
|
||||
last_seen: tuple[datetime.datetime, str] | None = None
|
||||
status = "success"
|
||||
run_start = time.monotonic()
|
||||
max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL
|
||||
|
||||
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200))
|
||||
try:
|
||||
while True:
|
||||
batch_start = time.monotonic()
|
||||
|
||||
while True:
|
||||
batch_start = time.monotonic()
|
||||
|
||||
fetch_start = time.monotonic()
|
||||
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
|
||||
start_from=self.window_start,
|
||||
end_before=self.window_end,
|
||||
last_seen=last_seen,
|
||||
batch_size=self.batch_size,
|
||||
)
|
||||
if not run_rows:
|
||||
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
|
||||
break
|
||||
|
||||
batch_index += 1
|
||||
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
|
||||
batch_index,
|
||||
len(run_rows),
|
||||
int((time.monotonic() - fetch_start) * 1000),
|
||||
)
|
||||
|
||||
tenant_ids = {row.tenant_id for row in run_rows}
|
||||
|
||||
filter_start = time.monotonic()
|
||||
free_tenants = self._filter_free_tenants(tenant_ids)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
|
||||
batch_index,
|
||||
len(free_tenants),
|
||||
len(tenant_ids),
|
||||
int((time.monotonic() - filter_start) * 1000),
|
||||
)
|
||||
|
||||
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
|
||||
paid_or_skipped = len(run_rows) - len(free_runs)
|
||||
|
||||
if not free_runs:
|
||||
skipped_message = (
|
||||
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
|
||||
fetch_start = time.monotonic()
|
||||
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
|
||||
start_from=self.window_start,
|
||||
end_before=self.window_end,
|
||||
last_seen=last_seen,
|
||||
batch_size=self.batch_size,
|
||||
)
|
||||
click.echo(
|
||||
click.style(
|
||||
skipped_message,
|
||||
fg="yellow",
|
||||
)
|
||||
)
|
||||
continue
|
||||
if not run_rows:
|
||||
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
|
||||
break
|
||||
|
||||
total_runs_targeted += len(free_runs)
|
||||
|
||||
if self.dry_run:
|
||||
count_start = time.monotonic()
|
||||
batch_counts = self.workflow_run_repo.count_runs_with_related(
|
||||
free_runs,
|
||||
count_node_executions=self._count_node_executions,
|
||||
count_trigger_logs=self._count_trigger_logs,
|
||||
)
|
||||
batch_index += 1
|
||||
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
|
||||
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
|
||||
batch_index,
|
||||
int((time.monotonic() - count_start) * 1000),
|
||||
len(run_rows),
|
||||
int((time.monotonic() - fetch_start) * 1000),
|
||||
)
|
||||
if related_totals is not None:
|
||||
for key in related_totals:
|
||||
related_totals[key] += batch_counts.get(key, 0)
|
||||
sample_ids = ", ".join(run.id for run in free_runs[:5])
|
||||
|
||||
tenant_ids = {row.tenant_id for row in run_rows}
|
||||
|
||||
filter_start = time.monotonic()
|
||||
free_tenants = self._filter_free_tenants(tenant_ids)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
|
||||
batch_index,
|
||||
len(free_tenants),
|
||||
len(tenant_ids),
|
||||
int((time.monotonic() - filter_start) * 1000),
|
||||
)
|
||||
|
||||
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
|
||||
paid_or_skipped = len(run_rows) - len(free_runs)
|
||||
|
||||
if not free_runs:
|
||||
skipped_message = (
|
||||
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
|
||||
)
|
||||
click.echo(
|
||||
click.style(
|
||||
skipped_message,
|
||||
fg="yellow",
|
||||
)
|
||||
)
|
||||
self._metrics.record_batch(
|
||||
batch_rows=len(run_rows),
|
||||
targeted_runs=0,
|
||||
skipped_runs=paid_or_skipped,
|
||||
deleted_runs=0,
|
||||
related_counts=None,
|
||||
related_action=None,
|
||||
batch_duration_seconds=time.monotonic() - batch_start,
|
||||
)
|
||||
continue
|
||||
|
||||
total_runs_targeted += len(free_runs)
|
||||
|
||||
if self.dry_run:
|
||||
count_start = time.monotonic()
|
||||
batch_counts = self.workflow_run_repo.count_runs_with_related(
|
||||
free_runs,
|
||||
count_node_executions=self._count_node_executions,
|
||||
count_trigger_logs=self._count_trigger_logs,
|
||||
)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
|
||||
batch_index,
|
||||
int((time.monotonic() - count_start) * 1000),
|
||||
)
|
||||
if related_totals is not None:
|
||||
for key in related_totals:
|
||||
related_totals[key] += batch_counts.get(key, 0)
|
||||
sample_ids = ", ".join(run.id for run in free_runs[:5])
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
|
||||
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
|
||||
fg="yellow",
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
|
||||
batch_index,
|
||||
int((time.monotonic() - batch_start) * 1000),
|
||||
)
|
||||
self._metrics.record_batch(
|
||||
batch_rows=len(run_rows),
|
||||
targeted_runs=len(free_runs),
|
||||
skipped_runs=paid_or_skipped,
|
||||
deleted_runs=0,
|
||||
related_counts={key: batch_counts.get(key, 0) for key in self._empty_related_counts()},
|
||||
related_action="would_delete",
|
||||
batch_duration_seconds=time.monotonic() - batch_start,
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
delete_start = time.monotonic()
|
||||
counts = self.workflow_run_repo.delete_runs_with_related(
|
||||
free_runs,
|
||||
delete_node_executions=self._delete_node_executions,
|
||||
delete_trigger_logs=self._delete_trigger_logs,
|
||||
)
|
||||
delete_ms = int((time.monotonic() - delete_start) * 1000)
|
||||
except Exception:
|
||||
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
|
||||
raise
|
||||
|
||||
total_runs_deleted += counts["runs"]
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
|
||||
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
|
||||
fg="yellow",
|
||||
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
|
||||
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
|
||||
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
|
||||
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
|
||||
f"skipped {paid_or_skipped} paid/unknown",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
|
||||
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
|
||||
batch_index,
|
||||
delete_ms,
|
||||
int((time.monotonic() - batch_start) * 1000),
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
delete_start = time.monotonic()
|
||||
counts = self.workflow_run_repo.delete_runs_with_related(
|
||||
free_runs,
|
||||
delete_node_executions=self._delete_node_executions,
|
||||
delete_trigger_logs=self._delete_trigger_logs,
|
||||
self._metrics.record_batch(
|
||||
batch_rows=len(run_rows),
|
||||
targeted_runs=len(free_runs),
|
||||
skipped_runs=paid_or_skipped,
|
||||
deleted_runs=counts["runs"],
|
||||
related_counts={key: counts.get(key, 0) for key in self._empty_related_counts()},
|
||||
related_action="deleted",
|
||||
batch_duration_seconds=time.monotonic() - batch_start,
|
||||
)
|
||||
delete_ms = int((time.monotonic() - delete_start) * 1000)
|
||||
except Exception:
|
||||
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
|
||||
raise
|
||||
|
||||
total_runs_deleted += counts["runs"]
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
|
||||
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
|
||||
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
|
||||
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
|
||||
f"skipped {paid_or_skipped} paid/unknown",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
|
||||
batch_index,
|
||||
delete_ms,
|
||||
int((time.monotonic() - batch_start) * 1000),
|
||||
)
|
||||
# Random sleep between batches to avoid overwhelming the database
|
||||
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
|
||||
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
|
||||
time.sleep(sleep_ms / 1000)
|
||||
|
||||
# Random sleep between batches to avoid overwhelming the database
|
||||
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
|
||||
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
|
||||
time.sleep(sleep_ms / 1000)
|
||||
|
||||
if self.dry_run:
|
||||
if self.window_start:
|
||||
summary_message = (
|
||||
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
||||
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
||||
)
|
||||
if self.dry_run:
|
||||
if self.window_start:
|
||||
summary_message = (
|
||||
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
||||
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
||||
)
|
||||
else:
|
||||
summary_message = (
|
||||
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
||||
f"before {self.window_end.isoformat()}"
|
||||
)
|
||||
if related_totals is not None:
|
||||
summary_message = (
|
||||
f"{summary_message}; related records: {self._format_related_counts(related_totals)}"
|
||||
)
|
||||
summary_color = "yellow"
|
||||
else:
|
||||
summary_message = (
|
||||
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
||||
f"before {self.window_end.isoformat()}"
|
||||
)
|
||||
if related_totals is not None:
|
||||
summary_message = f"{summary_message}; related records: {self._format_related_counts(related_totals)}"
|
||||
summary_color = "yellow"
|
||||
else:
|
||||
if self.window_start:
|
||||
summary_message = (
|
||||
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
|
||||
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
||||
)
|
||||
else:
|
||||
summary_message = (
|
||||
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs before {self.window_end.isoformat()}"
|
||||
)
|
||||
summary_color = "white"
|
||||
if self.window_start:
|
||||
summary_message = (
|
||||
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
|
||||
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
||||
)
|
||||
else:
|
||||
summary_message = (
|
||||
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
|
||||
f"before {self.window_end.isoformat()}"
|
||||
)
|
||||
summary_color = "white"
|
||||
|
||||
click.echo(click.style(summary_message, fg=summary_color))
|
||||
click.echo(click.style(summary_message, fg=summary_color))
|
||||
except Exception:
|
||||
status = "failed"
|
||||
raise
|
||||
finally:
|
||||
self._metrics.record_completion(
|
||||
status=status,
|
||||
job_duration_seconds=time.monotonic() - run_start,
|
||||
)
|
||||
|
||||
def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]:
|
||||
tenant_id_list = list(tenant_ids)
|
||||
|
||||
@ -265,6 +265,61 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
cleanup.run()
|
||||
|
||||
|
||||
def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FakeRepo(
|
||||
batches=[[FakeRun("run-free", "t_free", cutoff)]],
|
||||
delete_result={
|
||||
"runs": 0,
|
||||
"node_executions": 2,
|
||||
"offloads": 1,
|
||||
"app_logs": 3,
|
||||
"trigger_logs": 4,
|
||||
"pauses": 5,
|
||||
"pause_reasons": 6,
|
||||
},
|
||||
)
|
||||
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
|
||||
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
|
||||
|
||||
batch_calls: list[dict[str, object]] = []
|
||||
completion_calls: list[dict[str, object]] = []
|
||||
monkeypatch.setattr(cleanup._metrics, "record_batch", lambda **kwargs: batch_calls.append(kwargs))
|
||||
monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs))
|
||||
|
||||
cleanup.run()
|
||||
|
||||
assert len(batch_calls) == 1
|
||||
assert batch_calls[0]["batch_rows"] == 1
|
||||
assert batch_calls[0]["targeted_runs"] == 1
|
||||
assert batch_calls[0]["deleted_runs"] == 1
|
||||
assert batch_calls[0]["related_action"] == "deleted"
|
||||
assert len(completion_calls) == 1
|
||||
assert completion_calls[0]["status"] == "success"
|
||||
|
||||
|
||||
def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
class FailingRepo(FakeRepo):
|
||||
def delete_runs_with_related(
|
||||
self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None
|
||||
) -> dict[str, int]:
|
||||
raise RuntimeError("delete failed")
|
||||
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FailingRepo(batches=[[FakeRun("run-free", "t_free", cutoff)]])
|
||||
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
|
||||
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
|
||||
|
||||
completion_calls: list[dict[str, object]] = []
|
||||
monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs))
|
||||
|
||||
with pytest.raises(RuntimeError, match="delete failed"):
|
||||
cleanup.run()
|
||||
|
||||
assert len(completion_calls) == 1
|
||||
assert completion_calls[0]["status"] == "failed"
|
||||
|
||||
|
||||
def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FakeRepo(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user