dify/api/tests/unit_tests/services/test_webhook_service_additional.py
James 98897a5379
test: migrate webhook service additional mock tests to testcontainers (#35199)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-15 08:14:36 +00:00

293 lines
12 KiB
Python

from types import SimpleNamespace
from typing import Any
from unittest.mock import MagicMock
import pytest
from flask import Flask
from graphon.variables.types import SegmentType
from werkzeug.exceptions import RequestEntityTooLarge
from core.workflow.nodes.trigger_webhook.entities import (
ContentType,
WebhookBodyParameter,
WebhookData,
WebhookParameter,
)
from services.trigger import webhook_service as service_module
from services.trigger.webhook_service import WebhookService
class _FakeQuery:
def __init__(self, result: Any) -> None:
self._result = result
def where(self, *args: Any, **kwargs: Any) -> "_FakeQuery":
return self
def filter(self, *args: Any, **kwargs: Any) -> "_FakeQuery":
return self
def order_by(self, *args: Any, **kwargs: Any) -> "_FakeQuery":
return self
def first(self) -> Any:
return self._result
@pytest.fixture
def flask_app() -> Flask:
return Flask(__name__)
def _workflow_trigger(**kwargs: Any) -> Any:
return SimpleNamespace(**kwargs)
class TestWebhookServiceExtractionFallbacks:
def test_extract_webhook_data_should_use_text_fallback_for_unknown_content_type(
self,
flask_app: Flask,
monkeypatch: pytest.MonkeyPatch,
) -> None:
warning_mock = MagicMock()
monkeypatch.setattr(service_module.logger, "warning", warning_mock)
webhook_trigger = MagicMock()
with flask_app.test_request_context(
"/webhook",
method="POST",
headers={"Content-Type": "application/vnd.custom"},
data="plain content",
):
result = WebhookService.extract_webhook_data(webhook_trigger)
assert result["body"] == {"raw": "plain content"}
warning_mock.assert_called_once()
def test_extract_webhook_data_should_raise_for_request_too_large(
self,
flask_app: Flask,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(service_module.dify_config, "WEBHOOK_REQUEST_BODY_MAX_SIZE", 1)
with flask_app.test_request_context("/webhook", method="POST", data="ab"):
with pytest.raises(RequestEntityTooLarge):
WebhookService.extract_webhook_data(MagicMock())
def test_extract_octet_stream_body_should_return_none_when_empty_payload(self, flask_app: Flask) -> None:
webhook_trigger = MagicMock()
with flask_app.test_request_context("/webhook", method="POST", data=b""):
body, files = WebhookService._extract_octet_stream_body(webhook_trigger)
assert body == {"raw": None}
assert files == {}
def test_extract_octet_stream_body_should_return_none_when_processing_raises(
self,
flask_app: Flask,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = MagicMock()
monkeypatch.setattr(
WebhookService, "_detect_binary_mimetype", MagicMock(return_value="application/octet-stream")
)
monkeypatch.setattr(WebhookService, "_create_file_from_binary", MagicMock(side_effect=RuntimeError("boom")))
with flask_app.test_request_context("/webhook", method="POST", data=b"abc"):
body, files = WebhookService._extract_octet_stream_body(webhook_trigger)
assert body == {"raw": None}
assert files == {}
def test_extract_text_body_should_return_empty_string_when_request_read_fails(
self,
flask_app: Flask,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr("flask.wrappers.Request.get_data", MagicMock(side_effect=RuntimeError("read error")))
with flask_app.test_request_context("/webhook", method="POST", data="abc"):
body, files = WebhookService._extract_text_body()
assert body == {"raw": ""}
assert files == {}
def test_detect_binary_mimetype_should_fallback_when_magic_raises(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
fake_magic = MagicMock()
fake_magic.from_buffer.side_effect = RuntimeError("magic failed")
monkeypatch.setattr(service_module, "magic", fake_magic)
result = WebhookService._detect_binary_mimetype(b"binary")
assert result == "application/octet-stream"
def test_process_file_uploads_should_use_octet_stream_fallback_when_mimetype_unknown(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = _workflow_trigger(created_by="user-1", tenant_id="tenant-1")
file_obj = MagicMock()
file_obj.to_dict.return_value = {"id": "f-1"}
monkeypatch.setattr(WebhookService, "_create_file_from_binary", MagicMock(return_value=file_obj))
monkeypatch.setattr(service_module.mimetypes, "guess_type", MagicMock(return_value=(None, None)))
uploaded = MagicMock()
uploaded.filename = "file.unknown"
uploaded.content_type = None
uploaded.read.return_value = b"content"
result = WebhookService._process_file_uploads({"f": uploaded}, webhook_trigger)
assert result == {"f": {"id": "f-1"}}
def test_create_file_from_binary_should_call_tool_file_manager_and_file_factory(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
webhook_trigger = _workflow_trigger(created_by="user-1", tenant_id="tenant-1")
manager = MagicMock()
manager.create_file_by_raw.return_value = SimpleNamespace(id="tool-file-1")
monkeypatch.setattr(service_module, "ToolFileManager", MagicMock(return_value=manager))
expected_file = MagicMock()
monkeypatch.setattr(service_module.file_factory, "build_from_mapping", MagicMock(return_value=expected_file))
result = WebhookService._create_file_from_binary(b"abc", "text/plain", webhook_trigger)
assert result is expected_file
manager.create_file_by_raw.assert_called_once()
class TestWebhookServiceValidationAndConversion:
@pytest.mark.parametrize(
("raw_value", "param_type", "expected"),
[
("42", SegmentType.NUMBER, 42),
("3.14", SegmentType.NUMBER, 3.14),
("yes", SegmentType.BOOLEAN, True),
("no", SegmentType.BOOLEAN, False),
],
)
def test_convert_form_value_should_convert_supported_types(
self,
raw_value: str,
param_type: str,
expected: Any,
) -> None:
result = WebhookService._convert_form_value("param", raw_value, param_type)
assert result == expected
def test_convert_form_value_should_raise_for_unsupported_type(self) -> None:
with pytest.raises(ValueError, match="Unsupported type"):
WebhookService._convert_form_value("p", "x", SegmentType.FILE)
def test_validate_json_value_should_return_original_for_unmapped_supported_segment_type(
self,
monkeypatch: pytest.MonkeyPatch,
) -> None:
warning_mock = MagicMock()
monkeypatch.setattr(service_module.logger, "warning", warning_mock)
result = WebhookService._validate_json_value("param", {"x": 1}, "unsupported-type")
assert result == {"x": 1}
warning_mock.assert_called_once()
def test_validate_and_convert_value_should_wrap_conversion_errors(self) -> None:
with pytest.raises(ValueError, match="validation failed"):
WebhookService._validate_and_convert_value("param", "bad", SegmentType.NUMBER, is_form_data=True)
def test_process_parameters_should_raise_when_required_parameter_missing(self) -> None:
raw_params = {"optional": "x"}
config = [WebhookParameter(name="required_param", type=SegmentType.STRING, required=True)]
with pytest.raises(ValueError, match="Required parameter missing"):
WebhookService._process_parameters(raw_params, config, is_form_data=True)
def test_process_parameters_should_include_unconfigured_parameters(self) -> None:
raw_params = {"known": "1", "unknown": "x"}
config = [WebhookParameter(name="known", type=SegmentType.NUMBER, required=False)]
result = WebhookService._process_parameters(raw_params, config, is_form_data=True)
assert result == {"known": 1, "unknown": "x"}
def test_process_body_parameters_should_raise_when_required_text_raw_is_missing(self) -> None:
with pytest.raises(ValueError, match="Required body content missing"):
WebhookService._process_body_parameters(
raw_body={"raw": ""},
body_configs=[WebhookBodyParameter(name="raw", required=True)],
content_type=ContentType.TEXT,
)
def test_process_body_parameters_should_skip_file_config_for_multipart_form_data(self) -> None:
raw_body = {"message": "hello", "extra": "x"}
body_configs = [
WebhookBodyParameter(name="upload", type=SegmentType.FILE, required=True),
WebhookBodyParameter(name="message", type=SegmentType.STRING, required=True),
]
result = WebhookService._process_body_parameters(raw_body, body_configs, ContentType.FORM_DATA)
assert result == {"message": "hello", "extra": "x"}
def test_validate_required_headers_should_accept_sanitized_header_names(self) -> None:
headers = {"x_api_key": "123"}
configs = [WebhookParameter(name="x-api-key", required=True)]
WebhookService._validate_required_headers(headers, configs)
def test_validate_required_headers_should_raise_when_required_header_missing(self) -> None:
headers = {"x-other": "123"}
configs = [WebhookParameter(name="x-api-key", required=True)]
with pytest.raises(ValueError, match="Required header missing"):
WebhookService._validate_required_headers(headers, configs)
def test_validate_http_metadata_should_return_content_type_mismatch_error(self) -> None:
webhook_data = {"method": "POST", "headers": {"Content-Type": "application/json"}}
node_data = WebhookData(method="post", content_type=ContentType.TEXT)
result = WebhookService._validate_http_metadata(webhook_data, node_data)
assert result["valid"] is False
assert "Content-type mismatch" in result["error"]
def test_extract_content_type_should_fallback_to_lowercase_header_key(self) -> None:
headers = {"content-type": "application/json; charset=utf-8"}
assert WebhookService._extract_content_type(headers) == "application/json"
def test_build_workflow_inputs_should_include_expected_keys(self) -> None:
webhook_data = {"headers": {"h": "v"}, "query_params": {"q": 1}, "body": {"b": 2}}
result = WebhookService.build_workflow_inputs(webhook_data)
assert result["webhook_data"] == webhook_data
assert result["webhook_headers"] == {"h": "v"}
assert result["webhook_query_params"] == {"q": 1}
assert result["webhook_body"] == {"b": 2}
class TestWebhookServiceUtilities:
def test_generate_webhook_response_should_fallback_when_response_body_is_not_json(self) -> None:
node_config = {"data": {"status_code": 200, "response_body": "{bad-json"}}
body, status = WebhookService.generate_webhook_response(node_config)
assert status == 200
assert "message" in body
def test_generate_webhook_id_should_return_24_character_identifier(self) -> None:
webhook_id = WebhookService.generate_webhook_id()
assert isinstance(webhook_id, str)
assert len(webhook_id) == 24
def test_sanitize_key_should_return_original_value_for_non_string_input(self) -> None:
result = WebhookService._sanitize_key(123) # type: ignore[arg-type]
assert result == 123