diff --git a/api/commands.py b/api/commands.py index 93ddc379fb..2197050dc8 100644 --- a/api/commands.py +++ b/api/commands.py @@ -936,6 +936,12 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[ is_flag=True, help="Preview cleanup results without deleting any workflow run data.", ) +@click.option( + "--task-label", + default="daily", + show_default=True, + help="Stable label value used to distinguish multiple cleanup CronJobs in metrics.", +) def clean_workflow_runs( before_days: int, batch_size: int, @@ -944,6 +950,7 @@ def clean_workflow_runs( start_from: datetime.datetime | None, end_before: datetime.datetime | None, dry_run: bool, + task_label: str, ): """ Clean workflow runs and related workflow data for free tenants. @@ -976,6 +983,7 @@ def clean_workflow_runs( start_from=start_from, end_before=end_before, dry_run=dry_run, + task_label=task_label, ).run() finally: flush_telemetry() @@ -2620,12 +2628,19 @@ def migrate_oss( help="Graceful period in days after subscription expiration, will be ignored when billing is disabled.", ) @click.option("--dry-run", is_flag=True, default=False, help="Show messages logs would be cleaned without deleting") +@click.option( + "--task-label", + default="daily", + show_default=True, + help="Stable label value used to distinguish multiple cleanup CronJobs in metrics.", +) def clean_expired_messages( batch_size: int, graceful_period: int, start_from: datetime.datetime, end_before: datetime.datetime, dry_run: bool, + task_label: str, ): """ Clean expired messages and related data for tenants based on clean policy. @@ -2648,6 +2663,7 @@ def clean_expired_messages( end_before=end_before, batch_size=batch_size, dry_run=dry_run, + task_label=task_label, ) stats = service.run() diff --git a/api/services/retention/conversation/messages_clean_service.py b/api/services/retention/conversation/messages_clean_service.py index 86bf57c64e..c54d98be59 100644 --- a/api/services/retention/conversation/messages_clean_service.py +++ b/api/services/retention/conversation/messages_clean_service.py @@ -40,7 +40,7 @@ class MessagesCleanupMetrics: """ Records low-cardinality OpenTelemetry metrics for expired message cleanup jobs. - We keep labels stable (dry_run/window_mode/status) so these metrics remain + We keep labels stable (dry_run/window_mode/task_label/status) so these metrics remain dashboard-friendly for long-running CronJob executions. """ @@ -53,7 +53,7 @@ class MessagesCleanupMetrics: _batch_duration_seconds: "Histogram | None" _base_attributes: dict[str, str] - def __init__(self, *, dry_run: bool, has_window: bool) -> None: + def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None: self._job_runs_total = None self._batches_total = None self._messages_scanned_total = None @@ -65,6 +65,7 @@ class MessagesCleanupMetrics: "job_name": "messages_cleanup", "dry_run": str(dry_run).lower(), "window_mode": "between" if has_window else "before_cutoff", + "task_label": task_label, } self._init_instruments() @@ -168,6 +169,7 @@ class MessagesCleanService: start_from: datetime.datetime | None = None, batch_size: int = 1000, dry_run: bool = False, + task_label: str = "daily", ) -> None: """ Initialize the service with cleanup parameters. @@ -178,13 +180,20 @@ class MessagesCleanService: start_from: Optional start time (inclusive) of the range batch_size: Number of messages to process per batch dry_run: Whether to perform a dry run (no actual deletion) + task_label: Stable task label to distinguish multiple cleanup CronJobs """ self._policy = policy self._end_before = end_before self._start_from = start_from self._batch_size = batch_size self._dry_run = dry_run - self._metrics = MessagesCleanupMetrics(dry_run=dry_run, has_window=bool(start_from)) + normalized_task_label = task_label.strip() + self._task_label = normalized_task_label or "daily" + self._metrics = MessagesCleanupMetrics( + dry_run=dry_run, + has_window=bool(start_from), + task_label=self._task_label, + ) @classmethod def from_time_range( @@ -194,6 +203,7 @@ class MessagesCleanService: end_before: datetime.datetime, batch_size: int = 1000, dry_run: bool = False, + task_label: str = "daily", ) -> "MessagesCleanService": """ Create a service instance for cleaning messages within a specific time range. @@ -206,6 +216,7 @@ class MessagesCleanService: end_before: End time (exclusive) of the range batch_size: Number of messages to process per batch dry_run: Whether to perform a dry run (no actual deletion) + task_label: Stable task label to distinguish multiple cleanup CronJobs Returns: MessagesCleanService instance @@ -233,6 +244,7 @@ class MessagesCleanService: start_from=start_from, batch_size=batch_size, dry_run=dry_run, + task_label=task_label, ) @classmethod @@ -242,6 +254,7 @@ class MessagesCleanService: days: int = 30, batch_size: int = 1000, dry_run: bool = False, + task_label: str = "daily", ) -> "MessagesCleanService": """ Create a service instance for cleaning messages older than specified days. @@ -251,6 +264,7 @@ class MessagesCleanService: days: Number of days to look back from now batch_size: Number of messages to process per batch dry_run: Whether to perform a dry run (no actual deletion) + task_label: Stable task label to distinguish multiple cleanup CronJobs Returns: MessagesCleanService instance @@ -274,7 +288,14 @@ class MessagesCleanService: policy.__class__.__name__, ) - return cls(policy=policy, end_before=end_before, start_from=None, batch_size=batch_size, dry_run=dry_run) + return cls( + policy=policy, + end_before=end_before, + start_from=None, + batch_size=batch_size, + dry_run=dry_run, + task_label=task_label, + ) def run(self) -> dict[str, int]: """ diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py index 57e411fd26..70c45b191e 100644 --- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py @@ -28,7 +28,7 @@ class WorkflowRunCleanupMetrics: """ Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs. - Metrics are emitted with stable labels only (dry_run/window_mode/status) + Metrics are emitted with stable labels only (dry_run/window_mode/task_label/status) to keep dashboard and alert cardinality predictable in production clusters. """ @@ -43,7 +43,7 @@ class WorkflowRunCleanupMetrics: _batch_duration_seconds: "Histogram | None" _base_attributes: dict[str, str] - def __init__(self, *, dry_run: bool, has_window: bool) -> None: + def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None: self._job_runs_total = None self._batches_total = None self._runs_scanned_total = None @@ -57,6 +57,7 @@ class WorkflowRunCleanupMetrics: "job_name": "workflow_run_cleanup", "dry_run": str(dry_run).lower(), "window_mode": "between" if has_window else "before_cutoff", + "task_label": task_label, } self._init_instruments() @@ -178,6 +179,7 @@ class WorkflowRunCleanup: end_before: datetime.datetime | None = None, workflow_run_repo: APIWorkflowRunRepository | None = None, dry_run: bool = False, + task_label: str = "daily", ): if (start_from is None) ^ (end_before is None): raise ValueError("start_from and end_before must be both set or both omitted.") @@ -195,9 +197,12 @@ class WorkflowRunCleanup: self.batch_size = batch_size self._cleanup_whitelist: set[str] | None = None self.dry_run = dry_run + normalized_task_label = task_label.strip() + self.task_label = normalized_task_label or "daily" self._metrics = WorkflowRunCleanupMetrics( dry_run=dry_run, has_window=bool(start_from), + task_label=self.task_label, ) self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD self.workflow_run_repo: APIWorkflowRunRepository