This commit is contained in:
Yeuoly 2025-10-30 16:33:28 +08:00
parent e1e95f7ccd
commit b25d379ef4
2 changed files with 3 additions and 211 deletions

View File

@ -4,14 +4,14 @@ SQLAlchemy implementation of WorkflowTriggerLogRepository.
from collections.abc import Sequence
from datetime import UTC, datetime, timedelta
from typing import Any, Optional
from typing import Optional
from sqlalchemy import and_, delete, func, select, update
from sqlalchemy import and_, select
from sqlalchemy.orm import Session
from models.enums import WorkflowTriggerStatus
from models.trigger import WorkflowTriggerLog
from repositories.workflow_trigger_log_repository import TriggerLogOrderBy, WorkflowTriggerLogRepository
from repositories.workflow_trigger_log_repository import WorkflowTriggerLogRepository
class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository):
@ -45,37 +45,6 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository):
return self.session.scalar(query)
def get_by_status(
self,
tenant_id: str,
app_id: str,
status: WorkflowTriggerStatus,
limit: int = 100,
offset: int = 0,
order_by: TriggerLogOrderBy = TriggerLogOrderBy.CREATED_AT,
order_desc: bool = True,
) -> Sequence[WorkflowTriggerLog]:
"""Get trigger logs by status with pagination."""
query = select(WorkflowTriggerLog).where(
and_(
WorkflowTriggerLog.tenant_id == tenant_id,
WorkflowTriggerLog.app_id == app_id,
WorkflowTriggerLog.status == status,
)
)
# Apply ordering
order_column = getattr(WorkflowTriggerLog, order_by.value)
if order_desc:
query = query.order_by(order_column.desc())
else:
query = query.order_by(order_column.asc())
# Apply pagination
query = query.limit(limit).offset(offset)
return list(self.session.scalars(query).all())
def get_failed_for_retry(
self, tenant_id: str, max_retry_count: int = 3, limit: int = 100
) -> Sequence[WorkflowTriggerLog]:
@ -116,84 +85,3 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository):
)
return list(self.session.scalars(query).all())
def count_by_status(
self,
tenant_id: str,
app_id: str,
status: Optional[WorkflowTriggerStatus] = None,
since: Optional[datetime] = None,
) -> int:
"""Count trigger logs by status."""
query = select(func.count(WorkflowTriggerLog.id)).where(
and_(WorkflowTriggerLog.tenant_id == tenant_id, WorkflowTriggerLog.app_id == app_id)
)
if status:
query = query.where(WorkflowTriggerLog.status == status)
if since:
query = query.where(WorkflowTriggerLog.created_at >= since)
return self.session.scalar(query) or 0
def delete_expired_logs(self, tenant_id: str, before_date: datetime, batch_size: int = 1000) -> int:
"""Delete expired trigger logs in batches."""
total_deleted = 0
while True:
# Get batch of IDs to delete
subquery = (
select(WorkflowTriggerLog.id)
.where(and_(WorkflowTriggerLog.tenant_id == tenant_id, WorkflowTriggerLog.created_at < before_date))
.limit(batch_size)
)
# Delete the batch
result = self.session.execute(delete(WorkflowTriggerLog).where(WorkflowTriggerLog.id.in_(subquery)))
deleted = result.rowcount
total_deleted += deleted
if deleted < batch_size:
break
self.session.commit()
return total_deleted
def archive_completed_logs(
self, tenant_id: str, before_date: datetime, batch_size: int = 1000
) -> Sequence[WorkflowTriggerLog]:
"""Get completed logs for archival."""
query = (
select(WorkflowTriggerLog)
.where(
and_(
WorkflowTriggerLog.tenant_id == tenant_id,
WorkflowTriggerLog.status == WorkflowTriggerStatus.SUCCEEDED,
WorkflowTriggerLog.finished_at < before_date,
)
)
.limit(batch_size)
)
return list(self.session.scalars(query).all())
def update_status_batch(
self, trigger_log_ids: Sequence[str], new_status: WorkflowTriggerStatus, error_message: Optional[str] = None
) -> int:
"""Update status for multiple trigger logs."""
update_data: dict[str, Any] = {"status": new_status}
if error_message is not None:
update_data["error"] = error_message
if new_status in [WorkflowTriggerStatus.SUCCEEDED, WorkflowTriggerStatus.FAILED]:
update_data["finished_at"] = datetime.now(UTC)
result = self.session.execute(
update(WorkflowTriggerLog).where(WorkflowTriggerLog.id.in_(trigger_log_ids)).values(**update_data)
)
return result.rowcount

View File

@ -7,11 +7,9 @@ proper indexing and batch operations.
"""
from collections.abc import Sequence
from datetime import datetime
from enum import StrEnum
from typing import Optional, Protocol
from models.enums import WorkflowTriggerStatus
from models.trigger import WorkflowTriggerLog
@ -78,33 +76,6 @@ class WorkflowTriggerLogRepository(Protocol):
"""
...
def get_by_status(
self,
tenant_id: str,
app_id: str,
status: WorkflowTriggerStatus,
limit: int = 100,
offset: int = 0,
order_by: TriggerLogOrderBy = TriggerLogOrderBy.CREATED_AT,
order_desc: bool = True,
) -> Sequence[WorkflowTriggerLog]:
"""
Get trigger logs by status with pagination.
Args:
tenant_id: The tenant identifier
app_id: The application identifier
status: The workflow trigger status to filter by
limit: Maximum number of results
offset: Number of results to skip
order_by: Field to order results by
order_desc: Whether to order descending (True) or ascending (False)
Returns:
A sequence of WorkflowTriggerLog instances
"""
...
def get_failed_for_retry(
self, tenant_id: str, max_retry_count: int = 3, limit: int = 100
) -> Sequence[WorkflowTriggerLog]:
@ -138,70 +109,3 @@ class WorkflowTriggerLogRepository(Protocol):
A sequence of recent WorkflowTriggerLog instances
"""
...
def count_by_status(
self,
tenant_id: str,
app_id: str,
status: Optional[WorkflowTriggerStatus] = None,
since: Optional[datetime] = None,
) -> int:
"""
Count trigger logs by status.
Args:
tenant_id: The tenant identifier
app_id: The application identifier
status: Optional status filter
since: Optional datetime to count from
Returns:
Count of matching trigger logs
"""
...
def delete_expired_logs(self, tenant_id: str, before_date: datetime, batch_size: int = 1000) -> int:
"""
Delete expired trigger logs in batches.
Args:
tenant_id: The tenant identifier
before_date: Delete logs created before this date
batch_size: Number of logs to delete per batch
Returns:
Total number of logs deleted
"""
...
def archive_completed_logs(
self, tenant_id: str, before_date: datetime, batch_size: int = 1000
) -> Sequence[WorkflowTriggerLog]:
"""
Get completed logs for archival before deletion.
Args:
tenant_id: The tenant identifier
before_date: Get logs completed before this date
batch_size: Number of logs to retrieve
Returns:
A sequence of WorkflowTriggerLog instances for archival
"""
...
def update_status_batch(
self, trigger_log_ids: Sequence[str], new_status: WorkflowTriggerStatus, error_message: Optional[str] = None
) -> int:
"""
Update status for multiple trigger logs at once.
Args:
trigger_log_ids: List of trigger log IDs to update
new_status: The new status to set
error_message: Optional error message to set
Returns:
Number of logs updated
"""
...