add status filter

This commit is contained in:
hjlarry 2025-12-11 17:27:23 +08:00
parent 9ecc584a1c
commit c5365f89bf
2 changed files with 48 additions and 1 deletions

View File

@ -333,7 +333,17 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
with self._session_maker() as session:
stmt = (
select(WorkflowRun)
.where(WorkflowRun.created_at < end_before)
.where(
WorkflowRun.created_at < end_before,
WorkflowRun.status.in_(
[
WorkflowExecutionStatus.SUCCEEDED.value,
WorkflowExecutionStatus.FAILED.value,
WorkflowExecutionStatus.STOPPED.value,
WorkflowExecutionStatus.PARTIAL_SUCCEEDED.value,
]
),
)
.order_by(WorkflowRun.created_at.asc(), WorkflowRun.id.asc())
.limit(batch_size)
)

View File

@ -4,6 +4,7 @@ from datetime import UTC, datetime
from unittest.mock import Mock, patch
import pytest
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import Session, sessionmaker
from core.workflow.enums import WorkflowExecutionStatus
@ -104,6 +105,42 @@ class TestDifyAPISQLAlchemyWorkflowRunRepository:
return pause
class TestGetRunsBatchForCleanup(TestDifyAPISQLAlchemyWorkflowRunRepository):
def test_get_runs_batch_for_cleanup_filters_terminal_statuses(
self, repository: DifyAPISQLAlchemyWorkflowRunRepository, mock_session: Mock
):
scalar_result = Mock()
scalar_result.all.return_value = []
mock_session.scalars.return_value = scalar_result
repository.get_runs_batch_for_cleanup(
start_after=None,
end_before=datetime(2024, 1, 1),
last_seen=None,
batch_size=50,
)
stmt = mock_session.scalars.call_args[0][0]
compiled_sql = str(
stmt.compile(
dialect=postgresql.dialect(),
compile_kwargs={"literal_binds": True},
)
)
assert "workflow_runs.status" in compiled_sql
for status in (
WorkflowExecutionStatus.SUCCEEDED,
WorkflowExecutionStatus.FAILED,
WorkflowExecutionStatus.STOPPED,
WorkflowExecutionStatus.PARTIAL_SUCCEEDED,
):
assert f"'{status.value}'" in compiled_sql
assert "'running'" not in compiled_sql
assert "'paused'" not in compiled_sql
class TestCreateWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository):
"""Test create_workflow_pause method."""