From c5365f89bfb70ab715f7f5c8f41a1dc45340e3c9 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Thu, 11 Dec 2025 17:27:23 +0800 Subject: [PATCH] add status filter --- .../sqlalchemy_api_workflow_run_repository.py | 12 +++++- ..._sqlalchemy_api_workflow_run_repository.py | 37 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index 8a0356ec60..8d5623617d 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -333,7 +333,17 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): with self._session_maker() as session: stmt = ( select(WorkflowRun) - .where(WorkflowRun.created_at < end_before) + .where( + WorkflowRun.created_at < end_before, + WorkflowRun.status.in_( + [ + WorkflowExecutionStatus.SUCCEEDED.value, + WorkflowExecutionStatus.FAILED.value, + WorkflowExecutionStatus.STOPPED.value, + WorkflowExecutionStatus.PARTIAL_SUCCEEDED.value, + ] + ), + ) .order_by(WorkflowRun.created_at.asc(), WorkflowRun.id.asc()) .limit(batch_size) ) diff --git a/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py b/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py index 0c34676252..f3df3b5483 100644 --- a/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py +++ b/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py @@ -4,6 +4,7 @@ from datetime import UTC, datetime from unittest.mock import Mock, patch import pytest +from sqlalchemy.dialects import postgresql from sqlalchemy.orm import Session, sessionmaker from core.workflow.enums import WorkflowExecutionStatus @@ -104,6 +105,42 @@ class TestDifyAPISQLAlchemyWorkflowRunRepository: return pause +class TestGetRunsBatchForCleanup(TestDifyAPISQLAlchemyWorkflowRunRepository): + def test_get_runs_batch_for_cleanup_filters_terminal_statuses( + self, repository: DifyAPISQLAlchemyWorkflowRunRepository, mock_session: Mock + ): + scalar_result = Mock() + scalar_result.all.return_value = [] + mock_session.scalars.return_value = scalar_result + + repository.get_runs_batch_for_cleanup( + start_after=None, + end_before=datetime(2024, 1, 1), + last_seen=None, + batch_size=50, + ) + + stmt = mock_session.scalars.call_args[0][0] + compiled_sql = str( + stmt.compile( + dialect=postgresql.dialect(), + compile_kwargs={"literal_binds": True}, + ) + ) + + assert "workflow_runs.status" in compiled_sql + for status in ( + WorkflowExecutionStatus.SUCCEEDED, + WorkflowExecutionStatus.FAILED, + WorkflowExecutionStatus.STOPPED, + WorkflowExecutionStatus.PARTIAL_SUCCEEDED, + ): + assert f"'{status.value}'" in compiled_sql + + assert "'running'" not in compiled_sql + assert "'paused'" not in compiled_sql + + class TestCreateWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository): """Test create_workflow_pause method."""