refactor(api): type workflow run delete/count results with RunsWithRelatedCountsDict TypedDict (#34531)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
YBoy 2026-04-05 18:11:41 +02:00 committed by GitHub
parent ee87289917
commit b71b9f80b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 41 additions and 12 deletions

View File

@ -36,7 +36,7 @@ Example:
from collections.abc import Callable, Sequence
from datetime import datetime
from typing import Protocol
from typing import Protocol, TypedDict
from graphon.entities.pause_reason import PauseReason
from graphon.enums import WorkflowType
@ -55,6 +55,16 @@ from repositories.types import (
)
class RunsWithRelatedCountsDict(TypedDict):
runs: int
node_executions: int
offloads: int
app_logs: int
trigger_logs: int
pauses: int
pause_reasons: int
class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
"""
Protocol for service-layer WorkflowRun repository operations.
@ -333,7 +343,7 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
runs: Sequence[WorkflowRun],
delete_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None,
delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
) -> dict[str, int]:
) -> RunsWithRelatedCountsDict:
"""
Delete workflow runs and their related records (node executions, offloads, app logs,
trigger logs, pauses, pause reasons).
@ -400,7 +410,7 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
runs: Sequence[WorkflowRun],
count_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None,
count_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
) -> dict[str, int]:
) -> RunsWithRelatedCountsDict:
"""
Count workflow runs and their related records (node executions, offloads, app logs,
trigger logs, pauses, pause reasons) without deleting data.

View File

@ -45,7 +45,7 @@ from libs.uuid_utils import uuidv7
from models.enums import WorkflowRunTriggeredFrom
from models.human_input import HumanInputForm
from models.workflow import WorkflowAppLog, WorkflowArchiveLog, WorkflowPause, WorkflowPauseReason, WorkflowRun
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict
from repositories.entities.workflow_pause import WorkflowPauseEntity
from repositories.types import (
AverageInteractionStats,
@ -463,7 +463,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
runs: Sequence[WorkflowRun],
delete_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None,
delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
) -> dict[str, int]:
) -> RunsWithRelatedCountsDict:
if not runs:
return {
"runs": 0,
@ -638,7 +638,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
runs: Sequence[WorkflowRun],
count_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None,
count_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None,
) -> dict[str, int]:
) -> RunsWithRelatedCountsDict:
if not runs:
return {
"runs": 0,

View File

@ -173,6 +173,9 @@ class WorkflowRunCleanupMetrics:
self._record(self._job_duration_seconds, job_duration_seconds, attributes)
_RELATED_RECORD_KEYS = ("node_executions", "offloads", "app_logs", "trigger_logs", "pauses", "pause_reasons")
class WorkflowRunCleanup:
def __init__(
self,
@ -312,8 +315,8 @@ class WorkflowRunCleanup:
int((time.monotonic() - count_start) * 1000),
)
if related_totals is not None:
for key in related_totals:
related_totals[key] += batch_counts.get(key, 0)
for k in _RELATED_RECORD_KEYS:
related_totals[k] += batch_counts.get(k, 0) # type: ignore[literal-required,operator]
sample_ids = ", ".join(run.id for run in free_runs[:5])
click.echo(
click.style(
@ -332,7 +335,10 @@ class WorkflowRunCleanup:
targeted_runs=len(free_runs),
skipped_runs=paid_or_skipped,
deleted_runs=0,
related_counts={key: batch_counts.get(key, 0) for key in self._empty_related_counts()},
related_counts={
k: batch_counts[k] # type: ignore[literal-required]
for k in _RELATED_RECORD_KEYS
},
related_action="would_delete",
batch_duration_seconds=time.monotonic() - batch_start,
)
@ -372,7 +378,10 @@ class WorkflowRunCleanup:
targeted_runs=len(free_runs),
skipped_runs=paid_or_skipped,
deleted_runs=counts["runs"],
related_counts={key: counts.get(key, 0) for key in self._empty_related_counts()},
related_counts={
k: counts[k] # type: ignore[literal-required]
for k in _RELATED_RECORD_KEYS
},
related_action="deleted",
batch_duration_seconds=time.monotonic() - batch_start,
)

View File

@ -14,7 +14,7 @@ from sqlalchemy.orm import Session, sessionmaker
from extensions.ext_database import db
from models.workflow import WorkflowRun
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
@ -23,7 +23,17 @@ class DeleteResult:
run_id: str
tenant_id: str
success: bool
deleted_counts: dict[str, int] = field(default_factory=dict)
deleted_counts: RunsWithRelatedCountsDict = field(
default_factory=lambda: { # type: ignore[assignment]
"runs": 0,
"node_executions": 0,
"offloads": 0,
"app_logs": 0,
"trigger_logs": 0,
"pauses": 0,
"pause_reasons": 0,
}
)
error: str | None = None
elapsed_time: float = 0.0