mirror of
https://github.com/langgenius/dify.git
synced 2026-06-17 23:21:12 +08:00
refactor: optimize free plan workflow run cleanup batching (#37227)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
parent
8ca8b3d59a
commit
6ab5cf109b
@ -35,6 +35,7 @@ Example:
|
||||
"""
|
||||
|
||||
from collections.abc import Callable, Sequence
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Protocol, TypedDict
|
||||
|
||||
@ -65,6 +66,21 @@ class RunsWithRelatedCountsDict(TypedDict):
|
||||
pause_reasons: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class WorkflowRunCleanupRef:
|
||||
"""
|
||||
Lightweight workflow run reference for retention cleanup scans.
|
||||
|
||||
Cleanup jobs use this DTO when they only need cursor, tenant eligibility, and run-id deletion data. Keeping the
|
||||
query shape explicit prevents free-plan cleanup from hydrating full WorkflowRun models for rows that may be skipped
|
||||
after billing checks.
|
||||
"""
|
||||
|
||||
id: str
|
||||
tenant_id: str
|
||||
created_at: datetime
|
||||
|
||||
|
||||
class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
|
||||
"""
|
||||
Protocol for service-layer WorkflowRun repository operations.
|
||||
@ -286,6 +302,36 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def get_cleanup_refs_batch_by_time_range(
|
||||
self,
|
||||
start_from: datetime | None,
|
||||
end_before: datetime,
|
||||
last_seen: tuple[datetime, str] | None,
|
||||
batch_size: int,
|
||||
run_types: Sequence[WorkflowType] | None = None,
|
||||
tenant_ids: Sequence[str] | None = None,
|
||||
workflow_ids: Sequence[str] | None = None,
|
||||
upper_bound: tuple[datetime, str] | None = None,
|
||||
) -> Sequence[WorkflowRunCleanupRef]:
|
||||
"""
|
||||
Fetch lightweight ended workflow run refs in a time window for cleanup batching.
|
||||
|
||||
Args:
|
||||
start_from: Optional inclusive lower time boundary.
|
||||
end_before: Exclusive upper time boundary.
|
||||
last_seen: Optional exclusive `(created_at, id)` cursor lower bound.
|
||||
batch_size: Maximum number of refs to return.
|
||||
run_types: Optional workflow type filter.
|
||||
tenant_ids: Optional tenant filter.
|
||||
workflow_ids: Optional workflow ID filter.
|
||||
upper_bound: Optional inclusive `(created_at, id)` cursor upper bound. Cleanup uses this for a second,
|
||||
tenant-filtered target query that must stay within the candidate page high-water cursor.
|
||||
|
||||
Returns:
|
||||
Ordered lightweight cleanup refs containing only id, tenant_id, and created_at.
|
||||
"""
|
||||
...
|
||||
|
||||
def get_archived_run_ids(
|
||||
self,
|
||||
session: Session,
|
||||
@ -370,6 +416,19 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def delete_runs_with_related_by_ids(
|
||||
self,
|
||||
run_ids: Sequence[str],
|
||||
delete_node_executions: Callable[[Session, Sequence[str]], tuple[int, int]] | None = None,
|
||||
delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
|
||||
) -> RunsWithRelatedCountsDict:
|
||||
"""
|
||||
Delete workflow runs and cleanup-owned related records by workflow run IDs.
|
||||
|
||||
This mirrors delete_runs_with_related() for cleanup callers that do not need full WorkflowRun models.
|
||||
"""
|
||||
...
|
||||
|
||||
def get_app_logs_by_run_id(
|
||||
self,
|
||||
session: Session,
|
||||
@ -417,6 +476,19 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def count_runs_with_related_by_ids(
|
||||
self,
|
||||
run_ids: Sequence[str],
|
||||
count_node_executions: Callable[[Session, Sequence[str]], tuple[int, int]] | None = None,
|
||||
count_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
|
||||
) -> RunsWithRelatedCountsDict:
|
||||
"""
|
||||
Count workflow runs and cleanup-owned related records by workflow run IDs.
|
||||
|
||||
This mirrors count_runs_with_related() for dry-run cleanup callers that do not need full WorkflowRun models.
|
||||
"""
|
||||
...
|
||||
|
||||
def create_workflow_pause(
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
|
||||
@ -44,7 +44,11 @@ from libs.time_parser import get_time_threshold
|
||||
from models.enums import WorkflowRunTriggeredFrom
|
||||
from models.human_input import HumanInputForm, HumanInputFormRecipient
|
||||
from models.workflow import WorkflowAppLog, WorkflowArchiveLog, WorkflowPause, WorkflowPauseReason, WorkflowRun
|
||||
from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict
|
||||
from repositories.api_workflow_run_repository import (
|
||||
APIWorkflowRunRepository,
|
||||
RunsWithRelatedCountsDict,
|
||||
WorkflowRunCleanupRef,
|
||||
)
|
||||
from repositories.entities.workflow_pause import WorkflowPauseEntity
|
||||
from repositories.types import (
|
||||
AverageInteractionStats,
|
||||
@ -420,6 +424,71 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
|
||||
return session.scalars(stmt).all()
|
||||
|
||||
@override
|
||||
def get_cleanup_refs_batch_by_time_range(
|
||||
self,
|
||||
start_from: datetime | None,
|
||||
end_before: datetime,
|
||||
last_seen: tuple[datetime, str] | None,
|
||||
batch_size: int,
|
||||
run_types: Sequence[WorkflowType] | None = None,
|
||||
tenant_ids: Sequence[str] | None = None,
|
||||
workflow_ids: Sequence[str] | None = None,
|
||||
upper_bound: tuple[datetime, str] | None = None,
|
||||
) -> Sequence[WorkflowRunCleanupRef]:
|
||||
"""
|
||||
Fetch lightweight ended workflow run refs in a time window for cleanup batching.
|
||||
|
||||
The optional upper_bound is inclusive and is paired with last_seen by free-plan cleanup so a second,
|
||||
tenant-filtered target query stays within the candidate page already checked against billing.
|
||||
"""
|
||||
with self._session_maker() as session:
|
||||
stmt = (
|
||||
select(WorkflowRun.id, WorkflowRun.tenant_id, WorkflowRun.created_at)
|
||||
.where(
|
||||
WorkflowRun.created_at < end_before,
|
||||
WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()),
|
||||
)
|
||||
.order_by(WorkflowRun.created_at.asc(), WorkflowRun.id.asc())
|
||||
.limit(batch_size)
|
||||
)
|
||||
if run_types is not None:
|
||||
if not run_types:
|
||||
return []
|
||||
stmt = stmt.where(WorkflowRun.type.in_(run_types))
|
||||
|
||||
if start_from:
|
||||
stmt = stmt.where(WorkflowRun.created_at >= start_from)
|
||||
|
||||
if tenant_ids:
|
||||
stmt = stmt.where(WorkflowRun.tenant_id.in_(tenant_ids))
|
||||
|
||||
if workflow_ids:
|
||||
stmt = stmt.where(WorkflowRun.workflow_id.in_(workflow_ids))
|
||||
|
||||
if last_seen:
|
||||
stmt = stmt.where(
|
||||
tuple_(WorkflowRun.created_at, WorkflowRun.id)
|
||||
> tuple_(
|
||||
sa.literal(last_seen[0], type_=sa.DateTime()),
|
||||
sa.literal(last_seen[1], type_=WorkflowRun.id.type),
|
||||
)
|
||||
)
|
||||
|
||||
if upper_bound:
|
||||
stmt = stmt.where(
|
||||
tuple_(WorkflowRun.created_at, WorkflowRun.id)
|
||||
<= tuple_(
|
||||
sa.literal(upper_bound[0], type_=sa.DateTime()),
|
||||
sa.literal(upper_bound[1], type_=WorkflowRun.id.type),
|
||||
)
|
||||
)
|
||||
|
||||
return [
|
||||
WorkflowRunCleanupRef(id=run_id, tenant_id=tenant_id, created_at=created_at)
|
||||
for run_id, tenant_id, created_at in session.execute(stmt).all()
|
||||
]
|
||||
|
||||
@override
|
||||
def get_archived_run_ids(
|
||||
self,
|
||||
@ -530,6 +599,56 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
"pause_reasons": pause_reasons_deleted,
|
||||
}
|
||||
|
||||
@override
|
||||
def delete_runs_with_related_by_ids(
|
||||
self,
|
||||
run_ids: Sequence[str],
|
||||
delete_node_executions: Callable[[Session, Sequence[str]], tuple[int, int]] | None = None,
|
||||
delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
|
||||
) -> RunsWithRelatedCountsDict:
|
||||
if not run_ids:
|
||||
return self._empty_runs_with_related_counts()
|
||||
|
||||
run_ids = list(run_ids)
|
||||
with self._session_maker() as session:
|
||||
if delete_node_executions:
|
||||
node_executions_deleted, offloads_deleted = delete_node_executions(session, run_ids)
|
||||
else:
|
||||
node_executions_deleted, offloads_deleted = 0, 0
|
||||
|
||||
app_logs_result = session.execute(delete(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(run_ids)))
|
||||
app_logs_deleted = cast(CursorResult, app_logs_result).rowcount or 0
|
||||
|
||||
pause_stmt = select(WorkflowPause.id).where(WorkflowPause.workflow_run_id.in_(run_ids))
|
||||
pause_ids = session.scalars(pause_stmt).all()
|
||||
pause_reasons_deleted = 0
|
||||
pauses_deleted = 0
|
||||
|
||||
if pause_ids:
|
||||
pause_reasons_result = session.execute(
|
||||
delete(WorkflowPauseReason).where(WorkflowPauseReason.pause_id.in_(pause_ids))
|
||||
)
|
||||
pause_reasons_deleted = cast(CursorResult, pause_reasons_result).rowcount or 0
|
||||
pauses_result = session.execute(delete(WorkflowPause).where(WorkflowPause.id.in_(pause_ids)))
|
||||
pauses_deleted = cast(CursorResult, pauses_result).rowcount or 0
|
||||
|
||||
trigger_logs_deleted = delete_trigger_logs(session, run_ids) if delete_trigger_logs else 0
|
||||
|
||||
runs_result = session.execute(delete(WorkflowRun).where(WorkflowRun.id.in_(run_ids)))
|
||||
runs_deleted = cast(CursorResult, runs_result).rowcount or 0
|
||||
|
||||
session.commit()
|
||||
|
||||
return {
|
||||
"runs": runs_deleted,
|
||||
"node_executions": node_executions_deleted,
|
||||
"offloads": offloads_deleted,
|
||||
"app_logs": app_logs_deleted,
|
||||
"trigger_logs": trigger_logs_deleted,
|
||||
"pauses": pauses_deleted,
|
||||
"pause_reasons": pause_reasons_deleted,
|
||||
}
|
||||
|
||||
@override
|
||||
def get_app_logs_by_run_id(
|
||||
self,
|
||||
@ -711,6 +830,72 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
"pause_reasons": int(pause_reasons_count),
|
||||
}
|
||||
|
||||
@override
|
||||
def count_runs_with_related_by_ids(
|
||||
self,
|
||||
run_ids: Sequence[str],
|
||||
count_node_executions: Callable[[Session, Sequence[str]], tuple[int, int]] | None = None,
|
||||
count_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
|
||||
) -> RunsWithRelatedCountsDict:
|
||||
if not run_ids:
|
||||
return self._empty_runs_with_related_counts()
|
||||
|
||||
run_ids = list(run_ids)
|
||||
with self._session_maker() as session:
|
||||
if count_node_executions:
|
||||
node_executions_count, offloads_count = count_node_executions(session, run_ids)
|
||||
else:
|
||||
node_executions_count, offloads_count = 0, 0
|
||||
|
||||
runs_count = (
|
||||
session.scalar(select(func.count()).select_from(WorkflowRun).where(WorkflowRun.id.in_(run_ids))) or 0
|
||||
)
|
||||
app_logs_count = (
|
||||
session.scalar(
|
||||
select(func.count()).select_from(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(run_ids))
|
||||
)
|
||||
or 0
|
||||
)
|
||||
|
||||
pause_ids = session.scalars(
|
||||
select(WorkflowPause.id).where(WorkflowPause.workflow_run_id.in_(run_ids))
|
||||
).all()
|
||||
pauses_count = len(pause_ids)
|
||||
pause_reasons_count = 0
|
||||
if pause_ids:
|
||||
pause_reasons_count = (
|
||||
session.scalar(
|
||||
select(func.count())
|
||||
.select_from(WorkflowPauseReason)
|
||||
.where(WorkflowPauseReason.pause_id.in_(pause_ids))
|
||||
)
|
||||
or 0
|
||||
)
|
||||
|
||||
trigger_logs_count = count_trigger_logs(session, run_ids) if count_trigger_logs else 0
|
||||
|
||||
return {
|
||||
"runs": int(runs_count),
|
||||
"node_executions": node_executions_count,
|
||||
"offloads": offloads_count,
|
||||
"app_logs": int(app_logs_count),
|
||||
"trigger_logs": trigger_logs_count,
|
||||
"pauses": pauses_count,
|
||||
"pause_reasons": int(pause_reasons_count),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _empty_runs_with_related_counts() -> RunsWithRelatedCountsDict:
|
||||
return {
|
||||
"runs": 0,
|
||||
"node_executions": 0,
|
||||
"offloads": 0,
|
||||
"app_logs": 0,
|
||||
"trigger_logs": 0,
|
||||
"pauses": 0,
|
||||
"pause_reasons": 0,
|
||||
}
|
||||
|
||||
@override
|
||||
def create_workflow_pause(
|
||||
self,
|
||||
|
||||
@ -1,3 +1,10 @@
|
||||
"""Cleanup expired workflow run logs for free-plan tenants.
|
||||
|
||||
The cleanup service owns billing eligibility decisions while repositories own database-efficient batch selection and
|
||||
deletion. Free-plan cleanup intentionally scans lightweight workflow run references first, then re-queries the same
|
||||
candidate cursor slice with eligible tenant IDs so paid tenants are skipped without hydrating full WorkflowRun models.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import random
|
||||
@ -11,8 +18,11 @@ from sqlalchemy.orm import Session, sessionmaker
|
||||
from configs import dify_config
|
||||
from enums.cloud_plan import CloudPlan
|
||||
from extensions.ext_database import db
|
||||
from models.workflow import WorkflowRun
|
||||
from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict
|
||||
from repositories.api_workflow_run_repository import (
|
||||
APIWorkflowRunRepository,
|
||||
RunsWithRelatedCountsDict,
|
||||
WorkflowRunCleanupRef,
|
||||
)
|
||||
from repositories.factory import DifyAPIRepositoryFactory
|
||||
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
|
||||
from services.billing_service import BillingService, SubscriptionPlan
|
||||
@ -186,6 +196,13 @@ _RELATED_RECORD_KEYS = ("node_executions", "offloads", "app_logs", "trigger_logs
|
||||
|
||||
|
||||
class WorkflowRunCleanup:
|
||||
"""
|
||||
Coordinates free-plan workflow run retention cleanup.
|
||||
|
||||
The cleanup cursor advances by candidate refs, not target refs. This keeps pagination stable
|
||||
when billing filters out paid or unknown tenants before the repository performs the target lookup.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
days: int,
|
||||
@ -254,26 +271,28 @@ class WorkflowRunCleanup:
|
||||
batch_start = time.monotonic()
|
||||
|
||||
fetch_start = time.monotonic()
|
||||
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
|
||||
candidate_last_seen = last_seen
|
||||
candidate_refs = self.workflow_run_repo.get_cleanup_refs_batch_by_time_range(
|
||||
start_from=self.window_start,
|
||||
end_before=self.window_end,
|
||||
last_seen=last_seen,
|
||||
last_seen=candidate_last_seen,
|
||||
batch_size=self.batch_size,
|
||||
)
|
||||
if not run_rows:
|
||||
if not candidate_refs:
|
||||
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
|
||||
break
|
||||
|
||||
batch_index += 1
|
||||
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
|
||||
candidate_high_water = self._cursor_from_ref(candidate_refs[-1])
|
||||
last_seen = candidate_high_water
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
|
||||
"workflow_run_cleanup (batch #%s): fetched %s candidate refs in %sms",
|
||||
batch_index,
|
||||
len(run_rows),
|
||||
len(candidate_refs),
|
||||
int((time.monotonic() - fetch_start) * 1000),
|
||||
)
|
||||
|
||||
tenant_ids = {row.tenant_id for row in run_rows}
|
||||
tenant_ids = {ref.tenant_id for ref in candidate_refs}
|
||||
|
||||
filter_start = time.monotonic()
|
||||
free_tenants = self._filter_free_tenants(tenant_ids)
|
||||
@ -285,10 +304,28 @@ class WorkflowRunCleanup:
|
||||
int((time.monotonic() - filter_start) * 1000),
|
||||
)
|
||||
|
||||
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
|
||||
paid_or_skipped = len(run_rows) - len(free_runs)
|
||||
target_refs: Sequence[WorkflowRunCleanupRef] = []
|
||||
if free_tenants:
|
||||
target_fetch_start = time.monotonic()
|
||||
target_refs = self.workflow_run_repo.get_cleanup_refs_batch_by_time_range(
|
||||
start_from=self.window_start,
|
||||
end_before=self.window_end,
|
||||
last_seen=candidate_last_seen,
|
||||
batch_size=self.batch_size,
|
||||
tenant_ids=sorted(free_tenants),
|
||||
upper_bound=candidate_high_water,
|
||||
)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s): fetched %s target refs in %sms",
|
||||
batch_index,
|
||||
len(target_refs),
|
||||
int((time.monotonic() - target_fetch_start) * 1000),
|
||||
)
|
||||
|
||||
if not free_runs:
|
||||
target_run_ids = [ref.id for ref in target_refs]
|
||||
paid_or_skipped = max(len(candidate_refs) - len(target_run_ids), 0)
|
||||
|
||||
if not target_run_ids:
|
||||
skipped_message = (
|
||||
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
|
||||
)
|
||||
@ -299,7 +336,7 @@ class WorkflowRunCleanup:
|
||||
)
|
||||
)
|
||||
self._metrics.record_batch(
|
||||
batch_rows=len(run_rows),
|
||||
batch_rows=len(candidate_refs),
|
||||
targeted_runs=0,
|
||||
skipped_runs=paid_or_skipped,
|
||||
deleted_runs=0,
|
||||
@ -309,13 +346,13 @@ class WorkflowRunCleanup:
|
||||
)
|
||||
continue
|
||||
|
||||
total_runs_targeted += len(free_runs)
|
||||
total_runs_targeted += len(target_run_ids)
|
||||
|
||||
if self.dry_run:
|
||||
count_start = time.monotonic()
|
||||
batch_counts = self.workflow_run_repo.count_runs_with_related(
|
||||
free_runs,
|
||||
count_node_executions=self._count_node_executions,
|
||||
batch_counts = self.workflow_run_repo.count_runs_with_related_by_ids(
|
||||
target_run_ids,
|
||||
count_node_executions=self._count_node_executions_by_run_ids,
|
||||
count_trigger_logs=self._count_trigger_logs,
|
||||
)
|
||||
logger.info(
|
||||
@ -325,10 +362,10 @@ class WorkflowRunCleanup:
|
||||
)
|
||||
if related_totals is not None:
|
||||
self._accumulate_related_counts(related_totals, batch_counts)
|
||||
sample_ids = ", ".join(run.id for run in free_runs[:5])
|
||||
sample_ids = ", ".join(target_run_ids[:5])
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
|
||||
f"[batch #{batch_index}] would delete {len(target_run_ids)} runs "
|
||||
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
|
||||
fg="yellow",
|
||||
)
|
||||
@ -339,8 +376,8 @@ class WorkflowRunCleanup:
|
||||
int((time.monotonic() - batch_start) * 1000),
|
||||
)
|
||||
self._metrics.record_batch(
|
||||
batch_rows=len(run_rows),
|
||||
targeted_runs=len(free_runs),
|
||||
batch_rows=len(candidate_refs),
|
||||
targeted_runs=len(target_run_ids),
|
||||
skipped_runs=paid_or_skipped,
|
||||
deleted_runs=0,
|
||||
related_counts={
|
||||
@ -354,14 +391,14 @@ class WorkflowRunCleanup:
|
||||
|
||||
try:
|
||||
delete_start = time.monotonic()
|
||||
counts = self.workflow_run_repo.delete_runs_with_related(
|
||||
free_runs,
|
||||
delete_node_executions=self._delete_node_executions,
|
||||
counts = self.workflow_run_repo.delete_runs_with_related_by_ids(
|
||||
target_run_ids,
|
||||
delete_node_executions=self._delete_node_executions_by_run_ids,
|
||||
delete_trigger_logs=self._delete_trigger_logs,
|
||||
)
|
||||
delete_ms = int((time.monotonic() - delete_start) * 1000)
|
||||
except Exception:
|
||||
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
|
||||
logger.exception("Failed to delete workflow runs batch ending at %s", candidate_high_water[0])
|
||||
raise
|
||||
|
||||
total_runs_deleted += counts["runs"]
|
||||
@ -382,8 +419,8 @@ class WorkflowRunCleanup:
|
||||
int((time.monotonic() - batch_start) * 1000),
|
||||
)
|
||||
self._metrics.record_batch(
|
||||
batch_rows=len(run_rows),
|
||||
targeted_runs=len(free_runs),
|
||||
batch_rows=len(candidate_refs),
|
||||
targeted_runs=len(target_run_ids),
|
||||
skipped_runs=paid_or_skipped,
|
||||
deleted_runs=counts["runs"],
|
||||
related_counts={
|
||||
@ -439,7 +476,7 @@ class WorkflowRunCleanup:
|
||||
)
|
||||
|
||||
def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]:
|
||||
tenant_id_list = list(tenant_ids)
|
||||
tenant_id_list = sorted(set(tenant_ids))
|
||||
|
||||
if not dify_config.BILLING_ENABLED:
|
||||
return set(tenant_id_list)
|
||||
@ -553,15 +590,17 @@ class WorkflowRunCleanup:
|
||||
totals["pauses"] += batch.get("pauses", 0)
|
||||
totals["pause_reasons"] += batch.get("pause_reasons", 0)
|
||||
|
||||
def _count_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]:
|
||||
run_ids = [run.id for run in runs]
|
||||
@staticmethod
|
||||
def _cursor_from_ref(ref: WorkflowRunCleanupRef) -> tuple[datetime.datetime, str]:
|
||||
return ref.created_at, ref.id
|
||||
|
||||
def _count_node_executions_by_run_ids(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]:
|
||||
repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(
|
||||
session_maker=sessionmaker(bind=session.get_bind(), expire_on_commit=False)
|
||||
)
|
||||
return repo.count_by_runs(session, run_ids)
|
||||
|
||||
def _delete_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]:
|
||||
run_ids = [run.id for run in runs]
|
||||
def _delete_node_executions_by_run_ids(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]:
|
||||
repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(
|
||||
session_maker=sessionmaker(bind=session.get_bind(), expire_on_commit=False)
|
||||
)
|
||||
|
||||
@ -0,0 +1,320 @@
|
||||
"""Integration tests for workflow run cleanup repository queries."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta
|
||||
from typing import override
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy import Engine, select
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from graphon.entities import WorkflowExecution
|
||||
from graphon.entities.pause_reason import PauseReasonType
|
||||
from graphon.enums import WorkflowExecutionStatus, WorkflowType
|
||||
from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom
|
||||
from models.workflow import WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowPause, WorkflowPauseReason, WorkflowRun
|
||||
from repositories.sqlalchemy_api_workflow_run_repository import DifyAPISQLAlchemyWorkflowRunRepository
|
||||
|
||||
|
||||
class _TestWorkflowRunRepository(DifyAPISQLAlchemyWorkflowRunRepository):
|
||||
"""Concrete repository for tests where save() is not under test."""
|
||||
|
||||
@override
|
||||
def save(self, execution: WorkflowExecution) -> None:
|
||||
return None
|
||||
|
||||
|
||||
@dataclass
|
||||
class _TestScope:
|
||||
"""Per-test identifiers for rows created by cleanup repository tests."""
|
||||
|
||||
tenant_id: str = field(default_factory=lambda: str(uuid4()))
|
||||
app_id: str = field(default_factory=lambda: str(uuid4()))
|
||||
workflow_id: str = field(default_factory=lambda: str(uuid4()))
|
||||
user_id: str = field(default_factory=lambda: str(uuid4()))
|
||||
|
||||
|
||||
def _repository(db_session_with_containers: Session) -> DifyAPISQLAlchemyWorkflowRunRepository:
|
||||
engine = db_session_with_containers.get_bind()
|
||||
assert isinstance(engine, Engine)
|
||||
return _TestWorkflowRunRepository(session_maker=sessionmaker(bind=engine, expire_on_commit=False))
|
||||
|
||||
|
||||
def _create_workflow_run(
|
||||
session: Session,
|
||||
scope: _TestScope,
|
||||
*,
|
||||
status: WorkflowExecutionStatus = WorkflowExecutionStatus.SUCCEEDED,
|
||||
created_at: datetime,
|
||||
tenant_id: str | None = None,
|
||||
workflow_id: str | None = None,
|
||||
workflow_type: str = WorkflowType.WORKFLOW,
|
||||
) -> WorkflowRun:
|
||||
workflow_run = WorkflowRun(
|
||||
id=str(uuid4()),
|
||||
tenant_id=tenant_id or scope.tenant_id,
|
||||
app_id=scope.app_id,
|
||||
workflow_id=workflow_id or scope.workflow_id,
|
||||
type=workflow_type,
|
||||
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
|
||||
version="draft",
|
||||
graph="{}",
|
||||
inputs="{}",
|
||||
status=status,
|
||||
created_by_role=CreatorUserRole.ACCOUNT,
|
||||
created_by=scope.user_id,
|
||||
created_at=created_at,
|
||||
)
|
||||
session.add(workflow_run)
|
||||
session.commit()
|
||||
return workflow_run
|
||||
|
||||
|
||||
def _add_app_log(session: Session, scope: _TestScope, workflow_run: WorkflowRun) -> None:
|
||||
session.add(
|
||||
WorkflowAppLog(
|
||||
tenant_id=workflow_run.tenant_id,
|
||||
app_id=scope.app_id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
created_from=WorkflowAppLogCreatedFrom.SERVICE_API,
|
||||
created_by_role=CreatorUserRole.ACCOUNT,
|
||||
created_by=scope.user_id,
|
||||
)
|
||||
)
|
||||
session.commit()
|
||||
|
||||
|
||||
def _add_pause_with_reason(session: Session, scope: _TestScope, workflow_run: WorkflowRun) -> WorkflowPause:
|
||||
pause = WorkflowPause(
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
state_object_key=f"workflow-state-{uuid4()}.json",
|
||||
)
|
||||
pause_reason = WorkflowPauseReason(
|
||||
pause_id=pause.id,
|
||||
type_=PauseReasonType.SCHEDULED_PAUSE,
|
||||
message="scheduled pause",
|
||||
)
|
||||
session.add_all([pause, pause_reason])
|
||||
session.commit()
|
||||
return pause
|
||||
|
||||
|
||||
class TestGetCleanupRefsBatchByTimeRange:
|
||||
def test_applies_cursor_window_and_cleanup_filters(self, db_session_with_containers: Session) -> None:
|
||||
repository = _repository(db_session_with_containers)
|
||||
scope = _TestScope()
|
||||
base = datetime(2024, 1, 1, 12, 0, 0)
|
||||
|
||||
_create_workflow_run(db_session_with_containers, scope, created_at=base - timedelta(minutes=1))
|
||||
cursor_run = _create_workflow_run(db_session_with_containers, scope, created_at=base)
|
||||
first_target = _create_workflow_run(db_session_with_containers, scope, created_at=base + timedelta(minutes=1))
|
||||
second_target = _create_workflow_run(
|
||||
db_session_with_containers,
|
||||
scope,
|
||||
status=WorkflowExecutionStatus.FAILED,
|
||||
created_at=base + timedelta(minutes=2),
|
||||
)
|
||||
_create_workflow_run(
|
||||
db_session_with_containers,
|
||||
scope,
|
||||
status=WorkflowExecutionStatus.RUNNING,
|
||||
created_at=base + timedelta(minutes=1),
|
||||
)
|
||||
_create_workflow_run(
|
||||
db_session_with_containers,
|
||||
scope,
|
||||
created_at=base + timedelta(minutes=1),
|
||||
tenant_id=str(uuid4()),
|
||||
)
|
||||
_create_workflow_run(
|
||||
db_session_with_containers,
|
||||
scope,
|
||||
created_at=base + timedelta(minutes=1),
|
||||
workflow_id=str(uuid4()),
|
||||
)
|
||||
_create_workflow_run(
|
||||
db_session_with_containers,
|
||||
scope,
|
||||
created_at=base + timedelta(minutes=1),
|
||||
workflow_type=WorkflowType.CHAT,
|
||||
)
|
||||
_create_workflow_run(db_session_with_containers, scope, created_at=base + timedelta(minutes=3))
|
||||
|
||||
refs = repository.get_cleanup_refs_batch_by_time_range(
|
||||
start_from=base,
|
||||
end_before=base + timedelta(minutes=4),
|
||||
last_seen=(cursor_run.created_at, cursor_run.id),
|
||||
batch_size=10,
|
||||
run_types=[WorkflowType.WORKFLOW],
|
||||
tenant_ids=[scope.tenant_id],
|
||||
workflow_ids=[scope.workflow_id],
|
||||
upper_bound=(second_target.created_at, second_target.id),
|
||||
)
|
||||
|
||||
assert [(ref.id, ref.tenant_id, ref.created_at) for ref in refs] == [
|
||||
(first_target.id, scope.tenant_id, first_target.created_at),
|
||||
(second_target.id, scope.tenant_id, second_target.created_at),
|
||||
]
|
||||
|
||||
def test_returns_empty_when_run_type_filter_is_empty(self, db_session_with_containers: Session) -> None:
|
||||
repository = _repository(db_session_with_containers)
|
||||
|
||||
refs = repository.get_cleanup_refs_batch_by_time_range(
|
||||
start_from=None,
|
||||
end_before=datetime(2024, 1, 2),
|
||||
last_seen=None,
|
||||
batch_size=10,
|
||||
run_types=[],
|
||||
)
|
||||
|
||||
assert refs == []
|
||||
|
||||
|
||||
class TestCountRunsWithRelatedByIds:
|
||||
def test_counts_existing_runs_and_related_rows(self, db_session_with_containers: Session) -> None:
|
||||
repository = _repository(db_session_with_containers)
|
||||
scope = _TestScope()
|
||||
workflow_run = _create_workflow_run(
|
||||
db_session_with_containers,
|
||||
scope,
|
||||
created_at=datetime(2024, 1, 1, 12, 0, 0),
|
||||
)
|
||||
missing_run_id = str(uuid4())
|
||||
_add_app_log(db_session_with_containers, scope, workflow_run)
|
||||
_add_pause_with_reason(db_session_with_containers, scope, workflow_run)
|
||||
counted_node_run_ids: list[str] = []
|
||||
counted_trigger_run_ids: list[str] = []
|
||||
|
||||
counts = repository.count_runs_with_related_by_ids(
|
||||
[workflow_run.id, missing_run_id],
|
||||
count_node_executions=lambda _session, run_ids: counted_node_run_ids.extend(run_ids) or (2, 1),
|
||||
count_trigger_logs=lambda _session, run_ids: counted_trigger_run_ids.extend(run_ids) or 3,
|
||||
)
|
||||
|
||||
assert counted_node_run_ids == [workflow_run.id, missing_run_id]
|
||||
assert counted_trigger_run_ids == [workflow_run.id, missing_run_id]
|
||||
assert counts == {
|
||||
"runs": 1,
|
||||
"node_executions": 2,
|
||||
"offloads": 1,
|
||||
"app_logs": 1,
|
||||
"trigger_logs": 3,
|
||||
"pauses": 1,
|
||||
"pause_reasons": 1,
|
||||
}
|
||||
|
||||
def test_defaults_optional_related_counts(self, db_session_with_containers: Session) -> None:
|
||||
repository = _repository(db_session_with_containers)
|
||||
scope = _TestScope()
|
||||
workflow_run = _create_workflow_run(
|
||||
db_session_with_containers,
|
||||
scope,
|
||||
created_at=datetime(2024, 1, 1, 12, 0, 0),
|
||||
)
|
||||
|
||||
counts = repository.count_runs_with_related_by_ids([workflow_run.id])
|
||||
|
||||
assert counts == {
|
||||
"runs": 1,
|
||||
"node_executions": 0,
|
||||
"offloads": 0,
|
||||
"app_logs": 0,
|
||||
"trigger_logs": 0,
|
||||
"pauses": 0,
|
||||
"pause_reasons": 0,
|
||||
}
|
||||
|
||||
|
||||
class TestDeleteRunsWithRelatedByIds:
|
||||
def test_deletes_runs_and_related_rows(self, db_session_with_containers: Session) -> None:
|
||||
repository = _repository(db_session_with_containers)
|
||||
scope = _TestScope()
|
||||
workflow_run = _create_workflow_run(
|
||||
db_session_with_containers,
|
||||
scope,
|
||||
created_at=datetime(2024, 1, 1, 12, 0, 0),
|
||||
)
|
||||
_add_app_log(db_session_with_containers, scope, workflow_run)
|
||||
pause = _add_pause_with_reason(db_session_with_containers, scope, workflow_run)
|
||||
pause_id = pause.id
|
||||
deleted_node_run_ids: list[str] = []
|
||||
deleted_trigger_run_ids: list[str] = []
|
||||
|
||||
counts = repository.delete_runs_with_related_by_ids(
|
||||
[workflow_run.id],
|
||||
delete_node_executions=lambda _session, run_ids: deleted_node_run_ids.extend(run_ids) or (2, 1),
|
||||
delete_trigger_logs=lambda _session, run_ids: deleted_trigger_run_ids.extend(run_ids) or 3,
|
||||
)
|
||||
|
||||
assert deleted_node_run_ids == [workflow_run.id]
|
||||
assert deleted_trigger_run_ids == [workflow_run.id]
|
||||
assert counts == {
|
||||
"runs": 1,
|
||||
"node_executions": 2,
|
||||
"offloads": 1,
|
||||
"app_logs": 1,
|
||||
"trigger_logs": 3,
|
||||
"pauses": 1,
|
||||
"pause_reasons": 1,
|
||||
}
|
||||
verification_session = Session(bind=db_session_with_containers.get_bind())
|
||||
with verification_session:
|
||||
assert verification_session.get(WorkflowRun, workflow_run.id) is None
|
||||
assert verification_session.get(WorkflowPause, pause_id) is None
|
||||
assert (
|
||||
verification_session.scalar(
|
||||
select(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id == workflow_run.id)
|
||||
)
|
||||
is None
|
||||
)
|
||||
assert (
|
||||
verification_session.scalar(select(WorkflowPauseReason).where(WorkflowPauseReason.pause_id == pause_id))
|
||||
is None
|
||||
)
|
||||
|
||||
def test_defaults_optional_related_counts(self, db_session_with_containers: Session) -> None:
|
||||
repository = _repository(db_session_with_containers)
|
||||
scope = _TestScope()
|
||||
workflow_run = _create_workflow_run(
|
||||
db_session_with_containers,
|
||||
scope,
|
||||
created_at=datetime(2024, 1, 1, 12, 0, 0),
|
||||
)
|
||||
|
||||
counts = repository.delete_runs_with_related_by_ids([workflow_run.id])
|
||||
|
||||
assert counts == {
|
||||
"runs": 1,
|
||||
"node_executions": 0,
|
||||
"offloads": 0,
|
||||
"app_logs": 0,
|
||||
"trigger_logs": 0,
|
||||
"pauses": 0,
|
||||
"pause_reasons": 0,
|
||||
}
|
||||
|
||||
def test_empty_ids_return_empty_counts(self, db_session_with_containers: Session) -> None:
|
||||
repository = _repository(db_session_with_containers)
|
||||
|
||||
assert repository.count_runs_with_related_by_ids([]) == {
|
||||
"runs": 0,
|
||||
"node_executions": 0,
|
||||
"offloads": 0,
|
||||
"app_logs": 0,
|
||||
"trigger_logs": 0,
|
||||
"pauses": 0,
|
||||
"pause_reasons": 0,
|
||||
}
|
||||
assert repository.delete_runs_with_related_by_ids([]) == {
|
||||
"runs": 0,
|
||||
"node_executions": 0,
|
||||
"offloads": 0,
|
||||
"app_logs": 0,
|
||||
"trigger_logs": 0,
|
||||
"pauses": 0,
|
||||
"pause_reasons": 0,
|
||||
}
|
||||
@ -7,15 +7,16 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from repositories.api_workflow_run_repository import WorkflowRunCleanupRef
|
||||
from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
|
||||
|
||||
|
||||
def make_run(tenant_id: str = "t1", run_id: str = "r1", created_at: datetime.datetime | None = None):
|
||||
run = MagicMock()
|
||||
run.tenant_id = tenant_id
|
||||
run.id = run_id
|
||||
run.created_at = created_at or datetime.datetime(2024, 1, 1, tzinfo=datetime.UTC)
|
||||
return run
|
||||
def make_ref(tenant_id: str = "t1", run_id: str = "r1", created_at: datetime.datetime | None = None):
|
||||
return WorkflowRunCleanupRef(
|
||||
id=run_id,
|
||||
tenant_id=tenant_id,
|
||||
created_at=created_at or datetime.datetime(2024, 1, 1, tzinfo=datetime.UTC),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -341,28 +342,28 @@ class TestRunDeleteMode:
|
||||
return WorkflowRunCleanup(days=30, batch_size=10, workflow_run_repo=mock_repo)
|
||||
|
||||
def test_no_rows_stops_immediately(self, mock_repo):
|
||||
mock_repo.get_runs_batch_by_time_range.return_value = []
|
||||
mock_repo.get_cleanup_refs_batch_by_time_range.return_value = []
|
||||
c = self._make_cleanup(mock_repo)
|
||||
with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg:
|
||||
cfg.BILLING_ENABLED = False
|
||||
c.run()
|
||||
mock_repo.delete_runs_with_related.assert_not_called()
|
||||
mock_repo.delete_runs_with_related_by_ids.assert_not_called()
|
||||
|
||||
def test_all_paid_skips_delete(self, mock_repo):
|
||||
run = make_run("t1")
|
||||
mock_repo.get_runs_batch_by_time_range.side_effect = [[run], []]
|
||||
ref = make_ref("t1")
|
||||
mock_repo.get_cleanup_refs_batch_by_time_range.side_effect = [[ref], []]
|
||||
c = self._make_cleanup(mock_repo)
|
||||
# billing disabled -> all free; but let's override _filter_free_tenants to return empty
|
||||
c._filter_free_tenants = MagicMock(return_value=set())
|
||||
with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg:
|
||||
cfg.BILLING_ENABLED = False
|
||||
c.run()
|
||||
mock_repo.delete_runs_with_related.assert_not_called()
|
||||
mock_repo.delete_runs_with_related_by_ids.assert_not_called()
|
||||
|
||||
def test_runs_deleted_successfully(self, mock_repo):
|
||||
run = make_run("t1")
|
||||
mock_repo.get_runs_batch_by_time_range.side_effect = [[run], []]
|
||||
mock_repo.delete_runs_with_related.return_value = {
|
||||
ref = make_ref("t1")
|
||||
mock_repo.get_cleanup_refs_batch_by_time_range.side_effect = [[ref], [ref], []]
|
||||
mock_repo.delete_runs_with_related_by_ids.return_value = {
|
||||
"runs": 1,
|
||||
"node_executions": 0,
|
||||
"offloads": 0,
|
||||
@ -376,12 +377,12 @@ class TestRunDeleteMode:
|
||||
cfg.BILLING_ENABLED = False
|
||||
with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.time.sleep"):
|
||||
c.run()
|
||||
mock_repo.delete_runs_with_related.assert_called_once()
|
||||
mock_repo.delete_runs_with_related_by_ids.assert_called_once()
|
||||
|
||||
def test_delete_exception_reraises(self, mock_repo):
|
||||
run = make_run("t1")
|
||||
mock_repo.get_runs_batch_by_time_range.side_effect = [[run], []]
|
||||
mock_repo.delete_runs_with_related.side_effect = RuntimeError("db error")
|
||||
ref = make_ref("t1")
|
||||
mock_repo.get_cleanup_refs_batch_by_time_range.side_effect = [[ref], [ref]]
|
||||
mock_repo.delete_runs_with_related_by_ids.side_effect = RuntimeError("db error")
|
||||
c = self._make_cleanup(mock_repo)
|
||||
with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg:
|
||||
cfg.BILLING_ENABLED = False
|
||||
@ -389,7 +390,7 @@ class TestRunDeleteMode:
|
||||
c.run()
|
||||
|
||||
def test_summary_with_window_start(self, mock_repo):
|
||||
mock_repo.get_runs_batch_by_time_range.return_value = []
|
||||
mock_repo.get_cleanup_refs_batch_by_time_range.return_value = []
|
||||
with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg:
|
||||
cfg.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD = 0
|
||||
cfg.BILLING_ENABLED = False
|
||||
@ -421,9 +422,10 @@ class TestRunDryRunMode:
|
||||
)
|
||||
|
||||
def test_dry_run_no_delete_called(self, mock_repo):
|
||||
run = make_run("t1")
|
||||
mock_repo.get_runs_batch_by_time_range.side_effect = [[run], []]
|
||||
mock_repo.count_runs_with_related.return_value = {
|
||||
ref = make_ref("t1")
|
||||
mock_repo.get_cleanup_refs_batch_by_time_range.side_effect = [[ref], [ref], []]
|
||||
mock_repo.count_runs_with_related_by_ids.return_value = {
|
||||
"runs": 1,
|
||||
"node_executions": 2,
|
||||
"offloads": 0,
|
||||
"app_logs": 0,
|
||||
@ -435,11 +437,11 @@ class TestRunDryRunMode:
|
||||
with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg:
|
||||
cfg.BILLING_ENABLED = False
|
||||
c.run()
|
||||
mock_repo.delete_runs_with_related.assert_not_called()
|
||||
mock_repo.count_runs_with_related.assert_called_once()
|
||||
mock_repo.delete_runs_with_related_by_ids.assert_not_called()
|
||||
mock_repo.count_runs_with_related_by_ids.assert_called_once()
|
||||
|
||||
def test_dry_run_summary_with_window_start(self, mock_repo):
|
||||
mock_repo.get_runs_batch_by_time_range.return_value = []
|
||||
mock_repo.get_cleanup_refs_batch_by_time_range.return_value = []
|
||||
with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg:
|
||||
cfg.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD = 0
|
||||
cfg.BILLING_ENABLED = False
|
||||
@ -454,14 +456,14 @@ class TestRunDryRunMode:
|
||||
c.run()
|
||||
|
||||
def test_dry_run_all_paid_skips_count(self, mock_repo):
|
||||
run = make_run("t1")
|
||||
mock_repo.get_runs_batch_by_time_range.side_effect = [[run], []]
|
||||
ref = make_ref("t1")
|
||||
mock_repo.get_cleanup_refs_batch_by_time_range.side_effect = [[ref], []]
|
||||
c = self._make_dry_cleanup(mock_repo)
|
||||
c._filter_free_tenants = MagicMock(return_value=set())
|
||||
with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg:
|
||||
cfg.BILLING_ENABLED = False
|
||||
c.run()
|
||||
mock_repo.count_runs_with_related.assert_not_called()
|
||||
mock_repo.count_runs_with_related_by_ids.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -492,7 +494,7 @@ class TestTriggerLogMethods:
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _count_node_executions / _delete_node_executions
|
||||
# _count_node_executions_by_run_ids / _delete_node_executions_by_run_ids
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@ -500,25 +502,23 @@ class TestNodeExecutionMethods:
|
||||
def test_count_node_executions(self, cleanup):
|
||||
session = MagicMock()
|
||||
session.get_bind.return_value = MagicMock()
|
||||
runs = [make_run("t1", "r1")]
|
||||
with patch(
|
||||
"services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.DifyAPIRepositoryFactory"
|
||||
) as factory:
|
||||
repo = factory.create_api_workflow_node_execution_repository.return_value
|
||||
repo.count_by_runs.return_value = (10, 2)
|
||||
with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.sessionmaker"):
|
||||
result = cleanup._count_node_executions(session, runs)
|
||||
result = cleanup._count_node_executions_by_run_ids(session, ["r1"])
|
||||
assert result == (10, 2)
|
||||
|
||||
def test_delete_node_executions(self, cleanup):
|
||||
session = MagicMock()
|
||||
session.get_bind.return_value = MagicMock()
|
||||
runs = [make_run("t1", "r1")]
|
||||
with patch(
|
||||
"services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.DifyAPIRepositoryFactory"
|
||||
) as factory:
|
||||
repo = factory.create_api_workflow_node_execution_repository.return_value
|
||||
repo.delete_by_runs.return_value = (5, 1)
|
||||
with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.sessionmaker"):
|
||||
result = cleanup._delete_node_executions(session, runs)
|
||||
result = cleanup._delete_node_executions_by_run_ids(session, ["r1"])
|
||||
assert result == (5, 1)
|
||||
|
||||
@ -3,38 +3,27 @@ from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from repositories.api_workflow_run_repository import WorkflowRunCleanupRef
|
||||
from services.billing_service import SubscriptionPlan
|
||||
from services.retention.workflow_run import clear_free_plan_expired_workflow_run_logs as cleanup_module
|
||||
from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
|
||||
|
||||
|
||||
class FakeRun:
|
||||
def __init__(
|
||||
self,
|
||||
run_id: str,
|
||||
tenant_id: str,
|
||||
created_at: datetime.datetime,
|
||||
app_id: str = "app-1",
|
||||
workflow_id: str = "wf-1",
|
||||
triggered_from: str = "workflow-run",
|
||||
) -> None:
|
||||
self.id = run_id
|
||||
self.tenant_id = tenant_id
|
||||
self.app_id = app_id
|
||||
self.workflow_id = workflow_id
|
||||
self.triggered_from = triggered_from
|
||||
self.created_at = created_at
|
||||
def make_ref(run_id: str, tenant_id: str, created_at: datetime.datetime) -> WorkflowRunCleanupRef:
|
||||
return WorkflowRunCleanupRef(id=run_id, tenant_id=tenant_id, created_at=created_at)
|
||||
|
||||
|
||||
class FakeRepo:
|
||||
def __init__(
|
||||
self,
|
||||
batches: list[list[FakeRun]],
|
||||
batches: list[list[WorkflowRunCleanupRef]],
|
||||
delete_result: dict[str, int] | None = None,
|
||||
count_result: dict[str, int] | None = None,
|
||||
) -> None:
|
||||
self.batches = batches
|
||||
self.call_idx = 0
|
||||
self.candidate_call_idx = 0
|
||||
self.last_candidate_batch: list[WorkflowRunCleanupRef] = []
|
||||
self.cleanup_ref_calls: list[dict[str, object]] = []
|
||||
self.deleted: list[list[str]] = []
|
||||
self.counted: list[list[str]] = []
|
||||
self.delete_result = delete_result or {
|
||||
@ -56,7 +45,7 @@ class FakeRepo:
|
||||
"pause_reasons": 0,
|
||||
}
|
||||
|
||||
def get_runs_batch_by_time_range(
|
||||
def get_cleanup_refs_batch_by_time_range(
|
||||
self,
|
||||
start_from: datetime.datetime | None,
|
||||
end_before: datetime.datetime,
|
||||
@ -65,27 +54,50 @@ class FakeRepo:
|
||||
run_types=None,
|
||||
tenant_ids=None,
|
||||
workflow_ids=None,
|
||||
) -> list[FakeRun]:
|
||||
if self.call_idx >= len(self.batches):
|
||||
upper_bound: tuple[datetime.datetime, str] | None = None,
|
||||
) -> list[WorkflowRunCleanupRef]:
|
||||
self.cleanup_ref_calls.append(
|
||||
{
|
||||
"start_from": start_from,
|
||||
"end_before": end_before,
|
||||
"last_seen": last_seen,
|
||||
"batch_size": batch_size,
|
||||
"run_types": run_types,
|
||||
"tenant_ids": tenant_ids,
|
||||
"workflow_ids": workflow_ids,
|
||||
"upper_bound": upper_bound,
|
||||
}
|
||||
)
|
||||
if tenant_ids is not None or upper_bound is not None:
|
||||
refs = self.last_candidate_batch
|
||||
if tenant_ids is not None:
|
||||
tenant_id_set = set(tenant_ids)
|
||||
refs = [ref for ref in refs if ref.tenant_id in tenant_id_set]
|
||||
if upper_bound is not None:
|
||||
refs = [ref for ref in refs if (ref.created_at, ref.id) <= upper_bound]
|
||||
return refs[:batch_size]
|
||||
|
||||
if self.candidate_call_idx >= len(self.batches):
|
||||
return []
|
||||
batch = self.batches[self.call_idx]
|
||||
self.call_idx += 1
|
||||
batch = self.batches[self.candidate_call_idx]
|
||||
self.candidate_call_idx += 1
|
||||
self.last_candidate_batch = batch
|
||||
return batch
|
||||
|
||||
def delete_runs_with_related(
|
||||
self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None
|
||||
def delete_runs_with_related_by_ids(
|
||||
self, run_ids: list[str], delete_node_executions=None, delete_trigger_logs=None
|
||||
) -> dict[str, int]:
|
||||
self.deleted.append([run.id for run in runs])
|
||||
self.deleted.append(list(run_ids))
|
||||
result = self.delete_result.copy()
|
||||
result["runs"] = len(runs)
|
||||
result["runs"] = len(run_ids)
|
||||
return result
|
||||
|
||||
def count_runs_with_related(
|
||||
self, runs: list[FakeRun], count_node_executions=None, count_trigger_logs=None
|
||||
def count_runs_with_related_by_ids(
|
||||
self, run_ids: list[str], count_node_executions=None, count_trigger_logs=None
|
||||
) -> dict[str, int]:
|
||||
self.counted.append([run.id for run in runs])
|
||||
self.counted.append(list(run_ids))
|
||||
result = self.count_result.copy()
|
||||
result["runs"] = len(runs)
|
||||
result["runs"] = len(run_ids)
|
||||
return result
|
||||
|
||||
|
||||
@ -218,8 +230,8 @@ def test_run_deletes_only_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
repo = FakeRepo(
|
||||
batches=[
|
||||
[
|
||||
FakeRun("run-free", "t_free", cutoff),
|
||||
FakeRun("run-paid", "t_paid", cutoff),
|
||||
make_ref("run-free", "t_free", cutoff),
|
||||
make_ref("run-paid", "t_paid", cutoff),
|
||||
]
|
||||
]
|
||||
)
|
||||
@ -240,11 +252,43 @@ def test_run_deletes_only_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
cleanup.run()
|
||||
|
||||
assert repo.deleted == [["run-free"]]
|
||||
assert repo.cleanup_ref_calls[1]["tenant_ids"] == ["t_free"]
|
||||
|
||||
|
||||
def test_run_filters_candidate_tenants_before_target_query(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FakeRepo(
|
||||
batches=[
|
||||
[
|
||||
make_ref("run-free", "t_free", cutoff),
|
||||
make_ref("run-paid", "t_paid", cutoff),
|
||||
]
|
||||
]
|
||||
)
|
||||
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
|
||||
|
||||
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
|
||||
billing_calls: list[list[str]] = []
|
||||
|
||||
def fake_bulk(tenant_ids: list[str]) -> dict[str, SubscriptionPlan]:
|
||||
billing_calls.append(tenant_ids)
|
||||
return {
|
||||
"t_free": plan_info("sandbox", -1),
|
||||
"t_paid": plan_info("team", -1),
|
||||
}
|
||||
|
||||
monkeypatch.setattr(cleanup_module.BillingService, "get_plan_bulk_with_cache", staticmethod(fake_bulk))
|
||||
|
||||
cleanup.run()
|
||||
|
||||
assert billing_calls == [["t_free", "t_paid"]]
|
||||
assert repo.cleanup_ref_calls[1]["tenant_ids"] == ["t_free"]
|
||||
assert repo.deleted == [["run-free"]]
|
||||
|
||||
|
||||
def test_run_skips_when_no_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FakeRepo(batches=[[FakeRun("run-paid", "t_paid", cutoff)]])
|
||||
repo = FakeRepo(batches=[[make_ref("run-paid", "t_paid", cutoff)]])
|
||||
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
|
||||
|
||||
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
|
||||
@ -257,6 +301,53 @@ def test_run_skips_when_no_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None
|
||||
cleanup.run()
|
||||
|
||||
assert repo.deleted == []
|
||||
assert len(repo.cleanup_ref_calls) == 2
|
||||
|
||||
|
||||
def test_run_paid_only_records_skipped_metrics(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FakeRepo(batches=[[make_ref("run-paid", "t_paid", cutoff)]])
|
||||
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
|
||||
|
||||
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
|
||||
monkeypatch.setattr(
|
||||
cleanup_module.BillingService,
|
||||
"get_plan_bulk_with_cache",
|
||||
staticmethod(lambda tenant_ids: {tenant_id: plan_info("team", 1893456000) for tenant_id in tenant_ids}),
|
||||
)
|
||||
batch_calls: list[dict[str, object]] = []
|
||||
monkeypatch.setattr(cleanup._metrics, "record_batch", lambda **kwargs: batch_calls.append(kwargs))
|
||||
|
||||
cleanup.run()
|
||||
|
||||
assert repo.deleted == []
|
||||
assert repo.counted == []
|
||||
assert batch_calls[0]["batch_rows"] == 1
|
||||
assert batch_calls[0]["targeted_runs"] == 0
|
||||
assert batch_calls[0]["skipped_runs"] == 1
|
||||
assert batch_calls[0]["deleted_runs"] == 0
|
||||
|
||||
|
||||
def test_run_target_query_is_bounded_by_candidate_high_water(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
first_created_at = datetime.datetime(2024, 1, 1, 0, 0, 0)
|
||||
second_created_at = datetime.datetime(2024, 1, 1, 0, 1, 0)
|
||||
repo = FakeRepo(
|
||||
batches=[
|
||||
[
|
||||
make_ref("run-free-1", "t_free", first_created_at),
|
||||
make_ref("run-free-2", "t_free", second_created_at),
|
||||
]
|
||||
]
|
||||
)
|
||||
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=2)
|
||||
|
||||
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
|
||||
|
||||
cleanup.run()
|
||||
|
||||
assert repo.cleanup_ref_calls[1]["last_seen"] is None
|
||||
assert repo.cleanup_ref_calls[1]["upper_bound"] == (second_created_at, "run-free-2")
|
||||
assert repo.cleanup_ref_calls[2]["last_seen"] == (second_created_at, "run-free-2")
|
||||
|
||||
|
||||
def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
@ -268,7 +359,7 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FakeRepo(
|
||||
batches=[[FakeRun("run-free", "t_free", cutoff)]],
|
||||
batches=[[make_ref("run-free", "t_free", cutoff)]],
|
||||
delete_result={
|
||||
"runs": 0,
|
||||
"node_executions": 2,
|
||||
@ -300,13 +391,13 @@ def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None
|
||||
|
||||
def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
class FailingRepo(FakeRepo):
|
||||
def delete_runs_with_related(
|
||||
self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None
|
||||
def delete_runs_with_related_by_ids(
|
||||
self, run_ids: list[str], delete_node_executions=None, delete_trigger_logs=None
|
||||
) -> dict[str, int]:
|
||||
raise RuntimeError("delete failed")
|
||||
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FailingRepo(batches=[[FakeRun("run-free", "t_free", cutoff)]])
|
||||
repo = FailingRepo(batches=[[make_ref("run-free", "t_free", cutoff)]])
|
||||
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
|
||||
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
|
||||
|
||||
@ -323,7 +414,7 @@ def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FakeRepo(
|
||||
batches=[[FakeRun("run-free", "t_free", cutoff)]],
|
||||
batches=[[make_ref("run-free", "t_free", cutoff)]],
|
||||
count_result={
|
||||
"runs": 0,
|
||||
"node_executions": 2,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user