from io import BytesIO from unittest.mock import MagicMock, patch import pytest from flask import Flask from werkzeug.datastructures import FileStorage from services.trigger.webhook_service import WebhookService class TestWebhookServiceUnit: """Unit tests for WebhookService focusing on business logic without database dependencies.""" def test_extract_webhook_data_json(self): """Test webhook data extraction from JSON request.""" app = Flask(__name__) with app.test_request_context( "/webhook", method="POST", headers={"Content-Type": "application/json", "Authorization": "Bearer token"}, query_string="version=1&format=json", json={"message": "hello", "count": 42}, ): webhook_trigger = MagicMock() webhook_data = WebhookService.extract_webhook_data(webhook_trigger) assert webhook_data["method"] == "POST" assert webhook_data["headers"]["Authorization"] == "Bearer token" # Query params are now extracted as raw strings assert webhook_data["query_params"]["version"] == "1" assert webhook_data["query_params"]["format"] == "json" assert webhook_data["body"]["message"] == "hello" assert webhook_data["body"]["count"] == 42 assert webhook_data["files"] == {} def test_extract_webhook_data_query_params_remain_strings(self): """Query parameters should be extracted as raw strings without automatic conversion.""" app = Flask(__name__) with app.test_request_context( "/webhook", method="GET", headers={"Content-Type": "application/json"}, query_string="count=42&threshold=3.14&enabled=true¬e=text", ): webhook_trigger = MagicMock() webhook_data = WebhookService.extract_webhook_data(webhook_trigger) # After refactoring, raw extraction keeps query params as strings assert webhook_data["query_params"]["count"] == "42" assert webhook_data["query_params"]["threshold"] == "3.14" assert webhook_data["query_params"]["enabled"] == "true" assert webhook_data["query_params"]["note"] == "text" def test_extract_webhook_data_form_urlencoded(self): """Test webhook data extraction from form URL encoded request.""" app = Flask(__name__) with app.test_request_context( "/webhook", method="POST", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={"username": "test", "password": "secret"}, ): webhook_trigger = MagicMock() webhook_data = WebhookService.extract_webhook_data(webhook_trigger) assert webhook_data["method"] == "POST" assert webhook_data["body"]["username"] == "test" assert webhook_data["body"]["password"] == "secret" def test_extract_webhook_data_multipart_with_files(self): """Test webhook data extraction from multipart form with files.""" app = Flask(__name__) # Create a mock file file_content = b"test file content" file_storage = FileStorage(stream=BytesIO(file_content), filename="test.txt", content_type="text/plain") with app.test_request_context( "/webhook", method="POST", headers={"Content-Type": "multipart/form-data"}, data={"message": "test", "file": file_storage}, ): webhook_trigger = MagicMock() webhook_trigger.tenant_id = "test_tenant" with patch.object(WebhookService, "_process_file_uploads", autospec=True) as mock_process_files: mock_process_files.return_value = {"file": "mocked_file_obj"} webhook_data = WebhookService.extract_webhook_data(webhook_trigger) assert webhook_data["method"] == "POST" assert webhook_data["body"]["message"] == "test" assert webhook_data["files"]["file"] == "mocked_file_obj" mock_process_files.assert_called_once() def test_extract_webhook_data_raw_text(self): """Test webhook data extraction from raw text request.""" app = Flask(__name__) with app.test_request_context( "/webhook", method="POST", headers={"Content-Type": "text/plain"}, data="raw text content" ): webhook_trigger = MagicMock() webhook_data = WebhookService.extract_webhook_data(webhook_trigger) assert webhook_data["method"] == "POST" assert webhook_data["body"]["raw"] == "raw text content" def test_extract_octet_stream_body_uses_detected_mime(self): """Octet-stream uploads should rely on detected MIME type.""" app = Flask(__name__) binary_content = b"plain text data" with app.test_request_context( "/webhook", method="POST", headers={"Content-Type": "application/octet-stream"}, data=binary_content ): webhook_trigger = MagicMock() mock_file = MagicMock() mock_file.to_dict.return_value = {"file": "data"} with ( patch.object( WebhookService, "_detect_binary_mimetype", return_value="text/plain", autospec=True ) as mock_detect, patch.object(WebhookService, "_create_file_from_binary", autospec=True) as mock_create, ): mock_create.return_value = mock_file body, files = WebhookService._extract_octet_stream_body(webhook_trigger) assert body["raw"] == {"file": "data"} assert files == {} mock_detect.assert_called_once_with(binary_content) mock_create.assert_called_once() args = mock_create.call_args[0] assert args[0] == binary_content assert args[1] == "text/plain" assert args[2] is webhook_trigger def test_detect_binary_mimetype_uses_magic(self, monkeypatch): """python-magic output should be used when available.""" fake_magic = MagicMock() fake_magic.from_buffer.return_value = "image/png" monkeypatch.setattr("services.trigger.webhook_service.magic", fake_magic) result = WebhookService._detect_binary_mimetype(b"binary data") assert result == "image/png" fake_magic.from_buffer.assert_called_once() def test_detect_binary_mimetype_fallback_without_magic(self, monkeypatch): """Fallback MIME type should be used when python-magic is unavailable.""" monkeypatch.setattr("services.trigger.webhook_service.magic", None) result = WebhookService._detect_binary_mimetype(b"binary data") assert result == "application/octet-stream" def test_detect_binary_mimetype_handles_magic_exception(self, monkeypatch): """Fallback MIME type should be used when python-magic raises an exception.""" try: import magic as real_magic except ImportError: pytest.skip("python-magic is not installed") fake_magic = MagicMock() fake_magic.from_buffer.side_effect = real_magic.MagicException("magic error") monkeypatch.setattr("services.trigger.webhook_service.magic", fake_magic) with patch("services.trigger.webhook_service.logger", autospec=True) as mock_logger: result = WebhookService._detect_binary_mimetype(b"binary data") assert result == "application/octet-stream" mock_logger.debug.assert_called_once() def test_extract_webhook_data_invalid_json(self): """Test webhook data extraction with invalid JSON.""" app = Flask(__name__) with app.test_request_context( "/webhook", method="POST", headers={"Content-Type": "application/json"}, data="invalid json" ): webhook_trigger = MagicMock() with pytest.raises(ValueError, match="Invalid JSON body"): WebhookService.extract_webhook_data(webhook_trigger) def test_generate_webhook_response_default(self): """Test webhook response generation with default values.""" node_config = {"data": {}} response_data, status_code = WebhookService.generate_webhook_response(node_config) assert status_code == 200 assert response_data["status"] == "success" assert "Webhook processed successfully" in response_data["message"] def test_generate_webhook_response_custom_json(self): """Test webhook response generation with custom JSON response.""" node_config = {"data": {"status_code": 201, "response_body": '{"result": "created", "id": 123}'}} response_data, status_code = WebhookService.generate_webhook_response(node_config) assert status_code == 201 assert response_data["result"] == "created" assert response_data["id"] == 123 def test_generate_webhook_response_custom_text(self): """Test webhook response generation with custom text response.""" node_config = {"data": {"status_code": 202, "response_body": "Request accepted for processing"}} response_data, status_code = WebhookService.generate_webhook_response(node_config) assert status_code == 202 assert response_data["message"] == "Request accepted for processing" def test_generate_webhook_response_invalid_json(self): """Test webhook response generation with invalid JSON response.""" node_config = {"data": {"status_code": 400, "response_body": '{"invalid": json}'}} response_data, status_code = WebhookService.generate_webhook_response(node_config) assert status_code == 400 assert response_data["message"] == '{"invalid": json}' def test_generate_webhook_response_empty_response_body(self): """Test webhook response generation with empty response body.""" node_config = {"data": {"status_code": 204, "response_body": ""}} response_data, status_code = WebhookService.generate_webhook_response(node_config) assert status_code == 204 assert response_data["status"] == "success" assert "Webhook processed successfully" in response_data["message"] def test_generate_webhook_response_array_json(self): """Test webhook response generation with JSON array response.""" node_config = {"data": {"status_code": 200, "response_body": '[{"id": 1}, {"id": 2}]'}} response_data, status_code = WebhookService.generate_webhook_response(node_config) assert status_code == 200 assert isinstance(response_data, list) assert len(response_data) == 2 assert response_data[0]["id"] == 1 assert response_data[1]["id"] == 2 @patch("services.trigger.webhook_service.ToolFileManager", autospec=True) @patch("services.trigger.webhook_service.file_factory", autospec=True) def test_process_file_uploads_success(self, mock_file_factory, mock_tool_file_manager): """Test successful file upload processing.""" # Mock ToolFileManager mock_tool_file_instance = mock_tool_file_manager.return_value # Mock file creation mock_tool_file = MagicMock() mock_tool_file.id = "test_file_id" mock_tool_file_instance.create_file_by_raw.return_value = mock_tool_file # Mock file factory mock_file_obj = MagicMock() mock_file_factory.build_from_mapping.return_value = mock_file_obj # Create mock files files = { "file1": MagicMock(filename="test1.txt", content_type="text/plain"), "file2": MagicMock(filename="test2.jpg", content_type="image/jpeg"), } # Mock file reads files["file1"].read.return_value = b"content1" files["file2"].read.return_value = b"content2" webhook_trigger = MagicMock() webhook_trigger.tenant_id = "test_tenant" result = WebhookService._process_file_uploads(files, webhook_trigger) assert len(result) == 2 assert "file1" in result assert "file2" in result # Verify file processing was called for each file assert mock_tool_file_manager.call_count == 2 assert mock_file_factory.build_from_mapping.call_count == 2 @patch("services.trigger.webhook_service.ToolFileManager", autospec=True) @patch("services.trigger.webhook_service.file_factory", autospec=True) def test_process_file_uploads_with_errors(self, mock_file_factory, mock_tool_file_manager): """Test file upload processing with errors.""" # Mock ToolFileManager mock_tool_file_instance = mock_tool_file_manager.return_value # Mock file creation mock_tool_file = MagicMock() mock_tool_file.id = "test_file_id" mock_tool_file_instance.create_file_by_raw.return_value = mock_tool_file # Mock file factory mock_file_obj = MagicMock() mock_file_factory.build_from_mapping.return_value = mock_file_obj # Create mock files, one will fail files = { "good_file": MagicMock(filename="test.txt", content_type="text/plain"), "bad_file": MagicMock(filename="test.bad", content_type="text/plain"), } files["good_file"].read.return_value = b"content" files["bad_file"].read.side_effect = Exception("Read error") webhook_trigger = MagicMock() webhook_trigger.tenant_id = "test_tenant" result = WebhookService._process_file_uploads(files, webhook_trigger) # Should process the good file and skip the bad one assert len(result) == 1 assert "good_file" in result assert "bad_file" not in result def test_process_file_uploads_empty_filename(self): """Test file upload processing with empty filename.""" files = { "no_filename": MagicMock(filename="", content_type="text/plain"), "none_filename": MagicMock(filename=None, content_type="text/plain"), } webhook_trigger = MagicMock() webhook_trigger.tenant_id = "test_tenant" result = WebhookService._process_file_uploads(files, webhook_trigger) # Should skip files without filenames assert len(result) == 0 def test_validate_json_value_string(self): """Test JSON value validation for string type.""" # Valid string result = WebhookService._validate_json_value("name", "hello", "string") assert result == "hello" # Invalid string (number) - should raise ValueError with pytest.raises(ValueError, match="Expected string, got int"): WebhookService._validate_json_value("name", 123, "string") def test_validate_json_value_number(self): """Test JSON value validation for number type.""" # Valid integer result = WebhookService._validate_json_value("count", 42, "number") assert result == 42 # Valid float result = WebhookService._validate_json_value("price", 19.99, "number") assert result == 19.99 # Invalid number (string) - should raise ValueError with pytest.raises(ValueError, match="Expected number, got str"): WebhookService._validate_json_value("count", "42", "number") def test_validate_json_value_bool(self): """Test JSON value validation for boolean type.""" # Valid boolean result = WebhookService._validate_json_value("enabled", True, "boolean") assert result is True result = WebhookService._validate_json_value("enabled", False, "boolean") assert result is False # Invalid boolean (string) - should raise ValueError with pytest.raises(ValueError, match="Expected boolean, got str"): WebhookService._validate_json_value("enabled", "true", "boolean") def test_validate_json_value_object(self): """Test JSON value validation for object type.""" # Valid object result = WebhookService._validate_json_value("user", {"name": "John", "age": 30}, "object") assert result == {"name": "John", "age": 30} # Invalid object (string) - should raise ValueError with pytest.raises(ValueError, match="Expected object, got str"): WebhookService._validate_json_value("user", "not_an_object", "object") def test_validate_json_value_array_string(self): """Test JSON value validation for array[string] type.""" # Valid array of strings result = WebhookService._validate_json_value("tags", ["tag1", "tag2", "tag3"], "array[string]") assert result == ["tag1", "tag2", "tag3"] # Invalid - not an array with pytest.raises(ValueError, match="Expected array of strings, got str"): WebhookService._validate_json_value("tags", "not_an_array", "array[string]") # Invalid - array with non-strings with pytest.raises(ValueError, match="Expected array of strings, got list"): WebhookService._validate_json_value("tags", ["tag1", 123, "tag3"], "array[string]") def test_validate_json_value_array_number(self): """Test JSON value validation for array[number] type.""" # Valid array of numbers result = WebhookService._validate_json_value("scores", [1, 2.5, 3, 4.7], "array[number]") assert result == [1, 2.5, 3, 4.7] # Invalid - array with non-numbers with pytest.raises(ValueError, match="Expected array of numbers, got list"): WebhookService._validate_json_value("scores", [1, "2", 3], "array[number]") def test_validate_json_value_array_bool(self): """Test JSON value validation for array[boolean] type.""" # Valid array of booleans result = WebhookService._validate_json_value("flags", [True, False, True], "array[boolean]") assert result == [True, False, True] # Invalid - array with non-booleans with pytest.raises(ValueError, match="Expected array of booleans, got list"): WebhookService._validate_json_value("flags", [True, "false", True], "array[boolean]") def test_validate_json_value_array_object(self): """Test JSON value validation for array[object] type.""" # Valid array of objects result = WebhookService._validate_json_value("users", [{"name": "John"}, {"name": "Jane"}], "array[object]") assert result == [{"name": "John"}, {"name": "Jane"}] # Invalid - array with non-objects with pytest.raises(ValueError, match="Expected array of objects, got list"): WebhookService._validate_json_value("users", [{"name": "John"}, "not_object"], "array[object]") def test_convert_form_value_string(self): """Test form value conversion for string type.""" result = WebhookService._convert_form_value("test", "hello", "string") assert result == "hello" def test_convert_form_value_number(self): """Test form value conversion for number type.""" # Integer result = WebhookService._convert_form_value("count", "42", "number") assert result == 42 # Float result = WebhookService._convert_form_value("price", "19.99", "number") assert result == 19.99 # Invalid number with pytest.raises(ValueError, match="Cannot convert 'not_a_number' to number"): WebhookService._convert_form_value("count", "not_a_number", "number") def test_convert_form_value_boolean(self): """Test form value conversion for boolean type.""" # True values assert WebhookService._convert_form_value("flag", "true", "boolean") is True assert WebhookService._convert_form_value("flag", "1", "boolean") is True assert WebhookService._convert_form_value("flag", "yes", "boolean") is True # False values assert WebhookService._convert_form_value("flag", "false", "boolean") is False assert WebhookService._convert_form_value("flag", "0", "boolean") is False assert WebhookService._convert_form_value("flag", "no", "boolean") is False # Invalid boolean with pytest.raises(ValueError, match="Cannot convert 'maybe' to boolean"): WebhookService._convert_form_value("flag", "maybe", "boolean") def test_extract_and_validate_webhook_data_success(self): """Test successful unified data extraction and validation.""" app = Flask(__name__) with app.test_request_context( "/webhook", method="POST", headers={"Content-Type": "application/json"}, query_string="count=42&enabled=true", json={"message": "hello", "age": 25}, ): webhook_trigger = MagicMock() node_config = { "data": { "method": "post", "content_type": "application/json", "params": [ {"name": "count", "type": "number", "required": True}, {"name": "enabled", "type": "boolean", "required": True}, ], "body": [ {"name": "message", "type": "string", "required": True}, {"name": "age", "type": "number", "required": True}, ], } } result = WebhookService.extract_and_validate_webhook_data(webhook_trigger, node_config) # Check that types are correctly converted assert result["query_params"]["count"] == 42 # Converted to int assert result["query_params"]["enabled"] is True # Converted to bool assert result["body"]["message"] == "hello" # Already string assert result["body"]["age"] == 25 # Already number def test_extract_and_validate_webhook_data_invalid_json_error(self): """Invalid JSON should bubble up as a ValueError with details.""" app = Flask(__name__) with app.test_request_context( "/webhook", method="POST", headers={"Content-Type": "application/json"}, data='{"invalid": }', ): webhook_trigger = MagicMock() node_config = { "data": { "method": "post", "content_type": "application/json", } } with pytest.raises(ValueError, match="Invalid JSON body"): WebhookService.extract_and_validate_webhook_data(webhook_trigger, node_config) def test_extract_and_validate_webhook_data_validation_error(self): """Test unified data extraction with validation error.""" app = Flask(__name__) with app.test_request_context( "/webhook", method="GET", # Wrong method headers={"Content-Type": "application/json"}, ): webhook_trigger = MagicMock() node_config = { "data": { "method": "post", # Expects POST "content_type": "application/json", } } with pytest.raises(ValueError, match="HTTP method mismatch"): WebhookService.extract_and_validate_webhook_data(webhook_trigger, node_config) def test_debug_mode_parameter_handling(self): """Test that the debug mode parameter is properly handled in _prepare_webhook_execution.""" from controllers.trigger.webhook import _prepare_webhook_execution # Mock the WebhookService methods with ( patch.object(WebhookService, "get_webhook_trigger_and_workflow", autospec=True) as mock_get_trigger, patch.object(WebhookService, "extract_and_validate_webhook_data", autospec=True) as mock_extract, ): mock_trigger = MagicMock() mock_workflow = MagicMock() mock_config = {"data": {"test": "config"}} mock_data = {"test": "data"} mock_get_trigger.return_value = (mock_trigger, mock_workflow, mock_config) mock_extract.return_value = mock_data result = _prepare_webhook_execution("test_webhook", is_debug=False) assert result == (mock_trigger, mock_workflow, mock_config, mock_data, None) # Reset mock mock_get_trigger.reset_mock() result = _prepare_webhook_execution("test_webhook", is_debug=True) assert result == (mock_trigger, mock_workflow, mock_config, mock_data, None) # === Merged from test_webhook_service_additional.py === from types import SimpleNamespace from typing import Any, cast from werkzeug.exceptions import RequestEntityTooLarge from core.workflow.nodes.trigger_webhook.entities import ( ContentType, WebhookBodyParameter, WebhookData, WebhookParameter, ) from graphon.variables.types import SegmentType from models.enums import AppTriggerStatus from models.model import App from models.trigger import WorkflowWebhookTrigger from models.workflow import Workflow from services.errors.app import QuotaExceededError from services.trigger import webhook_service as service_module class _FakeQuery: def __init__(self, result: Any) -> None: self._result = result def where(self, *args: Any, **kwargs: Any) -> "_FakeQuery": return self def filter(self, *args: Any, **kwargs: Any) -> "_FakeQuery": return self def order_by(self, *args: Any, **kwargs: Any) -> "_FakeQuery": return self def first(self) -> Any: return self._result class _SessionContext: def __init__(self, session: Any) -> None: self._session = session def __enter__(self) -> Any: return self._session def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> bool: return False class _SessionmakerContext: def __init__(self, session: Any) -> None: self._session = session def begin(self) -> "_SessionmakerContext": return self def __enter__(self) -> Any: return self._session def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> bool: return False @pytest.fixture def flask_app() -> Flask: return Flask(__name__) def _patch_session(monkeypatch: pytest.MonkeyPatch, session: Any) -> None: monkeypatch.setattr(service_module, "db", SimpleNamespace(engine=MagicMock(), session=MagicMock())) monkeypatch.setattr(service_module, "Session", lambda *args, **kwargs: _SessionContext(session)) monkeypatch.setattr(service_module, "sessionmaker", lambda *args, **kwargs: _SessionmakerContext(session)) def _workflow_trigger(**kwargs: Any) -> WorkflowWebhookTrigger: return cast(WorkflowWebhookTrigger, SimpleNamespace(**kwargs)) def _workflow(**kwargs: Any) -> Workflow: return cast(Workflow, SimpleNamespace(**kwargs)) def _app(**kwargs: Any) -> App: return cast(App, SimpleNamespace(**kwargs)) def test_get_webhook_trigger_and_workflow_should_raise_when_webhook_not_found(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange fake_session = MagicMock() fake_session.scalar.return_value = None _patch_session(monkeypatch, fake_session) # Act / Assert with pytest.raises(ValueError, match="Webhook not found"): WebhookService.get_webhook_trigger_and_workflow("webhook-1") def test_get_webhook_trigger_and_workflow_should_raise_when_app_trigger_not_found( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1") fake_session = MagicMock() fake_session.scalar.side_effect = [webhook_trigger, None] _patch_session(monkeypatch, fake_session) # Act / Assert with pytest.raises(ValueError, match="App trigger not found"): WebhookService.get_webhook_trigger_and_workflow("webhook-1") def test_get_webhook_trigger_and_workflow_should_raise_when_app_trigger_rate_limited( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1") app_trigger = SimpleNamespace(status=AppTriggerStatus.RATE_LIMITED) fake_session = MagicMock() fake_session.scalar.side_effect = [webhook_trigger, app_trigger] _patch_session(monkeypatch, fake_session) # Act / Assert with pytest.raises(ValueError, match="rate limited"): WebhookService.get_webhook_trigger_and_workflow("webhook-1") def test_get_webhook_trigger_and_workflow_should_raise_when_app_trigger_disabled( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1") app_trigger = SimpleNamespace(status=AppTriggerStatus.DISABLED) fake_session = MagicMock() fake_session.scalar.side_effect = [webhook_trigger, app_trigger] _patch_session(monkeypatch, fake_session) # Act / Assert with pytest.raises(ValueError, match="disabled"): WebhookService.get_webhook_trigger_and_workflow("webhook-1") def test_get_webhook_trigger_and_workflow_should_raise_when_workflow_not_found(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1") app_trigger = SimpleNamespace(status=AppTriggerStatus.ENABLED) fake_session = MagicMock() fake_session.scalar.side_effect = [webhook_trigger, app_trigger, None] _patch_session(monkeypatch, fake_session) # Act / Assert with pytest.raises(ValueError, match="Workflow not found"): WebhookService.get_webhook_trigger_and_workflow("webhook-1") def test_get_webhook_trigger_and_workflow_should_return_values_for_non_debug_mode( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1") app_trigger = SimpleNamespace(status=AppTriggerStatus.ENABLED) workflow = MagicMock() workflow.get_node_config_by_id.return_value = {"data": {"key": "value"}} fake_session = MagicMock() fake_session.scalar.side_effect = [webhook_trigger, app_trigger, workflow] _patch_session(monkeypatch, fake_session) # Act got_trigger, got_workflow, got_node_config = WebhookService.get_webhook_trigger_and_workflow("webhook-1") # Assert assert got_trigger is webhook_trigger assert got_workflow is workflow assert got_node_config == {"data": {"key": "value"}} def test_get_webhook_trigger_and_workflow_should_return_values_for_debug_mode(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange webhook_trigger = SimpleNamespace(app_id="app-1", node_id="node-1") workflow = MagicMock() workflow.get_node_config_by_id.return_value = {"data": {"mode": "debug"}} fake_session = MagicMock() fake_session.scalar.side_effect = [webhook_trigger, workflow] _patch_session(monkeypatch, fake_session) # Act got_trigger, got_workflow, got_node_config = WebhookService.get_webhook_trigger_and_workflow( "webhook-1", is_debug=True ) # Assert assert got_trigger is webhook_trigger assert got_workflow is workflow assert got_node_config == {"data": {"mode": "debug"}} def test_extract_webhook_data_should_use_text_fallback_for_unknown_content_type( flask_app: Flask, monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange warning_mock = MagicMock() monkeypatch.setattr(service_module.logger, "warning", warning_mock) webhook_trigger = MagicMock() # Act with flask_app.test_request_context( "/webhook", method="POST", headers={"Content-Type": "application/vnd.custom"}, data="plain content", ): result = WebhookService.extract_webhook_data(webhook_trigger) # Assert assert result["body"] == {"raw": "plain content"} warning_mock.assert_called_once() def test_extract_webhook_data_should_raise_for_request_too_large( flask_app: Flask, monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange monkeypatch.setattr(service_module.dify_config, "WEBHOOK_REQUEST_BODY_MAX_SIZE", 1) # Act / Assert with flask_app.test_request_context("/webhook", method="POST", data="ab"): with pytest.raises(RequestEntityTooLarge): WebhookService.extract_webhook_data(MagicMock()) def test_extract_octet_stream_body_should_return_none_when_empty_payload(flask_app: Flask) -> None: # Arrange webhook_trigger = MagicMock() # Act with flask_app.test_request_context("/webhook", method="POST", data=b""): body, files = WebhookService._extract_octet_stream_body(webhook_trigger) # Assert assert body == {"raw": None} assert files == {} def test_extract_octet_stream_body_should_return_none_when_processing_raises( flask_app: Flask, monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange webhook_trigger = MagicMock() monkeypatch.setattr(WebhookService, "_detect_binary_mimetype", MagicMock(return_value="application/octet-stream")) monkeypatch.setattr(WebhookService, "_create_file_from_binary", MagicMock(side_effect=RuntimeError("boom"))) # Act with flask_app.test_request_context("/webhook", method="POST", data=b"abc"): body, files = WebhookService._extract_octet_stream_body(webhook_trigger) # Assert assert body == {"raw": None} assert files == {} def test_extract_text_body_should_return_empty_string_when_request_read_fails( flask_app: Flask, monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange monkeypatch.setattr("flask.wrappers.Request.get_data", MagicMock(side_effect=RuntimeError("read error"))) # Act with flask_app.test_request_context("/webhook", method="POST", data="abc"): body, files = WebhookService._extract_text_body() # Assert assert body == {"raw": ""} assert files == {} def test_detect_binary_mimetype_should_fallback_when_magic_raises(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange fake_magic = MagicMock() fake_magic.from_buffer.side_effect = RuntimeError("magic failed") monkeypatch.setattr(service_module, "magic", fake_magic) # Act result = WebhookService._detect_binary_mimetype(b"binary") # Assert assert result == "application/octet-stream" def test_process_file_uploads_should_use_octet_stream_fallback_when_mimetype_unknown( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange webhook_trigger = _workflow_trigger(created_by="user-1", tenant_id="tenant-1") file_obj = MagicMock() file_obj.to_dict.return_value = {"id": "f-1"} monkeypatch.setattr(WebhookService, "_create_file_from_binary", MagicMock(return_value=file_obj)) monkeypatch.setattr(service_module.mimetypes, "guess_type", MagicMock(return_value=(None, None))) uploaded = MagicMock() uploaded.filename = "file.unknown" uploaded.content_type = None uploaded.read.return_value = b"content" # Act result = WebhookService._process_file_uploads({"f": uploaded}, webhook_trigger) # Assert assert result == {"f": {"id": "f-1"}} def test_create_file_from_binary_should_call_tool_file_manager_and_file_factory( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange webhook_trigger = _workflow_trigger(created_by="user-1", tenant_id="tenant-1") manager = MagicMock() manager.create_file_by_raw.return_value = SimpleNamespace(id="tool-file-1") monkeypatch.setattr(service_module, "ToolFileManager", MagicMock(return_value=manager)) expected_file = MagicMock() monkeypatch.setattr(service_module.file_factory, "build_from_mapping", MagicMock(return_value=expected_file)) # Act result = WebhookService._create_file_from_binary(b"abc", "text/plain", webhook_trigger) # Assert assert result is expected_file manager.create_file_by_raw.assert_called_once() @pytest.mark.parametrize( ("raw_value", "param_type", "expected"), [ ("42", SegmentType.NUMBER, 42), ("3.14", SegmentType.NUMBER, 3.14), ("yes", SegmentType.BOOLEAN, True), ("no", SegmentType.BOOLEAN, False), ], ) def test_convert_form_value_should_convert_supported_types( raw_value: str, param_type: str, expected: Any, ) -> None: # Arrange # Act result = WebhookService._convert_form_value("param", raw_value, param_type) # Assert assert result == expected def test_convert_form_value_should_raise_for_unsupported_type() -> None: # Arrange # Act / Assert with pytest.raises(ValueError, match="Unsupported type"): WebhookService._convert_form_value("p", "x", SegmentType.FILE) def test_validate_json_value_should_return_original_for_unmapped_supported_segment_type( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange warning_mock = MagicMock() monkeypatch.setattr(service_module.logger, "warning", warning_mock) # Act result = WebhookService._validate_json_value("param", {"x": 1}, "unsupported-type") # Assert assert result == {"x": 1} warning_mock.assert_called_once() def test_validate_and_convert_value_should_wrap_conversion_errors() -> None: # Arrange # Act / Assert with pytest.raises(ValueError, match="validation failed"): WebhookService._validate_and_convert_value("param", "bad", SegmentType.NUMBER, is_form_data=True) def test_process_parameters_should_raise_when_required_parameter_missing() -> None: # Arrange raw_params = {"optional": "x"} config = [WebhookParameter(name="required_param", type=SegmentType.STRING, required=True)] # Act / Assert with pytest.raises(ValueError, match="Required parameter missing"): WebhookService._process_parameters(raw_params, config, is_form_data=True) def test_process_parameters_should_include_unconfigured_parameters() -> None: # Arrange raw_params = {"known": "1", "unknown": "x"} config = [WebhookParameter(name="known", type=SegmentType.NUMBER, required=False)] # Act result = WebhookService._process_parameters(raw_params, config, is_form_data=True) # Assert assert result == {"known": 1, "unknown": "x"} def test_process_body_parameters_should_raise_when_required_text_raw_is_missing() -> None: # Arrange # Act / Assert with pytest.raises(ValueError, match="Required body content missing"): WebhookService._process_body_parameters( raw_body={"raw": ""}, body_configs=[WebhookBodyParameter(name="raw", required=True)], content_type=ContentType.TEXT, ) def test_process_body_parameters_should_skip_file_config_for_multipart_form_data() -> None: # Arrange raw_body = {"message": "hello", "extra": "x"} body_configs = [ WebhookBodyParameter(name="upload", type=SegmentType.FILE, required=True), WebhookBodyParameter(name="message", type=SegmentType.STRING, required=True), ] # Act result = WebhookService._process_body_parameters(raw_body, body_configs, ContentType.FORM_DATA) # Assert assert result == {"message": "hello", "extra": "x"} def test_validate_required_headers_should_accept_sanitized_header_names() -> None: # Arrange headers = {"x_api_key": "123"} configs = [WebhookParameter(name="x-api-key", required=True)] # Act WebhookService._validate_required_headers(headers, configs) # Assert assert True def test_validate_required_headers_should_raise_when_required_header_missing() -> None: # Arrange headers = {"x-other": "123"} configs = [WebhookParameter(name="x-api-key", required=True)] # Act / Assert with pytest.raises(ValueError, match="Required header missing"): WebhookService._validate_required_headers(headers, configs) def test_validate_http_metadata_should_return_content_type_mismatch_error() -> None: # Arrange webhook_data = {"method": "POST", "headers": {"Content-Type": "application/json"}} node_data = WebhookData(method="post", content_type=ContentType.TEXT) # Act result = WebhookService._validate_http_metadata(webhook_data, node_data) # Assert assert result["valid"] is False assert "Content-type mismatch" in result["error"] def test_extract_content_type_should_fallback_to_lowercase_header_key() -> None: # Arrange headers = {"content-type": "application/json; charset=utf-8"} # Act result = WebhookService._extract_content_type(headers) # Assert assert result == "application/json" def test_build_workflow_inputs_should_include_expected_keys() -> None: # Arrange webhook_data = {"headers": {"h": "v"}, "query_params": {"q": 1}, "body": {"b": 2}} # Act result = WebhookService.build_workflow_inputs(webhook_data) # Assert assert result["webhook_data"] == webhook_data assert result["webhook_headers"] == {"h": "v"} assert result["webhook_query_params"] == {"q": 1} assert result["webhook_body"] == {"b": 2} def test_trigger_workflow_execution_should_trigger_async_workflow_successfully(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange webhook_trigger = _workflow_trigger( app_id="app-1", node_id="node-1", tenant_id="tenant-1", webhook_id="webhook-1", ) workflow = _workflow(id="wf-1") webhook_data = {"body": {"x": 1}} session = MagicMock() _patch_session(monkeypatch, session) end_user = SimpleNamespace(id="end-user-1") monkeypatch.setattr( service_module.EndUserService, "get_or_create_end_user_by_type", MagicMock(return_value=end_user) ) quota_type = SimpleNamespace(TRIGGER=SimpleNamespace(consume=MagicMock())) monkeypatch.setattr(service_module, "QuotaType", quota_type) trigger_async_mock = MagicMock() monkeypatch.setattr(service_module.AsyncWorkflowService, "trigger_workflow_async", trigger_async_mock) # Act WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow) # Assert trigger_async_mock.assert_called_once() def test_trigger_workflow_execution_should_mark_tenant_rate_limited_when_quota_exceeded( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange webhook_trigger = _workflow_trigger( app_id="app-1", node_id="node-1", tenant_id="tenant-1", webhook_id="webhook-1", ) workflow = _workflow(id="wf-1") session = MagicMock() _patch_session(monkeypatch, session) monkeypatch.setattr( service_module.EndUserService, "get_or_create_end_user_by_type", MagicMock(return_value=SimpleNamespace(id="end-user-1")), ) monkeypatch.setattr( service_module.QuotaService, "reserve", MagicMock(side_effect=QuotaExceededError(feature="trigger", tenant_id="tenant-1", required=1)), ) mark_rate_limited_mock = MagicMock() monkeypatch.setattr(service_module.AppTriggerService, "mark_tenant_triggers_rate_limited", mark_rate_limited_mock) # Act / Assert with pytest.raises(QuotaExceededError): WebhookService.trigger_workflow_execution(webhook_trigger, {"body": {}}, workflow) mark_rate_limited_mock.assert_called_once_with("tenant-1") def test_trigger_workflow_execution_should_log_and_reraise_unexpected_errors(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange webhook_trigger = _workflow_trigger( app_id="app-1", node_id="node-1", tenant_id="tenant-1", webhook_id="webhook-1", ) workflow = _workflow(id="wf-1") session = MagicMock() _patch_session(monkeypatch, session) monkeypatch.setattr( service_module.EndUserService, "get_or_create_end_user_by_type", MagicMock(side_effect=RuntimeError("boom")) ) logger_exception_mock = MagicMock() monkeypatch.setattr(service_module.logger, "exception", logger_exception_mock) # Act / Assert with pytest.raises(RuntimeError, match="boom"): WebhookService.trigger_workflow_execution(webhook_trigger, {"body": {}}, workflow) logger_exception_mock.assert_called_once() def test_sync_webhook_relationships_should_raise_when_workflow_exceeds_node_limit() -> None: # Arrange app = _app(id="app-1", tenant_id="tenant-1", created_by="user-1") workflow = _workflow( walk_nodes=lambda _node_type: [ (f"node-{i}", {}) for i in range(WebhookService.MAX_WEBHOOK_NODES_PER_WORKFLOW + 1) ] ) # Act / Assert with pytest.raises(ValueError, match="maximum webhook node limit"): WebhookService.sync_webhook_relationships(app, workflow) def test_sync_webhook_relationships_should_raise_when_lock_not_acquired(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange app = _app(id="app-1", tenant_id="tenant-1", created_by="user-1") workflow = _workflow(walk_nodes=lambda _node_type: [("node-1", {})]) lock = MagicMock() lock.acquire.return_value = False monkeypatch.setattr(service_module.redis_client, "get", MagicMock(return_value=None)) monkeypatch.setattr(service_module.redis_client, "lock", MagicMock(return_value=lock)) # Act / Assert with pytest.raises(RuntimeError, match="Failed to acquire lock"): WebhookService.sync_webhook_relationships(app, workflow) def test_sync_webhook_relationships_should_create_missing_records_and_delete_stale_records( monkeypatch: pytest.MonkeyPatch, ) -> None: # Arrange app = _app(id="app-1", tenant_id="tenant-1", created_by="user-1") workflow = _workflow(walk_nodes=lambda _node_type: [("node-new", {})]) class _WorkflowWebhookTrigger: app_id = "app_id" tenant_id = "tenant_id" webhook_id = "webhook_id" node_id = "node_id" def __init__(self, app_id: str, tenant_id: str, node_id: str, webhook_id: str, created_by: str) -> None: self.id = None self.app_id = app_id self.tenant_id = tenant_id self.node_id = node_id self.webhook_id = webhook_id self.created_by = created_by class _Select: def where(self, *args: Any, **kwargs: Any) -> "_Select": return self class _Session: def __init__(self) -> None: self.added: list[Any] = [] self.deleted: list[Any] = [] self.commit_count = 0 self.existing_records = [SimpleNamespace(node_id="node-stale")] def scalars(self, _stmt: Any) -> Any: return SimpleNamespace(all=lambda: self.existing_records) def add(self, obj: Any) -> None: self.added.append(obj) def flush(self) -> None: for idx, obj in enumerate(self.added, start=1): if obj.id is None: obj.id = f"rec-{idx}" def commit(self) -> None: self.commit_count += 1 def delete(self, obj: Any) -> None: self.deleted.append(obj) lock = MagicMock() lock.acquire.return_value = True lock.release.return_value = None fake_session = _Session() monkeypatch.setattr(service_module, "WorkflowWebhookTrigger", _WorkflowWebhookTrigger) monkeypatch.setattr(service_module, "select", MagicMock(return_value=_Select())) monkeypatch.setattr(service_module.redis_client, "get", MagicMock(return_value=None)) monkeypatch.setattr(service_module.redis_client, "lock", MagicMock(return_value=lock)) redis_set_mock = MagicMock() redis_delete_mock = MagicMock() monkeypatch.setattr(service_module.redis_client, "set", redis_set_mock) monkeypatch.setattr(service_module.redis_client, "delete", redis_delete_mock) monkeypatch.setattr(WebhookService, "generate_webhook_id", MagicMock(return_value="generated-webhook-id")) _patch_session(monkeypatch, fake_session) # Act WebhookService.sync_webhook_relationships(app, workflow) # Assert assert len(fake_session.added) == 1 assert len(fake_session.deleted) == 1 redis_set_mock.assert_called_once() redis_delete_mock.assert_called_once() lock.release.assert_called_once() def test_sync_webhook_relationships_should_log_when_lock_release_fails(monkeypatch: pytest.MonkeyPatch) -> None: # Arrange app = _app(id="app-1", tenant_id="tenant-1", created_by="user-1") workflow = _workflow(walk_nodes=lambda _node_type: []) class _Select: def where(self, *args: Any, **kwargs: Any) -> "_Select": return self class _Session: def scalars(self, _stmt: Any) -> Any: return SimpleNamespace(all=lambda: []) def commit(self) -> None: return None lock = MagicMock() lock.acquire.return_value = True lock.release.side_effect = RuntimeError("release failed") logger_exception_mock = MagicMock() monkeypatch.setattr(service_module, "select", MagicMock(return_value=_Select())) monkeypatch.setattr(service_module.redis_client, "get", MagicMock(return_value=None)) monkeypatch.setattr(service_module.redis_client, "lock", MagicMock(return_value=lock)) monkeypatch.setattr(service_module.logger, "exception", logger_exception_mock) _patch_session(monkeypatch, _Session()) # Act WebhookService.sync_webhook_relationships(app, workflow) # Assert assert logger_exception_mock.call_count == 1 def test_generate_webhook_response_should_fallback_when_response_body_is_not_json() -> None: # Arrange node_config = {"data": {"status_code": 200, "response_body": "{bad-json"}} # Act body, status = WebhookService.generate_webhook_response(node_config) # Assert assert status == 200 assert "message" in body def test_generate_webhook_id_should_return_24_character_identifier() -> None: # Arrange # Act webhook_id = WebhookService.generate_webhook_id() # Assert assert isinstance(webhook_id, str) assert len(webhook_id) == 24 def test_sanitize_key_should_return_original_value_for_non_string_input() -> None: # Arrange # Act result = WebhookService._sanitize_key(123) # type: ignore[arg-type] # Assert assert result == 123