test: migrate RagPipelineService DB operation SQL tests to Testcontainer (#34959)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
wdeveloper16 2026-04-11 18:32:52 +02:00 committed by GitHub
parent 12814b55d2
commit 34ce3cac70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 255 additions and 75 deletions

View File

@ -0,0 +1,255 @@
"""
Integration tests for RagPipelineService methods that interact with the database.
Migrated from unit_tests/services/rag_pipeline/test_rag_pipeline_service.py, replacing
db.session.scalar/commit/delete mocker patches with real PostgreSQL operations.
Covers:
- get_pipeline: Dataset and Pipeline lookups
- update_customized_pipeline_template: find + unique-name check + commit
- delete_customized_pipeline_template: find + delete + commit
"""
from collections.abc import Generator
from types import SimpleNamespace
from unittest.mock import patch
from uuid import uuid4
import pytest
from sqlalchemy.orm import Session, sessionmaker
from models.dataset import Dataset, Pipeline, PipelineCustomizedTemplate
from models.enums import DataSourceType
from services.entities.knowledge_entities.rag_pipeline_entities import IconInfo, PipelineTemplateInfoEntity
from services.rag_pipeline.rag_pipeline import RagPipelineService
class TestRagPipelineServiceGetPipeline:
"""Integration tests for RagPipelineService.get_pipeline."""
@pytest.fixture(autouse=True)
def _auto_rollback(self, db_session_with_containers: Session) -> Generator[None, None, None]:
yield
db_session_with_containers.rollback()
def _make_service(self, flask_app_with_containers) -> RagPipelineService:
with (
patch(
"services.rag_pipeline.rag_pipeline.DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository",
return_value=None,
),
patch(
"services.rag_pipeline.rag_pipeline.DifyAPIRepositoryFactory.create_api_workflow_run_repository",
return_value=None,
),
):
session_factory = sessionmaker(bind=flask_app_with_containers.extensions["sqlalchemy"].engine)
return RagPipelineService(session_maker=session_factory)
def _create_pipeline(self, db_session: Session, tenant_id: str, created_by: str) -> Pipeline:
pipeline = Pipeline(
tenant_id=tenant_id,
name=f"Pipeline {uuid4()}",
description="",
created_by=created_by,
)
db_session.add(pipeline)
db_session.flush()
return pipeline
def _create_dataset(
self, db_session: Session, tenant_id: str, created_by: str, pipeline_id: str | None = None
) -> Dataset:
dataset = Dataset(
tenant_id=tenant_id,
name=f"Dataset {uuid4()}",
data_source_type=DataSourceType.UPLOAD_FILE,
created_by=created_by,
pipeline_id=pipeline_id,
)
db_session.add(dataset)
db_session.flush()
return dataset
def test_get_pipeline_raises_when_dataset_not_found(
self, db_session_with_containers: Session, flask_app_with_containers
) -> None:
"""get_pipeline raises ValueError when dataset does not exist."""
service = self._make_service(flask_app_with_containers)
with pytest.raises(ValueError, match="Dataset not found"):
service.get_pipeline(tenant_id=str(uuid4()), dataset_id=str(uuid4()))
def test_get_pipeline_raises_when_pipeline_not_found(
self, db_session_with_containers: Session, flask_app_with_containers
) -> None:
"""get_pipeline raises ValueError when dataset exists but has no linked pipeline."""
tenant_id = str(uuid4())
created_by = str(uuid4())
dataset = self._create_dataset(db_session_with_containers, tenant_id, created_by, pipeline_id=None)
db_session_with_containers.flush()
service = self._make_service(flask_app_with_containers)
with pytest.raises(ValueError, match="(Dataset not found|Pipeline not found)"):
service.get_pipeline(tenant_id=tenant_id, dataset_id=dataset.id)
def test_get_pipeline_returns_pipeline_when_found(
self, db_session_with_containers: Session, flask_app_with_containers
) -> None:
"""get_pipeline returns the Pipeline when both Dataset and Pipeline exist."""
tenant_id = str(uuid4())
created_by = str(uuid4())
pipeline = self._create_pipeline(db_session_with_containers, tenant_id, created_by)
dataset = self._create_dataset(db_session_with_containers, tenant_id, created_by, pipeline_id=pipeline.id)
db_session_with_containers.flush()
service = self._make_service(flask_app_with_containers)
result = service.get_pipeline(tenant_id=tenant_id, dataset_id=dataset.id)
assert result.id == pipeline.id
class TestUpdateCustomizedPipelineTemplate:
"""Integration tests for RagPipelineService.update_customized_pipeline_template."""
@pytest.fixture(autouse=True)
def _auto_rollback(self, db_session_with_containers: Session) -> Generator[None, None, None]:
yield
db_session_with_containers.rollback()
def _create_template(
self, db_session: Session, tenant_id: str, created_by: str, name: str = "Template"
) -> PipelineCustomizedTemplate:
template = PipelineCustomizedTemplate(
tenant_id=tenant_id,
name=name,
description="Original description",
chunk_structure="fixed_size",
icon={"type": "emoji", "value": "📄"},
position=1,
yaml_content="{}",
install_count=0,
language="en-US",
created_by=created_by,
)
db_session.add(template)
db_session.flush()
return template
def test_update_template_succeeds(self, db_session_with_containers: Session, flask_app_with_containers) -> None:
"""update_customized_pipeline_template updates name and description."""
tenant_id = str(uuid4())
created_by = str(uuid4())
template = self._create_template(db_session_with_containers, tenant_id, created_by)
db_session_with_containers.flush()
fake_user = SimpleNamespace(id=created_by, current_tenant_id=tenant_id)
with patch("services.rag_pipeline.rag_pipeline.current_user", fake_user):
info = PipelineTemplateInfoEntity(
name="Updated Name",
description="Updated description",
icon_info=IconInfo(icon="🔥"),
)
result = RagPipelineService.update_customized_pipeline_template(template.id, info)
assert result.name == "Updated Name"
assert result.description == "Updated description"
def test_update_template_raises_when_not_found(
self, db_session_with_containers: Session, flask_app_with_containers
) -> None:
"""update_customized_pipeline_template raises ValueError when template doesn't exist."""
fake_user = SimpleNamespace(id=str(uuid4()), current_tenant_id=str(uuid4()))
with patch("services.rag_pipeline.rag_pipeline.current_user", fake_user):
info = PipelineTemplateInfoEntity(
name="New Name",
description="desc",
icon_info=IconInfo(icon="📄"),
)
with pytest.raises(ValueError, match="Customized pipeline template not found"):
RagPipelineService.update_customized_pipeline_template(str(uuid4()), info)
def test_update_template_raises_on_duplicate_name(
self, db_session_with_containers: Session, flask_app_with_containers
) -> None:
"""update_customized_pipeline_template raises ValueError when new name already exists."""
tenant_id = str(uuid4())
created_by = str(uuid4())
template1 = self._create_template(db_session_with_containers, tenant_id, created_by, name="Original")
self._create_template(db_session_with_containers, tenant_id, created_by, name="Duplicate")
db_session_with_containers.flush()
fake_user = SimpleNamespace(id=created_by, current_tenant_id=tenant_id)
with patch("services.rag_pipeline.rag_pipeline.current_user", fake_user):
info = PipelineTemplateInfoEntity(
name="Duplicate",
description="desc",
icon_info=IconInfo(icon="📄"),
)
with pytest.raises(ValueError, match="Template name is already exists"):
RagPipelineService.update_customized_pipeline_template(template1.id, info)
class TestDeleteCustomizedPipelineTemplate:
"""Integration tests for RagPipelineService.delete_customized_pipeline_template."""
@pytest.fixture(autouse=True)
def _auto_rollback(self, db_session_with_containers: Session) -> Generator[None, None, None]:
yield
db_session_with_containers.rollback()
def _create_template(self, db_session: Session, tenant_id: str, created_by: str) -> PipelineCustomizedTemplate:
template = PipelineCustomizedTemplate(
tenant_id=tenant_id,
name=f"Template {uuid4()}",
description="Description",
chunk_structure="fixed_size",
icon={"type": "emoji", "value": "📄"},
position=1,
yaml_content="{}",
install_count=0,
language="en-US",
created_by=created_by,
)
db_session.add(template)
db_session.flush()
return template
def test_delete_template_succeeds(self, db_session_with_containers: Session, flask_app_with_containers) -> None:
"""delete_customized_pipeline_template removes the template from the DB."""
tenant_id = str(uuid4())
created_by = str(uuid4())
template = self._create_template(db_session_with_containers, tenant_id, created_by)
template_id = template.id
db_session_with_containers.flush()
fake_user = SimpleNamespace(id=created_by, current_tenant_id=tenant_id)
with patch("services.rag_pipeline.rag_pipeline.current_user", fake_user):
RagPipelineService.delete_customized_pipeline_template(template_id)
# Verify the record is deleted within the same context
from sqlalchemy import select
from extensions.ext_database import db as ext_db
remaining = ext_db.session.scalar(
select(PipelineCustomizedTemplate).where(PipelineCustomizedTemplate.id == template_id)
)
assert remaining is None
def test_delete_template_raises_when_not_found(
self, db_session_with_containers: Session, flask_app_with_containers
) -> None:
"""delete_customized_pipeline_template raises ValueError when template doesn't exist."""
fake_user = SimpleNamespace(id=str(uuid4()), current_tenant_id=str(uuid4()))
with patch("services.rag_pipeline.rag_pipeline.current_user", fake_user):
with pytest.raises(ValueError, match="Customized pipeline template not found"):
RagPipelineService.delete_customized_pipeline_template(str(uuid4()))

View File

@ -116,81 +116,6 @@ def test_get_all_published_workflow_applies_limit_and_has_more(rag_pipeline_serv
assert has_more is True
def test_get_pipeline_raises_when_dataset_not_found(mocker, rag_pipeline_service) -> None:
mocker.patch("services.rag_pipeline.rag_pipeline.db.session.scalar", return_value=None)
with pytest.raises(ValueError, match="Dataset not found"):
rag_pipeline_service.get_pipeline("tenant-1", "dataset-1")
# --- update_customized_pipeline_template ---
def test_update_customized_pipeline_template_success(mocker) -> None:
template = SimpleNamespace(name="old", description="old", icon={}, updated_by=None)
# First scalar finds the template, second scalar (duplicate check) returns None
mocker.patch("services.rag_pipeline.rag_pipeline.db.session.scalar", side_effect=[template, None])
mocker.patch("services.rag_pipeline.rag_pipeline.db.session.commit")
mocker.patch("services.rag_pipeline.rag_pipeline.current_user", SimpleNamespace(id="u1", current_tenant_id="t1"))
info = PipelineTemplateInfoEntity(
name="new",
description="new desc",
icon_info=IconInfo(icon="🔥"),
)
result = RagPipelineService.update_customized_pipeline_template("tpl-1", info)
assert result.name == "new"
assert result.description == "new desc"
def test_update_customized_pipeline_template_not_found(mocker) -> None:
mocker.patch("services.rag_pipeline.rag_pipeline.db.session.scalar", return_value=None)
mocker.patch("services.rag_pipeline.rag_pipeline.current_user", SimpleNamespace(id="u1", current_tenant_id="t1"))
info = PipelineTemplateInfoEntity(name="x", description="d", icon_info=IconInfo(icon="i"))
with pytest.raises(ValueError, match="Customized pipeline template not found"):
RagPipelineService.update_customized_pipeline_template("tpl-missing", info)
def test_update_customized_pipeline_template_duplicate_name(mocker) -> None:
template = SimpleNamespace(name="old", description="old", icon={}, updated_by=None)
duplicate = SimpleNamespace(name="dup")
mocker.patch("services.rag_pipeline.rag_pipeline.db.session.scalar", side_effect=[template, duplicate])
mocker.patch("services.rag_pipeline.rag_pipeline.current_user", SimpleNamespace(id="u1", current_tenant_id="t1"))
info = PipelineTemplateInfoEntity(name="dup", description="d", icon_info=IconInfo(icon="i"))
with pytest.raises(ValueError, match="Template name is already exists"):
RagPipelineService.update_customized_pipeline_template("tpl-1", info)
# --- delete_customized_pipeline_template ---
def test_delete_customized_pipeline_template_success(mocker) -> None:
template = SimpleNamespace(id="tpl-1")
mocker.patch("services.rag_pipeline.rag_pipeline.db.session.scalar", return_value=template)
delete_mock = mocker.patch("services.rag_pipeline.rag_pipeline.db.session.delete")
commit_mock = mocker.patch("services.rag_pipeline.rag_pipeline.db.session.commit")
mocker.patch("services.rag_pipeline.rag_pipeline.current_user", SimpleNamespace(id="u1", current_tenant_id="t1"))
RagPipelineService.delete_customized_pipeline_template("tpl-1")
delete_mock.assert_called_once_with(template)
commit_mock.assert_called_once()
def test_delete_customized_pipeline_template_not_found(mocker) -> None:
mocker.patch("services.rag_pipeline.rag_pipeline.db.session.scalar", return_value=None)
mocker.patch("services.rag_pipeline.rag_pipeline.current_user", SimpleNamespace(id="u1", current_tenant_id="t1"))
with pytest.raises(ValueError, match="Customized pipeline template not found"):
RagPipelineService.delete_customized_pipeline_template("tpl-missing")
# --- sync_draft_workflow ---