chore: Caplog type (#37603)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Asuka Minato 2026-06-19 03:53:13 +09:00 committed by GitHub
parent 33edf97f81
commit e4500d2b9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 121 additions and 52 deletions

View File

@ -709,7 +709,9 @@ def test_langfuse_trace_entity_with_list_dict_input():
assert data.input[0]["content"] == "hello"
def test_workflow_trace_handles_usage_extraction_error(trace_instance, monkeypatch: pytest.MonkeyPatch, caplog):
def test_workflow_trace_handles_usage_extraction_error(
trace_instance, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
):
# Setup trace info to trigger LLM node usage extraction
trace_info = WorkflowTraceInfo(
workflow_id="wf-1",

View File

@ -521,7 +521,9 @@ def test_update_run_error(trace_instance):
trace_instance.update_run(update_data)
def test_workflow_trace_usage_extraction_error(trace_instance, monkeypatch: pytest.MonkeyPatch, caplog):
def test_workflow_trace_usage_extraction_error(
trace_instance, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
):
workflow_data = MagicMock()
workflow_data.created_at = _dt()
workflow_data.finished_at = _dt() + timedelta(seconds=1)

View File

@ -615,7 +615,9 @@ def test_get_project_url_error(trace_instance):
trace_instance.get_project_url()
def test_workflow_trace_usage_extraction_error_fixed(trace_instance, monkeypatch: pytest.MonkeyPatch, caplog):
def test_workflow_trace_usage_extraction_error_fixed(
trace_instance, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
):
trace_info = WorkflowTraceInfo(
workflow_id="86a52565-4a6b-4a1b-9bfd-98e4595e70de",
tenant_id="66e8e918-472e-4b69-8051-12502c34fc07",

View File

@ -45,7 +45,7 @@ class TestQueueTask:
assert "task_id" in task_data
assert "created_at" in task_data
def test_queue_task_redis_error(self, caplog):
def test_queue_task_redis_error(self, caplog: pytest.LogCaptureFixture):
with patch("services.enterprise.account_deletion_sync.redis_client") as mock_redis:
mock_redis.lpush.side_effect = RedisError("Connection failed")
@ -54,7 +54,7 @@ class TestQueueTask:
assert result is False
assert "Failed to queue account deletion sync" in caplog.text
def test_queue_task_type_error(self, caplog):
def test_queue_task_type_error(self, caplog: pytest.LogCaptureFixture):
with patch("services.enterprise.account_deletion_sync.redis_client") as mock_redis:
mock_redis.lpush.side_effect = TypeError("Cannot serialize")

View File

@ -147,7 +147,9 @@ class TestDeleteDraftVariablesBatch:
assert db_session_with_containers.scalar(select(func.count()).select_from(WorkflowDraftVariable)) == 0
@patch("tasks.remove_app_and_related_data_task._delete_draft_variable_offload_data")
def test_delete_draft_variables_batch_logs_progress(self, mock_offload_cleanup, db_session_with_containers, caplog):
def test_delete_draft_variables_batch_logs_progress(
self, mock_offload_cleanup, db_session_with_containers, caplog: pytest.LogCaptureFixture
):
"""Test that batch deletion logs progress correctly."""
tenant, app = _create_tenant_and_app(db_session_with_containers)
offload_data = _create_offload_data(db_session_with_containers, tenant_id=tenant.id, app_id=app.id, count=10)
@ -203,7 +205,9 @@ class TestDeleteDraftVariableOffloadData:
assert remaining_upload_files_count == 0
@patch("extensions.ext_storage.storage")
def test_delete_draft_variable_offload_data_storage_failure(self, mock_storage, db_session_with_containers, caplog):
def test_delete_draft_variable_offload_data_storage_failure(
self, mock_storage, db_session_with_containers, caplog: pytest.LogCaptureFixture
):
"""Test handling of storage deletion failures."""
tenant, app = _create_tenant_and_app(db_session_with_containers)
offload_data = _create_offload_data(db_session_with_containers, tenant_id=tenant.id, app_id=app.id, count=2)

View File

@ -1,6 +1,8 @@
import logging
from unittest.mock import MagicMock, patch
import pytest
import controllers.console.version as version_module
@ -19,7 +21,7 @@ class TestHasNewVersion:
)
assert result is False
def test_has_new_version_invalid_version(self, caplog):
def test_has_new_version_invalid_version(self, caplog: pytest.LogCaptureFixture):
with caplog.at_level(logging.WARNING, logger="controllers.console.version"):
result = version_module._has_new_version(
latest_version="invalid",

View File

@ -1,13 +1,26 @@
import logging
from typing import Protocol, cast
import pytest
from controllers.openapi._audit import EVENT_APP_RUN_OPENAPI, emit_app_run
class _AuditLogRecord(Protocol):
audit: bool
event: str
app_id: str
tenant_id: str
caller_kind: str
mode: str
surface: str
def test_event_constant():
assert EVENT_APP_RUN_OPENAPI == "app.run.openapi"
def test_emit_app_run_logs_with_audit_extra(caplog):
def test_emit_app_run_logs_with_audit_extra(caplog: pytest.LogCaptureFixture):
with caplog.at_level(logging.INFO, logger="controllers.openapi._audit"):
emit_app_run(
app_id="app1",
@ -16,7 +29,7 @@ def test_emit_app_run_logs_with_audit_extra(caplog):
mode="chat",
surface="apps",
)
record = next(r for r in caplog.records if r.message and "app.run.openapi" in r.message)
record = cast(_AuditLogRecord, next(r for r in caplog.records if r.message and "app.run.openapi" in r.message))
assert record.audit is True
assert record.event == EVENT_APP_RUN_OPENAPI
assert record.app_id == "app1"

View File

@ -4,6 +4,7 @@ Unit tests for Service API File Preview endpoint
import logging
import uuid
from typing import Protocol, cast
from unittest.mock import Mock, patch
import pytest
@ -13,6 +14,12 @@ from controllers.service_api.app.file_preview import FilePreviewApi
from models.model import App, EndUser, Message, MessageFile, UploadFile
class _FilePreviewLogRecord(Protocol):
file_id: str
app_id: str
error: str
class TestFilePreviewApi:
"""Test suite for FilePreviewApi"""
@ -349,7 +356,9 @@ class TestFilePreviewApi:
assert "Storage error" in str(exc_info.value)
def test_validate_file_ownership_unexpected_error_logging(self, file_preview_api: FilePreviewApi, caplog):
def test_validate_file_ownership_unexpected_error_logging(
self, file_preview_api: FilePreviewApi, caplog: pytest.LogCaptureFixture
):
"""Test that unexpected errors are logged properly"""
file_id = str(uuid.uuid4())
app_id = str(uuid.uuid4())
@ -369,8 +378,9 @@ class TestFilePreviewApi:
# Verify logging was called with the structured context fields. The ``extra`` keys
# are attached to the LogRecord as attributes, so they are not in ``caplog.text``.
assert len(caplog.records) == 1
record = caplog.records[0]
assert record.getMessage() == "Unexpected error during file ownership validation"
log_record = caplog.records[0]
assert log_record.getMessage() == "Unexpected error during file ownership validation"
record = cast(_FilePreviewLogRecord, log_record)
assert record.file_id == file_id
assert record.app_id == app_id
assert record.error == "Unexpected database error"

View File

@ -2,6 +2,8 @@ import logging
from types import SimpleNamespace
from unittest.mock import Mock, patch
import pytest
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature
@ -132,7 +134,7 @@ class TestAnnotationReplyFeature:
_, _, _, _, _, _, _, from_source, _ = mock_annotation_service.add_annotation_history.call_args[0]
assert from_source == "console"
def test_query_logs_and_returns_none_on_exception(self, caplog):
def test_query_logs_and_returns_none_on_exception(self, caplog: pytest.LogCaptureFixture):
feature = AnnotationReplyFeature()
annotation_setting = SimpleNamespace(
score_threshold=None,

View File

@ -1,6 +1,8 @@
import logging
from unittest.mock import Mock, patch
import pytest
from core.app.layers.timeslice_layer import TimeSliceLayer
from graphon.graph_engine.entities.commands import CommandType, GraphEngineCommand
from services.workflow.entities import WorkflowScheduleCFSPlanEntity
@ -65,7 +67,7 @@ class TestTimeSliceLayer:
scheduler.remove_job.assert_called_once_with("job-1")
def test_checker_job_handles_resource_limit_without_command_channel(self, caplog):
def test_checker_job_handles_resource_limit_without_command_channel(self, caplog: pytest.LogCaptureFixture):
scheduler = Mock()
scheduler.running = True
cfs_plan_scheduler = Mock(plan=Mock())

View File

@ -3,6 +3,8 @@ from datetime import UTC, datetime, timedelta
from types import SimpleNamespace
from unittest.mock import Mock, patch
import pytest
from core.app.layers.trigger_post_layer import TriggerPostLayer
from core.workflow.system_variables import build_system_variables
from graphon.graph_events import (
@ -115,7 +117,7 @@ class TestTriggerPostLayer:
repo.update.assert_called_once_with(trigger_log)
session.commit.assert_called_once()
def test_on_event_handles_missing_trigger_log(self, caplog):
def test_on_event_handles_missing_trigger_log(self, caplog: pytest.LogCaptureFixture):
runtime_state = SimpleNamespace(
outputs={},
variable_pool=VariablePool.from_bootstrap(

View File

@ -345,7 +345,9 @@ class TestMessageCycleManagerOptimization:
db_session.close.assert_called_once()
mock_redis.setex.assert_called_once()
def test_generate_conversation_name_worker_falls_back_when_generation_fails(self, message_cycle_manager, caplog):
def test_generate_conversation_name_worker_falls_back_when_generation_fails(
self, message_cycle_manager, caplog: pytest.LogCaptureFixture
):
"""Fallback to truncated query when LLM generation fails."""
flask_app = Flask(__name__)
conversation = SimpleNamespace(

View File

@ -20,7 +20,9 @@ class TestObservabilityLayerExtras:
assert layer._is_disabled is False
assert layer._tracer is tracer
def test_init_tracer_disables_when_get_tracer_fails(self, monkeypatch: pytest.MonkeyPatch, caplog):
def test_init_tracer_disables_when_get_tracer_fails(
self, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
):
monkeypatch.setattr("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True)
monkeypatch.setattr("core.app.workflow.layers.observability.is_instrument_flag_enabled", lambda: False)
@ -70,7 +72,7 @@ class TestObservabilityLayerExtras:
layer.on_event(object())
def test_on_graph_end_clears_unfinished_contexts(self, caplog):
def test_on_graph_end_clears_unfinished_contexts(self, caplog: pytest.LogCaptureFixture):
layer = ObservabilityLayer()
layer._node_contexts["exec"] = SimpleNamespace(span=object(), token="token")
@ -107,7 +109,7 @@ class TestObservabilityLayerExtras:
assert calls == []
def test_on_node_run_start_logs_warning_when_span_creation_fails(self, caplog):
def test_on_node_run_start_logs_warning_when_span_creation_fails(self, caplog: pytest.LogCaptureFixture):
layer = ObservabilityLayer()
layer._is_disabled = False
@ -166,7 +168,9 @@ class TestObservabilityLayerExtras:
assert ended == ["ended"]
assert "exec" not in layer._node_contexts
def test_on_node_run_end_logs_detach_failure(self, monkeypatch: pytest.MonkeyPatch, caplog):
def test_on_node_run_end_logs_detach_failure(
self, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
):
layer = ObservabilityLayer()
layer._is_disabled = False

View File

@ -512,7 +512,7 @@ def test_receive_loop_http_error_unknown_id(streams):
@pytest.mark.timeout(10)
def test_receive_loop_validation_error_notification(streams, caplog):
def test_receive_loop_validation_error_notification(streams, caplog: pytest.LogCaptureFixture):
with caplog.at_level(logging.WARNING, logger="core.mcp.session.base_session"):
read_stream, write_stream = streams
session = MockSession(read_stream, write_stream, ReceiveRequest, RootModel[MockNotification])
@ -570,7 +570,7 @@ def test_session_exit_timeout(streams):
@pytest.mark.timeout(10)
def test_receive_loop_fatal_exception(streams, caplog):
def test_receive_loop_fatal_exception(streams, caplog: pytest.LogCaptureFixture):
read_stream, write_stream = streams
session = MockSession(read_stream, write_stream, ReceiveRequest, ReceiveNotification)

View File

@ -189,7 +189,7 @@ class TestCacheEmbeddingMultimodalDocuments:
assert len(result) == 3
assert result[0] == normalized_cached
def test_embed_multimodal_documents_nan_handling(self, mock_model_instance, caplog):
def test_embed_multimodal_documents_nan_handling(self, mock_model_instance, caplog: pytest.LogCaptureFixture):
"""Test handling of NaN values in multimodal embeddings."""
cache_embedding = CacheEmbedding(mock_model_instance)
documents = [{"file_id": "valid"}, {"file_id": "nan"}]
@ -464,7 +464,7 @@ class TestCacheEmbeddingQueryErrors:
model_instance.credentials = {"api_key": "test-key"}
return model_instance
def test_embed_query_api_error_debug_mode(self, mock_model_instance, caplog):
def test_embed_query_api_error_debug_mode(self, mock_model_instance, caplog: pytest.LogCaptureFixture):
"""Test handling of API errors in debug mode."""
cache_embedding = CacheEmbedding(mock_model_instance)
query = "test query"
@ -483,7 +483,7 @@ class TestCacheEmbeddingQueryErrors:
assert "API Error" in str(exc_info.value)
assert any(record.levelno == logging.ERROR for record in caplog.records)
def test_embed_query_redis_set_error_debug_mode(self, mock_model_instance, caplog):
def test_embed_query_redis_set_error_debug_mode(self, mock_model_instance, caplog: pytest.LogCaptureFixture):
"""Test handling of Redis set errors in debug mode."""
cache_embedding = CacheEmbedding(mock_model_instance)
query = "test query"

View File

@ -549,7 +549,9 @@ def test_parse_docx_reads_real_paragraph_table_order(monkeypatch: pytest.MonkeyP
os.remove(tmp_path)
def test_parse_docx_covers_drawing_shapes_hyperlink_error_and_table_branch(monkeypatch: pytest.MonkeyPatch, caplog):
def test_parse_docx_covers_drawing_shapes_hyperlink_error_and_table_branch(
monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
):
extractor = object.__new__(WordExtractor)
ext_image_id = "ext-image"

View File

@ -645,7 +645,7 @@ class TestTextSplitterBasePaths:
with pytest.raises(NotImplementedError):
asyncio.run(splitter.atransform_documents([Document(page_content="x", metadata={})]))
def test_merge_splits_logs_warning_for_oversized_total(self, caplog):
def test_merge_splits_logs_warning_for_oversized_total(self, caplog: pytest.LogCaptureFixture):
"""Cover logger.warning path in _merge_splits."""
splitter = RecursiveCharacterTextSplitter(chunk_size=5, chunk_overlap=1)
with caplog.at_level(logging.WARNING, logger="core.rag.splitter.text_splitter"):

View File

@ -2,6 +2,8 @@ import json
from pathlib import Path
from unittest.mock import patch
import pytest
from core.schemas.registry import SchemaRegistry
@ -71,7 +73,7 @@ class TestSchemaRegistry:
assert registry.metadata[uri]["title"] == "Test Schema"
assert registry.metadata[uri]["version"] == "v1"
def test_load_schema_invalid_json(self, tmp_path, caplog):
def test_load_schema_invalid_json(self, tmp_path: Path, caplog: pytest.LogCaptureFixture):
schema_path = tmp_path / "invalid.json"
schema_path.write_text("invalid json")
@ -81,7 +83,7 @@ class TestSchemaRegistry:
assert "Failed to load schema v1/invalid" in caplog.text
def test_load_schema_os_error(self, tmp_path, caplog):
def test_load_schema_os_error(self, tmp_path: Path, caplog: pytest.LogCaptureFixture):
schema_path = tmp_path / "error.json"
schema_path.write_text("{}")

View File

@ -315,7 +315,7 @@ def test_cleanup_layer_fans_out_to_every_active_session():
assert {entry[1] for entry in session_store.cleaned} == {"cleanup-run-many"}
def test_cleanup_layer_warns_when_http_enabled_but_client_missing(caplog):
def test_cleanup_layer_warns_when_http_enabled_but_client_missing(caplog: pytest.LogCaptureFixture):
"""The HTTP cleanup branch must defensively skip when no client was wired.
This is the deployment-misconfig path: ``_HTTP_CLEANUP_SUPPORTED`` was
@ -347,7 +347,7 @@ def test_cleanup_layer_warns_when_http_enabled_but_client_missing(caplog):
assert any("no agent backend client is wired in" in record.message for record in caplog.records)
def test_cleanup_layer_skips_workflow_terminal_when_workflow_run_id_missing(caplog):
def test_cleanup_layer_skips_workflow_terminal_when_workflow_run_id_missing(caplog: pytest.LogCaptureFixture):
"""``workflow_run_id`` is the keying field; without it the fanout cannot
target a row, so the layer logs a warning and bails."""
import logging

View File

@ -1396,7 +1396,7 @@ class TestSaveMultimodalOutputAndConvertResultToMarkdown:
mock_file_saver.save_binary_string.assert_not_called()
mock_file_saver.save_remote_url.assert_not_called()
def test_unknown_item_type(self, llm_node_for_multimodal, caplog):
def test_unknown_item_type(self, llm_node_for_multimodal, caplog: pytest.LogCaptureFixture):
llm_node, mock_file_saver = llm_node_for_multimodal
unknown_item = self._UnknownItem()

View File

@ -222,7 +222,7 @@ def test_idempotency_first_seen(sample_envelope, mock_redis):
)
def test_idempotency_redis_failure_fails_open(sample_envelope, mock_redis, caplog):
def test_idempotency_redis_failure_fails_open(sample_envelope, mock_redis, caplog: pytest.LogCaptureFixture):
mock_redis.set.side_effect = Exception("Redis unavailable")
handler = EnterpriseMetricHandler()

View File

@ -1,11 +1,13 @@
import logging
from types import SimpleNamespace
import pytest
from core.tools.errors import ToolProviderNotFoundError
from events.event_handlers import delete_tool_parameters_cache_when_sync_draft_workflow as handler_module
def test_missing_tool_provider_does_not_log_error_traceback(monkeypatch, caplog):
def test_missing_tool_provider_does_not_log_error_traceback(monkeypatch, caplog: pytest.LogCaptureFixture):
app = SimpleNamespace(id="workflow-id", tenant_id="tenant-id")
workflow = SimpleNamespace(
graph_dict={

View File

@ -301,7 +301,9 @@ class TestRequestFinishedInfoAccessLine:
assert "123.456" in msg # rounded to 3 decimals
assert "trace-xyz" in msg
def test_info_access_log_uses_dash_without_start_timestamp(self, monkeypatch: pytest.MonkeyPatch, caplog):
def test_info_access_log_uses_dash_without_start_timestamp(
self, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
):
app = _get_test_app()
with app.test_request_context("/bar", method="POST"):
# No g.__request_started_ts set -> duration should be '-'

View File

@ -127,7 +127,9 @@ class TestWorkspacePermissionHelper:
@patch("libs.workspace_permission.EnterpriseService")
@patch("libs.workspace_permission.dify_config")
def test_enterprise_service_error_fails_open(self, mock_config, mock_enterprise_service, caplog):
def test_enterprise_service_error_fails_open(
self, mock_config, mock_enterprise_service, caplog: pytest.LogCaptureFixture
):
"""On enterprise service error, should fail-open (allow) and log error."""
mock_config.ENTERPRISE_ENABLED = True

View File

@ -1,6 +1,8 @@
from datetime import datetime
from unittest.mock import Mock, patch
import pytest
from services.hit_testing_service import HitTestingService
@ -80,7 +82,7 @@ class TestHitTestingServiceDumpRecords:
assert result[0]["segment"]["created_at"] == datetime(2024, 1, 1, 0, 0, 0)
def test_dump_retrieval_records_skips_records_with_missing_documents(self, caplog):
def test_dump_retrieval_records_skips_records_with_missing_documents(self, caplog: pytest.LogCaptureFixture):
record = _retrieval_record(
{
"segment": {

View File

@ -469,7 +469,7 @@ class TestRagPipelineTaskProxy:
# Assert
proxy._dispatch.assert_called_once()
def test_delay_method_with_empty_entities(self, caplog):
def test_delay_method_with_empty_entities(self, caplog: pytest.LogCaptureFixture):
"""Test delay method with empty rag_pipeline_invoke_entities."""
# Arrange
proxy = RagPipelineTaskProxy("tenant-123", "user-456", [])

View File

@ -35,7 +35,7 @@ def test_process_enterprise_telemetry_success(sample_envelope_json):
assert call_args.event_id == "test-event-123"
def test_process_enterprise_telemetry_invalid_json(caplog):
def test_process_enterprise_telemetry_invalid_json(caplog: pytest.LogCaptureFixture):
invalid_json = "not valid json"
process_enterprise_telemetry(invalid_json)
@ -43,7 +43,7 @@ def test_process_enterprise_telemetry_invalid_json(caplog):
assert "Failed to process enterprise telemetry envelope" in caplog.text
def test_process_enterprise_telemetry_handler_exception(sample_envelope_json, caplog):
def test_process_enterprise_telemetry_handler_exception(sample_envelope_json, caplog: pytest.LogCaptureFixture):
with patch("tasks.enterprise_telemetry_task.EnterpriseMetricHandler") as mock_handler_class:
mock_handler = MagicMock()
mock_handler.handle.side_effect = Exception("Handler error")
@ -54,7 +54,7 @@ def test_process_enterprise_telemetry_handler_exception(sample_envelope_json, ca
assert "Failed to process enterprise telemetry envelope" in caplog.text
def test_process_enterprise_telemetry_validation_error(caplog):
def test_process_enterprise_telemetry_validation_error(caplog: pytest.LogCaptureFixture):
invalid_envelope = json.dumps(
{
"case": "INVALID_CASE",

View File

@ -372,7 +372,7 @@ class TestMailTaskRetryLogic:
@patch("tasks.mail_register_task.get_email_i18n_service")
@patch("tasks.mail_register_task.mail")
def test_mail_task_logs_success(self, mock_mail, mock_email_service, caplog):
def test_mail_task_logs_success(self, mock_mail, mock_email_service, caplog: pytest.LogCaptureFixture):
"""Test that successful mail sends are logged properly."""
# Arrange
mock_mail.is_inited.return_value = True
@ -398,7 +398,7 @@ class TestMailTaskRetryLogic:
@patch("tasks.mail_register_task.get_email_i18n_service")
@patch("tasks.mail_register_task.mail")
def test_mail_task_logs_failure(self, mock_mail, mock_email_service, caplog):
def test_mail_task_logs_failure(self, mock_mail, mock_email_service, caplog: pytest.LogCaptureFixture):
"""Test that failed mail sends are logged with exception details."""
# Arrange
mock_mail.is_inited.return_value = True
@ -580,7 +580,9 @@ class TestInnerEmailTask:
@patch("tasks.mail_inner_task.get_email_i18n_service")
@patch("tasks.mail_inner_task.mail")
@patch("tasks.mail_inner_task._render_template_with_strategy")
def test_inner_email_task_logs_failure(self, mock_render, mock_mail, mock_email_service, caplog):
def test_inner_email_task_logs_failure(
self, mock_render, mock_mail, mock_email_service, caplog: pytest.LogCaptureFixture
):
"""Test inner email task logs failures properly."""
# Arrange
mock_mail.is_inited.return_value = True
@ -899,7 +901,9 @@ class TestPerformanceAndTiming:
@patch("tasks.mail_register_task.get_email_i18n_service")
@patch("tasks.mail_register_task.mail")
@patch("tasks.mail_register_task.time")
def test_mail_task_tracks_execution_time(self, mock_time, mock_mail, mock_email_service, caplog):
def test_mail_task_tracks_execution_time(
self, mock_time, mock_mail, mock_email_service, caplog: pytest.LogCaptureFixture
):
"""Test that mail tasks track and log execution time."""
# Arrange
mock_mail.is_inited.return_value = True
@ -1465,7 +1469,9 @@ class TestLoggingAndMonitoring:
@patch("tasks.mail_register_task.get_email_i18n_service")
@patch("tasks.mail_register_task.mail")
def test_mail_task_logs_recipient_information(self, mock_mail, mock_email_service, caplog):
def test_mail_task_logs_recipient_information(
self, mock_mail, mock_email_service, caplog: pytest.LogCaptureFixture
):
"""
Test that mail tasks log recipient information for audit trails.
@ -1487,7 +1493,9 @@ class TestLoggingAndMonitoring:
@patch("tasks.mail_inner_task.get_email_i18n_service")
@patch("tasks.mail_inner_task.mail")
def test_inner_email_task_logs_subject_for_tracking(self, mock_mail, mock_email_service, caplog):
def test_inner_email_task_logs_subject_for_tracking(
self, mock_mail, mock_email_service, caplog: pytest.LogCaptureFixture
):
"""
Test that inner email task logs subject for tracking purposes.

View File

@ -50,7 +50,7 @@ class TestDeleteDraftVariableOffloadData:
assert result == 0
mock_conn.execute.assert_not_called()
def test_delete_draft_variable_offload_data_database_failure(self, caplog):
def test_delete_draft_variable_offload_data_database_failure(self, caplog: pytest.LogCaptureFixture):
"""Test handling of database operation failures."""
mock_conn = MagicMock()
file_ids = ["file-1"]
@ -114,7 +114,9 @@ class TestDeleteAppStars:
class TestDeleteArchivedWorkflowRunFiles:
@patch("tasks.remove_app_and_related_data_task.get_archive_storage")
def test_delete_archived_workflow_run_files_not_configured(self, mock_get_storage, caplog):
def test_delete_archived_workflow_run_files_not_configured(
self, mock_get_storage, caplog: pytest.LogCaptureFixture
):
mock_get_storage.side_effect = ArchiveStorageNotConfiguredError("missing config")
with caplog.at_level(logging.INFO, logger="tasks.remove_app_and_related_data_task"):
@ -123,7 +125,7 @@ class TestDeleteArchivedWorkflowRunFiles:
assert caplog.text.count("Archive storage not configured") == 1
@patch("tasks.remove_app_and_related_data_task.get_archive_storage")
def test_delete_archived_workflow_run_files_list_failure(self, mock_get_storage, caplog):
def test_delete_archived_workflow_run_files_list_failure(self, mock_get_storage, caplog: pytest.LogCaptureFixture):
storage = MagicMock()
storage.list_objects.side_effect = Exception("list failed")
mock_get_storage.return_value = storage
@ -136,7 +138,7 @@ class TestDeleteArchivedWorkflowRunFiles:
assert "Failed to list archive files for app app-1" in caplog.text
@patch("tasks.remove_app_and_related_data_task.get_archive_storage")
def test_delete_archived_workflow_run_files_success(self, mock_get_storage, caplog):
def test_delete_archived_workflow_run_files_success(self, mock_get_storage, caplog: pytest.LogCaptureFixture):
storage = MagicMock()
storage.list_objects.return_value = ["key-1", "key-2"]
mock_get_storage.return_value = storage