From 8df80e0992e69826d7bf57a21a9f0688b1e65292 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Thu, 11 Dec 2025 17:20:52 +0800 Subject: [PATCH] fix CI --- ...ear_free_plan_expired_workflow_run_logs.py | 17 +- ...ear_free_plan_expired_workflow_run_logs.py | 198 +++++++----------- 2 files changed, 93 insertions(+), 122 deletions(-) diff --git a/api/services/clear_free_plan_expired_workflow_run_logs.py b/api/services/clear_free_plan_expired_workflow_run_logs.py index 68ba7787d3..922d0ab43c 100644 --- a/api/services/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/clear_free_plan_expired_workflow_run_logs.py @@ -5,10 +5,10 @@ from collections.abc import Iterable import click from sqlalchemy.orm import sessionmaker +from configs import dify_config from enums.cloud_plan import CloudPlan from extensions.ext_database import db from repositories.api_workflow_run_repository import APIWorkflowRunRepository -from repositories.factory import DifyAPIRepositoryFactory from services.billing_service import BillingService logger = logging.getLogger(__name__) @@ -35,9 +35,15 @@ class WorkflowRunCleanup: self.batch_size = batch_size self.billing_cache: dict[str, CloudPlan | None] = {} - self.repo = repo or DifyAPIRepositoryFactory.create_api_workflow_run_repository( - sessionmaker(bind=db.engine, expire_on_commit=False) - ) + if repo: + self.repo = repo + else: + # Lazy import to avoid circular dependency during module import + from repositories.factory import DifyAPIRepositoryFactory + + self.repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository( + sessionmaker(bind=db.engine, expire_on_commit=False) + ) def run(self) -> None: click.echo( @@ -110,6 +116,9 @@ class WorkflowRunCleanup: click.echo(click.style(summary_message, fg="white")) def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]: + if not dify_config.BILLING_ENABLED: + return set(tenant_ids) + tenant_id_list = list(tenant_ids) uncached_tenants = [tenant_id for tenant_id in tenant_id_list if tenant_id not in self.billing_cache] diff --git a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py index 57fb8d4e3f..29aefc5014 100644 --- a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py +++ b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py @@ -4,25 +4,53 @@ from typing import Any import pytest from services import clear_free_plan_expired_workflow_run_logs as cleanup_module -from services.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup, WorkflowRunRow +from services.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup -class DummySession: - def __init__(self) -> None: - self.committed = False +class FakeRun: + def __init__(self, run_id: str, tenant_id: str, created_at: datetime.datetime) -> None: + self.id = run_id + self.tenant_id = tenant_id + self.created_at = created_at - def __enter__(self) -> "DummySession": - return self - def __exit__(self, exc_type: object, exc: object, tb: object) -> None: - return None +class FakeRepo: + def __init__(self, batches: list[list[FakeRun]], delete_result: dict[str, int] | None = None) -> None: + self.batches = batches + self.call_idx = 0 + self.deleted: list[list[str]] = [] + self.delete_result = delete_result or { + "runs": 0, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } - def commit(self) -> None: - self.committed = True + def get_runs_batch_for_cleanup( + self, + start_after: datetime.datetime | None, + end_before: datetime.datetime, + last_seen: tuple[datetime.datetime, str] | None, + batch_size: int, + ) -> list[FakeRun]: + if self.call_idx >= len(self.batches): + return [] + batch = self.batches[self.call_idx] + self.call_idx += 1 + return batch + + def delete_runs_with_related(self, run_ids: list[str]) -> dict[str, int]: + self.deleted.append(list(run_ids)) + result = self.delete_result.copy() + result["runs"] = len(run_ids) + return result def test_filter_free_tenants_billing_disabled(monkeypatch: pytest.MonkeyPatch) -> None: - cleanup = WorkflowRunCleanup(days=30, batch_size=10) + cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=FakeRepo([])) monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False) @@ -38,25 +66,24 @@ def test_filter_free_tenants_billing_disabled(monkeypatch: pytest.MonkeyPatch) - def test_filter_free_tenants_bulk_mixed(monkeypatch: pytest.MonkeyPatch) -> None: - cleanup = WorkflowRunCleanup(days=30, batch_size=10) + cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=FakeRepo([])) monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) - # seed cache to avoid relying on billing service implementation cleanup.billing_cache["t_free"] = cleanup_module.CloudPlan.SANDBOX cleanup.billing_cache["t_paid"] = cleanup_module.CloudPlan.TEAM monkeypatch.setattr( cleanup_module.BillingService, "get_info_bulk", - staticmethod(lambda tenant_ids: {tenant_id: {} for tenant_id in tenant_ids}), + staticmethod(lambda tenant_ids: dict.fromkeys(tenant_ids, "sandbox")), ) free = cleanup._filter_free_tenants({"t_free", "t_paid", "t_missing"}) - assert free == {"t_free"} + assert free == {"t_free", "t_missing"} def test_filter_free_tenants_bulk_failure(monkeypatch: pytest.MonkeyPatch) -> None: - cleanup = WorkflowRunCleanup(days=30, batch_size=10) + cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=FakeRepo([])) monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) monkeypatch.setattr( @@ -72,7 +99,15 @@ def test_filter_free_tenants_bulk_failure(monkeypatch: pytest.MonkeyPatch) -> No def test_run_deletes_only_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None: cutoff = datetime.datetime.now() - cleanup = WorkflowRunCleanup(days=30, batch_size=10) + repo = FakeRepo( + batches=[ + [ + FakeRun("run-free", "t_free", cutoff), + FakeRun("run-paid", "t_paid", cutoff), + ] + ] + ) + cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=repo) monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) cleanup.billing_cache["t_free"] = cleanup_module.CloudPlan.SANDBOX @@ -80,122 +115,33 @@ def test_run_deletes_only_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr( cleanup_module.BillingService, "get_info_bulk", - staticmethod(lambda tenant_ids: {tenant_id: {} for tenant_id in tenant_ids}), + staticmethod(lambda tenant_ids: dict.fromkeys(tenant_ids, "sandbox")), ) - batches_returned = 0 - - def fake_load_batch(session: DummySession, last_seen: tuple[datetime.datetime, str] | None) -> list[WorkflowRunRow]: - nonlocal batches_returned - if batches_returned > 0: - return [] - batches_returned += 1 - return [ - WorkflowRunRow(id="run-free", tenant_id="t_free", created_at=cutoff), - WorkflowRunRow(id="run-paid", tenant_id="t_paid", created_at=cutoff), - ] - - deleted_ids: list[list[str]] = [] - - def fake_delete_runs(session: DummySession, workflow_run_ids: list[str]) -> dict[str, int]: - deleted_ids.append(list(workflow_run_ids)) - return { - "runs": len(workflow_run_ids), - "node_executions": 0, - "offloads": 0, - "app_logs": 0, - "trigger_logs": 0, - "pauses": 0, - "pause_reasons": 0, - } - - created_sessions: list[DummySession] = [] - - def fake_session_factory(engine: object | None = None) -> DummySession: - session = DummySession() - created_sessions.append(session) - return session - - monkeypatch.setattr(cleanup, "_load_batch", fake_load_batch) - monkeypatch.setattr(cleanup, "_delete_runs", fake_delete_runs) - monkeypatch.setattr(cleanup_module, "Session", fake_session_factory) - - class DummyDB: - engine: object | None = None - - monkeypatch.setattr(cleanup_module, "db", DummyDB()) - cleanup.run() - assert deleted_ids == [["run-free"]] - assert created_sessions - assert created_sessions[0].committed is True + assert repo.deleted == [["run-free"]] def test_run_skips_when_no_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None: cutoff = datetime.datetime.now() - cleanup = WorkflowRunCleanup(days=30, batch_size=10) + repo = FakeRepo(batches=[[FakeRun("run-paid", "t_paid", cutoff)]]) + cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=repo) monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) monkeypatch.setattr( cleanup_module.BillingService, "get_info_bulk", - staticmethod(lambda tenant_ids: {tenant_id: {"subscription": {"plan": "TEAM"}} for tenant_id in tenant_ids}), + staticmethod(lambda tenant_ids: dict.fromkeys(tenant_ids, "team")), ) - batches_returned = 0 - - def fake_load_batch(session: DummySession, last_seen: tuple[datetime.datetime, str] | None) -> list[WorkflowRunRow]: - nonlocal batches_returned - if batches_returned > 0: - return [] - batches_returned += 1 - return [WorkflowRunRow(id="run-paid", tenant_id="t_paid", created_at=cutoff)] - - delete_called = False - - def fake_delete_runs(session: DummySession, workflow_run_ids: list[str]) -> dict[str, int]: - nonlocal delete_called - delete_called = True - return { - "runs": 0, - "node_executions": 0, - "offloads": 0, - "app_logs": 0, - "trigger_logs": 0, - "pauses": 0, - "pause_reasons": 0, - } - - def fake_session_factory(engine: object | None = None) -> DummySession: # pragma: no cover - simple factory - return DummySession() - - monkeypatch.setattr(cleanup, "_load_batch", fake_load_batch) - monkeypatch.setattr(cleanup, "_delete_runs", fake_delete_runs) - monkeypatch.setattr(cleanup_module, "Session", fake_session_factory) - monkeypatch.setattr(cleanup_module, "db", type("DummyDB", (), {"engine": None})) - cleanup.run() - assert delete_called is False + assert repo.deleted == [] -def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None: - cleanup = WorkflowRunCleanup(days=30, batch_size=10) - - def fake_load_batch(session: DummySession, last_seen: tuple[datetime.datetime, str] | None) -> list[WorkflowRunRow]: - return [] - - def fake_delete_runs(session: DummySession, workflow_run_ids: list[str]) -> dict[str, int]: - raise AssertionError("should not delete") - - def fake_session_factory(engine: object | None = None) -> DummySession: # pragma: no cover - simple factory - return DummySession() - - monkeypatch.setattr(cleanup, "_load_batch", fake_load_batch) - monkeypatch.setattr(cleanup, "_delete_runs", fake_delete_runs) - monkeypatch.setattr(cleanup_module, "Session", fake_session_factory) - monkeypatch.setattr(cleanup_module, "db", type("DummyDB", (), {"engine": None})) +def test_run_exits_on_empty_batch() -> None: + cleanup = WorkflowRunCleanup(days=30, batch_size=10, repo=FakeRepo([])) cleanup.run() @@ -203,7 +149,11 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None: def test_between_sets_window_bounds() -> None: start_after = datetime.datetime(2024, 5, 1, 0, 0, 0) end_before = datetime.datetime(2024, 6, 1, 0, 0, 0) - cleanup = WorkflowRunCleanup(days=30, batch_size=10, start_after=start_after, end_before=end_before) + cleanup = WorkflowRunCleanup(days=30, + batch_size=10, + start_after=start_after, + end_before=end_before, + repo=FakeRepo([])) assert cleanup.window_start == start_after assert cleanup.window_end == end_before @@ -211,13 +161,25 @@ def test_between_sets_window_bounds() -> None: def test_between_requires_both_boundaries() -> None: with pytest.raises(ValueError): - WorkflowRunCleanup(days=30, batch_size=10, start_after=datetime.datetime.now(), end_before=None) + WorkflowRunCleanup( + days=30, + batch_size=10, + start_after=datetime.datetime.now(), + end_before=None, + repo=FakeRepo([]) + ) with pytest.raises(ValueError): - WorkflowRunCleanup(days=30, batch_size=10, start_after=None, end_before=datetime.datetime.now()) + WorkflowRunCleanup( + days=30, + batch_size=10, + start_after=None, + end_before=datetime.datetime.now(), + repo=FakeRepo([]) + ) def test_between_requires_end_after_start() -> None: start_after = datetime.datetime(2024, 6, 1, 0, 0, 0) end_before = datetime.datetime(2024, 5, 1, 0, 0, 0) with pytest.raises(ValueError): - WorkflowRunCleanup(days=30, batch_size=10, start_after=start_after, end_before=end_before) + WorkflowRunCleanup(days=30, batch_size=10, start_after=start_after, end_before=end_before, repo=FakeRepo([]))