diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 1a2a539c80..100589804c 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -36,7 +36,7 @@ Example: from collections.abc import Callable, Sequence from datetime import datetime -from typing import Protocol +from typing import Protocol, TypedDict from graphon.entities.pause_reason import PauseReason from graphon.enums import WorkflowType @@ -55,6 +55,16 @@ from repositories.types import ( ) +class RunsWithRelatedCountsDict(TypedDict): + runs: int + node_executions: int + offloads: int + app_logs: int + trigger_logs: int + pauses: int + pause_reasons: int + + class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): """ Protocol for service-layer WorkflowRun repository operations. @@ -333,7 +343,7 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): runs: Sequence[WorkflowRun], delete_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None, delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None, - ) -> dict[str, int]: + ) -> RunsWithRelatedCountsDict: """ Delete workflow runs and their related records (node executions, offloads, app logs, trigger logs, pauses, pause reasons). @@ -400,7 +410,7 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): runs: Sequence[WorkflowRun], count_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None, count_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None, - ) -> dict[str, int]: + ) -> RunsWithRelatedCountsDict: """ Count workflow runs and their related records (node executions, offloads, app logs, trigger logs, pauses, pause reasons) without deleting data. diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index 413936b542..9267be2636 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -45,7 +45,7 @@ from libs.uuid_utils import uuidv7 from models.enums import WorkflowRunTriggeredFrom from models.human_input import HumanInputForm from models.workflow import WorkflowAppLog, WorkflowArchiveLog, WorkflowPause, WorkflowPauseReason, WorkflowRun -from repositories.api_workflow_run_repository import APIWorkflowRunRepository +from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict from repositories.entities.workflow_pause import WorkflowPauseEntity from repositories.types import ( AverageInteractionStats, @@ -463,7 +463,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): runs: Sequence[WorkflowRun], delete_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None, delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None, - ) -> dict[str, int]: + ) -> RunsWithRelatedCountsDict: if not runs: return { "runs": 0, @@ -638,7 +638,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): runs: Sequence[WorkflowRun], count_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None, count_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None, - ) -> dict[str, int]: + ) -> RunsWithRelatedCountsDict: if not runs: return { "runs": 0, diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py index 62bc9f5f10..021fa61d96 100644 --- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py @@ -173,6 +173,9 @@ class WorkflowRunCleanupMetrics: self._record(self._job_duration_seconds, job_duration_seconds, attributes) +_RELATED_RECORD_KEYS = ("node_executions", "offloads", "app_logs", "trigger_logs", "pauses", "pause_reasons") + + class WorkflowRunCleanup: def __init__( self, @@ -312,8 +315,8 @@ class WorkflowRunCleanup: int((time.monotonic() - count_start) * 1000), ) if related_totals is not None: - for key in related_totals: - related_totals[key] += batch_counts.get(key, 0) + for k in _RELATED_RECORD_KEYS: + related_totals[k] += batch_counts.get(k, 0) # type: ignore[literal-required,operator] sample_ids = ", ".join(run.id for run in free_runs[:5]) click.echo( click.style( @@ -332,7 +335,10 @@ class WorkflowRunCleanup: targeted_runs=len(free_runs), skipped_runs=paid_or_skipped, deleted_runs=0, - related_counts={key: batch_counts.get(key, 0) for key in self._empty_related_counts()}, + related_counts={ + k: batch_counts[k] # type: ignore[literal-required] + for k in _RELATED_RECORD_KEYS + }, related_action="would_delete", batch_duration_seconds=time.monotonic() - batch_start, ) @@ -372,7 +378,10 @@ class WorkflowRunCleanup: targeted_runs=len(free_runs), skipped_runs=paid_or_skipped, deleted_runs=counts["runs"], - related_counts={key: counts.get(key, 0) for key in self._empty_related_counts()}, + related_counts={ + k: counts[k] # type: ignore[literal-required] + for k in _RELATED_RECORD_KEYS + }, related_action="deleted", batch_duration_seconds=time.monotonic() - batch_start, ) diff --git a/api/services/retention/workflow_run/delete_archived_workflow_run.py b/api/services/retention/workflow_run/delete_archived_workflow_run.py index 11873bf1b9..937a106710 100644 --- a/api/services/retention/workflow_run/delete_archived_workflow_run.py +++ b/api/services/retention/workflow_run/delete_archived_workflow_run.py @@ -14,7 +14,7 @@ from sqlalchemy.orm import Session, sessionmaker from extensions.ext_database import db from models.workflow import WorkflowRun -from repositories.api_workflow_run_repository import APIWorkflowRunRepository +from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository @@ -23,7 +23,17 @@ class DeleteResult: run_id: str tenant_id: str success: bool - deleted_counts: dict[str, int] = field(default_factory=dict) + deleted_counts: RunsWithRelatedCountsDict = field( + default_factory=lambda: { # type: ignore[assignment] + "runs": 0, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } + ) error: str | None = None elapsed_time: float = 0.0