test: migrate Conversation.status_count and Site.generate_code SQL tests to Testcontainers (#34955)

This commit is contained in:
wdeveloper16 2026-04-11 19:56:44 +02:00 committed by GitHub
parent 859920a81f
commit 452067db19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 314 additions and 340 deletions

View File

@ -0,0 +1,314 @@
"""
Integration tests for Conversation.status_count and Site.generate_code model properties.
Migrated from unit_tests/models/test_app_models.py TestConversationStatusCount and
test_site_generate_code, replacing db.session.scalars mocks with real PostgreSQL queries.
"""
from collections.abc import Generator
from uuid import uuid4
import pytest
from graphon.enums import WorkflowExecutionStatus
from sqlalchemy.orm import Session
from models.enums import ConversationFromSource, InvokeFrom
from models.model import App, AppMode, Conversation, Message, Site
from models.workflow import Workflow, WorkflowRun, WorkflowRunTriggeredFrom, WorkflowType
class TestConversationStatusCount:
"""Integration tests for Conversation.status_count property."""
@pytest.fixture(autouse=True)
def _auto_rollback(self, db_session_with_containers: Session) -> Generator[None, None, None]:
"""Automatically rollback session changes after each test."""
yield
db_session_with_containers.rollback()
def _create_app(self, db_session: Session, tenant_id: str, created_by: str) -> App:
app = App(
tenant_id=tenant_id,
name=f"App {uuid4()}",
mode=AppMode.ADVANCED_CHAT,
enable_site=False,
enable_api=True,
is_demo=False,
is_public=False,
is_universal=False,
created_by=created_by,
updated_by=created_by,
)
db_session.add(app)
db_session.flush()
return app
def _create_conversation(self, db_session: Session, app: App) -> Conversation:
conversation = Conversation(
app_id=app.id,
mode=app.mode,
name=f"Conversation {uuid4()}",
summary="",
inputs={},
introduction="",
system_instruction="",
system_instruction_tokens=0,
status="normal",
invoke_from=InvokeFrom.WEB_APP,
from_source=ConversationFromSource.API,
dialogue_count=0,
is_deleted=False,
)
conversation.inputs = {}
db_session.add(conversation)
db_session.flush()
return conversation
def _create_workflow(self, db_session: Session, app: App, created_by: str) -> Workflow:
workflow = Workflow(
tenant_id=app.tenant_id,
app_id=app.id,
type=WorkflowType.CHAT,
version="draft",
graph="{}",
created_by=created_by,
)
workflow._features = "{}"
db_session.add(workflow)
db_session.flush()
return workflow
def _create_workflow_run(
self, db_session: Session, app: App, workflow: Workflow, status: WorkflowExecutionStatus, created_by: str
) -> WorkflowRun:
run = WorkflowRun(
tenant_id=app.tenant_id,
app_id=app.id,
workflow_id=workflow.id,
type=WorkflowType.CHAT,
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
version="draft",
status=status,
created_by_role="account",
created_by=created_by,
)
db_session.add(run)
db_session.flush()
return run
def _create_message(
self, db_session: Session, app: App, conversation: Conversation, workflow_run_id: str | None = None
) -> Message:
message = Message(
app_id=app.id,
conversation_id=conversation.id,
_inputs={},
query="Test query",
message={"role": "user", "content": "Test query"},
answer="Test answer",
model_provider="openai",
model_id="gpt-4",
message_tokens=10,
message_unit_price=0,
answer_tokens=10,
answer_unit_price=0,
total_price=0,
currency="USD",
from_source=ConversationFromSource.API,
invoke_from=InvokeFrom.WEB_APP,
workflow_run_id=workflow_run_id,
)
db_session.add(message)
db_session.flush()
return message
def test_status_count_returns_none_when_no_messages(self, db_session_with_containers: Session) -> None:
"""status_count returns None when conversation has no messages with workflow_run_id."""
tenant_id = str(uuid4())
created_by = str(uuid4())
app = self._create_app(db_session_with_containers, tenant_id, created_by)
conversation = self._create_conversation(db_session_with_containers, app)
result = conversation.status_count
assert result is None
def test_status_count_returns_none_when_messages_have_no_workflow_run_id(
self, db_session_with_containers: Session
) -> None:
"""status_count returns None when messages exist but none have workflow_run_id."""
tenant_id = str(uuid4())
created_by = str(uuid4())
app = self._create_app(db_session_with_containers, tenant_id, created_by)
conversation = self._create_conversation(db_session_with_containers, app)
self._create_message(db_session_with_containers, app, conversation, workflow_run_id=None)
result = conversation.status_count
assert result is None
def test_status_count_counts_succeeded_workflow_run(self, db_session_with_containers: Session) -> None:
"""status_count correctly counts succeeded workflow runs."""
tenant_id = str(uuid4())
created_by = str(uuid4())
app = self._create_app(db_session_with_containers, tenant_id, created_by)
conversation = self._create_conversation(db_session_with_containers, app)
workflow = self._create_workflow(db_session_with_containers, app, created_by)
run = self._create_workflow_run(
db_session_with_containers, app, workflow, WorkflowExecutionStatus.SUCCEEDED, created_by
)
self._create_message(db_session_with_containers, app, conversation, workflow_run_id=run.id)
result = conversation.status_count
assert result is not None
assert result["success"] == 1
assert result["failed"] == 0
assert result["partial_success"] == 0
assert result["paused"] == 0
def test_status_count_counts_failed_workflow_run(self, db_session_with_containers: Session) -> None:
"""status_count correctly counts failed workflow runs."""
tenant_id = str(uuid4())
created_by = str(uuid4())
app = self._create_app(db_session_with_containers, tenant_id, created_by)
conversation = self._create_conversation(db_session_with_containers, app)
workflow = self._create_workflow(db_session_with_containers, app, created_by)
run = self._create_workflow_run(
db_session_with_containers, app, workflow, WorkflowExecutionStatus.FAILED, created_by
)
self._create_message(db_session_with_containers, app, conversation, workflow_run_id=run.id)
result = conversation.status_count
assert result is not None
assert result["success"] == 0
assert result["failed"] == 1
assert result["partial_success"] == 0
assert result["paused"] == 0
def test_status_count_counts_paused_workflow_run(self, db_session_with_containers: Session) -> None:
"""status_count correctly counts paused workflow runs."""
tenant_id = str(uuid4())
created_by = str(uuid4())
app = self._create_app(db_session_with_containers, tenant_id, created_by)
conversation = self._create_conversation(db_session_with_containers, app)
workflow = self._create_workflow(db_session_with_containers, app, created_by)
run = self._create_workflow_run(
db_session_with_containers, app, workflow, WorkflowExecutionStatus.PAUSED, created_by
)
self._create_message(db_session_with_containers, app, conversation, workflow_run_id=run.id)
result = conversation.status_count
assert result is not None
assert result["success"] == 0
assert result["failed"] == 0
assert result["partial_success"] == 0
assert result["paused"] == 1
def test_status_count_multiple_statuses(self, db_session_with_containers: Session) -> None:
"""status_count counts multiple workflow runs with different statuses."""
tenant_id = str(uuid4())
created_by = str(uuid4())
app = self._create_app(db_session_with_containers, tenant_id, created_by)
conversation = self._create_conversation(db_session_with_containers, app)
workflow = self._create_workflow(db_session_with_containers, app, created_by)
for status in [
WorkflowExecutionStatus.SUCCEEDED,
WorkflowExecutionStatus.FAILED,
WorkflowExecutionStatus.PARTIAL_SUCCEEDED,
WorkflowExecutionStatus.PAUSED,
]:
run = self._create_workflow_run(db_session_with_containers, app, workflow, status, created_by)
self._create_message(db_session_with_containers, app, conversation, workflow_run_id=run.id)
result = conversation.status_count
assert result is not None
assert result["success"] == 1
assert result["failed"] == 1
assert result["partial_success"] == 1
assert result["paused"] == 1
def test_status_count_filters_workflow_runs_by_app_id(self, db_session_with_containers: Session) -> None:
"""status_count excludes workflow runs belonging to a different app."""
tenant_id = str(uuid4())
created_by = str(uuid4())
app = self._create_app(db_session_with_containers, tenant_id, created_by)
other_app = self._create_app(db_session_with_containers, tenant_id, created_by)
conversation = self._create_conversation(db_session_with_containers, app)
workflow = self._create_workflow(db_session_with_containers, other_app, created_by)
# Workflow run belongs to other_app, not app
other_run = self._create_workflow_run(
db_session_with_containers, other_app, workflow, WorkflowExecutionStatus.SUCCEEDED, created_by
)
# Message references that run but is in a conversation under app
self._create_message(db_session_with_containers, app, conversation, workflow_run_id=other_run.id)
result = conversation.status_count
# The run should be excluded because app_id filter doesn't match
assert result is not None
assert result["success"] == 0
class TestSiteGenerateCode:
"""Integration tests for Site.generate_code static method."""
@pytest.fixture(autouse=True)
def _auto_rollback(self, db_session_with_containers: Session) -> Generator[None, None, None]:
"""Automatically rollback session changes after each test."""
yield
db_session_with_containers.rollback()
def test_generate_code_returns_string_of_correct_length(self, db_session_with_containers: Session) -> None:
"""Site.generate_code returns a code string of the requested length."""
code = Site.generate_code(8)
assert isinstance(code, str)
assert len(code) == 8
def test_generate_code_avoids_duplicates(self, db_session_with_containers: Session) -> None:
"""Site.generate_code returns a code not already in use."""
tenant_id = str(uuid4())
app = App(
tenant_id=tenant_id,
name="Test App",
mode=AppMode.CHAT,
enable_site=True,
enable_api=False,
is_demo=False,
is_public=False,
is_universal=False,
created_by=str(uuid4()),
updated_by=str(uuid4()),
)
db_session_with_containers.add(app)
db_session_with_containers.flush()
site = Site(
app_id=app.id,
title="Test Site",
default_language="en-US",
customize_token_strategy="not_allow",
)
# Set an explicit code so generate_code must avoid it
site.code = "AAAAAAAA"
db_session_with_containers.add(site)
db_session_with_containers.flush()
code = Site.generate_code(8)
assert isinstance(code, str)
assert len(code) == 8
assert code != site.code

View File

@ -291,24 +291,6 @@ class TestAppModelConfig:
# Assert
assert result == questions
def test_app_model_config_annotation_reply_dict_disabled(self):
"""Test annotation_reply_dict when annotation is disabled."""
# Arrange
config = AppModelConfig(
app_id=str(uuid4()),
provider="openai",
model_id="gpt-4",
created_by=str(uuid4()),
)
# Mock database scalar to return None (no annotation setting found)
with patch("models.model.db.session.scalar", return_value=None):
# Act
result = config.annotation_reply_dict
# Assert
assert result == {"enabled": False}
class TestConversationModel:
"""Test suite for Conversation model integrity."""
@ -948,17 +930,6 @@ class TestSiteModel:
with pytest.raises(ValueError, match="Custom disclaimer cannot exceed 512 characters"):
site.custom_disclaimer = long_disclaimer
def test_site_generate_code(self):
"""Test Site.generate_code static method."""
# Mock database scalar to return 0 (no existing codes)
with patch("models.model.db.session.scalar", return_value=0):
# Act
code = Site.generate_code(8)
# Assert
assert isinstance(code, str)
assert len(code) == 8
class TestModelIntegration:
"""Test suite for model integration scenarios."""
@ -1146,314 +1117,3 @@ class TestModelIntegration:
# Assert
assert site.app_id == app.id
assert app.enable_site is True
class TestConversationStatusCount:
"""Test suite for Conversation.status_count property N+1 query fix."""
def test_status_count_no_messages(self):
"""Test status_count returns None when conversation has no messages."""
# Arrange
conversation = Conversation(
app_id=str(uuid4()),
mode=AppMode.CHAT,
name="Test Conversation",
status="normal",
from_source=ConversationFromSource.API,
)
conversation.id = str(uuid4())
# Mock the database query to return no messages
with patch("models.model.db.session.scalars") as mock_scalars:
mock_scalars.return_value.all.return_value = []
# Act
result = conversation.status_count
# Assert
assert result is None
def test_status_count_messages_without_workflow_runs(self):
"""Test status_count when messages have no workflow_run_id."""
# Arrange
app_id = str(uuid4())
conversation_id = str(uuid4())
conversation = Conversation(
app_id=app_id,
mode=AppMode.CHAT,
name="Test Conversation",
status="normal",
from_source=ConversationFromSource.API,
)
conversation.id = conversation_id
# Mock the database query to return no messages with workflow_run_id
with patch("models.model.db.session.scalars") as mock_scalars:
mock_scalars.return_value.all.return_value = []
# Act
result = conversation.status_count
# Assert
assert result is None
def test_status_count_batch_loading_implementation(self):
"""Test that status_count uses batch loading instead of N+1 queries."""
# Arrange
from graphon.enums import WorkflowExecutionStatus
app_id = str(uuid4())
conversation_id = str(uuid4())
# Create workflow run IDs
workflow_run_id_1 = str(uuid4())
workflow_run_id_2 = str(uuid4())
workflow_run_id_3 = str(uuid4())
conversation = Conversation(
app_id=app_id,
mode=AppMode.CHAT,
name="Test Conversation",
status="normal",
from_source=ConversationFromSource.API,
)
conversation.id = conversation_id
# Mock messages with workflow_run_id
mock_messages = [
MagicMock(
conversation_id=conversation_id,
workflow_run_id=workflow_run_id_1,
),
MagicMock(
conversation_id=conversation_id,
workflow_run_id=workflow_run_id_2,
),
MagicMock(
conversation_id=conversation_id,
workflow_run_id=workflow_run_id_3,
),
]
# Mock workflow runs with different statuses
mock_workflow_runs = [
MagicMock(
id=workflow_run_id_1,
status=WorkflowExecutionStatus.SUCCEEDED.value,
app_id=app_id,
),
MagicMock(
id=workflow_run_id_2,
status=WorkflowExecutionStatus.FAILED.value,
app_id=app_id,
),
MagicMock(
id=workflow_run_id_3,
status=WorkflowExecutionStatus.PARTIAL_SUCCEEDED.value,
app_id=app_id,
),
]
# Track database calls
calls_made = []
def mock_scalars(query):
calls_made.append(str(query))
mock_result = MagicMock()
# Return messages for the first query (messages with workflow_run_id)
if "messages" in str(query) and "conversation_id" in str(query):
mock_result.all.return_value = mock_messages
# Return workflow runs for the batch query
elif "workflow_runs" in str(query):
mock_result.all.return_value = mock_workflow_runs
else:
mock_result.all.return_value = []
return mock_result
# Act & Assert
with patch("models.model.db.session.scalars", side_effect=mock_scalars):
result = conversation.status_count
# Verify only 2 database queries were made (not N+1)
assert len(calls_made) == 2, f"Expected 2 queries, got {len(calls_made)}: {calls_made}"
# Verify the first query gets messages
assert "messages" in calls_made[0]
assert "conversation_id" in calls_made[0]
# Verify the second query batch loads workflow runs with proper filtering
assert "workflow_runs" in calls_made[1]
assert "app_id" in calls_made[1] # Security filter applied
assert "IN" in calls_made[1] # Batch loading with IN clause
# Verify correct status counts
assert result["success"] == 1 # One SUCCEEDED
assert result["failed"] == 1 # One FAILED
assert result["partial_success"] == 1 # One PARTIAL_SUCCEEDED
assert result["paused"] == 0
def test_status_count_app_id_filtering(self):
"""Test that status_count filters workflow runs by app_id for security."""
# Arrange
app_id = str(uuid4())
other_app_id = str(uuid4())
conversation_id = str(uuid4())
workflow_run_id = str(uuid4())
conversation = Conversation(
app_id=app_id,
mode=AppMode.CHAT,
name="Test Conversation",
status="normal",
from_source=ConversationFromSource.API,
)
conversation.id = conversation_id
# Mock message with workflow_run_id
mock_messages = [
MagicMock(
conversation_id=conversation_id,
workflow_run_id=workflow_run_id,
),
]
calls_made = []
def mock_scalars(query):
calls_made.append(str(query))
mock_result = MagicMock()
if "messages" in str(query):
mock_result.all.return_value = mock_messages
elif "workflow_runs" in str(query):
# Return empty list because no workflow run matches the correct app_id
mock_result.all.return_value = [] # Workflow run filtered out by app_id
else:
mock_result.all.return_value = []
return mock_result
# Act
with patch("models.model.db.session.scalars", side_effect=mock_scalars):
result = conversation.status_count
# Assert - query should include app_id filter
workflow_query = calls_made[1]
assert "app_id" in workflow_query
# Since workflow run has wrong app_id, it shouldn't be included in counts
assert result["success"] == 0
assert result["failed"] == 0
assert result["partial_success"] == 0
assert result["paused"] == 0
def test_status_count_handles_invalid_workflow_status(self):
"""Test that status_count gracefully handles invalid workflow status values."""
# Arrange
app_id = str(uuid4())
conversation_id = str(uuid4())
workflow_run_id = str(uuid4())
conversation = Conversation(
app_id=app_id,
mode=AppMode.CHAT,
name="Test Conversation",
status="normal",
from_source=ConversationFromSource.API,
)
conversation.id = conversation_id
mock_messages = [
MagicMock(
conversation_id=conversation_id,
workflow_run_id=workflow_run_id,
),
]
# Mock workflow run with invalid status
mock_workflow_runs = [
MagicMock(
id=workflow_run_id,
status="invalid_status", # Invalid status that should raise ValueError
app_id=app_id,
),
]
with patch("models.model.db.session.scalars") as mock_scalars:
# Mock the messages query
def mock_scalars_side_effect(query):
mock_result = MagicMock()
if "messages" in str(query):
mock_result.all.return_value = mock_messages
elif "workflow_runs" in str(query):
mock_result.all.return_value = mock_workflow_runs
else:
mock_result.all.return_value = []
return mock_result
mock_scalars.side_effect = mock_scalars_side_effect
# Act - should not raise exception
result = conversation.status_count
# Assert - should handle invalid status gracefully
assert result["success"] == 0
assert result["failed"] == 0
assert result["partial_success"] == 0
assert result["paused"] == 0
def test_status_count_paused(self):
"""Test status_count includes paused workflow runs."""
# Arrange
from graphon.enums import WorkflowExecutionStatus
app_id = str(uuid4())
conversation_id = str(uuid4())
workflow_run_id = str(uuid4())
conversation = Conversation(
app_id=app_id,
mode=AppMode.CHAT,
name="Test Conversation",
status="normal",
from_source=ConversationFromSource.API,
)
conversation.id = conversation_id
mock_messages = [
MagicMock(
conversation_id=conversation_id,
workflow_run_id=workflow_run_id,
),
]
mock_workflow_runs = [
MagicMock(
id=workflow_run_id,
status=WorkflowExecutionStatus.PAUSED.value,
app_id=app_id,
),
]
with patch("models.model.db.session.scalars") as mock_scalars:
def mock_scalars_side_effect(query):
mock_result = MagicMock()
if "messages" in str(query):
mock_result.all.return_value = mock_messages
elif "workflow_runs" in str(query):
mock_result.all.return_value = mock_workflow_runs
else:
mock_result.all.return_value = []
return mock_result
mock_scalars.side_effect = mock_scalars_side_effect
# Act
result = conversation.status_count
# Assert
assert result["paused"] == 1