From 452067db19d9eedc161c8cea8d6a8f316eba234c Mon Sep 17 00:00:00 2001 From: wdeveloper16 Date: Sat, 11 Apr 2026 19:56:44 +0200 Subject: [PATCH] test: migrate Conversation.status_count and Site.generate_code SQL tests to Testcontainers (#34955) --- .../models/test_conversation_status_count.py | 314 ++++++++++++++++ .../unit_tests/models/test_app_models.py | 340 ------------------ 2 files changed, 314 insertions(+), 340 deletions(-) create mode 100644 api/tests/test_containers_integration_tests/models/test_conversation_status_count.py diff --git a/api/tests/test_containers_integration_tests/models/test_conversation_status_count.py b/api/tests/test_containers_integration_tests/models/test_conversation_status_count.py new file mode 100644 index 0000000000..4ca87de52d --- /dev/null +++ b/api/tests/test_containers_integration_tests/models/test_conversation_status_count.py @@ -0,0 +1,314 @@ +""" +Integration tests for Conversation.status_count and Site.generate_code model properties. + +Migrated from unit_tests/models/test_app_models.py TestConversationStatusCount and +test_site_generate_code, replacing db.session.scalars mocks with real PostgreSQL queries. +""" + +from collections.abc import Generator +from uuid import uuid4 + +import pytest +from graphon.enums import WorkflowExecutionStatus +from sqlalchemy.orm import Session + +from models.enums import ConversationFromSource, InvokeFrom +from models.model import App, AppMode, Conversation, Message, Site +from models.workflow import Workflow, WorkflowRun, WorkflowRunTriggeredFrom, WorkflowType + + +class TestConversationStatusCount: + """Integration tests for Conversation.status_count property.""" + + @pytest.fixture(autouse=True) + def _auto_rollback(self, db_session_with_containers: Session) -> Generator[None, None, None]: + """Automatically rollback session changes after each test.""" + yield + db_session_with_containers.rollback() + + def _create_app(self, db_session: Session, tenant_id: str, created_by: str) -> App: + app = App( + tenant_id=tenant_id, + name=f"App {uuid4()}", + mode=AppMode.ADVANCED_CHAT, + enable_site=False, + enable_api=True, + is_demo=False, + is_public=False, + is_universal=False, + created_by=created_by, + updated_by=created_by, + ) + db_session.add(app) + db_session.flush() + return app + + def _create_conversation(self, db_session: Session, app: App) -> Conversation: + conversation = Conversation( + app_id=app.id, + mode=app.mode, + name=f"Conversation {uuid4()}", + summary="", + inputs={}, + introduction="", + system_instruction="", + system_instruction_tokens=0, + status="normal", + invoke_from=InvokeFrom.WEB_APP, + from_source=ConversationFromSource.API, + dialogue_count=0, + is_deleted=False, + ) + conversation.inputs = {} + db_session.add(conversation) + db_session.flush() + return conversation + + def _create_workflow(self, db_session: Session, app: App, created_by: str) -> Workflow: + workflow = Workflow( + tenant_id=app.tenant_id, + app_id=app.id, + type=WorkflowType.CHAT, + version="draft", + graph="{}", + created_by=created_by, + ) + workflow._features = "{}" + db_session.add(workflow) + db_session.flush() + return workflow + + def _create_workflow_run( + self, db_session: Session, app: App, workflow: Workflow, status: WorkflowExecutionStatus, created_by: str + ) -> WorkflowRun: + run = WorkflowRun( + tenant_id=app.tenant_id, + app_id=app.id, + workflow_id=workflow.id, + type=WorkflowType.CHAT, + triggered_from=WorkflowRunTriggeredFrom.APP_RUN, + version="draft", + status=status, + created_by_role="account", + created_by=created_by, + ) + db_session.add(run) + db_session.flush() + return run + + def _create_message( + self, db_session: Session, app: App, conversation: Conversation, workflow_run_id: str | None = None + ) -> Message: + message = Message( + app_id=app.id, + conversation_id=conversation.id, + _inputs={}, + query="Test query", + message={"role": "user", "content": "Test query"}, + answer="Test answer", + model_provider="openai", + model_id="gpt-4", + message_tokens=10, + message_unit_price=0, + answer_tokens=10, + answer_unit_price=0, + total_price=0, + currency="USD", + from_source=ConversationFromSource.API, + invoke_from=InvokeFrom.WEB_APP, + workflow_run_id=workflow_run_id, + ) + db_session.add(message) + db_session.flush() + return message + + def test_status_count_returns_none_when_no_messages(self, db_session_with_containers: Session) -> None: + """status_count returns None when conversation has no messages with workflow_run_id.""" + tenant_id = str(uuid4()) + created_by = str(uuid4()) + + app = self._create_app(db_session_with_containers, tenant_id, created_by) + conversation = self._create_conversation(db_session_with_containers, app) + + result = conversation.status_count + + assert result is None + + def test_status_count_returns_none_when_messages_have_no_workflow_run_id( + self, db_session_with_containers: Session + ) -> None: + """status_count returns None when messages exist but none have workflow_run_id.""" + tenant_id = str(uuid4()) + created_by = str(uuid4()) + + app = self._create_app(db_session_with_containers, tenant_id, created_by) + conversation = self._create_conversation(db_session_with_containers, app) + self._create_message(db_session_with_containers, app, conversation, workflow_run_id=None) + + result = conversation.status_count + + assert result is None + + def test_status_count_counts_succeeded_workflow_run(self, db_session_with_containers: Session) -> None: + """status_count correctly counts succeeded workflow runs.""" + tenant_id = str(uuid4()) + created_by = str(uuid4()) + + app = self._create_app(db_session_with_containers, tenant_id, created_by) + conversation = self._create_conversation(db_session_with_containers, app) + workflow = self._create_workflow(db_session_with_containers, app, created_by) + run = self._create_workflow_run( + db_session_with_containers, app, workflow, WorkflowExecutionStatus.SUCCEEDED, created_by + ) + self._create_message(db_session_with_containers, app, conversation, workflow_run_id=run.id) + + result = conversation.status_count + + assert result is not None + assert result["success"] == 1 + assert result["failed"] == 0 + assert result["partial_success"] == 0 + assert result["paused"] == 0 + + def test_status_count_counts_failed_workflow_run(self, db_session_with_containers: Session) -> None: + """status_count correctly counts failed workflow runs.""" + tenant_id = str(uuid4()) + created_by = str(uuid4()) + + app = self._create_app(db_session_with_containers, tenant_id, created_by) + conversation = self._create_conversation(db_session_with_containers, app) + workflow = self._create_workflow(db_session_with_containers, app, created_by) + run = self._create_workflow_run( + db_session_with_containers, app, workflow, WorkflowExecutionStatus.FAILED, created_by + ) + self._create_message(db_session_with_containers, app, conversation, workflow_run_id=run.id) + + result = conversation.status_count + + assert result is not None + assert result["success"] == 0 + assert result["failed"] == 1 + assert result["partial_success"] == 0 + assert result["paused"] == 0 + + def test_status_count_counts_paused_workflow_run(self, db_session_with_containers: Session) -> None: + """status_count correctly counts paused workflow runs.""" + tenant_id = str(uuid4()) + created_by = str(uuid4()) + + app = self._create_app(db_session_with_containers, tenant_id, created_by) + conversation = self._create_conversation(db_session_with_containers, app) + workflow = self._create_workflow(db_session_with_containers, app, created_by) + run = self._create_workflow_run( + db_session_with_containers, app, workflow, WorkflowExecutionStatus.PAUSED, created_by + ) + self._create_message(db_session_with_containers, app, conversation, workflow_run_id=run.id) + + result = conversation.status_count + + assert result is not None + assert result["success"] == 0 + assert result["failed"] == 0 + assert result["partial_success"] == 0 + assert result["paused"] == 1 + + def test_status_count_multiple_statuses(self, db_session_with_containers: Session) -> None: + """status_count counts multiple workflow runs with different statuses.""" + tenant_id = str(uuid4()) + created_by = str(uuid4()) + + app = self._create_app(db_session_with_containers, tenant_id, created_by) + conversation = self._create_conversation(db_session_with_containers, app) + workflow = self._create_workflow(db_session_with_containers, app, created_by) + + for status in [ + WorkflowExecutionStatus.SUCCEEDED, + WorkflowExecutionStatus.FAILED, + WorkflowExecutionStatus.PARTIAL_SUCCEEDED, + WorkflowExecutionStatus.PAUSED, + ]: + run = self._create_workflow_run(db_session_with_containers, app, workflow, status, created_by) + self._create_message(db_session_with_containers, app, conversation, workflow_run_id=run.id) + + result = conversation.status_count + + assert result is not None + assert result["success"] == 1 + assert result["failed"] == 1 + assert result["partial_success"] == 1 + assert result["paused"] == 1 + + def test_status_count_filters_workflow_runs_by_app_id(self, db_session_with_containers: Session) -> None: + """status_count excludes workflow runs belonging to a different app.""" + tenant_id = str(uuid4()) + created_by = str(uuid4()) + + app = self._create_app(db_session_with_containers, tenant_id, created_by) + other_app = self._create_app(db_session_with_containers, tenant_id, created_by) + conversation = self._create_conversation(db_session_with_containers, app) + workflow = self._create_workflow(db_session_with_containers, other_app, created_by) + + # Workflow run belongs to other_app, not app + other_run = self._create_workflow_run( + db_session_with_containers, other_app, workflow, WorkflowExecutionStatus.SUCCEEDED, created_by + ) + # Message references that run but is in a conversation under app + self._create_message(db_session_with_containers, app, conversation, workflow_run_id=other_run.id) + + result = conversation.status_count + + # The run should be excluded because app_id filter doesn't match + assert result is not None + assert result["success"] == 0 + + +class TestSiteGenerateCode: + """Integration tests for Site.generate_code static method.""" + + @pytest.fixture(autouse=True) + def _auto_rollback(self, db_session_with_containers: Session) -> Generator[None, None, None]: + """Automatically rollback session changes after each test.""" + yield + db_session_with_containers.rollback() + + def test_generate_code_returns_string_of_correct_length(self, db_session_with_containers: Session) -> None: + """Site.generate_code returns a code string of the requested length.""" + code = Site.generate_code(8) + + assert isinstance(code, str) + assert len(code) == 8 + + def test_generate_code_avoids_duplicates(self, db_session_with_containers: Session) -> None: + """Site.generate_code returns a code not already in use.""" + tenant_id = str(uuid4()) + app = App( + tenant_id=tenant_id, + name="Test App", + mode=AppMode.CHAT, + enable_site=True, + enable_api=False, + is_demo=False, + is_public=False, + is_universal=False, + created_by=str(uuid4()), + updated_by=str(uuid4()), + ) + db_session_with_containers.add(app) + db_session_with_containers.flush() + + site = Site( + app_id=app.id, + title="Test Site", + default_language="en-US", + customize_token_strategy="not_allow", + ) + # Set an explicit code so generate_code must avoid it + site.code = "AAAAAAAA" + db_session_with_containers.add(site) + db_session_with_containers.flush() + + code = Site.generate_code(8) + + assert isinstance(code, str) + assert len(code) == 8 + assert code != site.code diff --git a/api/tests/unit_tests/models/test_app_models.py b/api/tests/unit_tests/models/test_app_models.py index 59597fb8cd..4e46cf9654 100644 --- a/api/tests/unit_tests/models/test_app_models.py +++ b/api/tests/unit_tests/models/test_app_models.py @@ -291,24 +291,6 @@ class TestAppModelConfig: # Assert assert result == questions - def test_app_model_config_annotation_reply_dict_disabled(self): - """Test annotation_reply_dict when annotation is disabled.""" - # Arrange - config = AppModelConfig( - app_id=str(uuid4()), - provider="openai", - model_id="gpt-4", - created_by=str(uuid4()), - ) - - # Mock database scalar to return None (no annotation setting found) - with patch("models.model.db.session.scalar", return_value=None): - # Act - result = config.annotation_reply_dict - - # Assert - assert result == {"enabled": False} - class TestConversationModel: """Test suite for Conversation model integrity.""" @@ -948,17 +930,6 @@ class TestSiteModel: with pytest.raises(ValueError, match="Custom disclaimer cannot exceed 512 characters"): site.custom_disclaimer = long_disclaimer - def test_site_generate_code(self): - """Test Site.generate_code static method.""" - # Mock database scalar to return 0 (no existing codes) - with patch("models.model.db.session.scalar", return_value=0): - # Act - code = Site.generate_code(8) - - # Assert - assert isinstance(code, str) - assert len(code) == 8 - class TestModelIntegration: """Test suite for model integration scenarios.""" @@ -1146,314 +1117,3 @@ class TestModelIntegration: # Assert assert site.app_id == app.id assert app.enable_site is True - - -class TestConversationStatusCount: - """Test suite for Conversation.status_count property N+1 query fix.""" - - def test_status_count_no_messages(self): - """Test status_count returns None when conversation has no messages.""" - # Arrange - conversation = Conversation( - app_id=str(uuid4()), - mode=AppMode.CHAT, - name="Test Conversation", - status="normal", - from_source=ConversationFromSource.API, - ) - conversation.id = str(uuid4()) - - # Mock the database query to return no messages - with patch("models.model.db.session.scalars") as mock_scalars: - mock_scalars.return_value.all.return_value = [] - - # Act - result = conversation.status_count - - # Assert - assert result is None - - def test_status_count_messages_without_workflow_runs(self): - """Test status_count when messages have no workflow_run_id.""" - # Arrange - app_id = str(uuid4()) - conversation_id = str(uuid4()) - - conversation = Conversation( - app_id=app_id, - mode=AppMode.CHAT, - name="Test Conversation", - status="normal", - from_source=ConversationFromSource.API, - ) - conversation.id = conversation_id - - # Mock the database query to return no messages with workflow_run_id - with patch("models.model.db.session.scalars") as mock_scalars: - mock_scalars.return_value.all.return_value = [] - - # Act - result = conversation.status_count - - # Assert - assert result is None - - def test_status_count_batch_loading_implementation(self): - """Test that status_count uses batch loading instead of N+1 queries.""" - # Arrange - from graphon.enums import WorkflowExecutionStatus - - app_id = str(uuid4()) - conversation_id = str(uuid4()) - - # Create workflow run IDs - workflow_run_id_1 = str(uuid4()) - workflow_run_id_2 = str(uuid4()) - workflow_run_id_3 = str(uuid4()) - - conversation = Conversation( - app_id=app_id, - mode=AppMode.CHAT, - name="Test Conversation", - status="normal", - from_source=ConversationFromSource.API, - ) - conversation.id = conversation_id - - # Mock messages with workflow_run_id - mock_messages = [ - MagicMock( - conversation_id=conversation_id, - workflow_run_id=workflow_run_id_1, - ), - MagicMock( - conversation_id=conversation_id, - workflow_run_id=workflow_run_id_2, - ), - MagicMock( - conversation_id=conversation_id, - workflow_run_id=workflow_run_id_3, - ), - ] - - # Mock workflow runs with different statuses - mock_workflow_runs = [ - MagicMock( - id=workflow_run_id_1, - status=WorkflowExecutionStatus.SUCCEEDED.value, - app_id=app_id, - ), - MagicMock( - id=workflow_run_id_2, - status=WorkflowExecutionStatus.FAILED.value, - app_id=app_id, - ), - MagicMock( - id=workflow_run_id_3, - status=WorkflowExecutionStatus.PARTIAL_SUCCEEDED.value, - app_id=app_id, - ), - ] - - # Track database calls - calls_made = [] - - def mock_scalars(query): - calls_made.append(str(query)) - mock_result = MagicMock() - - # Return messages for the first query (messages with workflow_run_id) - if "messages" in str(query) and "conversation_id" in str(query): - mock_result.all.return_value = mock_messages - # Return workflow runs for the batch query - elif "workflow_runs" in str(query): - mock_result.all.return_value = mock_workflow_runs - else: - mock_result.all.return_value = [] - - return mock_result - - # Act & Assert - with patch("models.model.db.session.scalars", side_effect=mock_scalars): - result = conversation.status_count - - # Verify only 2 database queries were made (not N+1) - assert len(calls_made) == 2, f"Expected 2 queries, got {len(calls_made)}: {calls_made}" - - # Verify the first query gets messages - assert "messages" in calls_made[0] - assert "conversation_id" in calls_made[0] - - # Verify the second query batch loads workflow runs with proper filtering - assert "workflow_runs" in calls_made[1] - assert "app_id" in calls_made[1] # Security filter applied - assert "IN" in calls_made[1] # Batch loading with IN clause - - # Verify correct status counts - assert result["success"] == 1 # One SUCCEEDED - assert result["failed"] == 1 # One FAILED - assert result["partial_success"] == 1 # One PARTIAL_SUCCEEDED - assert result["paused"] == 0 - - def test_status_count_app_id_filtering(self): - """Test that status_count filters workflow runs by app_id for security.""" - # Arrange - app_id = str(uuid4()) - other_app_id = str(uuid4()) - conversation_id = str(uuid4()) - workflow_run_id = str(uuid4()) - - conversation = Conversation( - app_id=app_id, - mode=AppMode.CHAT, - name="Test Conversation", - status="normal", - from_source=ConversationFromSource.API, - ) - conversation.id = conversation_id - - # Mock message with workflow_run_id - mock_messages = [ - MagicMock( - conversation_id=conversation_id, - workflow_run_id=workflow_run_id, - ), - ] - - calls_made = [] - - def mock_scalars(query): - calls_made.append(str(query)) - mock_result = MagicMock() - - if "messages" in str(query): - mock_result.all.return_value = mock_messages - elif "workflow_runs" in str(query): - # Return empty list because no workflow run matches the correct app_id - mock_result.all.return_value = [] # Workflow run filtered out by app_id - else: - mock_result.all.return_value = [] - - return mock_result - - # Act - with patch("models.model.db.session.scalars", side_effect=mock_scalars): - result = conversation.status_count - - # Assert - query should include app_id filter - workflow_query = calls_made[1] - assert "app_id" in workflow_query - - # Since workflow run has wrong app_id, it shouldn't be included in counts - assert result["success"] == 0 - assert result["failed"] == 0 - assert result["partial_success"] == 0 - assert result["paused"] == 0 - - def test_status_count_handles_invalid_workflow_status(self): - """Test that status_count gracefully handles invalid workflow status values.""" - # Arrange - app_id = str(uuid4()) - conversation_id = str(uuid4()) - workflow_run_id = str(uuid4()) - - conversation = Conversation( - app_id=app_id, - mode=AppMode.CHAT, - name="Test Conversation", - status="normal", - from_source=ConversationFromSource.API, - ) - conversation.id = conversation_id - - mock_messages = [ - MagicMock( - conversation_id=conversation_id, - workflow_run_id=workflow_run_id, - ), - ] - - # Mock workflow run with invalid status - mock_workflow_runs = [ - MagicMock( - id=workflow_run_id, - status="invalid_status", # Invalid status that should raise ValueError - app_id=app_id, - ), - ] - - with patch("models.model.db.session.scalars") as mock_scalars: - # Mock the messages query - def mock_scalars_side_effect(query): - mock_result = MagicMock() - if "messages" in str(query): - mock_result.all.return_value = mock_messages - elif "workflow_runs" in str(query): - mock_result.all.return_value = mock_workflow_runs - else: - mock_result.all.return_value = [] - return mock_result - - mock_scalars.side_effect = mock_scalars_side_effect - - # Act - should not raise exception - result = conversation.status_count - - # Assert - should handle invalid status gracefully - assert result["success"] == 0 - assert result["failed"] == 0 - assert result["partial_success"] == 0 - assert result["paused"] == 0 - - def test_status_count_paused(self): - """Test status_count includes paused workflow runs.""" - # Arrange - from graphon.enums import WorkflowExecutionStatus - - app_id = str(uuid4()) - conversation_id = str(uuid4()) - workflow_run_id = str(uuid4()) - - conversation = Conversation( - app_id=app_id, - mode=AppMode.CHAT, - name="Test Conversation", - status="normal", - from_source=ConversationFromSource.API, - ) - conversation.id = conversation_id - - mock_messages = [ - MagicMock( - conversation_id=conversation_id, - workflow_run_id=workflow_run_id, - ), - ] - - mock_workflow_runs = [ - MagicMock( - id=workflow_run_id, - status=WorkflowExecutionStatus.PAUSED.value, - app_id=app_id, - ), - ] - - with patch("models.model.db.session.scalars") as mock_scalars: - - def mock_scalars_side_effect(query): - mock_result = MagicMock() - if "messages" in str(query): - mock_result.all.return_value = mock_messages - elif "workflow_runs" in str(query): - mock_result.all.return_value = mock_workflow_runs - else: - mock_result.all.return_value = [] - return mock_result - - mock_scalars.side_effect = mock_scalars_side_effect - - # Act - result = conversation.status_count - - # Assert - assert result["paused"] == 1