This commit is contained in:
hjlarry 2025-12-11 17:20:52 +08:00
parent 9616c6bb7d
commit 8df80e0992
2 changed files with 93 additions and 122 deletions

View File

@ -5,10 +5,10 @@ from collections.abc import Iterable
import click
from sqlalchemy.orm import sessionmaker
from configs import dify_config
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
from repositories.factory import DifyAPIRepositoryFactory
from services.billing_service import BillingService
logger = logging.getLogger(__name__)
@ -35,9 +35,15 @@ class WorkflowRunCleanup:
self.batch_size = batch_size
self.billing_cache: dict[str, CloudPlan | None] = {}
self.repo = repo or DifyAPIRepositoryFactory.create_api_workflow_run_repository(
sessionmaker(bind=db.engine, expire_on_commit=False)
)
if repo:
self.repo = repo
else:
# Lazy import to avoid circular dependency during module import
from repositories.factory import DifyAPIRepositoryFactory
self.repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(
sessionmaker(bind=db.engine, expire_on_commit=False)
)
def run(self) -> None:
click.echo(
@ -110,6 +116,9 @@ class WorkflowRunCleanup:
click.echo(click.style(summary_message, fg="white"))
def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]:
if not dify_config.BILLING_ENABLED:
return set(tenant_ids)
tenant_id_list = list(tenant_ids)
uncached_tenants = [tenant_id for tenant_id in tenant_id_list if tenant_id not in self.billing_cache]

View File

