Feature add test containers mail account deletion task 1858 (#26555)

Signed-off-by: NeatGuyCoding <15627489+NeatGuyCoding@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
NeatGuyCoding 2025-10-05 12:47:17 +08:00 committed by GitHub
parent 00fb468f2e
commit b1d189324a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 242 additions and 182 deletions

View File

@ -784,133 +784,6 @@ class TestCleanDatasetTask:
print(f"Total cleanup time: {cleanup_duration:.3f} seconds")
print(f"Average time per document: {cleanup_duration / len(documents):.3f} seconds")
def test_clean_dataset_task_concurrent_cleanup_scenarios(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test dataset cleanup with concurrent cleanup scenarios and race conditions.
This test verifies that the task can properly:
1. Handle multiple cleanup operations on the same dataset
2. Prevent data corruption during concurrent access
3. Maintain data consistency across multiple cleanup attempts
4. Handle race conditions gracefully
5. Ensure idempotent cleanup operations
"""
# Create test data
account, tenant = self._create_test_account_and_tenant(db_session_with_containers)
dataset = self._create_test_dataset(db_session_with_containers, account, tenant)
document = self._create_test_document(db_session_with_containers, account, tenant, dataset)
segment = self._create_test_segment(db_session_with_containers, account, tenant, dataset, document)
upload_file = self._create_test_upload_file(db_session_with_containers, account, tenant)
# Update document with file reference
import json
document.data_source_info = json.dumps({"upload_file_id": upload_file.id})
from extensions.ext_database import db
db.session.commit()
# Save IDs for verification
dataset_id = dataset.id
tenant_id = tenant.id
upload_file_id = upload_file.id
# Mock storage to simulate slow operations
mock_storage = mock_external_service_dependencies["storage"]
original_delete = mock_storage.delete
def slow_delete(key):
import time
time.sleep(0.1) # Simulate slow storage operation
return original_delete(key)
mock_storage.delete.side_effect = slow_delete
# Execute multiple cleanup operations concurrently
import threading
cleanup_results = []
cleanup_errors = []
def run_cleanup():
try:
clean_dataset_task(
dataset_id=dataset_id,
tenant_id=tenant_id,
indexing_technique="high_quality",
index_struct='{"type": "paragraph"}',
collection_binding_id=str(uuid.uuid4()),
doc_form="paragraph_index",
)
cleanup_results.append("success")
except Exception as e:
cleanup_errors.append(str(e))
# Start multiple cleanup threads
threads = []
for i in range(3):
thread = threading.Thread(target=run_cleanup)
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
# Verify results
# Check that all documents were deleted (only once)
remaining_documents = db.session.query(Document).filter_by(dataset_id=dataset_id).all()
assert len(remaining_documents) == 0
# Check that all segments were deleted (only once)
remaining_segments = db.session.query(DocumentSegment).filter_by(dataset_id=dataset_id).all()
assert len(remaining_segments) == 0
# Check that upload file was deleted (only once)
# Note: In concurrent scenarios, the first thread deletes documents and segments,
# subsequent threads may not find the related data to clean up upload files
# This demonstrates the idempotent nature of the cleanup process
remaining_files = db.session.query(UploadFile).filter_by(id=upload_file_id).all()
# The upload file should be deleted by the first successful cleanup operation
# However, in concurrent scenarios, this may not always happen due to race conditions
# This test demonstrates the idempotent nature of the cleanup process
if len(remaining_files) > 0:
print(f"Warning: Upload file {upload_file_id} was not deleted in concurrent scenario")
print("This is expected behavior demonstrating the idempotent nature of cleanup")
# We don't assert here as the behavior depends on timing and race conditions
# Verify that storage.delete was called (may be called multiple times in concurrent scenarios)
# In concurrent scenarios, storage operations may be called multiple times due to race conditions
assert mock_storage.delete.call_count > 0
# Verify that index processor was called (may be called multiple times in concurrent scenarios)
mock_index_processor = mock_external_service_dependencies["index_processor"]
assert mock_index_processor.clean.call_count > 0
# Check cleanup results
assert len(cleanup_results) == 3, "All cleanup operations should complete"
assert len(cleanup_errors) == 0, "No cleanup errors should occur"
# Verify idempotency by running cleanup again on the same dataset
# This should not perform any additional operations since data is already cleaned
clean_dataset_task(
dataset_id=dataset_id,
tenant_id=tenant_id,
indexing_technique="high_quality",
index_struct='{"type": "paragraph"}',
collection_binding_id=str(uuid.uuid4()),
doc_form="paragraph_index",
)
# Verify that no additional storage operations were performed
# Note: In concurrent scenarios, the exact count may vary due to race conditions
print(f"Final storage delete calls: {mock_storage.delete.call_count}")
print(f"Final index processor calls: {mock_index_processor.clean.call_count}")
print("Note: Multiple calls in concurrent scenarios are expected due to race conditions")
def test_clean_dataset_task_storage_exception_handling(
self, db_session_with_containers, mock_external_service_dependencies
):

View File

@ -148,61 +148,6 @@ class TestEnableSegmentsToIndexTask:
db.session.commit()
return segments
def test_enable_segments_to_index_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful segments indexing with paragraph index type.
This test verifies:
- Proper dataset and document retrieval from database
- Correct segment processing and document creation
- Index processor integration
- Database state updates
- Redis cache key deletion
"""
# Arrange: Create test data
dataset, document = self._create_test_dataset_and_document(
db_session_with_containers, mock_external_service_dependencies
)
segments = self._create_test_segments(db_session_with_containers, document, dataset)
# Set up Redis cache keys to simulate indexing in progress
segment_ids = [segment.id for segment in segments]
for segment in segments:
indexing_cache_key = f"segment_{segment.id}_indexing"
redis_client.set(indexing_cache_key, "processing", ex=300) # 5 minutes expiry
# Verify cache keys exist
for segment in segments:
indexing_cache_key = f"segment_{segment.id}_indexing"
assert redis_client.exists(indexing_cache_key) == 1
# Act: Execute the task
enable_segments_to_index_task(segment_ids, dataset.id, document.id)
# Assert: Verify the expected outcomes
# Verify index processor was called correctly
mock_external_service_dependencies["index_processor_factory"].assert_called_once_with(IndexType.PARAGRAPH_INDEX)
mock_external_service_dependencies["index_processor"].load.assert_called_once()
# Verify the load method was called with correct parameters
call_args = mock_external_service_dependencies["index_processor"].load.call_args
assert call_args is not None
documents = call_args[0][1] # Second argument should be documents list
assert len(documents) == 3
# Verify document structure
for i, doc in enumerate(documents):
assert doc.page_content == segments[i].content
assert doc.metadata["doc_id"] == segments[i].index_node_id
assert doc.metadata["doc_hash"] == segments[i].index_node_hash
assert doc.metadata["document_id"] == document.id
assert doc.metadata["dataset_id"] == dataset.id
# Verify Redis cache keys were deleted
for segment in segments:
indexing_cache_key = f"segment_{segment.id}_indexing"
assert redis_client.exists(indexing_cache_key) == 0
def test_enable_segments_to_index_with_different_index_type(
self, db_session_with_containers, mock_external_service_dependencies
):

View File

@ -0,0 +1,242 @@
from unittest.mock import MagicMock, patch
import pytest
from faker import Faker
from extensions.ext_database import db
from libs.email_i18n import EmailType
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
from tasks.mail_account_deletion_task import send_account_deletion_verification_code, send_deletion_success_task
class TestMailAccountDeletionTask:
"""Integration tests for mail account deletion tasks using testcontainers."""
@pytest.fixture
def mock_external_service_dependencies(self):
"""Mock setup for external service dependencies."""
with (
patch("tasks.mail_account_deletion_task.mail") as mock_mail,
patch("tasks.mail_account_deletion_task.get_email_i18n_service") as mock_get_email_service,
):
# Setup mock mail service
mock_mail.is_inited.return_value = True
# Setup mock email service
mock_email_service = MagicMock()
mock_get_email_service.return_value = mock_email_service
yield {
"mail": mock_mail,
"get_email_service": mock_get_email_service,
"email_service": mock_email_service,
}
def _create_test_account(self, db_session_with_containers):
"""
Helper method to create a test account for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
Returns:
Account: Created account instance
"""
fake = Faker()
# Create account
account = Account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
status="active",
)
db.session.add(account)
db.session.commit()
# Create tenant
tenant = Tenant(
name=fake.company(),
status="normal",
)
db.session.add(tenant)
db.session.commit()
# Create tenant-account join
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER.value,
current=True,
)
db.session.add(join)
db.session.commit()
return account
def test_send_deletion_success_task_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful account deletion success email sending.
This test verifies:
- Proper email service initialization check
- Correct email service method calls
- Template context is properly formatted
- Email type is correctly specified
"""
# Arrange: Create test data
account = self._create_test_account(db_session_with_containers)
test_email = account.email
test_language = "en-US"
# Act: Execute the task
send_deletion_success_task(test_email, test_language)
# Assert: Verify the expected outcomes
# Verify mail service was checked
mock_external_service_dependencies["mail"].is_inited.assert_called_once()
# Verify email service was retrieved
mock_external_service_dependencies["get_email_service"].assert_called_once()
# Verify email was sent with correct parameters
mock_external_service_dependencies["email_service"].send_email.assert_called_once_with(
email_type=EmailType.ACCOUNT_DELETION_SUCCESS,
language_code=test_language,
to=test_email,
template_context={
"to": test_email,
"email": test_email,
},
)
def test_send_deletion_success_task_mail_not_initialized(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test account deletion success email when mail service is not initialized.
This test verifies:
- Early return when mail service is not initialized
- No email service calls are made
- No exceptions are raised
"""
# Arrange: Setup mail service to return not initialized
mock_external_service_dependencies["mail"].is_inited.return_value = False
account = self._create_test_account(db_session_with_containers)
test_email = account.email
# Act: Execute the task
send_deletion_success_task(test_email)
# Assert: Verify no email service calls were made
mock_external_service_dependencies["get_email_service"].assert_not_called()
mock_external_service_dependencies["email_service"].send_email.assert_not_called()
def test_send_deletion_success_task_email_service_exception(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test account deletion success email when email service raises exception.
This test verifies:
- Exception is properly caught and logged
- Task completes without raising exception
- Error logging is recorded
"""
# Arrange: Setup email service to raise exception
mock_external_service_dependencies["email_service"].send_email.side_effect = Exception("Email service failed")
account = self._create_test_account(db_session_with_containers)
test_email = account.email
# Act: Execute the task (should not raise exception)
send_deletion_success_task(test_email)
# Assert: Verify email service was called but exception was handled
mock_external_service_dependencies["email_service"].send_email.assert_called_once()
def test_send_account_deletion_verification_code_success(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test successful account deletion verification code email sending.
This test verifies:
- Proper email service initialization check
- Correct email service method calls
- Template context includes verification code
- Email type is correctly specified
"""
# Arrange: Create test data
account = self._create_test_account(db_session_with_containers)
test_email = account.email
test_code = "123456"
test_language = "en-US"
# Act: Execute the task
send_account_deletion_verification_code(test_email, test_code, test_language)
# Assert: Verify the expected outcomes
# Verify mail service was checked
mock_external_service_dependencies["mail"].is_inited.assert_called_once()
# Verify email service was retrieved
mock_external_service_dependencies["get_email_service"].assert_called_once()
# Verify email was sent with correct parameters
mock_external_service_dependencies["email_service"].send_email.assert_called_once_with(
email_type=EmailType.ACCOUNT_DELETION_VERIFICATION,
language_code=test_language,
to=test_email,
template_context={
"to": test_email,
"code": test_code,
},
)
def test_send_account_deletion_verification_code_mail_not_initialized(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test account deletion verification code email when mail service is not initialized.
This test verifies:
- Early return when mail service is not initialized
- No email service calls are made
- No exceptions are raised
"""
# Arrange: Setup mail service to return not initialized
mock_external_service_dependencies["mail"].is_inited.return_value = False
account = self._create_test_account(db_session_with_containers)
test_email = account.email
test_code = "123456"
# Act: Execute the task
send_account_deletion_verification_code(test_email, test_code)
# Assert: Verify no email service calls were made
mock_external_service_dependencies["get_email_service"].assert_not_called()
mock_external_service_dependencies["email_service"].send_email.assert_not_called()
def test_send_account_deletion_verification_code_email_service_exception(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test account deletion verification code email when email service raises exception.
This test verifies:
- Exception is properly caught and logged
- Task completes without raising exception
- Error logging is recorded
"""
# Arrange: Setup email service to raise exception
mock_external_service_dependencies["email_service"].send_email.side_effect = Exception("Email service failed")
account = self._create_test_account(db_session_with_containers)
test_email = account.email
test_code = "123456"
# Act: Execute the task (should not raise exception)
send_account_deletion_verification_code(test_email, test_code)
# Assert: Verify email service was called but exception was handled
mock_external_service_dependencies["email_service"].send_email.assert_called_once()