From 6ab5cf109baa9597e38ec28e19647d0e436413b4 Mon Sep 17 00:00:00 2001 From: Ingram Z Date: Wed, 17 Jun 2026 11:19:26 +0800 Subject: [PATCH] refactor: optimize free plan workflow run cleanup batching (#37227) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../api_workflow_run_repository.py | 72 ++++ .../sqlalchemy_api_workflow_run_repository.py | 187 +++++++++- ...ear_free_plan_expired_workflow_run_logs.py | 103 ++++-- ...alchemy_workflow_run_cleanup_repository.py | 320 ++++++++++++++++++ ...ear_free_plan_expired_workflow_run_logs.py | 66 ++-- ...ear_free_plan_expired_workflow_run_logs.py | 169 ++++++--- 6 files changed, 812 insertions(+), 105 deletions(-) create mode 100644 api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_workflow_run_cleanup_repository.py diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 72b38e7906..2659e55055 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -35,6 +35,7 @@ Example: """ from collections.abc import Callable, Sequence +from dataclasses import dataclass from datetime import datetime from typing import Protocol, TypedDict @@ -65,6 +66,21 @@ class RunsWithRelatedCountsDict(TypedDict): pause_reasons: int +@dataclass(frozen=True) +class WorkflowRunCleanupRef: + """ + Lightweight workflow run reference for retention cleanup scans. + + Cleanup jobs use this DTO when they only need cursor, tenant eligibility, and run-id deletion data. Keeping the + query shape explicit prevents free-plan cleanup from hydrating full WorkflowRun models for rows that may be skipped + after billing checks. + """ + + id: str + tenant_id: str + created_at: datetime + + class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): """ Protocol for service-layer WorkflowRun repository operations. @@ -286,6 +302,36 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): """ ... + def get_cleanup_refs_batch_by_time_range( + self, + start_from: datetime | None, + end_before: datetime, + last_seen: tuple[datetime, str] | None, + batch_size: int, + run_types: Sequence[WorkflowType] | None = None, + tenant_ids: Sequence[str] | None = None, + workflow_ids: Sequence[str] | None = None, + upper_bound: tuple[datetime, str] | None = None, + ) -> Sequence[WorkflowRunCleanupRef]: + """ + Fetch lightweight ended workflow run refs in a time window for cleanup batching. + + Args: + start_from: Optional inclusive lower time boundary. + end_before: Exclusive upper time boundary. + last_seen: Optional exclusive `(created_at, id)` cursor lower bound. + batch_size: Maximum number of refs to return. + run_types: Optional workflow type filter. + tenant_ids: Optional tenant filter. + workflow_ids: Optional workflow ID filter. + upper_bound: Optional inclusive `(created_at, id)` cursor upper bound. Cleanup uses this for a second, + tenant-filtered target query that must stay within the candidate page high-water cursor. + + Returns: + Ordered lightweight cleanup refs containing only id, tenant_id, and created_at. + """ + ... + def get_archived_run_ids( self, session: Session, @@ -370,6 +416,19 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): """ ... + def delete_runs_with_related_by_ids( + self, + run_ids: Sequence[str], + delete_node_executions: Callable[[Session, Sequence[str]], tuple[int, int]] | None = None, + delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None, + ) -> RunsWithRelatedCountsDict: + """ + Delete workflow runs and cleanup-owned related records by workflow run IDs. + + This mirrors delete_runs_with_related() for cleanup callers that do not need full WorkflowRun models. + """ + ... + def get_app_logs_by_run_id( self, session: Session, @@ -417,6 +476,19 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): """ ... + def count_runs_with_related_by_ids( + self, + run_ids: Sequence[str], + count_node_executions: Callable[[Session, Sequence[str]], tuple[int, int]] | None = None, + count_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None, + ) -> RunsWithRelatedCountsDict: + """ + Count workflow runs and cleanup-owned related records by workflow run IDs. + + This mirrors count_runs_with_related() for dry-run cleanup callers that do not need full WorkflowRun models. + """ + ... + def create_workflow_pause( self, workflow_run_id: str, diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index cbc9d03e5e..98c605f0a1 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -44,7 +44,11 @@ from libs.time_parser import get_time_threshold from models.enums import WorkflowRunTriggeredFrom from models.human_input import HumanInputForm, HumanInputFormRecipient from models.workflow import WorkflowAppLog, WorkflowArchiveLog, WorkflowPause, WorkflowPauseReason, WorkflowRun -from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict +from repositories.api_workflow_run_repository import ( + APIWorkflowRunRepository, + RunsWithRelatedCountsDict, + WorkflowRunCleanupRef, +) from repositories.entities.workflow_pause import WorkflowPauseEntity from repositories.types import ( AverageInteractionStats, @@ -420,6 +424,71 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): return session.scalars(stmt).all() + @override + def get_cleanup_refs_batch_by_time_range( + self, + start_from: datetime | None, + end_before: datetime, + last_seen: tuple[datetime, str] | None, + batch_size: int, + run_types: Sequence[WorkflowType] | None = None, + tenant_ids: Sequence[str] | None = None, + workflow_ids: Sequence[str] | None = None, + upper_bound: tuple[datetime, str] | None = None, + ) -> Sequence[WorkflowRunCleanupRef]: + """ + Fetch lightweight ended workflow run refs in a time window for cleanup batching. + + The optional upper_bound is inclusive and is paired with last_seen by free-plan cleanup so a second, + tenant-filtered target query stays within the candidate page already checked against billing. + """ + with self._session_maker() as session: + stmt = ( + select(WorkflowRun.id, WorkflowRun.tenant_id, WorkflowRun.created_at) + .where( + WorkflowRun.created_at < end_before, + WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()), + ) + .order_by(WorkflowRun.created_at.asc(), WorkflowRun.id.asc()) + .limit(batch_size) + ) + if run_types is not None: + if not run_types: + return [] + stmt = stmt.where(WorkflowRun.type.in_(run_types)) + + if start_from: + stmt = stmt.where(WorkflowRun.created_at >= start_from) + + if tenant_ids: + stmt = stmt.where(WorkflowRun.tenant_id.in_(tenant_ids)) + + if workflow_ids: + stmt = stmt.where(WorkflowRun.workflow_id.in_(workflow_ids)) + + if last_seen: + stmt = stmt.where( + tuple_(WorkflowRun.created_at, WorkflowRun.id) + > tuple_( + sa.literal(last_seen[0], type_=sa.DateTime()), + sa.literal(last_seen[1], type_=WorkflowRun.id.type), + ) + ) + + if upper_bound: + stmt = stmt.where( + tuple_(WorkflowRun.created_at, WorkflowRun.id) + <= tuple_( + sa.literal(upper_bound[0], type_=sa.DateTime()), + sa.literal(upper_bound[1], type_=WorkflowRun.id.type), + ) + ) + + return [ + WorkflowRunCleanupRef(id=run_id, tenant_id=tenant_id, created_at=created_at) + for run_id, tenant_id, created_at in session.execute(stmt).all() + ] + @override def get_archived_run_ids( self, @@ -530,6 +599,56 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): "pause_reasons": pause_reasons_deleted, } + @override + def delete_runs_with_related_by_ids( + self, + run_ids: Sequence[str], + delete_node_executions: Callable[[Session, Sequence[str]], tuple[int, int]] | None = None, + delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None, + ) -> RunsWithRelatedCountsDict: + if not run_ids: + return self._empty_runs_with_related_counts() + + run_ids = list(run_ids) + with self._session_maker() as session: + if delete_node_executions: + node_executions_deleted, offloads_deleted = delete_node_executions(session, run_ids) + else: + node_executions_deleted, offloads_deleted = 0, 0 + + app_logs_result = session.execute(delete(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(run_ids))) + app_logs_deleted = cast(CursorResult, app_logs_result).rowcount or 0 + + pause_stmt = select(WorkflowPause.id).where(WorkflowPause.workflow_run_id.in_(run_ids)) + pause_ids = session.scalars(pause_stmt).all() + pause_reasons_deleted = 0 + pauses_deleted = 0 + + if pause_ids: + pause_reasons_result = session.execute( + delete(WorkflowPauseReason).where(WorkflowPauseReason.pause_id.in_(pause_ids)) + ) + pause_reasons_deleted = cast(CursorResult, pause_reasons_result).rowcount or 0 + pauses_result = session.execute(delete(WorkflowPause).where(WorkflowPause.id.in_(pause_ids))) + pauses_deleted = cast(CursorResult, pauses_result).rowcount or 0 + + trigger_logs_deleted = delete_trigger_logs(session, run_ids) if delete_trigger_logs else 0 + + runs_result = session.execute(delete(WorkflowRun).where(WorkflowRun.id.in_(run_ids))) + runs_deleted = cast(CursorResult, runs_result).rowcount or 0 + + session.commit() + + return { + "runs": runs_deleted, + "node_executions": node_executions_deleted, + "offloads": offloads_deleted, + "app_logs": app_logs_deleted, + "trigger_logs": trigger_logs_deleted, + "pauses": pauses_deleted, + "pause_reasons": pause_reasons_deleted, + } + @override def get_app_logs_by_run_id( self, @@ -711,6 +830,72 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): "pause_reasons": int(pause_reasons_count), } + @override + def count_runs_with_related_by_ids( + self, + run_ids: Sequence[str], + count_node_executions: Callable[[Session, Sequence[str]], tuple[int, int]] | None = None, + count_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None, + ) -> RunsWithRelatedCountsDict: + if not run_ids: + return self._empty_runs_with_related_counts() + + run_ids = list(run_ids) + with self._session_maker() as session: + if count_node_executions: + node_executions_count, offloads_count = count_node_executions(session, run_ids) + else: + node_executions_count, offloads_count = 0, 0 + + runs_count = ( + session.scalar(select(func.count()).select_from(WorkflowRun).where(WorkflowRun.id.in_(run_ids))) or 0 + ) + app_logs_count = ( + session.scalar( + select(func.count()).select_from(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(run_ids)) + ) + or 0 + ) + + pause_ids = session.scalars( + select(WorkflowPause.id).where(WorkflowPause.workflow_run_id.in_(run_ids)) + ).all() + pauses_count = len(pause_ids) + pause_reasons_count = 0 + if pause_ids: + pause_reasons_count = ( + session.scalar( + select(func.count()) + .select_from(WorkflowPauseReason) + .where(WorkflowPauseReason.pause_id.in_(pause_ids)) + ) + or 0 + ) + + trigger_logs_count = count_trigger_logs(session, run_ids) if count_trigger_logs else 0 + + return { + "runs": int(runs_count), + "node_executions": node_executions_count, + "offloads": offloads_count, + "app_logs": int(app_logs_count), + "trigger_logs": trigger_logs_count, + "pauses": pauses_count, + "pause_reasons": int(pause_reasons_count), + } + + @staticmethod + def _empty_runs_with_related_counts() -> RunsWithRelatedCountsDict: + return { + "runs": 0, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } + @override def create_workflow_pause( self, diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py index 58e8ac57a8..3652997f8a 100644 --- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py @@ -1,3 +1,10 @@ +"""Cleanup expired workflow run logs for free-plan tenants. + +The cleanup service owns billing eligibility decisions while repositories own database-efficient batch selection and +deletion. Free-plan cleanup intentionally scans lightweight workflow run references first, then re-queries the same +candidate cursor slice with eligible tenant IDs so paid tenants are skipped without hydrating full WorkflowRun models. +""" + import datetime import logging import random @@ -11,8 +18,11 @@ from sqlalchemy.orm import Session, sessionmaker from configs import dify_config from enums.cloud_plan import CloudPlan from extensions.ext_database import db -from models.workflow import WorkflowRun -from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict +from repositories.api_workflow_run_repository import ( + APIWorkflowRunRepository, + RunsWithRelatedCountsDict, + WorkflowRunCleanupRef, +) from repositories.factory import DifyAPIRepositoryFactory from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository from services.billing_service import BillingService, SubscriptionPlan @@ -186,6 +196,13 @@ _RELATED_RECORD_KEYS = ("node_executions", "offloads", "app_logs", "trigger_logs class WorkflowRunCleanup: + """ + Coordinates free-plan workflow run retention cleanup. + + The cleanup cursor advances by candidate refs, not target refs. This keeps pagination stable + when billing filters out paid or unknown tenants before the repository performs the target lookup. + """ + def __init__( self, days: int, @@ -254,26 +271,28 @@ class WorkflowRunCleanup: batch_start = time.monotonic() fetch_start = time.monotonic() - run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( + candidate_last_seen = last_seen + candidate_refs = self.workflow_run_repo.get_cleanup_refs_batch_by_time_range( start_from=self.window_start, end_before=self.window_end, - last_seen=last_seen, + last_seen=candidate_last_seen, batch_size=self.batch_size, ) - if not run_rows: + if not candidate_refs: logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1) break batch_index += 1 - last_seen = (run_rows[-1].created_at, run_rows[-1].id) + candidate_high_water = self._cursor_from_ref(candidate_refs[-1]) + last_seen = candidate_high_water logger.info( - "workflow_run_cleanup (batch #%s): fetched %s rows in %sms", + "workflow_run_cleanup (batch #%s): fetched %s candidate refs in %sms", batch_index, - len(run_rows), + len(candidate_refs), int((time.monotonic() - fetch_start) * 1000), ) - tenant_ids = {row.tenant_id for row in run_rows} + tenant_ids = {ref.tenant_id for ref in candidate_refs} filter_start = time.monotonic() free_tenants = self._filter_free_tenants(tenant_ids) @@ -285,10 +304,28 @@ class WorkflowRunCleanup: int((time.monotonic() - filter_start) * 1000), ) - free_runs = [row for row in run_rows if row.tenant_id in free_tenants] - paid_or_skipped = len(run_rows) - len(free_runs) + target_refs: Sequence[WorkflowRunCleanupRef] = [] + if free_tenants: + target_fetch_start = time.monotonic() + target_refs = self.workflow_run_repo.get_cleanup_refs_batch_by_time_range( + start_from=self.window_start, + end_before=self.window_end, + last_seen=candidate_last_seen, + batch_size=self.batch_size, + tenant_ids=sorted(free_tenants), + upper_bound=candidate_high_water, + ) + logger.info( + "workflow_run_cleanup (batch #%s): fetched %s target refs in %sms", + batch_index, + len(target_refs), + int((time.monotonic() - target_fetch_start) * 1000), + ) - if not free_runs: + target_run_ids = [ref.id for ref in target_refs] + paid_or_skipped = max(len(candidate_refs) - len(target_run_ids), 0) + + if not target_run_ids: skipped_message = ( f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)" ) @@ -299,7 +336,7 @@ class WorkflowRunCleanup: ) ) self._metrics.record_batch( - batch_rows=len(run_rows), + batch_rows=len(candidate_refs), targeted_runs=0, skipped_runs=paid_or_skipped, deleted_runs=0, @@ -309,13 +346,13 @@ class WorkflowRunCleanup: ) continue - total_runs_targeted += len(free_runs) + total_runs_targeted += len(target_run_ids) if self.dry_run: count_start = time.monotonic() - batch_counts = self.workflow_run_repo.count_runs_with_related( - free_runs, - count_node_executions=self._count_node_executions, + batch_counts = self.workflow_run_repo.count_runs_with_related_by_ids( + target_run_ids, + count_node_executions=self._count_node_executions_by_run_ids, count_trigger_logs=self._count_trigger_logs, ) logger.info( @@ -325,10 +362,10 @@ class WorkflowRunCleanup: ) if related_totals is not None: self._accumulate_related_counts(related_totals, batch_counts) - sample_ids = ", ".join(run.id for run in free_runs[:5]) + sample_ids = ", ".join(target_run_ids[:5]) click.echo( click.style( - f"[batch #{batch_index}] would delete {len(free_runs)} runs " + f"[batch #{batch_index}] would delete {len(target_run_ids)} runs " f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown", fg="yellow", ) @@ -339,8 +376,8 @@ class WorkflowRunCleanup: int((time.monotonic() - batch_start) * 1000), ) self._metrics.record_batch( - batch_rows=len(run_rows), - targeted_runs=len(free_runs), + batch_rows=len(candidate_refs), + targeted_runs=len(target_run_ids), skipped_runs=paid_or_skipped, deleted_runs=0, related_counts={ @@ -354,14 +391,14 @@ class WorkflowRunCleanup: try: delete_start = time.monotonic() - counts = self.workflow_run_repo.delete_runs_with_related( - free_runs, - delete_node_executions=self._delete_node_executions, + counts = self.workflow_run_repo.delete_runs_with_related_by_ids( + target_run_ids, + delete_node_executions=self._delete_node_executions_by_run_ids, delete_trigger_logs=self._delete_trigger_logs, ) delete_ms = int((time.monotonic() - delete_start) * 1000) except Exception: - logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0]) + logger.exception("Failed to delete workflow runs batch ending at %s", candidate_high_water[0]) raise total_runs_deleted += counts["runs"] @@ -382,8 +419,8 @@ class WorkflowRunCleanup: int((time.monotonic() - batch_start) * 1000), ) self._metrics.record_batch( - batch_rows=len(run_rows), - targeted_runs=len(free_runs), + batch_rows=len(candidate_refs), + targeted_runs=len(target_run_ids), skipped_runs=paid_or_skipped, deleted_runs=counts["runs"], related_counts={ @@ -439,7 +476,7 @@ class WorkflowRunCleanup: ) def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]: - tenant_id_list = list(tenant_ids) + tenant_id_list = sorted(set(tenant_ids)) if not dify_config.BILLING_ENABLED: return set(tenant_id_list) @@ -553,15 +590,17 @@ class WorkflowRunCleanup: totals["pauses"] += batch.get("pauses", 0) totals["pause_reasons"] += batch.get("pause_reasons", 0) - def _count_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]: - run_ids = [run.id for run in runs] + @staticmethod + def _cursor_from_ref(ref: WorkflowRunCleanupRef) -> tuple[datetime.datetime, str]: + return ref.created_at, ref.id + + def _count_node_executions_by_run_ids(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]: repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository( session_maker=sessionmaker(bind=session.get_bind(), expire_on_commit=False) ) return repo.count_by_runs(session, run_ids) - def _delete_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]: - run_ids = [run.id for run in runs] + def _delete_node_executions_by_run_ids(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]: repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository( session_maker=sessionmaker(bind=session.get_bind(), expire_on_commit=False) ) diff --git a/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_workflow_run_cleanup_repository.py b/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_workflow_run_cleanup_repository.py new file mode 100644 index 0000000000..a2834dc80a --- /dev/null +++ b/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_workflow_run_cleanup_repository.py @@ -0,0 +1,320 @@ +"""Integration tests for workflow run cleanup repository queries.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import override +from uuid import uuid4 + +from sqlalchemy import Engine, select +from sqlalchemy.orm import Session, sessionmaker + +from graphon.entities import WorkflowExecution +from graphon.entities.pause_reason import PauseReasonType +from graphon.enums import WorkflowExecutionStatus, WorkflowType +from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom +from models.workflow import WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowPause, WorkflowPauseReason, WorkflowRun +from repositories.sqlalchemy_api_workflow_run_repository import DifyAPISQLAlchemyWorkflowRunRepository + + +class _TestWorkflowRunRepository(DifyAPISQLAlchemyWorkflowRunRepository): + """Concrete repository for tests where save() is not under test.""" + + @override + def save(self, execution: WorkflowExecution) -> None: + return None + + +@dataclass +class _TestScope: + """Per-test identifiers for rows created by cleanup repository tests.""" + + tenant_id: str = field(default_factory=lambda: str(uuid4())) + app_id: str = field(default_factory=lambda: str(uuid4())) + workflow_id: str = field(default_factory=lambda: str(uuid4())) + user_id: str = field(default_factory=lambda: str(uuid4())) + + +def _repository(db_session_with_containers: Session) -> DifyAPISQLAlchemyWorkflowRunRepository: + engine = db_session_with_containers.get_bind() + assert isinstance(engine, Engine) + return _TestWorkflowRunRepository(session_maker=sessionmaker(bind=engine, expire_on_commit=False)) + + +def _create_workflow_run( + session: Session, + scope: _TestScope, + *, + status: WorkflowExecutionStatus = WorkflowExecutionStatus.SUCCEEDED, + created_at: datetime, + tenant_id: str | None = None, + workflow_id: str | None = None, + workflow_type: str = WorkflowType.WORKFLOW, +) -> WorkflowRun: + workflow_run = WorkflowRun( + id=str(uuid4()), + tenant_id=tenant_id or scope.tenant_id, + app_id=scope.app_id, + workflow_id=workflow_id or scope.workflow_id, + type=workflow_type, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + version="draft", + graph="{}", + inputs="{}", + status=status, + created_by_role=CreatorUserRole.ACCOUNT, + created_by=scope.user_id, + created_at=created_at, + ) + session.add(workflow_run) + session.commit() + return workflow_run + + +def _add_app_log(session: Session, scope: _TestScope, workflow_run: WorkflowRun) -> None: + session.add( + WorkflowAppLog( + tenant_id=workflow_run.tenant_id, + app_id=scope.app_id, + workflow_id=workflow_run.workflow_id, + workflow_run_id=workflow_run.id, + created_from=WorkflowAppLogCreatedFrom.SERVICE_API, + created_by_role=CreatorUserRole.ACCOUNT, + created_by=scope.user_id, + ) + ) + session.commit() + + +def _add_pause_with_reason(session: Session, scope: _TestScope, workflow_run: WorkflowRun) -> WorkflowPause: + pause = WorkflowPause( + workflow_id=workflow_run.workflow_id, + workflow_run_id=workflow_run.id, + state_object_key=f"workflow-state-{uuid4()}.json", + ) + pause_reason = WorkflowPauseReason( + pause_id=pause.id, + type_=PauseReasonType.SCHEDULED_PAUSE, + message="scheduled pause", + ) + session.add_all([pause, pause_reason]) + session.commit() + return pause + + +class TestGetCleanupRefsBatchByTimeRange: + def test_applies_cursor_window_and_cleanup_filters(self, db_session_with_containers: Session) -> None: + repository = _repository(db_session_with_containers) + scope = _TestScope() + base = datetime(2024, 1, 1, 12, 0, 0) + + _create_workflow_run(db_session_with_containers, scope, created_at=base - timedelta(minutes=1)) + cursor_run = _create_workflow_run(db_session_with_containers, scope, created_at=base) + first_target = _create_workflow_run(db_session_with_containers, scope, created_at=base + timedelta(minutes=1)) + second_target = _create_workflow_run( + db_session_with_containers, + scope, + status=WorkflowExecutionStatus.FAILED, + created_at=base + timedelta(minutes=2), + ) + _create_workflow_run( + db_session_with_containers, + scope, + status=WorkflowExecutionStatus.RUNNING, + created_at=base + timedelta(minutes=1), + ) + _create_workflow_run( + db_session_with_containers, + scope, + created_at=base + timedelta(minutes=1), + tenant_id=str(uuid4()), + ) + _create_workflow_run( + db_session_with_containers, + scope, + created_at=base + timedelta(minutes=1), + workflow_id=str(uuid4()), + ) + _create_workflow_run( + db_session_with_containers, + scope, + created_at=base + timedelta(minutes=1), + workflow_type=WorkflowType.CHAT, + ) + _create_workflow_run(db_session_with_containers, scope, created_at=base + timedelta(minutes=3)) + + refs = repository.get_cleanup_refs_batch_by_time_range( + start_from=base, + end_before=base + timedelta(minutes=4), + last_seen=(cursor_run.created_at, cursor_run.id), + batch_size=10, + run_types=[WorkflowType.WORKFLOW], + tenant_ids=[scope.tenant_id], + workflow_ids=[scope.workflow_id], + upper_bound=(second_target.created_at, second_target.id), + ) + + assert [(ref.id, ref.tenant_id, ref.created_at) for ref in refs] == [ + (first_target.id, scope.tenant_id, first_target.created_at), + (second_target.id, scope.tenant_id, second_target.created_at), + ] + + def test_returns_empty_when_run_type_filter_is_empty(self, db_session_with_containers: Session) -> None: + repository = _repository(db_session_with_containers) + + refs = repository.get_cleanup_refs_batch_by_time_range( + start_from=None, + end_before=datetime(2024, 1, 2), + last_seen=None, + batch_size=10, + run_types=[], + ) + + assert refs == [] + + +class TestCountRunsWithRelatedByIds: + def test_counts_existing_runs_and_related_rows(self, db_session_with_containers: Session) -> None: + repository = _repository(db_session_with_containers) + scope = _TestScope() + workflow_run = _create_workflow_run( + db_session_with_containers, + scope, + created_at=datetime(2024, 1, 1, 12, 0, 0), + ) + missing_run_id = str(uuid4()) + _add_app_log(db_session_with_containers, scope, workflow_run) + _add_pause_with_reason(db_session_with_containers, scope, workflow_run) + counted_node_run_ids: list[str] = [] + counted_trigger_run_ids: list[str] = [] + + counts = repository.count_runs_with_related_by_ids( + [workflow_run.id, missing_run_id], + count_node_executions=lambda _session, run_ids: counted_node_run_ids.extend(run_ids) or (2, 1), + count_trigger_logs=lambda _session, run_ids: counted_trigger_run_ids.extend(run_ids) or 3, + ) + + assert counted_node_run_ids == [workflow_run.id, missing_run_id] + assert counted_trigger_run_ids == [workflow_run.id, missing_run_id] + assert counts == { + "runs": 1, + "node_executions": 2, + "offloads": 1, + "app_logs": 1, + "trigger_logs": 3, + "pauses": 1, + "pause_reasons": 1, + } + + def test_defaults_optional_related_counts(self, db_session_with_containers: Session) -> None: + repository = _repository(db_session_with_containers) + scope = _TestScope() + workflow_run = _create_workflow_run( + db_session_with_containers, + scope, + created_at=datetime(2024, 1, 1, 12, 0, 0), + ) + + counts = repository.count_runs_with_related_by_ids([workflow_run.id]) + + assert counts == { + "runs": 1, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } + + +class TestDeleteRunsWithRelatedByIds: + def test_deletes_runs_and_related_rows(self, db_session_with_containers: Session) -> None: + repository = _repository(db_session_with_containers) + scope = _TestScope() + workflow_run = _create_workflow_run( + db_session_with_containers, + scope, + created_at=datetime(2024, 1, 1, 12, 0, 0), + ) + _add_app_log(db_session_with_containers, scope, workflow_run) + pause = _add_pause_with_reason(db_session_with_containers, scope, workflow_run) + pause_id = pause.id + deleted_node_run_ids: list[str] = [] + deleted_trigger_run_ids: list[str] = [] + + counts = repository.delete_runs_with_related_by_ids( + [workflow_run.id], + delete_node_executions=lambda _session, run_ids: deleted_node_run_ids.extend(run_ids) or (2, 1), + delete_trigger_logs=lambda _session, run_ids: deleted_trigger_run_ids.extend(run_ids) or 3, + ) + + assert deleted_node_run_ids == [workflow_run.id] + assert deleted_trigger_run_ids == [workflow_run.id] + assert counts == { + "runs": 1, + "node_executions": 2, + "offloads": 1, + "app_logs": 1, + "trigger_logs": 3, + "pauses": 1, + "pause_reasons": 1, + } + verification_session = Session(bind=db_session_with_containers.get_bind()) + with verification_session: + assert verification_session.get(WorkflowRun, workflow_run.id) is None + assert verification_session.get(WorkflowPause, pause_id) is None + assert ( + verification_session.scalar( + select(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id == workflow_run.id) + ) + is None + ) + assert ( + verification_session.scalar(select(WorkflowPauseReason).where(WorkflowPauseReason.pause_id == pause_id)) + is None + ) + + def test_defaults_optional_related_counts(self, db_session_with_containers: Session) -> None: + repository = _repository(db_session_with_containers) + scope = _TestScope() + workflow_run = _create_workflow_run( + db_session_with_containers, + scope, + created_at=datetime(2024, 1, 1, 12, 0, 0), + ) + + counts = repository.delete_runs_with_related_by_ids([workflow_run.id]) + + assert counts == { + "runs": 1, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } + + def test_empty_ids_return_empty_counts(self, db_session_with_containers: Session) -> None: + repository = _repository(db_session_with_containers) + + assert repository.count_runs_with_related_by_ids([]) == { + "runs": 0, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } + assert repository.delete_runs_with_related_by_ids([]) == { + "runs": 0, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } diff --git a/api/tests/unit_tests/services/retention/workflow_run/test_clear_free_plan_expired_workflow_run_logs.py b/api/tests/unit_tests/services/retention/workflow_run/test_clear_free_plan_expired_workflow_run_logs.py index 7d30645d38..1e15a72f47 100644 --- a/api/tests/unit_tests/services/retention/workflow_run/test_clear_free_plan_expired_workflow_run_logs.py +++ b/api/tests/unit_tests/services/retention/workflow_run/test_clear_free_plan_expired_workflow_run_logs.py @@ -7,15 +7,16 @@ from unittest.mock import MagicMock, patch import pytest +from repositories.api_workflow_run_repository import WorkflowRunCleanupRef from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup -def make_run(tenant_id: str = "t1", run_id: str = "r1", created_at: datetime.datetime | None = None): - run = MagicMock() - run.tenant_id = tenant_id - run.id = run_id - run.created_at = created_at or datetime.datetime(2024, 1, 1, tzinfo=datetime.UTC) - return run +def make_ref(tenant_id: str = "t1", run_id: str = "r1", created_at: datetime.datetime | None = None): + return WorkflowRunCleanupRef( + id=run_id, + tenant_id=tenant_id, + created_at=created_at or datetime.datetime(2024, 1, 1, tzinfo=datetime.UTC), + ) @pytest.fixture @@ -341,28 +342,28 @@ class TestRunDeleteMode: return WorkflowRunCleanup(days=30, batch_size=10, workflow_run_repo=mock_repo) def test_no_rows_stops_immediately(self, mock_repo): - mock_repo.get_runs_batch_by_time_range.return_value = [] + mock_repo.get_cleanup_refs_batch_by_time_range.return_value = [] c = self._make_cleanup(mock_repo) with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg: cfg.BILLING_ENABLED = False c.run() - mock_repo.delete_runs_with_related.assert_not_called() + mock_repo.delete_runs_with_related_by_ids.assert_not_called() def test_all_paid_skips_delete(self, mock_repo): - run = make_run("t1") - mock_repo.get_runs_batch_by_time_range.side_effect = [[run], []] + ref = make_ref("t1") + mock_repo.get_cleanup_refs_batch_by_time_range.side_effect = [[ref], []] c = self._make_cleanup(mock_repo) # billing disabled -> all free; but let's override _filter_free_tenants to return empty c._filter_free_tenants = MagicMock(return_value=set()) with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg: cfg.BILLING_ENABLED = False c.run() - mock_repo.delete_runs_with_related.assert_not_called() + mock_repo.delete_runs_with_related_by_ids.assert_not_called() def test_runs_deleted_successfully(self, mock_repo): - run = make_run("t1") - mock_repo.get_runs_batch_by_time_range.side_effect = [[run], []] - mock_repo.delete_runs_with_related.return_value = { + ref = make_ref("t1") + mock_repo.get_cleanup_refs_batch_by_time_range.side_effect = [[ref], [ref], []] + mock_repo.delete_runs_with_related_by_ids.return_value = { "runs": 1, "node_executions": 0, "offloads": 0, @@ -376,12 +377,12 @@ class TestRunDeleteMode: cfg.BILLING_ENABLED = False with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.time.sleep"): c.run() - mock_repo.delete_runs_with_related.assert_called_once() + mock_repo.delete_runs_with_related_by_ids.assert_called_once() def test_delete_exception_reraises(self, mock_repo): - run = make_run("t1") - mock_repo.get_runs_batch_by_time_range.side_effect = [[run], []] - mock_repo.delete_runs_with_related.side_effect = RuntimeError("db error") + ref = make_ref("t1") + mock_repo.get_cleanup_refs_batch_by_time_range.side_effect = [[ref], [ref]] + mock_repo.delete_runs_with_related_by_ids.side_effect = RuntimeError("db error") c = self._make_cleanup(mock_repo) with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg: cfg.BILLING_ENABLED = False @@ -389,7 +390,7 @@ class TestRunDeleteMode: c.run() def test_summary_with_window_start(self, mock_repo): - mock_repo.get_runs_batch_by_time_range.return_value = [] + mock_repo.get_cleanup_refs_batch_by_time_range.return_value = [] with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg: cfg.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD = 0 cfg.BILLING_ENABLED = False @@ -421,9 +422,10 @@ class TestRunDryRunMode: ) def test_dry_run_no_delete_called(self, mock_repo): - run = make_run("t1") - mock_repo.get_runs_batch_by_time_range.side_effect = [[run], []] - mock_repo.count_runs_with_related.return_value = { + ref = make_ref("t1") + mock_repo.get_cleanup_refs_batch_by_time_range.side_effect = [[ref], [ref], []] + mock_repo.count_runs_with_related_by_ids.return_value = { + "runs": 1, "node_executions": 2, "offloads": 0, "app_logs": 0, @@ -435,11 +437,11 @@ class TestRunDryRunMode: with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg: cfg.BILLING_ENABLED = False c.run() - mock_repo.delete_runs_with_related.assert_not_called() - mock_repo.count_runs_with_related.assert_called_once() + mock_repo.delete_runs_with_related_by_ids.assert_not_called() + mock_repo.count_runs_with_related_by_ids.assert_called_once() def test_dry_run_summary_with_window_start(self, mock_repo): - mock_repo.get_runs_batch_by_time_range.return_value = [] + mock_repo.get_cleanup_refs_batch_by_time_range.return_value = [] with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg: cfg.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD = 0 cfg.BILLING_ENABLED = False @@ -454,14 +456,14 @@ class TestRunDryRunMode: c.run() def test_dry_run_all_paid_skips_count(self, mock_repo): - run = make_run("t1") - mock_repo.get_runs_batch_by_time_range.side_effect = [[run], []] + ref = make_ref("t1") + mock_repo.get_cleanup_refs_batch_by_time_range.side_effect = [[ref], []] c = self._make_dry_cleanup(mock_repo) c._filter_free_tenants = MagicMock(return_value=set()) with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg: cfg.BILLING_ENABLED = False c.run() - mock_repo.count_runs_with_related.assert_not_called() + mock_repo.count_runs_with_related_by_ids.assert_not_called() # --------------------------------------------------------------------------- @@ -492,7 +494,7 @@ class TestTriggerLogMethods: # --------------------------------------------------------------------------- -# _count_node_executions / _delete_node_executions +# _count_node_executions_by_run_ids / _delete_node_executions_by_run_ids # --------------------------------------------------------------------------- @@ -500,25 +502,23 @@ class TestNodeExecutionMethods: def test_count_node_executions(self, cleanup): session = MagicMock() session.get_bind.return_value = MagicMock() - runs = [make_run("t1", "r1")] with patch( "services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.DifyAPIRepositoryFactory" ) as factory: repo = factory.create_api_workflow_node_execution_repository.return_value repo.count_by_runs.return_value = (10, 2) with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.sessionmaker"): - result = cleanup._count_node_executions(session, runs) + result = cleanup._count_node_executions_by_run_ids(session, ["r1"]) assert result == (10, 2) def test_delete_node_executions(self, cleanup): session = MagicMock() session.get_bind.return_value = MagicMock() - runs = [make_run("t1", "r1")] with patch( "services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.DifyAPIRepositoryFactory" ) as factory: repo = factory.create_api_workflow_node_execution_repository.return_value repo.delete_by_runs.return_value = (5, 1) with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.sessionmaker"): - result = cleanup._delete_node_executions(session, runs) + result = cleanup._delete_node_executions_by_run_ids(session, ["r1"]) assert result == (5, 1) diff --git a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py index 6bf78d3411..60488beb24 100644 --- a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py +++ b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py @@ -3,38 +3,27 @@ from typing import Any import pytest +from repositories.api_workflow_run_repository import WorkflowRunCleanupRef from services.billing_service import SubscriptionPlan from services.retention.workflow_run import clear_free_plan_expired_workflow_run_logs as cleanup_module from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup -class FakeRun: - def __init__( - self, - run_id: str, - tenant_id: str, - created_at: datetime.datetime, - app_id: str = "app-1", - workflow_id: str = "wf-1", - triggered_from: str = "workflow-run", - ) -> None: - self.id = run_id - self.tenant_id = tenant_id - self.app_id = app_id - self.workflow_id = workflow_id - self.triggered_from = triggered_from - self.created_at = created_at +def make_ref(run_id: str, tenant_id: str, created_at: datetime.datetime) -> WorkflowRunCleanupRef: + return WorkflowRunCleanupRef(id=run_id, tenant_id=tenant_id, created_at=created_at) class FakeRepo: def __init__( self, - batches: list[list[FakeRun]], + batches: list[list[WorkflowRunCleanupRef]], delete_result: dict[str, int] | None = None, count_result: dict[str, int] | None = None, ) -> None: self.batches = batches - self.call_idx = 0 + self.candidate_call_idx = 0 + self.last_candidate_batch: list[WorkflowRunCleanupRef] = [] + self.cleanup_ref_calls: list[dict[str, object]] = [] self.deleted: list[list[str]] = [] self.counted: list[list[str]] = [] self.delete_result = delete_result or { @@ -56,7 +45,7 @@ class FakeRepo: "pause_reasons": 0, } - def get_runs_batch_by_time_range( + def get_cleanup_refs_batch_by_time_range( self, start_from: datetime.datetime | None, end_before: datetime.datetime, @@ -65,27 +54,50 @@ class FakeRepo: run_types=None, tenant_ids=None, workflow_ids=None, - ) -> list[FakeRun]: - if self.call_idx >= len(self.batches): + upper_bound: tuple[datetime.datetime, str] | None = None, + ) -> list[WorkflowRunCleanupRef]: + self.cleanup_ref_calls.append( + { + "start_from": start_from, + "end_before": end_before, + "last_seen": last_seen, + "batch_size": batch_size, + "run_types": run_types, + "tenant_ids": tenant_ids, + "workflow_ids": workflow_ids, + "upper_bound": upper_bound, + } + ) + if tenant_ids is not None or upper_bound is not None: + refs = self.last_candidate_batch + if tenant_ids is not None: + tenant_id_set = set(tenant_ids) + refs = [ref for ref in refs if ref.tenant_id in tenant_id_set] + if upper_bound is not None: + refs = [ref for ref in refs if (ref.created_at, ref.id) <= upper_bound] + return refs[:batch_size] + + if self.candidate_call_idx >= len(self.batches): return [] - batch = self.batches[self.call_idx] - self.call_idx += 1 + batch = self.batches[self.candidate_call_idx] + self.candidate_call_idx += 1 + self.last_candidate_batch = batch return batch - def delete_runs_with_related( - self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None + def delete_runs_with_related_by_ids( + self, run_ids: list[str], delete_node_executions=None, delete_trigger_logs=None ) -> dict[str, int]: - self.deleted.append([run.id for run in runs]) + self.deleted.append(list(run_ids)) result = self.delete_result.copy() - result["runs"] = len(runs) + result["runs"] = len(run_ids) return result - def count_runs_with_related( - self, runs: list[FakeRun], count_node_executions=None, count_trigger_logs=None + def count_runs_with_related_by_ids( + self, run_ids: list[str], count_node_executions=None, count_trigger_logs=None ) -> dict[str, int]: - self.counted.append([run.id for run in runs]) + self.counted.append(list(run_ids)) result = self.count_result.copy() - result["runs"] = len(runs) + result["runs"] = len(run_ids) return result @@ -218,8 +230,8 @@ def test_run_deletes_only_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None: repo = FakeRepo( batches=[ [ - FakeRun("run-free", "t_free", cutoff), - FakeRun("run-paid", "t_paid", cutoff), + make_ref("run-free", "t_free", cutoff), + make_ref("run-paid", "t_paid", cutoff), ] ] ) @@ -240,11 +252,43 @@ def test_run_deletes_only_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None: cleanup.run() assert repo.deleted == [["run-free"]] + assert repo.cleanup_ref_calls[1]["tenant_ids"] == ["t_free"] + + +def test_run_filters_candidate_tenants_before_target_query(monkeypatch: pytest.MonkeyPatch) -> None: + cutoff = datetime.datetime.now() + repo = FakeRepo( + batches=[ + [ + make_ref("run-free", "t_free", cutoff), + make_ref("run-paid", "t_paid", cutoff), + ] + ] + ) + cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) + billing_calls: list[list[str]] = [] + + def fake_bulk(tenant_ids: list[str]) -> dict[str, SubscriptionPlan]: + billing_calls.append(tenant_ids) + return { + "t_free": plan_info("sandbox", -1), + "t_paid": plan_info("team", -1), + } + + monkeypatch.setattr(cleanup_module.BillingService, "get_plan_bulk_with_cache", staticmethod(fake_bulk)) + + cleanup.run() + + assert billing_calls == [["t_free", "t_paid"]] + assert repo.cleanup_ref_calls[1]["tenant_ids"] == ["t_free"] + assert repo.deleted == [["run-free"]] def test_run_skips_when_no_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None: cutoff = datetime.datetime.now() - repo = FakeRepo(batches=[[FakeRun("run-paid", "t_paid", cutoff)]]) + repo = FakeRepo(batches=[[make_ref("run-paid", "t_paid", cutoff)]]) cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10) monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) @@ -257,6 +301,53 @@ def test_run_skips_when_no_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None cleanup.run() assert repo.deleted == [] + assert len(repo.cleanup_ref_calls) == 2 + + +def test_run_paid_only_records_skipped_metrics(monkeypatch: pytest.MonkeyPatch) -> None: + cutoff = datetime.datetime.now() + repo = FakeRepo(batches=[[make_ref("run-paid", "t_paid", cutoff)]]) + cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) + monkeypatch.setattr( + cleanup_module.BillingService, + "get_plan_bulk_with_cache", + staticmethod(lambda tenant_ids: {tenant_id: plan_info("team", 1893456000) for tenant_id in tenant_ids}), + ) + batch_calls: list[dict[str, object]] = [] + monkeypatch.setattr(cleanup._metrics, "record_batch", lambda **kwargs: batch_calls.append(kwargs)) + + cleanup.run() + + assert repo.deleted == [] + assert repo.counted == [] + assert batch_calls[0]["batch_rows"] == 1 + assert batch_calls[0]["targeted_runs"] == 0 + assert batch_calls[0]["skipped_runs"] == 1 + assert batch_calls[0]["deleted_runs"] == 0 + + +def test_run_target_query_is_bounded_by_candidate_high_water(monkeypatch: pytest.MonkeyPatch) -> None: + first_created_at = datetime.datetime(2024, 1, 1, 0, 0, 0) + second_created_at = datetime.datetime(2024, 1, 1, 0, 1, 0) + repo = FakeRepo( + batches=[ + [ + make_ref("run-free-1", "t_free", first_created_at), + make_ref("run-free-2", "t_free", second_created_at), + ] + ] + ) + cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=2) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False) + + cleanup.run() + + assert repo.cleanup_ref_calls[1]["last_seen"] is None + assert repo.cleanup_ref_calls[1]["upper_bound"] == (second_created_at, "run-free-2") + assert repo.cleanup_ref_calls[2]["last_seen"] == (second_created_at, "run-free-2") def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None: @@ -268,7 +359,7 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None: def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None: cutoff = datetime.datetime.now() repo = FakeRepo( - batches=[[FakeRun("run-free", "t_free", cutoff)]], + batches=[[make_ref("run-free", "t_free", cutoff)]], delete_result={ "runs": 0, "node_executions": 2, @@ -300,13 +391,13 @@ def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None: class FailingRepo(FakeRepo): - def delete_runs_with_related( - self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None + def delete_runs_with_related_by_ids( + self, run_ids: list[str], delete_node_executions=None, delete_trigger_logs=None ) -> dict[str, int]: raise RuntimeError("delete failed") cutoff = datetime.datetime.now() - repo = FailingRepo(batches=[[FakeRun("run-free", "t_free", cutoff)]]) + repo = FailingRepo(batches=[[make_ref("run-free", "t_free", cutoff)]]) cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10) monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False) @@ -323,7 +414,7 @@ def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None: def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None: cutoff = datetime.datetime.now() repo = FakeRepo( - batches=[[FakeRun("run-free", "t_free", cutoff)]], + batches=[[make_ref("run-free", "t_free", cutoff)]], count_result={ "runs": 0, "node_executions": 2,