mirror of
https://github.com/langgenius/dify.git
synced 2026-06-24 21:11:16 +08:00
chore: rm all mock_logger (#37786)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
parent
eb19140986
commit
347b02d318
@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
from uuid import uuid4
|
||||
@ -355,7 +356,10 @@ class TestWebhookServiceTriggerExecutionWithContainers:
|
||||
mock_mark_rate_limited.assert_called_once_with(tenant.id)
|
||||
|
||||
def test_trigger_workflow_execution_logs_and_reraises_unexpected_errors(
|
||||
self, db_session_with_containers: Session, flask_app_with_containers: Flask
|
||||
self,
|
||||
db_session_with_containers: Session,
|
||||
flask_app_with_containers: Flask,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
):
|
||||
del flask_app_with_containers
|
||||
factory = WebhookServiceRelationshipFactory
|
||||
@ -367,13 +371,11 @@ class TestWebhookServiceTriggerExecutionWithContainers:
|
||||
webhook_trigger = factory.create_webhook_trigger(
|
||||
db_session_with_containers, app=app, account=account, node_id="node-1"
|
||||
)
|
||||
caplog.set_level(logging.ERROR, logger="services.trigger.webhook_service")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"services.trigger.webhook_service.EndUserService.get_or_create_end_user_by_type",
|
||||
side_effect=RuntimeError("boom"),
|
||||
),
|
||||
patch("services.trigger.webhook_service.logger.exception") as mock_logger_exception,
|
||||
with patch(
|
||||
"services.trigger.webhook_service.EndUserService.get_or_create_end_user_by_type",
|
||||
side_effect=RuntimeError("boom"),
|
||||
):
|
||||
with pytest.raises(RuntimeError, match="boom"):
|
||||
WebhookService.trigger_workflow_execution(
|
||||
@ -382,7 +384,7 @@ class TestWebhookServiceTriggerExecutionWithContainers:
|
||||
workflow,
|
||||
)
|
||||
|
||||
mock_logger_exception.assert_called_once()
|
||||
assert caplog.messages.count(f"Failed to trigger workflow for webhook {webhook_trigger.webhook_id}") == 1
|
||||
|
||||
|
||||
class TestWebhookServiceRelationshipSyncWithContainers:
|
||||
@ -482,7 +484,10 @@ class TestWebhookServiceRelationshipSyncWithContainers:
|
||||
assert cached_payload["webhook_id"] == "cache-webhook-id-00001"
|
||||
|
||||
def test_sync_webhook_relationships_logs_when_lock_release_fails(
|
||||
self, db_session_with_containers: Session, flask_app_with_containers: Flask
|
||||
self,
|
||||
db_session_with_containers: Session,
|
||||
flask_app_with_containers: Flask,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
):
|
||||
del flask_app_with_containers
|
||||
factory = WebhookServiceRelationshipFactory
|
||||
@ -494,14 +499,12 @@ class TestWebhookServiceRelationshipSyncWithContainers:
|
||||
lock = MagicMock()
|
||||
lock.acquire.return_value = True
|
||||
lock.release.side_effect = RuntimeError("release failed")
|
||||
caplog.set_level(logging.ERROR, logger="services.trigger.webhook_service")
|
||||
|
||||
with (
|
||||
patch("services.trigger.webhook_service.redis_client.lock", return_value=lock),
|
||||
patch("services.trigger.webhook_service.logger.exception") as mock_logger_exception,
|
||||
):
|
||||
with patch("services.trigger.webhook_service.redis_client.lock", return_value=lock):
|
||||
WebhookService.sync_webhook_relationships(app, workflow)
|
||||
|
||||
mock_logger_exception.assert_called_once()
|
||||
assert caplog.messages.count(f"Failed to release lock for webhook sync, app {app.id}") == 1
|
||||
|
||||
|
||||
def _read_cache(cache_key: str) -> dict[str, str] | None:
|
||||
|
||||
@ -6,9 +6,11 @@ using TestContainers to ensure realistic database interactions and proper isolat
|
||||
The task is responsible for removing document segments from the search index when they are disabled.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from unittest.mock import MagicMock, patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from faker import Faker
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
@ -533,7 +535,9 @@ class TestDisableSegmentsFromIndexTask:
|
||||
assert result is None # Task should complete without returning a value
|
||||
mock_factory.assert_called_with(doc_form)
|
||||
|
||||
def test_disable_segments_performance_timing(self, db_session_with_containers: Session):
|
||||
def test_disable_segments_performance_timing(
|
||||
self, db_session_with_containers: Session, caplog: pytest.LogCaptureFixture
|
||||
):
|
||||
"""
|
||||
Test that the task properly measures and logs performance timing.
|
||||
|
||||
@ -562,21 +566,18 @@ class TestDisableSegmentsFromIndexTask:
|
||||
# Mock time.perf_counter to control timing
|
||||
with patch("tasks.disable_segments_from_index_task.time.perf_counter") as mock_perf_counter:
|
||||
mock_perf_counter.side_effect = [1000.0, 1000.5] # 0.5 seconds execution time
|
||||
caplog.set_level(logging.INFO, logger="tasks.disable_segments_from_index_task")
|
||||
|
||||
# Mock logger to capture log messages
|
||||
with patch("tasks.disable_segments_from_index_task.logger") as mock_logger:
|
||||
# Act
|
||||
result = disable_segments_from_index_task(segment_ids, dataset.id, document.id)
|
||||
# Act
|
||||
result = disable_segments_from_index_task(segment_ids, dataset.id, document.id)
|
||||
|
||||
# Assert
|
||||
assert result is None # Task should complete without returning a value
|
||||
# Assert
|
||||
assert result is None # Task should complete without returning a value
|
||||
|
||||
# Verify performance logging
|
||||
mock_logger.info.assert_called()
|
||||
log_calls = [call[0][0] for call in mock_logger.info.call_args_list]
|
||||
performance_log = next((call for call in log_calls if "latency" in call), None)
|
||||
assert performance_log is not None
|
||||
assert "0.5" in performance_log # Should log the execution time
|
||||
# Verify performance logging
|
||||
performance_log = next((message for message in caplog.messages if "latency" in message), None)
|
||||
assert performance_log is not None
|
||||
assert "0.5" in performance_log # Should log the execution time
|
||||
|
||||
def test_disable_segments_redis_cache_cleanup(self, db_session_with_containers: Session):
|
||||
"""
|
||||
|
||||
@ -10,6 +10,7 @@ All tests use the testcontainers infrastructure to ensure proper database isolat
|
||||
and realistic testing scenarios with actual PostgreSQL and Redis instances.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
@ -543,7 +544,10 @@ class TestSendEmailCodeLoginMailTask:
|
||||
redis_client.delete(cache_key)
|
||||
|
||||
def test_send_email_code_login_mail_task_error_handling_comprehensive(
|
||||
self, db_session_with_containers: Session, mock_external_service_dependencies
|
||||
self,
|
||||
db_session_with_containers: Session,
|
||||
mock_external_service_dependencies,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
):
|
||||
"""
|
||||
Test comprehensive error handling for email code login mail task.
|
||||
@ -559,6 +563,7 @@ class TestSendEmailCodeLoginMailTask:
|
||||
test_email = fake.email()
|
||||
test_code = "123456"
|
||||
test_language = "en-US"
|
||||
caplog.set_level(logging.ERROR, logger="tasks.mail_email_code_login")
|
||||
|
||||
# Test different exception types
|
||||
exception_types = [
|
||||
@ -571,31 +576,26 @@ class TestSendEmailCodeLoginMailTask:
|
||||
|
||||
for error_name, exception in exception_types:
|
||||
# Reset mocks for each test case
|
||||
caplog.clear()
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
mock_email_service_instance.reset_mock()
|
||||
mock_email_service_instance.send_email.side_effect = exception
|
||||
|
||||
# Mock logging to capture error messages
|
||||
with patch("tasks.mail_email_code_login.logger", autospec=True) as mock_logger:
|
||||
# Act: Execute the task - it should handle the exception gracefully
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_email,
|
||||
code=test_code,
|
||||
)
|
||||
# Act: Execute the task - it should handle the exception gracefully
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_email,
|
||||
code=test_code,
|
||||
)
|
||||
|
||||
# Assert: Verify error handling
|
||||
# Verify email service was called (and failed)
|
||||
mock_email_service_instance.send_email.assert_called_once()
|
||||
# Assert: Verify error handling
|
||||
# Verify email service was called (and failed)
|
||||
mock_email_service_instance.send_email.assert_called_once()
|
||||
|
||||
# Verify error was logged
|
||||
error_calls = [
|
||||
call
|
||||
for call in mock_logger.exception.call_args_list
|
||||
if f"Send email code login mail to {test_email} failed" in str(call)
|
||||
]
|
||||
# Check if any exception call was made (the exact message format may vary)
|
||||
assert mock_logger.exception.call_count >= 1, f"Error should be logged for {error_name}"
|
||||
# Verify error was logged
|
||||
assert f"Send email code login mail to {test_email} failed" in caplog.messages, (
|
||||
f"Error should be logged for {error_name}"
|
||||
)
|
||||
|
||||
# Reset side effect for next iteration
|
||||
mock_email_service_instance.send_email.side_effect = None
|
||||
|
||||
@ -11,6 +11,7 @@ and realistic testing scenarios with actual PostgreSQL and Redis instances.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
@ -295,7 +296,10 @@ class TestMailInviteMemberTask:
|
||||
mock_email_service.send_email.assert_not_called()
|
||||
|
||||
def test_send_invite_member_mail_email_service_exception(
|
||||
self, db_session_with_containers: Session, mock_external_service_dependencies
|
||||
self,
|
||||
db_session_with_containers: Session,
|
||||
mock_external_service_dependencies,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
):
|
||||
"""
|
||||
Test error handling when email service raises an exception.
|
||||
@ -308,21 +312,19 @@ class TestMailInviteMemberTask:
|
||||
# Arrange: Setup email service to raise exception
|
||||
mock_email_service = mock_external_service_dependencies["email_service"]
|
||||
mock_email_service.send_email.side_effect = Exception("Email service failed")
|
||||
caplog.set_level(logging.ERROR, logger="tasks.mail_invite_member_task")
|
||||
|
||||
# Act & Assert: Execute task and verify exception is handled
|
||||
with patch("tasks.mail_invite_member_task.logger", autospec=True) as mock_logger:
|
||||
send_invite_member_mail_task(
|
||||
language="en-US",
|
||||
to="test@example.com",
|
||||
token="test-token",
|
||||
inviter_name="Test User",
|
||||
workspace_name="Test Workspace",
|
||||
)
|
||||
send_invite_member_mail_task(
|
||||
language="en-US",
|
||||
to="test@example.com",
|
||||
token="test-token",
|
||||
inviter_name="Test User",
|
||||
workspace_name="Test Workspace",
|
||||
)
|
||||
|
||||
# Verify error was logged
|
||||
mock_logger.exception.assert_called_once()
|
||||
error_call = mock_logger.exception.call_args[0][0]
|
||||
assert "Send invite member mail to %s failed" in error_call
|
||||
# Verify error was logged
|
||||
assert caplog.messages.count("Send invite member mail to test@example.com failed") == 1
|
||||
|
||||
def test_send_invite_member_mail_template_context_validation(
|
||||
self, db_session_with_containers: Session, mock_external_service_dependencies
|
||||
|
||||
@ -5,6 +5,7 @@ This module provides integration tests for email registration tasks
|
||||
using TestContainers to ensure real database and service interactions.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
@ -68,7 +69,10 @@ class TestMailRegisterTask:
|
||||
mock_mail_dependencies["email_service"].send_email.assert_not_called()
|
||||
|
||||
def test_send_email_register_mail_task_exception_handling(
|
||||
self, db_session_with_containers: Session, mock_mail_dependencies
|
||||
self,
|
||||
db_session_with_containers: Session,
|
||||
mock_mail_dependencies,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
):
|
||||
"""Test email registration task exception handling."""
|
||||
mock_mail_dependencies["email_service"].send_email.side_effect = Exception("Email service error")
|
||||
@ -76,10 +80,11 @@ class TestMailRegisterTask:
|
||||
fake = Faker()
|
||||
to_email = fake.email()
|
||||
code = fake.numerify("######")
|
||||
caplog.set_level(logging.ERROR, logger="tasks.mail_register_task")
|
||||
|
||||
with patch("tasks.mail_register_task.logger", autospec=True) as mock_logger:
|
||||
send_email_register_mail_task(language="en-US", to=to_email, code=code)
|
||||
mock_logger.exception.assert_called_once_with("Send email register mail to %s failed", to_email)
|
||||
send_email_register_mail_task(language="en-US", to=to_email, code=code)
|
||||
|
||||
assert caplog.messages.count(f"Send email register mail to {to_email} failed") == 1
|
||||
|
||||
def test_send_email_register_mail_task_when_account_exist_success(
|
||||
self, db_session_with_containers: Session, mock_mail_dependencies
|
||||
@ -121,7 +126,10 @@ class TestMailRegisterTask:
|
||||
mock_mail_dependencies["email_service"].send_email.assert_not_called()
|
||||
|
||||
def test_send_email_register_mail_task_when_account_exist_exception_handling(
|
||||
self, db_session_with_containers: Session, mock_mail_dependencies
|
||||
self,
|
||||
db_session_with_containers: Session,
|
||||
mock_mail_dependencies,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
):
|
||||
"""Test account exist email task exception handling."""
|
||||
mock_mail_dependencies["email_service"].send_email.side_effect = Exception("Email service error")
|
||||
@ -129,7 +137,8 @@ class TestMailRegisterTask:
|
||||
fake = Faker()
|
||||
to_email = fake.email()
|
||||
account_name = fake.name()
|
||||
caplog.set_level(logging.ERROR, logger="tasks.mail_register_task")
|
||||
|
||||
with patch("tasks.mail_register_task.logger", autospec=True) as mock_logger:
|
||||
send_email_register_mail_task_when_account_exist(language="en-US", to=to_email, account_name=account_name)
|
||||
mock_logger.exception.assert_called_once_with("Send email register mail to %s failed", to_email)
|
||||
send_email_register_mail_task_when_account_exist(language="en-US", to=to_email, account_name=account_name)
|
||||
|
||||
assert caplog.messages.count(f"Send email register mail to {to_email} failed") == 1
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
@ -1704,13 +1705,14 @@ def test_get_specific_provider_credential_decrypts_and_obfuscates_credentials()
|
||||
assert credentials == {"openai_api_key": "raw-secret", "region": "us"}
|
||||
|
||||
|
||||
def test_get_specific_provider_credential_logs_when_decrypt_fails() -> None:
|
||||
def test_get_specific_provider_credential_logs_when_decrypt_fails(caplog: pytest.LogCaptureFixture) -> None:
|
||||
configuration = _build_provider_configuration()
|
||||
configuration.provider.provider_credential_schema = _build_secret_provider_schema()
|
||||
session = Mock()
|
||||
session.execute.return_value.scalar_one_or_none.return_value = SimpleNamespace(
|
||||
encrypted_config='{"openai_api_key":"enc-secret"}'
|
||||
)
|
||||
caplog.set_level(logging.ERROR, logger="core.entities.provider_configuration")
|
||||
|
||||
with _patched_session(session):
|
||||
with patch.object(ProviderConfiguration, "_get_provider_record", return_value=None):
|
||||
@ -1718,16 +1720,15 @@ def test_get_specific_provider_credential_logs_when_decrypt_fails() -> None:
|
||||
"core.entities.provider_configuration.encrypter.decrypt_token",
|
||||
side_effect=RuntimeError("boom"),
|
||||
):
|
||||
with patch("core.entities.provider_configuration.logger.exception") as mock_logger:
|
||||
with patch.object(
|
||||
ProviderConfiguration,
|
||||
"obfuscated_credentials",
|
||||
side_effect=lambda credentials, credential_form_schemas: credentials,
|
||||
):
|
||||
credentials = configuration._get_specific_provider_credential("cred-1")
|
||||
with patch.object(
|
||||
ProviderConfiguration,
|
||||
"obfuscated_credentials",
|
||||
side_effect=lambda credentials, credential_form_schemas: credentials,
|
||||
):
|
||||
credentials = configuration._get_specific_provider_credential("cred-1")
|
||||
|
||||
assert credentials == {"openai_api_key": "enc-secret"}
|
||||
mock_logger.assert_called_once()
|
||||
assert caplog.messages.count("Failed to decrypt credential secret variable openai_api_key") == 1
|
||||
|
||||
|
||||
def test_validate_provider_credentials_uses_empty_original_when_record_missing() -> None:
|
||||
@ -1831,7 +1832,7 @@ def test_switch_active_provider_credential_rolls_back_on_error() -> None:
|
||||
session.rollback.assert_called_once()
|
||||
|
||||
|
||||
def test_get_specific_custom_model_credential_logs_when_decrypt_fails() -> None:
|
||||
def test_get_specific_custom_model_credential_logs_when_decrypt_fails(caplog: pytest.LogCaptureFixture) -> None:
|
||||
configuration = _build_provider_configuration()
|
||||
configuration.provider.model_credential_schema = _build_secret_model_schema()
|
||||
session = Mock()
|
||||
@ -1840,19 +1841,19 @@ def test_get_specific_custom_model_credential_logs_when_decrypt_fails() -> None:
|
||||
credential_name="Main",
|
||||
encrypted_config='{"openai_api_key":"enc-secret"}',
|
||||
)
|
||||
caplog.set_level(logging.ERROR, logger="core.entities.provider_configuration")
|
||||
|
||||
with _patched_session(session):
|
||||
with patch("core.entities.provider_configuration.encrypter.decrypt_token", side_effect=RuntimeError("boom")):
|
||||
with patch("core.entities.provider_configuration.logger.exception") as mock_logger:
|
||||
with patch.object(
|
||||
ProviderConfiguration,
|
||||
"obfuscated_credentials",
|
||||
side_effect=lambda credentials, credential_form_schemas: credentials,
|
||||
):
|
||||
result = configuration._get_specific_custom_model_credential(ModelType.LLM, "gpt-4o", "cred-1")
|
||||
with patch.object(
|
||||
ProviderConfiguration,
|
||||
"obfuscated_credentials",
|
||||
side_effect=lambda credentials, credential_form_schemas: credentials,
|
||||
):
|
||||
result = configuration._get_specific_custom_model_credential(ModelType.LLM, "gpt-4o", "cred-1")
|
||||
|
||||
assert result["credentials"] == {"openai_api_key": "enc-secret"}
|
||||
mock_logger.assert_called_once()
|
||||
assert caplog.messages.count("Failed to decrypt model credential secret variable openai_api_key") == 1
|
||||
|
||||
|
||||
def test_validate_custom_model_credentials_handles_invalid_original_json() -> None:
|
||||
|
||||
@ -407,7 +407,7 @@ class TestCacheEmbeddingDocuments:
|
||||
assert len(calls[1].kwargs["texts"]) == 10
|
||||
assert len(calls[2].kwargs["texts"]) == 5
|
||||
|
||||
def test_embed_documents_nan_handling(self, mock_model_instance, caplog):
|
||||
def test_embed_documents_nan_handling(self, mock_model_instance, caplog: pytest.LogCaptureFixture):
|
||||
"""Test handling of NaN values in embeddings.
|
||||
|
||||
Verifies:
|
||||
|
||||
@ -385,7 +385,7 @@ class TestParagraphIndexProcessor:
|
||||
with pytest.raises(ValueError, match="model_name and model_provider_name"):
|
||||
ParagraphIndexProcessor.generate_summary("tenant-1", "text", {"enable": True})
|
||||
|
||||
def test_generate_summary_text_only_flow(self, caplog) -> None:
|
||||
def test_generate_summary_text_only_flow(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
model_instance = Mock()
|
||||
model_instance.credentials = {"k": "v"}
|
||||
model_instance.model_type_instance.get_model_schema.return_value = SimpleNamespace(features=[])
|
||||
@ -459,7 +459,7 @@ class TestParagraphIndexProcessor:
|
||||
assert summary == "vision summary"
|
||||
mock_extract_text.assert_not_called()
|
||||
|
||||
def test_generate_summary_fallbacks_for_prompt_and_result_types(self, caplog) -> None:
|
||||
def test_generate_summary_fallbacks_for_prompt_and_result_types(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
model_instance = Mock()
|
||||
model_instance.credentials = {"k": "v"}
|
||||
model_instance.model_type_instance.get_model_schema.return_value = SimpleNamespace(
|
||||
@ -503,7 +503,7 @@ class TestParagraphIndexProcessor:
|
||||
"Failed to convert image file to prompt message content" in record.message for record in caplog.records
|
||||
)
|
||||
|
||||
def test_extract_images_from_text_handles_patterns_and_build_errors(self, caplog) -> None:
|
||||
def test_extract_images_from_text_handles_patterns_and_build_errors(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
text = (
|
||||
" "
|
||||
" "
|
||||
@ -554,7 +554,7 @@ class TestParagraphIndexProcessor:
|
||||
session.scalars.return_value = scalars_result
|
||||
assert ParagraphIndexProcessor._extract_images_from_text("tenant-1", "no images here", session) == []
|
||||
|
||||
def test_extract_images_from_text_logs_when_build_fails(self, caplog) -> None:
|
||||
def test_extract_images_from_text_logs_when_build_fails(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
text = ""
|
||||
image_upload = SimpleNamespace(
|
||||
id="11111111-1111-1111-1111-111111111111",
|
||||
@ -583,7 +583,7 @@ class TestParagraphIndexProcessor:
|
||||
assert files == []
|
||||
assert sum(1 for r in caplog.records if r.levelno == logging.WARNING) == 1
|
||||
|
||||
def test_extract_images_from_segment_attachments(self, caplog) -> None:
|
||||
def test_extract_images_from_segment_attachments(self, caplog: pytest.LogCaptureFixture) -> None:
|
||||
image_upload = SimpleNamespace(
|
||||
id="file-1",
|
||||
name="image",
|
||||
|
||||
@ -351,7 +351,9 @@ class TestQAIndexProcessor:
|
||||
assert all_qa_documents[0].metadata["answer"] == "A test."
|
||||
assert all_qa_documents[1].metadata["answer"] == "Coverage."
|
||||
|
||||
def test_format_qa_document_logs_errors(self, processor: QAIndexProcessor, fake_flask_app, caplog) -> None:
|
||||
def test_format_qa_document_logs_errors(
|
||||
self, processor: QAIndexProcessor, fake_flask_app, caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
all_qa_documents: list[Document] = []
|
||||
source_document = Document(page_content="source text", metadata={"origin": "doc-1"})
|
||||
|
||||
|
||||
@ -4,6 +4,8 @@ from datetime import datetime
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from core.app.workflow.layers.llm_quota import LLMQuotaLayer
|
||||
from core.errors.error import QuotaExceededError
|
||||
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionStatus
|
||||
@ -122,7 +124,7 @@ def test_precheck_ignores_non_quota_node() -> None:
|
||||
mock_check.assert_not_called()
|
||||
|
||||
|
||||
def test_quota_error_is_handled_in_layer(caplog) -> None:
|
||||
def test_quota_error_is_handled_in_layer(caplog: pytest.LogCaptureFixture) -> None:
|
||||
layer = LLMQuotaLayer(tenant_id="tenant-id")
|
||||
stop_event = threading.Event()
|
||||
layer.command_channel = MagicMock()
|
||||
|
||||
@ -7,6 +7,8 @@ from datetime import UTC, datetime
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from configs.enterprise import EnterpriseTelemetryConfig
|
||||
from enterprise.telemetry.entities import EnterpriseTelemetryCounter, EnterpriseTelemetryHistogram
|
||||
from enterprise.telemetry.exporter import EnterpriseExporter, _datetime_to_ns, _parse_otlp_headers
|
||||
@ -533,7 +535,7 @@ def test_export_span_cross_workflow_parent_context() -> None:
|
||||
assert kwargs["context"] is not None
|
||||
|
||||
|
||||
def test_export_span_logs_exception_on_error(caplog) -> None:
|
||||
def test_export_span_logs_exception_on_error(caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""If the span block raises, the exception is logged and context is still cleared."""
|
||||
exporter, mock_tracer, mock_span = _make_exporter_with_mock_tracer()
|
||||
|
||||
@ -546,7 +548,7 @@ def test_export_span_logs_exception_on_error(caplog) -> None:
|
||||
assert "bad.span" in caplog.text
|
||||
|
||||
|
||||
def test_export_span_invalid_trace_correlation_logs_warning(caplog) -> None:
|
||||
def test_export_span_invalid_trace_correlation_logs_warning(caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Invalid UUID for trace_correlation_override triggers a warning log."""
|
||||
exporter, mock_tracer, mock_span = _make_exporter_with_mock_tracer()
|
||||
|
||||
|
||||
@ -4,6 +4,7 @@ from types import SimpleNamespace
|
||||
from unittest.mock import patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine, select
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
@ -122,7 +123,9 @@ def test_message_created_paid_credit_accounting_uses_paid_pool() -> None:
|
||||
)
|
||||
|
||||
|
||||
def test_capped_credit_pool_accounting_skips_exhaustion_warning_when_full_amount_is_deducted(caplog) -> None:
|
||||
def test_capped_credit_pool_accounting_skips_exhaustion_warning_when_full_amount_is_deducted(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
with patch(
|
||||
"services.credit_pool_service.CreditPoolService.deduct_credits_capped",
|
||||
return_value=3,
|
||||
|
||||
@ -56,18 +56,19 @@ def mock_response_receiver(monkeypatch: pytest.MonkeyPatch) -> mock.Mock:
|
||||
return mock_log_request_finished
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_logger(monkeypatch: pytest.MonkeyPatch) -> logging.Logger:
|
||||
_logger = mock.MagicMock(spec=logging.Logger)
|
||||
monkeypatch.setattr(ext_request_logging, "logger", _logger)
|
||||
return _logger
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enable_request_logging(monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr(dify_config, "ENABLE_REQUEST_LOGGING", True)
|
||||
|
||||
|
||||
def _captured_records(caplog: pytest.LogCaptureFixture, level: int) -> list[logging.LogRecord]:
|
||||
return [
|
||||
record
|
||||
for record in caplog.records
|
||||
if record.name == ext_request_logging.logger.name and record.levelno == level
|
||||
]
|
||||
|
||||
|
||||
class TestRequestLoggingExtension:
|
||||
def test_receiver_should_not_be_invoked_if_configuration_is_disabled(
|
||||
self,
|
||||
@ -108,67 +109,74 @@ class TestRequestLoggingExtension:
|
||||
|
||||
class TestLoggingLevel:
|
||||
@pytest.mark.usefixtures("enable_request_logging")
|
||||
def test_logging_should_be_skipped_if_level_is_above_debug(self, enable_request_logging, mock_logger):
|
||||
mock_logger.isEnabledFor.return_value = False
|
||||
def test_logging_should_be_skipped_if_level_is_above_debug(
|
||||
self, enable_request_logging, caplog: pytest.LogCaptureFixture
|
||||
):
|
||||
caplog.set_level(logging.INFO, logger=ext_request_logging.logger.name)
|
||||
app = _get_test_app()
|
||||
init_app(app)
|
||||
|
||||
with app.test_client() as client:
|
||||
client.post("/", json={_KEY_NEEDLE: _VALUE_NEEDLE})
|
||||
mock_logger.debug.assert_not_called()
|
||||
assert not _captured_records(caplog, logging.DEBUG)
|
||||
|
||||
|
||||
class TestRequestReceiverLogging:
|
||||
@pytest.mark.usefixtures("enable_request_logging")
|
||||
def test_non_json_request(self, enable_request_logging, mock_logger, mock_response_receiver):
|
||||
mock_logger.isEnabledFor.return_value = True
|
||||
def test_non_json_request(self, enable_request_logging, caplog: pytest.LogCaptureFixture, mock_response_receiver):
|
||||
caplog.set_level(logging.DEBUG, logger=ext_request_logging.logger.name)
|
||||
app = _get_test_app()
|
||||
init_app(app)
|
||||
|
||||
with app.test_client() as client:
|
||||
client.post("/", data="plain text")
|
||||
assert mock_logger.debug.call_count == 1
|
||||
call_args = mock_logger.debug.call_args[0]
|
||||
assert "Received Request" in call_args[0]
|
||||
assert call_args[1] == "POST"
|
||||
assert call_args[2] == "/"
|
||||
assert "Request Body" not in call_args[0]
|
||||
debug_records = _captured_records(caplog, logging.DEBUG)
|
||||
assert len(debug_records) == 1
|
||||
record = debug_records[0]
|
||||
assert "Received Request" in record.msg
|
||||
assert record.args == ("POST", "/")
|
||||
assert "Request Body" not in record.msg
|
||||
|
||||
@pytest.mark.usefixtures("enable_request_logging")
|
||||
def test_json_request(self, enable_request_logging, mock_logger, mock_response_receiver):
|
||||
mock_logger.isEnabledFor.return_value = True
|
||||
def test_json_request(self, enable_request_logging, caplog: pytest.LogCaptureFixture, mock_response_receiver):
|
||||
caplog.set_level(logging.DEBUG, logger=ext_request_logging.logger.name)
|
||||
app = _get_test_app()
|
||||
init_app(app)
|
||||
|
||||
with app.test_client() as client:
|
||||
client.post("/", json={_KEY_NEEDLE: _VALUE_NEEDLE})
|
||||
assert mock_logger.debug.call_count == 1
|
||||
call_args = mock_logger.debug.call_args[0]
|
||||
assert "Received Request" in call_args[0]
|
||||
assert "Request Body" in call_args[0]
|
||||
assert call_args[1] == "POST"
|
||||
assert call_args[2] == "/"
|
||||
assert _KEY_NEEDLE in call_args[3]
|
||||
debug_records = _captured_records(caplog, logging.DEBUG)
|
||||
assert len(debug_records) == 1
|
||||
record = debug_records[0]
|
||||
assert "Received Request" in record.msg
|
||||
assert "Request Body" in record.msg
|
||||
assert record.args[0] == "POST"
|
||||
assert record.args[1] == "/"
|
||||
assert _KEY_NEEDLE in record.args[2]
|
||||
|
||||
@pytest.mark.usefixtures("enable_request_logging")
|
||||
def test_json_request_with_empty_body(self, enable_request_logging, mock_logger, mock_response_receiver):
|
||||
mock_logger.isEnabledFor.return_value = True
|
||||
def test_json_request_with_empty_body(
|
||||
self, enable_request_logging, caplog: pytest.LogCaptureFixture, mock_response_receiver
|
||||
):
|
||||
caplog.set_level(logging.DEBUG, logger=ext_request_logging.logger.name)
|
||||
app = _get_test_app()
|
||||
init_app(app)
|
||||
|
||||
with app.test_client() as client:
|
||||
client.post("/", headers={"Content-Type": "application/json"})
|
||||
|
||||
assert mock_logger.debug.call_count == 1
|
||||
call_args = mock_logger.debug.call_args[0]
|
||||
assert "Received Request" in call_args[0]
|
||||
assert "Request Body" not in call_args[0]
|
||||
assert call_args[1] == "POST"
|
||||
assert call_args[2] == "/"
|
||||
debug_records = _captured_records(caplog, logging.DEBUG)
|
||||
assert len(debug_records) == 1
|
||||
record = debug_records[0]
|
||||
assert "Received Request" in record.msg
|
||||
assert "Request Body" not in record.msg
|
||||
assert record.args == ("POST", "/")
|
||||
|
||||
@pytest.mark.usefixtures("enable_request_logging")
|
||||
def test_json_request_with_invalid_json_as_body(self, enable_request_logging, mock_logger, mock_response_receiver):
|
||||
mock_logger.isEnabledFor.return_value = True
|
||||
def test_json_request_with_invalid_json_as_body(
|
||||
self, enable_request_logging, caplog: pytest.LogCaptureFixture, mock_response_receiver
|
||||
):
|
||||
caplog.set_level(logging.DEBUG, logger=ext_request_logging.logger.name)
|
||||
app = _get_test_app()
|
||||
init_app(app)
|
||||
|
||||
@ -178,50 +186,53 @@ class TestRequestReceiverLogging:
|
||||
headers={"Content-Type": "application/json"},
|
||||
data="{",
|
||||
)
|
||||
assert mock_logger.debug.call_count == 0
|
||||
assert mock_logger.exception.call_count == 1
|
||||
|
||||
exception_call_args = mock_logger.exception.call_args[0]
|
||||
assert exception_call_args[0] == "Failed to parse JSON request"
|
||||
assert not _captured_records(caplog, logging.DEBUG)
|
||||
error_records = _captured_records(caplog, logging.ERROR)
|
||||
assert len(error_records) == 1
|
||||
assert error_records[0].message == "Failed to parse JSON request"
|
||||
|
||||
|
||||
class TestResponseReceiverLogging:
|
||||
@pytest.mark.usefixtures("enable_request_logging")
|
||||
def test_non_json_response(self, enable_request_logging, mock_logger):
|
||||
mock_logger.isEnabledFor.return_value = True
|
||||
def test_non_json_response(self, enable_request_logging, caplog: pytest.LogCaptureFixture):
|
||||
caplog.set_level(logging.DEBUG, logger=ext_request_logging.logger.name)
|
||||
app = _get_test_app()
|
||||
response = Response(
|
||||
"OK",
|
||||
headers={"Content-Type": "text/plain"},
|
||||
)
|
||||
_log_request_finished(app, response)
|
||||
assert mock_logger.debug.call_count == 1
|
||||
call_args = mock_logger.debug.call_args[0]
|
||||
assert "Response" in call_args[0]
|
||||
assert "200" in call_args[1]
|
||||
assert call_args[2] == "text/plain"
|
||||
assert "Response Body" not in call_args[0]
|
||||
debug_records = _captured_records(caplog, logging.DEBUG)
|
||||
assert len(debug_records) == 1
|
||||
record = debug_records[0]
|
||||
assert "Response" in record.msg
|
||||
assert "200" in record.args[0]
|
||||
assert record.args[1] == "text/plain"
|
||||
assert "Response Body" not in record.msg
|
||||
|
||||
@pytest.mark.usefixtures("enable_request_logging")
|
||||
def test_json_response(self, enable_request_logging, mock_logger, mock_response_receiver):
|
||||
mock_logger.isEnabledFor.return_value = True
|
||||
def test_json_response(self, enable_request_logging, caplog: pytest.LogCaptureFixture, mock_response_receiver):
|
||||
caplog.set_level(logging.DEBUG, logger=ext_request_logging.logger.name)
|
||||
app = _get_test_app()
|
||||
response = Response(
|
||||
json.dumps({_KEY_NEEDLE: _VALUE_NEEDLE}),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
_log_request_finished(app, response)
|
||||
assert mock_logger.debug.call_count == 1
|
||||
call_args = mock_logger.debug.call_args[0]
|
||||
assert "Response" in call_args[0]
|
||||
assert "Response Body" in call_args[0]
|
||||
assert "200" in call_args[1]
|
||||
assert call_args[2] == "application/json"
|
||||
assert _KEY_NEEDLE in call_args[3]
|
||||
debug_records = _captured_records(caplog, logging.DEBUG)
|
||||
assert len(debug_records) == 1
|
||||
record = debug_records[0]
|
||||
assert "Response" in record.msg
|
||||
assert "Response Body" in record.msg
|
||||
assert "200" in record.args[0]
|
||||
assert record.args[1] == "application/json"
|
||||
assert _KEY_NEEDLE in record.args[2]
|
||||
|
||||
@pytest.mark.usefixtures("enable_request_logging")
|
||||
def test_json_request_with_invalid_json_as_body(self, enable_request_logging, mock_logger, mock_response_receiver):
|
||||
mock_logger.isEnabledFor.return_value = True
|
||||
def test_json_request_with_invalid_json_as_body(
|
||||
self, enable_request_logging, caplog: pytest.LogCaptureFixture, mock_response_receiver
|
||||
):
|
||||
caplog.set_level(logging.DEBUG, logger=ext_request_logging.logger.name)
|
||||
app = _get_test_app()
|
||||
|
||||
response = Response(
|
||||
@ -229,11 +240,10 @@ class TestResponseReceiverLogging:
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
_log_request_finished(app, response)
|
||||
assert mock_logger.debug.call_count == 0
|
||||
assert mock_logger.exception.call_count == 1
|
||||
|
||||
exception_call_args = mock_logger.exception.call_args[0]
|
||||
assert exception_call_args[0] == "Failed to parse JSON response"
|
||||
assert not _captured_records(caplog, logging.DEBUG)
|
||||
error_records = _captured_records(caplog, logging.ERROR)
|
||||
assert len(error_records) == 1
|
||||
assert error_records[0].message == "Failed to parse JSON response"
|
||||
|
||||
|
||||
class TestResponseUnmodified:
|
||||
@ -267,7 +277,7 @@ class TestResponseUnmodified:
|
||||
|
||||
class TestRequestFinishedInfoAccessLine:
|
||||
def test_info_access_log_includes_method_path_status_duration_trace_id(
|
||||
self, monkeypatch: pytest.MonkeyPatch, caplog
|
||||
self, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
|
||||
):
|
||||
"""Ensure INFO access line contains expected fields with computed duration and trace id."""
|
||||
app = _get_test_app()
|
||||
|
||||
@ -5,6 +5,7 @@ This module covers the pre-uninstall plugin hook behavior:
|
||||
- API failure: soft-fail (logs and does not re-raise)
|
||||
"""
|
||||
|
||||
import logging
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
@ -43,18 +44,16 @@ class TestTryPreUninstallPlugin:
|
||||
timeout=dify_config.ENTERPRISE_REQUEST_TIMEOUT,
|
||||
)
|
||||
|
||||
def test_try_pre_uninstall_plugin_http_error_soft_fails(self):
|
||||
def test_try_pre_uninstall_plugin_http_error_soft_fails(self, caplog: pytest.LogCaptureFixture):
|
||||
body = PreUninstallPluginRequest(
|
||||
tenant_id="tenant-456",
|
||||
plugin_unique_identifier="com.example.other_plugin",
|
||||
)
|
||||
caplog.set_level(logging.ERROR, logger="services.enterprise.plugin_manager_service")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"services.enterprise.plugin_manager_service.EnterprisePluginManagerRequest.send_request"
|
||||
) as mock_send_request,
|
||||
patch("services.enterprise.plugin_manager_service.logger") as mock_logger,
|
||||
):
|
||||
with patch(
|
||||
"services.enterprise.plugin_manager_service.EnterprisePluginManagerRequest.send_request"
|
||||
) as mock_send_request:
|
||||
mock_send_request.side_effect = HTTPStatusError(
|
||||
"502 Bad Gateway",
|
||||
request=None,
|
||||
@ -69,20 +68,22 @@ class TestTryPreUninstallPlugin:
|
||||
json={"tenant_id": "tenant-456", "plugin_unique_identifier": "com.example.other_plugin"},
|
||||
timeout=dify_config.ENTERPRISE_REQUEST_TIMEOUT,
|
||||
)
|
||||
mock_logger.exception.assert_called_once()
|
||||
assert len(caplog.records) == 1
|
||||
assert caplog.messages[0] == (
|
||||
"failed to perform pre uninstall plugin hook. tenant_id: tenant-456, "
|
||||
"plugin_unique_identifier: com.example.other_plugin"
|
||||
)
|
||||
|
||||
def test_try_pre_uninstall_plugin_generic_exception_soft_fails(self):
|
||||
def test_try_pre_uninstall_plugin_generic_exception_soft_fails(self, caplog: pytest.LogCaptureFixture):
|
||||
body = PreUninstallPluginRequest(
|
||||
tenant_id="tenant-789",
|
||||
plugin_unique_identifier="com.example.failing_plugin",
|
||||
)
|
||||
caplog.set_level(logging.ERROR, logger="services.enterprise.plugin_manager_service")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"services.enterprise.plugin_manager_service.EnterprisePluginManagerRequest.send_request"
|
||||
) as mock_send_request,
|
||||
patch("services.enterprise.plugin_manager_service.logger") as mock_logger,
|
||||
):
|
||||
with patch(
|
||||
"services.enterprise.plugin_manager_service.EnterprisePluginManagerRequest.send_request"
|
||||
) as mock_send_request:
|
||||
mock_send_request.side_effect = ConnectionError("network unreachable")
|
||||
|
||||
PluginManagerService.try_pre_uninstall_plugin(body)
|
||||
@ -93,7 +94,11 @@ class TestTryPreUninstallPlugin:
|
||||
json={"tenant_id": "tenant-789", "plugin_unique_identifier": "com.example.failing_plugin"},
|
||||
timeout=dify_config.ENTERPRISE_REQUEST_TIMEOUT,
|
||||
)
|
||||
mock_logger.exception.assert_called_once()
|
||||
assert len(caplog.records) == 1
|
||||
assert caplog.messages[0] == (
|
||||
"failed to perform pre uninstall plugin hook. tenant_id: tenant-789, "
|
||||
"plugin_unique_identifier: com.example.failing_plugin"
|
||||
)
|
||||
|
||||
|
||||
class TestCheckCredentialPolicyCompliance:
|
||||
|
||||
@ -8,6 +8,7 @@ and both positive and negative test scenarios.
|
||||
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
from unittest.mock import Mock, create_autospec, patch
|
||||
@ -312,16 +313,16 @@ class TestGetSchemaVersion:
|
||||
result = restore._get_schema_version(manifest)
|
||||
assert result == "1.0"
|
||||
|
||||
def test_missing_schema_version_defaults_to_1_0(self):
|
||||
def test_missing_schema_version_defaults_to_1_0(self, caplog: pytest.LogCaptureFixture):
|
||||
"""Should default to 1.0 when schema_version is missing."""
|
||||
restore = WorkflowRunRestore()
|
||||
manifest = {"tables": {}}
|
||||
caplog.set_level(logging.WARNING, logger="services.retention.workflow_run.restore_archived_workflow_run")
|
||||
|
||||
with patch("services.retention.workflow_run.restore_archived_workflow_run.logger") as mock_logger:
|
||||
result = restore._get_schema_version(manifest)
|
||||
result = restore._get_schema_version(manifest)
|
||||
|
||||
assert result == "1.0"
|
||||
mock_logger.warning.assert_called_once_with("Manifest missing schema_version; defaulting to 1.0")
|
||||
assert "Manifest missing schema_version; defaulting to 1.0" in caplog.messages
|
||||
|
||||
def test_unsupported_schema_version_raises_error(self):
|
||||
"""Should raise ValueError for unsupported schema version."""
|
||||
@ -492,19 +493,19 @@ class TestRestoreTableRecords:
|
||||
"""Tests for WorkflowRunRestore._restore_table_records method."""
|
||||
|
||||
@patch("services.retention.workflow_run.restore_archived_workflow_run.TABLE_MODELS")
|
||||
def test_unknown_table_returns_zero(self, mock_table_models):
|
||||
def test_unknown_table_returns_zero(self, mock_table_models, caplog: pytest.LogCaptureFixture):
|
||||
"""Should return 0 for unknown table."""
|
||||
restore = WorkflowRunRestore()
|
||||
mock_table_models.get.return_value = None
|
||||
|
||||
mock_session = Mock()
|
||||
records = [{"id": "test"}]
|
||||
caplog.set_level(logging.WARNING, logger="services.retention.workflow_run.restore_archived_workflow_run")
|
||||
|
||||
with patch("services.retention.workflow_run.restore_archived_workflow_run.logger") as mock_logger:
|
||||
result = restore._restore_table_records(mock_session, "unknown_table", records, schema_version="1.0")
|
||||
result = restore._restore_table_records(mock_session, "unknown_table", records, schema_version="1.0")
|
||||
|
||||
assert result == 0
|
||||
mock_logger.warning.assert_called_once_with("Unknown table: %s", "unknown_table")
|
||||
assert "Unknown table: unknown_table" in caplog.messages
|
||||
|
||||
def test_empty_records_returns_zero(self):
|
||||
"""Should return 0 for empty records list."""
|
||||
|
||||
@ -673,7 +673,7 @@ def test_enqueue_resume_workflow_not_found(mocker: MockerFixture, mock_session_f
|
||||
assert "WorkflowRun not found" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_enqueue_resume_app_not_found(mocker, mock_session_factory, caplog):
|
||||
def test_enqueue_resume_app_not_found(mocker, mock_session_factory, caplog: pytest.LogCaptureFixture):
|
||||
session_factory, session = mock_session_factory
|
||||
service = HumanInputService(session_factory)
|
||||
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import logging
|
||||
from io import BytesIO
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
@ -159,7 +160,9 @@ class TestWebhookServiceUnit:
|
||||
|
||||
assert result == "application/octet-stream"
|
||||
|
||||
def test_detect_binary_mimetype_handles_magic_exception(self, monkeypatch: pytest.MonkeyPatch):
|
||||
def test_detect_binary_mimetype_handles_magic_exception(
|
||||
self, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
|
||||
):
|
||||
"""Fallback MIME type should be used when python-magic raises an exception."""
|
||||
try:
|
||||
import magic as real_magic
|
||||
@ -169,12 +172,12 @@ class TestWebhookServiceUnit:
|
||||
fake_magic = MagicMock()
|
||||
fake_magic.from_buffer.side_effect = real_magic.MagicException("magic error")
|
||||
monkeypatch.setattr("services.trigger.webhook_service.magic", fake_magic)
|
||||
caplog.set_level(logging.DEBUG, logger="services.trigger.webhook_service")
|
||||
|
||||
with patch("services.trigger.webhook_service.logger", autospec=True) as mock_logger:
|
||||
result = WebhookService._detect_binary_mimetype(b"binary data")
|
||||
result = WebhookService._detect_binary_mimetype(b"binary data")
|
||||
|
||||
assert result == "application/octet-stream"
|
||||
mock_logger.debug.assert_called_once()
|
||||
assert result == "application/octet-stream"
|
||||
assert "python-magic detection failed for octet-stream payload" in caplog.messages
|
||||
|
||||
def test_extract_webhook_data_invalid_json(self):
|
||||
"""Test webhook data extraction with invalid JSON."""
|
||||
|
||||
Loading…
Reference in New Issue
Block a user