From b25d379ef444bf0c851a1e08f9840dc6bcf30e85 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Thu, 30 Oct 2025 16:33:28 +0800 Subject: [PATCH] cleanup --- ...alchemy_workflow_trigger_log_repository.py | 118 +----------------- .../workflow_trigger_log_repository.py | 96 -------------- 2 files changed, 3 insertions(+), 211 deletions(-) diff --git a/api/repositories/sqlalchemy_workflow_trigger_log_repository.py b/api/repositories/sqlalchemy_workflow_trigger_log_repository.py index f0ccc0a2ba..8637f46b06 100644 --- a/api/repositories/sqlalchemy_workflow_trigger_log_repository.py +++ b/api/repositories/sqlalchemy_workflow_trigger_log_repository.py @@ -4,14 +4,14 @@ SQLAlchemy implementation of WorkflowTriggerLogRepository. from collections.abc import Sequence from datetime import UTC, datetime, timedelta -from typing import Any, Optional +from typing import Optional -from sqlalchemy import and_, delete, func, select, update +from sqlalchemy import and_, select from sqlalchemy.orm import Session from models.enums import WorkflowTriggerStatus from models.trigger import WorkflowTriggerLog -from repositories.workflow_trigger_log_repository import TriggerLogOrderBy, WorkflowTriggerLogRepository +from repositories.workflow_trigger_log_repository import WorkflowTriggerLogRepository class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository): @@ -45,37 +45,6 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository): return self.session.scalar(query) - def get_by_status( - self, - tenant_id: str, - app_id: str, - status: WorkflowTriggerStatus, - limit: int = 100, - offset: int = 0, - order_by: TriggerLogOrderBy = TriggerLogOrderBy.CREATED_AT, - order_desc: bool = True, - ) -> Sequence[WorkflowTriggerLog]: - """Get trigger logs by status with pagination.""" - query = select(WorkflowTriggerLog).where( - and_( - WorkflowTriggerLog.tenant_id == tenant_id, - WorkflowTriggerLog.app_id == app_id, - WorkflowTriggerLog.status == status, - ) - ) - - # Apply ordering - order_column = getattr(WorkflowTriggerLog, order_by.value) - if order_desc: - query = query.order_by(order_column.desc()) - else: - query = query.order_by(order_column.asc()) - - # Apply pagination - query = query.limit(limit).offset(offset) - - return list(self.session.scalars(query).all()) - def get_failed_for_retry( self, tenant_id: str, max_retry_count: int = 3, limit: int = 100 ) -> Sequence[WorkflowTriggerLog]: @@ -116,84 +85,3 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository): ) return list(self.session.scalars(query).all()) - - def count_by_status( - self, - tenant_id: str, - app_id: str, - status: Optional[WorkflowTriggerStatus] = None, - since: Optional[datetime] = None, - ) -> int: - """Count trigger logs by status.""" - query = select(func.count(WorkflowTriggerLog.id)).where( - and_(WorkflowTriggerLog.tenant_id == tenant_id, WorkflowTriggerLog.app_id == app_id) - ) - - if status: - query = query.where(WorkflowTriggerLog.status == status) - - if since: - query = query.where(WorkflowTriggerLog.created_at >= since) - - return self.session.scalar(query) or 0 - - def delete_expired_logs(self, tenant_id: str, before_date: datetime, batch_size: int = 1000) -> int: - """Delete expired trigger logs in batches.""" - total_deleted = 0 - - while True: - # Get batch of IDs to delete - subquery = ( - select(WorkflowTriggerLog.id) - .where(and_(WorkflowTriggerLog.tenant_id == tenant_id, WorkflowTriggerLog.created_at < before_date)) - .limit(batch_size) - ) - - # Delete the batch - result = self.session.execute(delete(WorkflowTriggerLog).where(WorkflowTriggerLog.id.in_(subquery))) - - deleted = result.rowcount - total_deleted += deleted - - if deleted < batch_size: - break - - self.session.commit() - - return total_deleted - - def archive_completed_logs( - self, tenant_id: str, before_date: datetime, batch_size: int = 1000 - ) -> Sequence[WorkflowTriggerLog]: - """Get completed logs for archival.""" - query = ( - select(WorkflowTriggerLog) - .where( - and_( - WorkflowTriggerLog.tenant_id == tenant_id, - WorkflowTriggerLog.status == WorkflowTriggerStatus.SUCCEEDED, - WorkflowTriggerLog.finished_at < before_date, - ) - ) - .limit(batch_size) - ) - - return list(self.session.scalars(query).all()) - - def update_status_batch( - self, trigger_log_ids: Sequence[str], new_status: WorkflowTriggerStatus, error_message: Optional[str] = None - ) -> int: - """Update status for multiple trigger logs.""" - update_data: dict[str, Any] = {"status": new_status} - - if error_message is not None: - update_data["error"] = error_message - - if new_status in [WorkflowTriggerStatus.SUCCEEDED, WorkflowTriggerStatus.FAILED]: - update_data["finished_at"] = datetime.now(UTC) - - result = self.session.execute( - update(WorkflowTriggerLog).where(WorkflowTriggerLog.id.in_(trigger_log_ids)).values(**update_data) - ) - - return result.rowcount diff --git a/api/repositories/workflow_trigger_log_repository.py b/api/repositories/workflow_trigger_log_repository.py index 1c026c2f5d..495f8cb941 100644 --- a/api/repositories/workflow_trigger_log_repository.py +++ b/api/repositories/workflow_trigger_log_repository.py @@ -7,11 +7,9 @@ proper indexing and batch operations. """ from collections.abc import Sequence -from datetime import datetime from enum import StrEnum from typing import Optional, Protocol -from models.enums import WorkflowTriggerStatus from models.trigger import WorkflowTriggerLog @@ -78,33 +76,6 @@ class WorkflowTriggerLogRepository(Protocol): """ ... - def get_by_status( - self, - tenant_id: str, - app_id: str, - status: WorkflowTriggerStatus, - limit: int = 100, - offset: int = 0, - order_by: TriggerLogOrderBy = TriggerLogOrderBy.CREATED_AT, - order_desc: bool = True, - ) -> Sequence[WorkflowTriggerLog]: - """ - Get trigger logs by status with pagination. - - Args: - tenant_id: The tenant identifier - app_id: The application identifier - status: The workflow trigger status to filter by - limit: Maximum number of results - offset: Number of results to skip - order_by: Field to order results by - order_desc: Whether to order descending (True) or ascending (False) - - Returns: - A sequence of WorkflowTriggerLog instances - """ - ... - def get_failed_for_retry( self, tenant_id: str, max_retry_count: int = 3, limit: int = 100 ) -> Sequence[WorkflowTriggerLog]: @@ -138,70 +109,3 @@ class WorkflowTriggerLogRepository(Protocol): A sequence of recent WorkflowTriggerLog instances """ ... - - def count_by_status( - self, - tenant_id: str, - app_id: str, - status: Optional[WorkflowTriggerStatus] = None, - since: Optional[datetime] = None, - ) -> int: - """ - Count trigger logs by status. - - Args: - tenant_id: The tenant identifier - app_id: The application identifier - status: Optional status filter - since: Optional datetime to count from - - Returns: - Count of matching trigger logs - """ - ... - - def delete_expired_logs(self, tenant_id: str, before_date: datetime, batch_size: int = 1000) -> int: - """ - Delete expired trigger logs in batches. - - Args: - tenant_id: The tenant identifier - before_date: Delete logs created before this date - batch_size: Number of logs to delete per batch - - Returns: - Total number of logs deleted - """ - ... - - def archive_completed_logs( - self, tenant_id: str, before_date: datetime, batch_size: int = 1000 - ) -> Sequence[WorkflowTriggerLog]: - """ - Get completed logs for archival before deletion. - - Args: - tenant_id: The tenant identifier - before_date: Get logs completed before this date - batch_size: Number of logs to retrieve - - Returns: - A sequence of WorkflowTriggerLog instances for archival - """ - ... - - def update_status_batch( - self, trigger_log_ids: Sequence[str], new_status: WorkflowTriggerStatus, error_message: Optional[str] = None - ) -> int: - """ - Update status for multiple trigger logs at once. - - Args: - trigger_log_ids: List of trigger log IDs to update - new_status: The new status to set - error_message: Optional error message to set - - Returns: - Number of logs updated - """ - ...