diff --git a/api/tests/test_containers_integration_tests/services/test_delete_archived_workflow_run.py b/api/tests/test_containers_integration_tests/services/test_delete_archived_workflow_run.py new file mode 100644 index 0000000000..546292109e --- /dev/null +++ b/api/tests/test_containers_integration_tests/services/test_delete_archived_workflow_run.py @@ -0,0 +1,143 @@ +""" +Testcontainers integration tests for archived workflow run deletion service. +""" + +from datetime import UTC, datetime, timedelta +from uuid import uuid4 + +from sqlalchemy import select + +from core.workflow.enums import WorkflowExecutionStatus +from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom +from models.workflow import WorkflowArchiveLog, WorkflowRun +from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion + + +class TestArchivedWorkflowRunDeletion: + def _create_workflow_run( + self, + db_session_with_containers, + *, + tenant_id: str, + created_at: datetime, + ) -> WorkflowRun: + run = WorkflowRun( + id=str(uuid4()), + tenant_id=tenant_id, + app_id=str(uuid4()), + workflow_id=str(uuid4()), + type="workflow", + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + version="1.0.0", + graph="{}", + inputs="{}", + status=WorkflowExecutionStatus.SUCCEEDED, + outputs="{}", + elapsed_time=0.1, + total_tokens=1, + total_steps=1, + created_by_role=CreatorUserRole.ACCOUNT, + created_by=str(uuid4()), + created_at=created_at, + finished_at=created_at, + exceptions_count=0, + ) + db_session_with_containers.add(run) + db_session_with_containers.commit() + return run + + def _create_archive_log(self, db_session_with_containers, *, run: WorkflowRun) -> None: + archive_log = WorkflowArchiveLog( + tenant_id=run.tenant_id, + app_id=run.app_id, + workflow_id=run.workflow_id, + workflow_run_id=run.id, + created_by_role=run.created_by_role, + created_by=run.created_by, + log_id=None, + log_created_at=None, + log_created_from=None, + run_version=run.version, + run_status=run.status, + run_triggered_from=run.triggered_from, + run_error=run.error, + run_elapsed_time=run.elapsed_time, + run_total_tokens=run.total_tokens, + run_total_steps=run.total_steps, + run_created_at=run.created_at, + run_finished_at=run.finished_at, + run_exceptions_count=run.exceptions_count, + trigger_metadata=None, + ) + db_session_with_containers.add(archive_log) + db_session_with_containers.commit() + + def test_delete_by_run_id_returns_error_when_run_missing(self, db_session_with_containers): + deleter = ArchivedWorkflowRunDeletion() + missing_run_id = str(uuid4()) + + result = deleter.delete_by_run_id(missing_run_id) + + assert result.success is False + assert result.error == f"Workflow run {missing_run_id} not found" + + def test_delete_by_run_id_returns_error_when_not_archived(self, db_session_with_containers): + tenant_id = str(uuid4()) + run = self._create_workflow_run( + db_session_with_containers, + tenant_id=tenant_id, + created_at=datetime.now(UTC), + ) + deleter = ArchivedWorkflowRunDeletion() + + result = deleter.delete_by_run_id(run.id) + + assert result.success is False + assert result.error == f"Workflow run {run.id} is not archived" + + def test_delete_batch_uses_repo(self, db_session_with_containers): + tenant_id = str(uuid4()) + base_time = datetime.now(UTC) + run1 = self._create_workflow_run(db_session_with_containers, tenant_id=tenant_id, created_at=base_time) + run2 = self._create_workflow_run( + db_session_with_containers, + tenant_id=tenant_id, + created_at=base_time + timedelta(seconds=1), + ) + self._create_archive_log(db_session_with_containers, run=run1) + self._create_archive_log(db_session_with_containers, run=run2) + run_ids = [run1.id, run2.id] + + deleter = ArchivedWorkflowRunDeletion() + results = deleter.delete_batch( + tenant_ids=[tenant_id], + start_date=base_time - timedelta(minutes=1), + end_date=base_time + timedelta(minutes=1), + limit=2, + ) + + assert len(results) == 2 + assert all(result.success for result in results) + + remaining_runs = db_session_with_containers.scalars( + select(WorkflowRun).where(WorkflowRun.id.in_(run_ids)) + ).all() + assert remaining_runs == [] + + def test_delete_run_calls_repo(self, db_session_with_containers): + tenant_id = str(uuid4()) + run = self._create_workflow_run( + db_session_with_containers, + tenant_id=tenant_id, + created_at=datetime.now(UTC), + ) + run_id = run.id + deleter = ArchivedWorkflowRunDeletion() + + result = deleter._delete_run(run) + + assert result.success is True + assert result.deleted_counts["runs"] == 1 + db_session_with_containers.expunge_all() + deleted_run = db_session_with_containers.get(WorkflowRun, run_id) + assert deleted_run is None diff --git a/api/tests/unit_tests/services/test_delete_archived_workflow_run.py b/api/tests/unit_tests/services/test_delete_archived_workflow_run.py index 2c9d946ea6..babd620ab7 100644 --- a/api/tests/unit_tests/services/test_delete_archived_workflow_run.py +++ b/api/tests/unit_tests/services/test_delete_archived_workflow_run.py @@ -6,66 +6,6 @@ from unittest.mock import MagicMock, patch class TestArchivedWorkflowRunDeletion: - def test_delete_by_run_id_returns_error_when_run_missing(self): - from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion - - deleter = ArchivedWorkflowRunDeletion() - repo = MagicMock() - session = MagicMock() - session.get.return_value = None - - session_maker = MagicMock() - session_maker.return_value.__enter__.return_value = session - session_maker.return_value.__exit__.return_value = None - mock_db = MagicMock() - mock_db.engine = MagicMock() - - with ( - patch("services.retention.workflow_run.delete_archived_workflow_run.db", mock_db), - patch( - "services.retention.workflow_run.delete_archived_workflow_run.sessionmaker", return_value=session_maker - ), - patch.object(deleter, "_get_workflow_run_repo", return_value=repo), - ): - result = deleter.delete_by_run_id("run-1") - - assert result.success is False - assert result.error == "Workflow run run-1 not found" - repo.get_archived_run_ids.assert_not_called() - - def test_delete_by_run_id_returns_error_when_not_archived(self): - from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion - - deleter = ArchivedWorkflowRunDeletion() - repo = MagicMock() - repo.get_archived_run_ids.return_value = set() - run = MagicMock() - run.id = "run-1" - run.tenant_id = "tenant-1" - - session = MagicMock() - session.get.return_value = run - - session_maker = MagicMock() - session_maker.return_value.__enter__.return_value = session - session_maker.return_value.__exit__.return_value = None - mock_db = MagicMock() - mock_db.engine = MagicMock() - - with ( - patch("services.retention.workflow_run.delete_archived_workflow_run.db", mock_db), - patch( - "services.retention.workflow_run.delete_archived_workflow_run.sessionmaker", return_value=session_maker - ), - patch.object(deleter, "_get_workflow_run_repo", return_value=repo), - patch.object(deleter, "_delete_run") as mock_delete_run, - ): - result = deleter.delete_by_run_id("run-1") - - assert result.success is False - assert result.error == "Workflow run run-1 is not archived" - mock_delete_run.assert_not_called() - def test_delete_by_run_id_calls_delete_run(self): from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion @@ -98,55 +38,6 @@ class TestArchivedWorkflowRunDeletion: assert result.success is True mock_delete_run.assert_called_once_with(run) - def test_delete_batch_uses_repo(self): - from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion - - deleter = ArchivedWorkflowRunDeletion() - repo = MagicMock() - run1 = MagicMock() - run1.id = "run-1" - run1.tenant_id = "tenant-1" - run2 = MagicMock() - run2.id = "run-2" - run2.tenant_id = "tenant-1" - repo.get_archived_runs_by_time_range.return_value = [run1, run2] - - session = MagicMock() - session_maker = MagicMock() - session_maker.return_value.__enter__.return_value = session - session_maker.return_value.__exit__.return_value = None - start_date = MagicMock() - end_date = MagicMock() - mock_db = MagicMock() - mock_db.engine = MagicMock() - - with ( - patch("services.retention.workflow_run.delete_archived_workflow_run.db", mock_db), - patch( - "services.retention.workflow_run.delete_archived_workflow_run.sessionmaker", return_value=session_maker - ), - patch.object(deleter, "_get_workflow_run_repo", return_value=repo), - patch.object( - deleter, "_delete_run", side_effect=[MagicMock(success=True), MagicMock(success=True)] - ) as mock_delete_run, - ): - results = deleter.delete_batch( - tenant_ids=["tenant-1"], - start_date=start_date, - end_date=end_date, - limit=2, - ) - - assert len(results) == 2 - repo.get_archived_runs_by_time_range.assert_called_once_with( - session=session, - tenant_ids=["tenant-1"], - start_date=start_date, - end_date=end_date, - limit=2, - ) - assert mock_delete_run.call_count == 2 - def test_delete_run_dry_run(self): from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion @@ -160,21 +51,3 @@ class TestArchivedWorkflowRunDeletion: assert result.success is True mock_get_repo.assert_not_called() - - def test_delete_run_calls_repo(self): - from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion - - deleter = ArchivedWorkflowRunDeletion() - run = MagicMock() - run.id = "run-1" - run.tenant_id = "tenant-1" - - repo = MagicMock() - repo.delete_runs_with_related.return_value = {"runs": 1} - - with patch.object(deleter, "_get_workflow_run_repo", return_value=repo): - result = deleter._delete_run(run) - - assert result.success is True - assert result.deleted_counts == {"runs": 1} - repo.delete_runs_with_related.assert_called_once()