@ -4,25 +4,53 @@ from typing import Any
import pytest
from services import clear_free_plan_expired_workflow_run_logs as cleanup_module
from services.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup, WorkflowRunRow
from services.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
class DummySession:
def __init__(self) -> None:
self.committed = False
class FakeRun:
def __init__(self, run_id: str, tenant_id: str, created_at: datetime.datetime) -> None:
self.id = run_id
self.tenant_id = tenant_id
self.created_at = created_at
def __enter__(self) -> "DummySession":
return self
def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
return None
class FakeRepo:
def __init__(self, batches: list[list[FakeRun]], delete_result: dict[str, int] | None = None) -> None:
self.batches = batches
self.call_idx = 0
self.deleted: list[list[str]] = []
self.delete_result = delete_result or {
"runs": 0,
"node_executions": 0,
"offloads": 0,
"app_logs": 0,
"trigger_logs": 0,
"pauses": 0,
"pause_reasons": 0,
}
def commit(self) -> None:
self.committed = True
def get_runs_batch_for_cleanup(
self,
start_after: datetime.datetime | None,
end_before: datetime.datetime,
last_seen: tuple[datetime.datetime, str] | None,
batch_size: int,
) -> list[FakeRun]:
if self.call_idx >= len(self.batches):
return []
batch = self.batches[self.call_idx]
self.call_idx += 1
return batch
def delete_runs_with_related(self, run_ids: list[str]) -> dict[str, int]:
self.deleted.append(list(run_ids))
result = self.delete_result.copy()
result["runs"] = len(run_ids)
return result
def test_filter_free_tenants_billing_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup = WorkflowRunCleanup(days=30, batch_size=10)
cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=FakeRepo([]))
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
@ -38,25 +66,24 @@ def test_filter_free_tenants_billing_disabled(monkeypatch: pytest.MonkeyPatch) -
def test_filter_free_tenants_bulk_mixed(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup = WorkflowRunCleanup(days=30, batch_size=10)
cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=FakeRepo([]))
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
# seed cache to avoid relying on billing service implementation
cleanup.billing_cache["t_free"] = cleanup_module.CloudPlan.SANDBOX
cleanup.billing_cache["t_paid"] = cleanup_module.CloudPlan.TEAM
monkeypatch.setattr(
cleanup_module.BillingService,
"get_info_bulk",
staticmethod(lambda tenant_ids: {tenant_id: {} for tenant_id in tenant_ids}),
staticmethod(lambda tenant_ids: dict.fromkeys(tenant_ids, "sandbox")),
)
free = cleanup._filter_free_tenants({"t_free", "t_paid", "t_missing"})
assert free == {"t_free"}
assert free == {"t_free", "t_missing"}
def test_filter_free_tenants_bulk_failure(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup = WorkflowRunCleanup(days=30, batch_size=10)
cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=FakeRepo([]))
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
monkeypatch.setattr(
@ -72,7 +99,15 @@ def test_filter_free_tenants_bulk_failure(monkeypatch: pytest.MonkeyPatch) -> No
def test_run_deletes_only_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None:
cutoff = datetime.datetime.now()
cleanup = WorkflowRunCleanup(days=30, batch_size=10)
repo = FakeRepo(
batches=[
[
FakeRun("run-free", "t_free", cutoff),
FakeRun("run-paid", "t_paid", cutoff),
]
]
)
cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=repo)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
cleanup.billing_cache["t_free"] = cleanup_module.CloudPlan.SANDBOX
@ -80,122 +115,33 @@ def test_run_deletes_only_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
cleanup_module.BillingService,
"get_info_bulk",
staticmethod(lambda tenant_ids: {tenant_id: {} for tenant_id in tenant_ids}),
staticmethod(lambda tenant_ids: dict.fromkeys(tenant_ids, "sandbox")),
)
batches_returned = 0
def fake_load_batch(session: DummySession, last_seen: tuple[datetime.datetime, str] | None) -> list[WorkflowRunRow]:
nonlocal batches_returned
if batches_returned > 0:
return []
batches_returned += 1
return [
WorkflowRunRow(id="run-free", tenant_id="t_free", created_at=cutoff),
WorkflowRunRow(id="run-paid", tenant_id="t_paid", created_at=cutoff),
]
deleted_ids: list[list[str]] = []
def fake_delete_runs(session: DummySession, workflow_run_ids: list[str]) -> dict[str, int]:
deleted_ids.append(list(workflow_run_ids))
return {
"runs": len(workflow_run_ids),
"node_executions": 0,
"offloads": 0,
"app_logs": 0,
"trigger_logs": 0,
"pauses": 0,
"pause_reasons": 0,
}
created_sessions: list[DummySession] = []
def fake_session_factory(engine: object | None = None) -> DummySession:
session = DummySession()
created_sessions.append(session)
return session
monkeypatch.setattr(cleanup, "_load_batch", fake_load_batch)
monkeypatch.setattr(cleanup, "_delete_runs", fake_delete_runs)
monkeypatch.setattr(cleanup_module, "Session", fake_session_factory)
class DummyDB:
engine: object | None = None
monkeypatch.setattr(cleanup_module, "db", DummyDB())
cleanup.run()
assert deleted_ids == [["run-free"]]
assert created_sessions
assert created_sessions[0].committed is True
assert repo.deleted == [["run-free"]]
def test_run_skips_when_no_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None:
cutoff = datetime.datetime.now()
cleanup = WorkflowRunCleanup(days=30, batch_size=10)
repo = FakeRepo(batches=[[FakeRun("run-paid", "t_paid", cutoff)]])
cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=repo)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True)
monkeypatch.setattr(
cleanup_module.BillingService,
"get_info_bulk",
staticmethod(lambda tenant_ids: {tenant_id: {"subscription": {"plan": "TEAM"}} for tenant_id in tenant_ids}),
staticmethod(lambda tenant_ids: dict.fromkeys(tenant_ids, "team")),
)
batches_returned = 0
def fake_load_batch(session: DummySession, last_seen: tuple[datetime.datetime, str] | None) -> list[WorkflowRunRow]:
nonlocal batches_returned
if batches_returned > 0:
return []
batches_returned += 1
return [WorkflowRunRow(id="run-paid", tenant_id="t_paid", created_at=cutoff)]
delete_called = False
def fake_delete_runs(session: DummySession, workflow_run_ids: list[str]) -> dict[str, int]:
nonlocal delete_called
delete_called = True
return {
"runs": 0,
"node_executions": 0,
"offloads": 0,
"app_logs": 0,
"trigger_logs": 0,
"pauses": 0,
"pause_reasons": 0,
}
def fake_session_factory(engine: object | None = None) -> DummySession: # pragma: no cover - simple factory
return DummySession()
monkeypatch.setattr(cleanup, "_load_batch", fake_load_batch)
monkeypatch.setattr(cleanup, "_delete_runs", fake_delete_runs)
monkeypatch.setattr(cleanup_module, "Session", fake_session_factory)
monkeypatch.setattr(cleanup_module, "db", type("DummyDB", (), {"engine": None}))
cleanup.run()
assert delete_called is False
assert repo.deleted == []
def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup = WorkflowRunCleanup(days=30, batch_size=10)
def fake_load_batch(session: DummySession, last_seen: tuple[datetime.datetime, str] | None) -> list[WorkflowRunRow]:
return []
def fake_delete_runs(session: DummySession, workflow_run_ids: list[str]) -> dict[str, int]:
raise AssertionError("should not delete")
def fake_session_factory(engine: object | None = None) -> DummySession: # pragma: no cover - simple factory
return DummySession()
monkeypatch.setattr(cleanup, "_load_batch", fake_load_batch)
monkeypatch.setattr(cleanup, "_delete_runs", fake_delete_runs)
monkeypatch.setattr(cleanup_module, "Session", fake_session_factory)
monkeypatch.setattr(cleanup_module, "db", type("DummyDB", (), {"engine": None}))
def test_run_exits_on_empty_batch() -> None:
cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=FakeRepo([]))
cleanup.run()
@ -203,7 +149,11 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None:
def test_between_sets_window_bounds() -> None:
start_after = datetime.datetime(2024, 5, 1, 0, 0, 0)
end_before = datetime.datetime(2024, 6, 1, 0, 0, 0)
cleanup = WorkflowRunCleanup(days=30, batch_size=10, start_after=start_after, end_before=end_before)
cleanup = WorkflowRunCleanup(days=30,
batch_size=10,
start_after=start_after,
end_before=end_before,
repo=FakeRepo([]))
assert cleanup.window_start == start_after
assert cleanup.window_end == end_before
@ -211,13 +161,25 @@ def test_between_sets_window_bounds() -> None:
def test_between_requires_both_boundaries() -> None:
with pytest.raises(ValueError):
WorkflowRunCleanup(days=30, batch_size=10, start_after=datetime.datetime.now(), end_before=None)
WorkflowRunCleanup(
days=30,
batch_size=10,
start_after=datetime.datetime.now(),
end_before=None,
repo=FakeRepo([])
)
with pytest.raises(ValueError):
WorkflowRunCleanup(days=30, batch_size=10, start_after=None, end_before=datetime.datetime.now())
WorkflowRunCleanup(
days=30,
batch_size=10,
start_after=None,
end_before=datetime.datetime.now(),
repo=FakeRepo([])
)
def test_between_requires_end_after_start() -> None:
start_after = datetime.datetime(2024, 6, 1, 0, 0, 0)
end_before = datetime.datetime(2024, 5, 1, 0, 0, 0)
with pytest.raises(ValueError):
WorkflowRunCleanup(days=30, batch_size=10, start_after=start_after, end_before=end_before)
WorkflowRunCleanup(days=30, batch_size=10, start_after=start_after, end_before=end_before, repo=FakeRepo([]))