use the workflow trigger log repo

This commit is contained in:
hjlarry 2025-12-12 09:22:31 +08:00
parent ccb55da5e1
commit 60288d1fd7
4 changed files with 79 additions and 6 deletions

View File

@ -40,7 +40,6 @@ from libs.infinite_scroll_pagination import InfiniteScrollPagination
from libs.time_parser import get_time_threshold
from libs.uuid_utils import uuidv7
from models.enums import WorkflowRunTriggeredFrom
from models.trigger import WorkflowTriggerLog
from models.workflow import (
WorkflowAppLog,
WorkflowNodeExecutionModel,
@ -53,6 +52,7 @@ from models.workflow import (
)
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
from repositories.entities.workflow_pause import WorkflowPauseEntity
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
from repositories.types import (
AverageInteractionStats,
DailyRunsStats,
@ -413,10 +413,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
pauses_result = session.execute(delete(WorkflowPauseModel).where(WorkflowPauseModel.id.in_(pause_ids)))
pauses_deleted = pauses_result.rowcount or 0
trigger_logs_result = session.execute(
delete(WorkflowTriggerLog).where(WorkflowTriggerLog.workflow_run_id.in_(run_ids))
)
trigger_logs_deleted = trigger_logs_result.rowcount or 0
trigger_logs_deleted = SQLAlchemyWorkflowTriggerLogRepository(session).delete_by_run_ids(run_ids)
runs_result = session.execute(delete(WorkflowRun).where(WorkflowRun.id.in_(run_ids)))
runs_deleted = runs_result.rowcount or 0

View File

@ -5,7 +5,7 @@ SQLAlchemy implementation of WorkflowTriggerLogRepository.
from collections.abc import Sequence
from datetime import UTC, datetime, timedelta
from sqlalchemy import and_, select
from sqlalchemy import and_, delete, select
from sqlalchemy.orm import Session
from models.enums import WorkflowTriggerStatus
@ -84,3 +84,21 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository):
)
return list(self.session.scalars(query).all())
def delete_by_run_ids(self, run_ids: Sequence[str]) -> int:
"""
Delete trigger logs associated with the given workflow run ids.
Args:
run_ids: Collection of workflow run identifiers.
Returns:
Number of rows deleted.
"""
if not run_ids:
return 0
result = self.session.execute(
delete(WorkflowTriggerLog).where(WorkflowTriggerLog.workflow_run_id.in_(run_ids))
)
return result.rowcount or 0

View File

@ -218,6 +218,33 @@ class TestCreateWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository):
)
class TestDeleteRunsWithRelated(TestDifyAPISQLAlchemyWorkflowRunRepository):
def test_uses_trigger_log_repository(
self, repository: DifyAPISQLAlchemyWorkflowRunRepository, mock_session: Mock
):
node_ids_result = Mock()
node_ids_result.all.return_value = []
pause_ids_result = Mock()
pause_ids_result.all.return_value = []
mock_session.scalars.side_effect = [node_ids_result, pause_ids_result]
# app_logs delete, runs delete
mock_session.execute.side_effect = [Mock(rowcount=0), Mock(rowcount=1)]
fake_trigger_repo = Mock()
fake_trigger_repo.delete_by_run_ids.return_value = 3
with patch(
"repositories.sqlalchemy_api_workflow_run_repository.SQLAlchemyWorkflowTriggerLogRepository",
return_value=fake_trigger_repo,
):
counts = repository.delete_runs_with_related(["run-1"])
fake_trigger_repo.delete_by_run_ids.assert_called_once_with(["run-1"])
assert counts["trigger_logs"] == 3
assert counts["runs"] == 1
class TestResumeWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository):
"""Test resume_workflow_pause method."""

View File

@ -0,0 +1,31 @@
from unittest.mock import Mock
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import Session
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
def test_delete_by_run_ids_executes_delete():
session = Mock(spec=Session)
session.execute.return_value = Mock(rowcount=2)
repo = SQLAlchemyWorkflowTriggerLogRepository(session)
deleted = repo.delete_by_run_ids(["run-1", "run-2"])
stmt = session.execute.call_args[0][0]
compiled_sql = str(stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))
assert "workflow_trigger_logs" in compiled_sql
assert "'run-1'" in compiled_sql
assert "'run-2'" in compiled_sql
assert deleted == 2
def test_delete_by_run_ids_empty_short_circuits():
session = Mock(spec=Session)
repo = SQLAlchemyWorkflowTriggerLogRepository(session)
deleted = repo.delete_by_run_ids([])
session.execute.assert_not_called()
assert deleted == 0