diff --git a/api/controllers/console/__init__.py b/api/controllers/console/__init__.py index 57dbc8da64..575e5b0c42 100644 --- a/api/controllers/console/__init__.py +++ b/api/controllers/console/__init__.py @@ -67,6 +67,7 @@ from .app import ( workflow_draft_variable, workflow_run, workflow_statistic, + workflow_trigger, ) # Import auth controllers diff --git a/api/controllers/console/app/workflow_trigger.py b/api/controllers/console/app/workflow_trigger.py new file mode 100644 index 0000000000..4189637582 --- /dev/null +++ b/api/controllers/console/app/workflow_trigger.py @@ -0,0 +1,151 @@ +import logging +import secrets + +from flask_restful import Resource, reqparse +from sqlalchemy.orm import Session +from werkzeug.exceptions import BadRequest, Forbidden, NotFound + +from configs import dify_config +from controllers.console import api +from controllers.console.app.wraps import get_app_model +from controllers.console.wraps import account_initialization_required, setup_required +from extensions.ext_database import db +from libs.login import current_user, login_required +from models.workflow import WorkflowWebhookTrigger + +logger = logging.getLogger(__name__) + + +class WebhookTriggerApi(Resource): + """Webhook Trigger API""" + + @setup_required + @login_required + @account_initialization_required + @get_app_model + def post(self, app_model): + """Create webhook trigger""" + parser = reqparse.RequestParser() + parser.add_argument("node_id", type=str, required=True, help="Node ID is required") + parser.add_argument( + "triggered_by", + type=str, + required=False, + default="production", + choices=["debugger", "production"], + help="triggered_by must be debugger or production", + ) + args = parser.parse_args() + + if app_model.mode != "workflow": + raise BadRequest("Invalid app mode, only workflow can add webhook node") + + # The role of the current user in the ta table must be admin, owner, or editor + if not current_user.is_editor: + raise Forbidden() + + node_id = args["node_id"] + triggered_by = args["triggered_by"] + + with Session(db.engine) as session: + # Check if webhook trigger already exists for this app, node, and environment + existing_trigger = ( + session.query(WorkflowWebhookTrigger) + .filter( + WorkflowWebhookTrigger.app_id == app_model.id, + WorkflowWebhookTrigger.node_id == node_id, + WorkflowWebhookTrigger.triggered_by == triggered_by, + ) + .first() + ) + + if existing_trigger: + raise BadRequest("Webhook trigger already exists for this node and environment") + + # Generate unique webhook_id + webhook_id = self._generate_webhook_id(session) + + # Create new webhook trigger + webhook_trigger = WorkflowWebhookTrigger( + app_id=app_model.id, + node_id=node_id, + tenant_id=current_user.current_tenant_id, + webhook_id=webhook_id, + triggered_by=triggered_by, + ) + + session.add(webhook_trigger) + session.commit() + session.refresh(webhook_trigger) + + return { + "id": webhook_trigger.id, + "webhook_id": webhook_trigger.webhook_id, + "webhook_url": f"{dify_config.SERVICE_API_URL}/triggers/webhook/{webhook_trigger.webhook_id}", + "node_id": webhook_trigger.node_id, + "triggered_by": webhook_trigger.triggered_by, + "created_at": webhook_trigger.created_at.isoformat(), + } + + @setup_required + @login_required + @account_initialization_required + @get_app_model + def delete(self, app_model): + """Delete webhook trigger""" + parser = reqparse.RequestParser() + parser.add_argument("node_id", type=str, required=True, help="Node ID is required") + parser.add_argument( + "triggered_by", + type=str, + required=False, + default="production", + choices=["debugger", "production"], + help="triggered_by must be debugger or production", + ) + args = parser.parse_args() + + # The role of the current user in the ta table must be admin, owner, or editor + if not current_user.is_editor: + raise Forbidden() + + node_id = args["node_id"] + triggered_by = args["triggered_by"] + + with Session(db.engine) as session: + # Find webhook trigger + webhook_trigger = ( + session.query(WorkflowWebhookTrigger) + .filter( + WorkflowWebhookTrigger.app_id == app_model.id, + WorkflowWebhookTrigger.node_id == node_id, + WorkflowWebhookTrigger.triggered_by == triggered_by, + WorkflowWebhookTrigger.tenant_id == current_user.current_tenant_id, + ) + .first() + ) + + if not webhook_trigger: + raise NotFound("Webhook trigger not found") + + session.delete(webhook_trigger) + session.commit() + + return {"result": "success"}, 204 + + def _generate_webhook_id(self, session: Session) -> str: + """Generate unique 24-character webhook ID""" + while True: + # Generate 24-character random string + webhook_id = secrets.token_urlsafe(18)[:24] # token_urlsafe gives base64url, take first 24 chars + + # Check if it already exists + existing = ( + session.query(WorkflowWebhookTrigger).filter(WorkflowWebhookTrigger.webhook_id == webhook_id).first() + ) + + if not existing: + return webhook_id + + +api.add_resource(WebhookTriggerApi, "/apps//workflows/triggers/webhook") diff --git a/api/controllers/trigger/__init__.py b/api/controllers/trigger/__init__.py new file mode 100644 index 0000000000..9132c0179e --- /dev/null +++ b/api/controllers/trigger/__init__.py @@ -0,0 +1,7 @@ +from flask import Blueprint + +# Create trigger blueprint +bp = Blueprint("trigger", __name__, url_prefix="/triggers") + +# Import routes after blueprint creation to avoid circular imports +from . import webhook diff --git a/api/controllers/trigger/webhook.py b/api/controllers/trigger/webhook.py new file mode 100644 index 0000000000..b3e1770779 --- /dev/null +++ b/api/controllers/trigger/webhook.py @@ -0,0 +1,43 @@ +import logging + +from flask import jsonify +from werkzeug.exceptions import BadRequest, NotFound + +from controllers.trigger import bp +from services.webhook_service import WebhookService + +logger = logging.getLogger(__name__) + + +@bp.route("/webhook/", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]) +def handle_webhook(webhook_id: str): + """ + Handle webhook trigger calls. + + This endpoint receives webhook calls and processes them according to the + configured webhook trigger settings. + """ + try: + # Get webhook trigger, workflow, and node configuration + webhook_trigger, workflow, node_config = WebhookService.get_webhook_trigger_and_workflow(webhook_id) + + # Extract request data + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + + # Validate request against node configuration + validation_result = WebhookService.validate_webhook_request(webhook_data, node_config) + if not validation_result["valid"]: + raise BadRequest(validation_result["error"]) + + # Process webhook call (send to Celery) + WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow) + + # Return configured response + response_data, status_code = WebhookService.generate_webhook_response(node_config) + return jsonify(response_data), status_code + + except ValueError as e: + raise NotFound(str(e)) + except Exception as e: + logger.exception(f"Webhook processing failed for {webhook_id}: {str(e)}") + return jsonify({"error": "Internal server error", "message": str(e)}), 500 diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 9e9f896da1..8fe195add4 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -138,17 +138,20 @@ class WorkflowAppGenerator(BaseAppGenerator): **extract_external_trace_id_from_args(args), } workflow_run_id = str(uuid.uuid4()) + if triggered_from in (WorkflowRunTriggeredFrom.DEBUGGING, WorkflowRunTriggeredFrom.APP_RUN): + # start node get inputs + inputs = self._prepare_user_inputs( + user_inputs=inputs, + variables=app_config.variables, + tenant_id=app_model.tenant_id, + strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False, + ) # init application generate entity application_generate_entity = WorkflowAppGenerateEntity( task_id=str(uuid.uuid4()), app_config=app_config, file_upload_config=file_extra_config, - inputs=self._prepare_user_inputs( - user_inputs=inputs, - variables=app_config.variables, - tenant_id=app_model.tenant_id, - strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False, - ), + inputs=inputs, files=list(system_files), user_id=user.id, stream=streaming, diff --git a/api/core/workflow/nodes/enums.py b/api/core/workflow/nodes/enums.py index 7cf9ab9107..e4a4a487c4 100644 --- a/api/core/workflow/nodes/enums.py +++ b/api/core/workflow/nodes/enums.py @@ -25,6 +25,7 @@ class NodeType(StrEnum): DOCUMENT_EXTRACTOR = "document-extractor" LIST_OPERATOR = "list-operator" AGENT = "agent" + WEBHOOK = "webhook" class ErrorStrategy(StrEnum): diff --git a/api/core/workflow/nodes/node_mapping.py b/api/core/workflow/nodes/node_mapping.py index 294b47670b..b3758be9d9 100644 --- a/api/core/workflow/nodes/node_mapping.py +++ b/api/core/workflow/nodes/node_mapping.py @@ -22,6 +22,7 @@ from core.workflow.nodes.tool import ToolNode from core.workflow.nodes.variable_aggregator import VariableAggregatorNode from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1 from core.workflow.nodes.variable_assigner.v2 import VariableAssignerNode as VariableAssignerNodeV2 +from core.workflow.nodes.webhook import WebhookNode LATEST_VERSION = "latest" @@ -132,4 +133,8 @@ NODE_TYPE_CLASSES_MAPPING: Mapping[NodeType, Mapping[str, type[BaseNode]]] = { "2": AgentNode, "1": AgentNode, }, + NodeType.WEBHOOK: { + LATEST_VERSION: WebhookNode, + "1": WebhookNode, + }, } diff --git a/api/core/workflow/nodes/webhook/__init__.py b/api/core/workflow/nodes/webhook/__init__.py new file mode 100644 index 0000000000..f77a165c42 --- /dev/null +++ b/api/core/workflow/nodes/webhook/__init__.py @@ -0,0 +1,3 @@ +from .node import WebhookNode + +__all__ = ["WebhookNode"] diff --git a/api/core/workflow/nodes/webhook/entities.py b/api/core/workflow/nodes/webhook/entities.py new file mode 100644 index 0000000000..d615fbf938 --- /dev/null +++ b/api/core/workflow/nodes/webhook/entities.py @@ -0,0 +1,61 @@ +from collections.abc import Sequence +from enum import StrEnum +from typing import Literal, Optional + +from pydantic import BaseModel, Field + +from core.workflow.nodes.base import BaseNodeData + + +class Method(StrEnum): + GET = "get" + POST = "post" + HEAD = "head" + PATCH = "patch" + PUT = "put" + DELETE = "delete" + + +class ContentType(StrEnum): + JSON = "application/json" + FORM_DATA = "multipart/form-data" + FORM_URLENCODED = "application/x-www-form-urlencoded" + TEXT = "text/plain" + FORM = "form" + + +class WebhookParameter(BaseModel): + """Parameter definition for headers, query params, or body.""" + + name: str + required: bool = False + + +class WebhookBodyParameter(BaseModel): + """Body parameter with type information.""" + + name: str + type: Literal["string", "number", "boolean", "object", "array", "file"] = "string" + required: bool = False + + +class WebhookData(BaseNodeData): + """ + Webhook Node Data. + """ + + class SyncMode(StrEnum): + SYNC = "async" # only support + + method: Method = Method.GET + content_type: ContentType = Field(alias="content-type", default=ContentType.JSON) + headers: Sequence[WebhookParameter] = Field(default_factory=list) + params: Sequence[WebhookParameter] = Field(default_factory=list) # query parameters + body: Sequence[WebhookBodyParameter] = Field(default_factory=list) + + status_code: int = 200 # Expected status code for response + response_body: str = "" # Template for response body + + # Webhook specific fields (not from client data, set internally) + webhook_id: Optional[str] = None # Set when webhook trigger is created + timeout: int = 30 # Timeout in seconds to wait for webhook response diff --git a/api/core/workflow/nodes/webhook/exc.py b/api/core/workflow/nodes/webhook/exc.py new file mode 100644 index 0000000000..dc2239c287 --- /dev/null +++ b/api/core/workflow/nodes/webhook/exc.py @@ -0,0 +1,25 @@ +from core.workflow.nodes.base.exc import BaseNodeError + + +class WebhookNodeError(BaseNodeError): + """Base webhook node error.""" + + pass + + +class WebhookTimeoutError(WebhookNodeError): + """Webhook timeout error.""" + + pass + + +class WebhookNotFoundError(WebhookNodeError): + """Webhook not found error.""" + + pass + + +class WebhookConfigError(WebhookNodeError): + """Webhook configuration error.""" + + pass diff --git a/api/core/workflow/nodes/webhook/node.py b/api/core/workflow/nodes/webhook/node.py new file mode 100644 index 0000000000..1480f9b16a --- /dev/null +++ b/api/core/workflow/nodes/webhook/node.py @@ -0,0 +1,118 @@ +from collections.abc import Mapping +from typing import Any, Optional + +from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus +from core.workflow.nodes.base import BaseNode +from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig +from core.workflow.nodes.enums import ErrorStrategy, NodeType + +from .entities import WebhookData + + +class WebhookNode(BaseNode): + _node_type = NodeType.WEBHOOK + + _node_data: WebhookData + + def init_node_data(self, data: Mapping[str, Any]) -> None: + self._node_data = WebhookData.model_validate(data) + + def _get_error_strategy(self) -> Optional[ErrorStrategy]: + return self._node_data.error_strategy + + def _get_retry_config(self) -> RetryConfig: + return self._node_data.retry_config + + def _get_title(self) -> str: + return self._node_data.title + + def _get_description(self) -> Optional[str]: + return self._node_data.desc + + def _get_default_value_dict(self) -> dict[str, Any]: + return self._node_data.default_value_dict + + def get_base_node_data(self) -> BaseNodeData: + return self._node_data + + @classmethod + def get_default_config(cls, filters: Optional[dict[str, Any]] = None) -> dict: + return { + "type": "webhook", + "config": { + "method": "get", + "content-type": "application/json", + "headers": [], + "params": [], + "body": [], + "async_mode": True, + "status_code": 200, + "response_body": "", + "timeout": 30, + }, + } + + @classmethod + def version(cls) -> str: + return "1" + + def _run(self) -> NodeRunResult: + """ + Run the webhook node. + + Like the start node, this simply takes the webhook data from the variable pool + and makes it available to downstream nodes. The actual webhook handling + happens in the trigger controller. + """ + # Get webhook data from variable pool (injected by Celery task) + webhook_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs) + + # Extract webhook-specific outputs based on node configuration + outputs = self._extract_configured_outputs(webhook_inputs) + + return NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + inputs=webhook_inputs, + outputs=outputs, + ) + + def _extract_configured_outputs(self, webhook_inputs: dict[str, Any]) -> dict[str, Any]: + """Extract outputs based on node configuration from webhook inputs.""" + outputs = {} + + # Get the raw webhook data (should be injected by Celery task) + webhook_data = webhook_inputs.get("webhook_data", {}) + + # Extract configured headers (case-insensitive) + webhook_headers = webhook_data.get("headers", {}) + webhook_headers_lower = {k.lower(): v for k, v in webhook_headers.items()} + + for header in self._node_data.headers: + header_name = header.name + # Try exact match first, then case-insensitive match + value = webhook_headers.get(header_name) or webhook_headers_lower.get(header_name.lower()) + outputs[header_name] = value + + # Extract configured query parameters + for param in self._node_data.params: + param_name = param.name + outputs[param_name] = webhook_data.get("query_params", {}).get(param_name) + + # Extract configured body parameters + for body_param in self._node_data.body: + param_name = body_param.name + param_type = body_param.type + + if param_type == "file": + # Get File object (already processed by webhook controller) + file_obj = webhook_data.get("files", {}).get(param_name) + outputs[param_name] = file_obj + else: + # Get regular body parameter + outputs[param_name] = webhook_data.get("body", {}).get(param_name) + + # Include raw webhook data for debugging/advanced use + outputs["_webhook_raw"] = webhook_data + + return outputs diff --git a/api/extensions/ext_blueprints.py b/api/extensions/ext_blueprints.py index a4d013ffc0..367c8a0dd0 100644 --- a/api/extensions/ext_blueprints.py +++ b/api/extensions/ext_blueprints.py @@ -12,6 +12,7 @@ def init_app(app: DifyApp): from controllers.inner_api import bp as inner_api_bp from controllers.mcp import bp as mcp_bp from controllers.service_api import bp as service_api_bp + from controllers.trigger import bp as trigger_bp from controllers.web import bp as web_bp CORS( @@ -48,3 +49,11 @@ def init_app(app: DifyApp): app.register_blueprint(inner_api_bp) app.register_blueprint(mcp_bp) + + # Register trigger blueprint with CORS for webhook calls + CORS( + trigger_bp, + allow_headers=["Content-Type", "Authorization", "X-App-Code"], + methods=["GET", "PUT", "POST", "DELETE", "OPTIONS", "PATCH", "HEAD"], + ) + app.register_blueprint(trigger_bp) diff --git a/api/migrations/versions/2025_08_23_2039-5871f634954d_add_workflow_webhook_table.py b/api/migrations/versions/2025_08_23_2039-5871f634954d_add_workflow_webhook_table.py new file mode 100644 index 0000000000..fe37696e5c --- /dev/null +++ b/api/migrations/versions/2025_08_23_2039-5871f634954d_add_workflow_webhook_table.py @@ -0,0 +1,47 @@ +"""Add workflow webhook table + +Revision ID: 5871f634954d +Revises: fa8b0fa6f407 +Create Date: 2025-08-23 20:39:20.704501 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '5871f634954d' +down_revision = '4558cfabe44e' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('workflow_webhook_triggers', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False), + sa.Column('app_id', models.types.StringUUID(), nullable=False), + sa.Column('node_id', sa.String(length=64), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('webhook_id', sa.String(length=24), nullable=False), + sa.Column('triggered_by', sa.String(length=16), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.PrimaryKeyConstraint('id', name='workflow_webhook_trigger_pkey'), + sa.UniqueConstraint('app_id', 'node_id', 'triggered_by', name='uniq_node'), + sa.UniqueConstraint('webhook_id', name='uniq_webhook_id') + ) + with op.batch_alter_table('workflow_webhook_triggers', schema=None) as batch_op: + batch_op.create_index('workflow_webhook_trigger_tenant_idx', ['tenant_id'], unique=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_webhook_triggers', schema=None) as batch_op: + batch_op.drop_index('workflow_webhook_trigger_tenant_idx') + + op.drop_table('workflow_webhook_triggers') + # ### end Alembic commands ### diff --git a/api/models/workflow.py b/api/models/workflow.py index 38eda31c5e..79eeaa3bfe 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1383,3 +1383,41 @@ class WorkflowTriggerLog(Base): "triggered_at": self.triggered_at.isoformat() if self.triggered_at else None, "finished_at": self.finished_at.isoformat() if self.finished_at else None, } + + +class WorkflowWebhookTrigger(Base): + """ + Workflow Webhook Trigger + + Attributes: + - id (uuid) Primary key + - app_id (uuid) App ID to bind to a specific app + - node_id (varchar) Node ID which node in the workflow + - tenant_id (uuid) Workspace ID + - webhook_id (varchar) Webhook ID for URL: https://api.dify.ai/triggers/webhook/:webhook_id + - triggered_by (varchar) Environment: debugger or production + - created_at (timestamp) Creation time + - updated_at (timestamp) Last update time + """ + + __tablename__ = "workflow_webhook_triggers" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="workflow_webhook_trigger_pkey"), + sa.Index("workflow_webhook_trigger_tenant_idx", "tenant_id"), + sa.UniqueConstraint("app_id", "node_id", "triggered_by", name="uniq_node"), + sa.UniqueConstraint("webhook_id", name="uniq_webhook_id"), + ) + + id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + node_id: Mapped[str] = mapped_column(String(64), nullable=False) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + webhook_id: Mapped[str] = mapped_column(String(24), nullable=False) + triggered_by: Mapped[str] = mapped_column(String(16), nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + server_default=func.current_timestamp(), + server_onupdate=func.current_timestamp(), + ) diff --git a/api/services/webhook_service.py b/api/services/webhook_service.py new file mode 100644 index 0000000000..9042545426 --- /dev/null +++ b/api/services/webhook_service.py @@ -0,0 +1,262 @@ +import logging +from typing import Any + +from flask import request +from sqlalchemy import select +from sqlalchemy.orm import Session + +from core.file.models import FileTransferMethod +from core.tools.tool_file_manager import ToolFileManager +from extensions.ext_database import db +from factories import file_factory +from models.account import Account, TenantAccountJoin, TenantAccountRole +from models.enums import WorkflowRunTriggeredFrom +from models.workflow import Workflow, WorkflowWebhookTrigger +from services.async_workflow_service import AsyncWorkflowService +from services.workflow.entities import TriggerData + +logger = logging.getLogger(__name__) + + +class WebhookService: + """Service for handling webhook operations.""" + + @classmethod + def get_webhook_trigger_and_workflow( + cls, webhook_id: str + ) -> tuple[WorkflowWebhookTrigger, Workflow, dict[str, Any]]: + """Get webhook trigger, workflow, and node configuration.""" + with Session(db.engine) as session: + # Get webhook trigger + webhook_trigger = ( + session.query(WorkflowWebhookTrigger).filter(WorkflowWebhookTrigger.webhook_id == webhook_id).first() + ) + if not webhook_trigger: + raise ValueError(f"Webhook not found: {webhook_id}") + + # Get workflow + workflow = ( + session.query(Workflow) + .filter( + Workflow.app_id == webhook_trigger.app_id, + Workflow.version != Workflow.VERSION_DRAFT, + ) + .order_by(Workflow.created_at.desc()) + .first() + ) + if not workflow: + raise ValueError(f"Workflow not found for app {webhook_trigger.app_id}") + + node_config = workflow.get_node_config_by_id(webhook_trigger.node_id) + + return webhook_trigger, workflow, node_config + + @classmethod + def extract_webhook_data(cls, webhook_trigger: WorkflowWebhookTrigger) -> dict[str, Any]: + """Extract and process data from incoming webhook request.""" + data = { + "method": request.method, + "headers": dict(request.headers), + "query_params": dict(request.args), + "body": {}, + "files": {}, + } + + content_type = request.headers.get("Content-Type", "").lower() + + # Extract body data based on content type + if "application/json" in content_type: + try: + data["body"] = request.get_json() or {} + except Exception: + data["body"] = {} + elif "application/x-www-form-urlencoded" in content_type: + data["body"] = dict(request.form) + elif "multipart/form-data" in content_type: + data["body"] = dict(request.form) + # Handle file uploads + if request.files: + data["files"] = cls._process_file_uploads(request.files, webhook_trigger) + else: + # Raw text data + try: + data["body"] = {"raw": request.get_data(as_text=True)} + except Exception: + data["body"] = {"raw": ""} + + return data + + @classmethod + def _process_file_uploads(cls, files, webhook_trigger: WorkflowWebhookTrigger) -> dict[str, Any]: + """Process file uploads using ToolFileManager.""" + processed_files = {} + + for name, file in files.items(): + if file and file.filename: + try: + tool_file_manager = ToolFileManager() + file_content = file.read() + + # Create file using ToolFileManager + tool_file = tool_file_manager.create_file_by_raw( + user_id="webhook_user", + tenant_id=webhook_trigger.tenant_id, + conversation_id=None, + file_binary=file_content, + mimetype=file.content_type or "application/octet-stream", + ) + + # Build File object + mapping = { + "tool_file_id": tool_file.id, + "transfer_method": FileTransferMethod.TOOL_FILE.value, + } + file_obj = file_factory.build_from_mapping( + mapping=mapping, + tenant_id=webhook_trigger.tenant_id, + ) + + processed_files[name] = file_obj + + except Exception as e: + logger.exception(f"Failed to process file upload {name}: {str(e)}") + # Continue processing other files + + return processed_files + + @classmethod + def validate_webhook_request(cls, webhook_data: dict[str, Any], node_config: dict[str, Any]) -> dict[str, Any]: + """Validate webhook request against node configuration.""" + try: + node_data = node_config.get("data", {}) + + # Validate HTTP method + configured_method = node_data.get("method", "get").upper() + request_method = webhook_data["method"].upper() + if configured_method != request_method: + return { + "valid": False, + "error": f"HTTP method mismatch. Expected {configured_method}, got {request_method}", + } + + # Validate required headers (case-insensitive) + headers = node_data.get("headers", []) + # Create case-insensitive header lookup + webhook_headers_lower = {k.lower(): v for k, v in webhook_data["headers"].items()} + + for header in headers: + if header.get("required", False): + header_name = header.get("name", "") + if header_name.lower() not in webhook_headers_lower: + return {"valid": False, "error": f"Required header missing: {header_name}"} + + # Validate required query parameters + params = node_data.get("params", []) + for param in params: + if param.get("required", False): + param_name = param.get("name", "") + if param_name not in webhook_data["query_params"]: + return {"valid": False, "error": f"Required query parameter missing: {param_name}"} + + # Validate required body parameters + body_params = node_data.get("body", []) + for body_param in body_params: + if body_param.get("required", False): + param_name = body_param.get("name", "") + param_type = body_param.get("type", "string") + + # Check if parameter exists + if param_type == "file": + file_obj = webhook_data.get("files", {}).get(param_name) + if not file_obj: + return {"valid": False, "error": f"Required file parameter missing: {param_name}"} + else: + if param_name not in webhook_data.get("body", {}): + return {"valid": False, "error": f"Required body parameter missing: {param_name}"} + + return {"valid": True} + + except Exception as e: + logger.exception(f"Validation error: {str(e)}") + return {"valid": False, "error": f"Validation failed: {str(e)}"} + + @classmethod + def trigger_workflow_execution( + cls, webhook_trigger: WorkflowWebhookTrigger, webhook_data: dict[str, Any], workflow: Workflow + ) -> None: + """Trigger workflow execution via AsyncWorkflowService.""" + try: + with Session(db.engine) as session: + # Get tenant owner as the user for webhook execution + tenant_owner = session.scalar( + select(Account) + .join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id) + .where( + TenantAccountJoin.tenant_id == webhook_trigger.tenant_id, + TenantAccountJoin.role == TenantAccountRole.OWNER, + ) + ) + + if not tenant_owner: + logger.error(f"Tenant owner not found for tenant {webhook_trigger.tenant_id}") + raise ValueError("Tenant owner not found") + + # Prepare inputs for the webhook node + # The webhook node expects webhook_data in the inputs + workflow_inputs = { + "webhook_data": webhook_data, + "webhook_headers": webhook_data.get("headers", {}), + "webhook_query_params": webhook_data.get("query_params", {}), + "webhook_body": webhook_data.get("body", {}), + "webhook_files": webhook_data.get("files", {}), + } + + # Create trigger data + trigger_data = TriggerData( + app_id=webhook_trigger.app_id, + workflow_id=workflow.id, + root_node_id=webhook_trigger.node_id, # Start from the webhook node + trigger_type=WorkflowRunTriggeredFrom.WEBHOOK, + inputs=workflow_inputs, + tenant_id=webhook_trigger.tenant_id, + ) + + # Trigger workflow execution asynchronously + AsyncWorkflowService.trigger_workflow_async( + session, + tenant_owner, + trigger_data, + ) + + except Exception as e: + logger.exception(f"Failed to trigger workflow for webhook {webhook_trigger.webhook_id}: {str(e)}") + raise + + @classmethod + def generate_webhook_response(cls, node_config: dict[str, Any]) -> tuple[dict[str, Any], int]: + """Generate HTTP response based on node configuration.""" + import json + + node_data = node_config.get("data", {}) + + # Get configured status code and response body + status_code = node_data.get("status_code", 200) + response_body = node_data.get("response_body", "") + + # Parse response body as JSON if it's valid JSON, otherwise return as text + try: + if response_body: + try: + response_data = ( + json.loads(response_body) + if response_body.strip().startswith(("{", "[")) + else {"message": response_body} + ) + except json.JSONDecodeError: + response_data = {"message": response_body} + else: + response_data = {"status": "success", "message": "Webhook processed successfully"} + except: + response_data = {"message": response_body or "Webhook processed successfully"} + + return response_data, status_code diff --git a/api/tests/test_containers_integration_tests/services/test_webhook_service.py b/api/tests/test_containers_integration_tests/services/test_webhook_service.py new file mode 100644 index 0000000000..f671a1e29d --- /dev/null +++ b/api/tests/test_containers_integration_tests/services/test_webhook_service.py @@ -0,0 +1,497 @@ +import json +from io import BytesIO +from unittest.mock import MagicMock, patch + +import pytest +from faker import Faker +from flask import Flask +from werkzeug.datastructures import FileStorage + +from models.model import App +from models.workflow import Workflow, WorkflowWebhookTrigger +from services.account_service import AccountService, TenantService +from services.webhook_service import WebhookService + + +class TestWebhookService: + """Integration tests for WebhookService using testcontainers.""" + + @pytest.fixture + def mock_external_dependencies(self): + """Mock external service dependencies.""" + with ( + patch("services.webhook_service.AsyncWorkflowService") as mock_async_service, + patch("services.webhook_service.ToolFileManager") as mock_tool_file_manager, + patch("services.webhook_service.file_factory") as mock_file_factory, + patch("services.account_service.FeatureService") as mock_feature_service, + ): + # Mock ToolFileManager + mock_tool_file_instance = MagicMock() + mock_tool_file_manager.return_value = mock_tool_file_instance + + # Mock file creation + mock_tool_file = MagicMock() + mock_tool_file.id = "test_file_id" + mock_tool_file_instance.create_file_by_raw.return_value = mock_tool_file + + # Mock file factory + mock_file_obj = MagicMock() + mock_file_factory.build_from_mapping.return_value = mock_file_obj + + # Mock feature service + mock_feature_service.get_system_features.return_value.is_allow_register = True + mock_feature_service.get_system_features.return_value.is_allow_create_workspace = True + + yield { + "async_service": mock_async_service, + "tool_file_manager": mock_tool_file_manager, + "file_factory": mock_file_factory, + "tool_file": mock_tool_file, + "file_obj": mock_file_obj, + "feature_service": mock_feature_service, + } + + @pytest.fixture + def test_data(self, db_session_with_containers, mock_external_dependencies): + """Create test data for webhook service tests.""" + fake = Faker() + + # Create account and tenant + account = AccountService.create_account( + email=fake.email(), + name=fake.name(), + interface_language="en-US", + password=fake.password(length=12), + ) + TenantService.create_owner_tenant_if_not_exist(account, name=fake.company()) + tenant = account.current_tenant + + # Create app + app = App( + tenant_id=tenant.id, + name=fake.company(), + description=fake.text(), + mode="workflow", + icon="", + icon_background="", + enable_site=True, + enable_api=True, + ) + db_session_with_containers.add(app) + db_session_with_containers.flush() + + # Create workflow + workflow_data = { + "nodes": [ + { + "id": "webhook_node", + "type": "webhook", + "data": { + "title": "Test Webhook", + "method": "post", + "content-type": "application/json", + "headers": [ + {"name": "Authorization", "required": True}, + {"name": "Content-Type", "required": False}, + ], + "params": [{"name": "version", "required": True}, {"name": "format", "required": False}], + "body": [ + {"name": "message", "type": "string", "required": True}, + {"name": "count", "type": "number", "required": False}, + {"name": "upload", "type": "file", "required": False}, + ], + "status_code": 200, + "response_body": '{"status": "success"}', + "timeout": 30, + }, + } + ], + "edges": [], + } + + workflow = Workflow( + tenant_id=tenant.id, + app_id=app.id, + type="workflow", + graph=json.dumps(workflow_data), + features=json.dumps({}), + created_by=account.id, + environment_variables=[], + conversation_variables=[], + version="1.0", + ) + db_session_with_containers.add(workflow) + db_session_with_containers.flush() + + # Create webhook trigger + webhook_id = fake.uuid4()[:16] + webhook_trigger = WorkflowWebhookTrigger( + app_id=app.id, + node_id="webhook_node", + tenant_id=tenant.id, + webhook_id=webhook_id, + triggered_by="production", + ) + db_session_with_containers.add(webhook_trigger) + db_session_with_containers.commit() + + return { + "tenant": tenant, + "account": account, + "app": app, + "workflow": workflow, + "webhook_trigger": webhook_trigger, + "webhook_id": webhook_id, + } + + def test_get_webhook_trigger_and_workflow_success(self, test_data, flask_app_with_containers): + """Test successful retrieval of webhook trigger and workflow.""" + webhook_id = test_data["webhook_id"] + + with flask_app_with_containers.app_context(): + webhook_trigger, workflow, node_config = WebhookService.get_webhook_trigger_and_workflow(webhook_id) + + assert webhook_trigger is not None + assert webhook_trigger.webhook_id == webhook_id + assert workflow is not None + assert workflow.app_id == test_data["app"].id + assert node_config is not None + assert node_config["id"] == "webhook_node" + assert node_config["data"]["title"] == "Test Webhook" + + def test_get_webhook_trigger_and_workflow_not_found(self, flask_app_with_containers): + """Test webhook trigger not found scenario.""" + with flask_app_with_containers.app_context(): + with pytest.raises(ValueError, match="Webhook not found"): + WebhookService.get_webhook_trigger_and_workflow("nonexistent_webhook") + + def test_extract_webhook_data_json(self): + """Test webhook data extraction from JSON request.""" + app = Flask(__name__) + + with app.test_request_context( + "/webhook", + method="POST", + headers={"Content-Type": "application/json", "Authorization": "Bearer token"}, + query_string="version=1&format=json", + json={"message": "hello", "count": 42}, + ): + webhook_trigger = MagicMock() + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + + assert webhook_data["method"] == "POST" + assert webhook_data["headers"]["Authorization"] == "Bearer token" + assert webhook_data["query_params"]["version"] == "1" + assert webhook_data["query_params"]["format"] == "json" + assert webhook_data["body"]["message"] == "hello" + assert webhook_data["body"]["count"] == 42 + assert webhook_data["files"] == {} + + def test_extract_webhook_data_form_urlencoded(self): + """Test webhook data extraction from form URL encoded request.""" + app = Flask(__name__) + + with app.test_request_context( + "/webhook", + method="POST", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={"username": "test", "password": "secret"}, + ): + webhook_trigger = MagicMock() + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + + assert webhook_data["method"] == "POST" + assert webhook_data["body"]["username"] == "test" + assert webhook_data["body"]["password"] == "secret" + + def test_extract_webhook_data_multipart_with_files(self, mock_external_dependencies): + """Test webhook data extraction from multipart form with files.""" + app = Flask(__name__) + + # Create a mock file + file_content = b"test file content" + file_storage = FileStorage(stream=BytesIO(file_content), filename="test.txt", content_type="text/plain") + + with app.test_request_context( + "/webhook", + method="POST", + headers={"Content-Type": "multipart/form-data"}, + data={"message": "test", "upload": file_storage}, + ): + webhook_trigger = MagicMock() + webhook_trigger.tenant_id = "test_tenant" + + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + + assert webhook_data["method"] == "POST" + assert webhook_data["body"]["message"] == "test" + assert "upload" in webhook_data["files"] + + # Verify file processing was called + mock_external_dependencies["tool_file_manager"].assert_called_once() + mock_external_dependencies["file_factory"].build_from_mapping.assert_called_once() + + def test_extract_webhook_data_raw_text(self): + """Test webhook data extraction from raw text request.""" + app = Flask(__name__) + + with app.test_request_context( + "/webhook", method="POST", headers={"Content-Type": "text/plain"}, data="raw text content" + ): + webhook_trigger = MagicMock() + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + + assert webhook_data["method"] == "POST" + assert webhook_data["body"]["raw"] == "raw text content" + + def test_validate_webhook_request_success(self): + """Test successful webhook request validation.""" + webhook_data = { + "method": "POST", + "headers": {"Authorization": "Bearer token", "Content-Type": "application/json"}, + "query_params": {"version": "1"}, + "body": {"message": "hello"}, + "files": {}, + } + + node_config = { + "data": { + "method": "post", + "headers": [{"name": "Authorization", "required": True}, {"name": "Content-Type", "required": False}], + "params": [{"name": "version", "required": True}], + "body": [{"name": "message", "type": "string", "required": True}], + } + } + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is True + + def test_validate_webhook_request_method_mismatch(self): + """Test webhook validation with HTTP method mismatch.""" + webhook_data = {"method": "GET", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + node_config = {"data": {"method": "post"}} + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "HTTP method mismatch" in result["error"] + + def test_validate_webhook_request_missing_required_header(self): + """Test webhook validation with missing required header.""" + webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + node_config = {"data": {"method": "post", "headers": [{"name": "Authorization", "required": True}]}} + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "Required header missing: Authorization" in result["error"] + + def test_validate_webhook_request_case_insensitive_headers(self): + """Test webhook validation with case-insensitive header matching.""" + webhook_data = { + "method": "POST", + "headers": {"authorization": "Bearer token"}, # lowercase + "query_params": {}, + "body": {}, + "files": {}, + } + + node_config = { + "data": { + "method": "post", + "headers": [ + {"name": "Authorization", "required": True} # Pascal case + ], + } + } + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is True + + def test_validate_webhook_request_missing_required_param(self): + """Test webhook validation with missing required query parameter.""" + webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + node_config = {"data": {"method": "post", "params": [{"name": "version", "required": True}]}} + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "Required query parameter missing: version" in result["error"] + + def test_validate_webhook_request_missing_required_body_param(self): + """Test webhook validation with missing required body parameter.""" + webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + node_config = {"data": {"method": "post", "body": [{"name": "message", "type": "string", "required": True}]}} + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "Required body parameter missing: message" in result["error"] + + def test_validate_webhook_request_missing_required_file(self): + """Test webhook validation with missing required file parameter.""" + webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + node_config = {"data": {"method": "post", "body": [{"name": "upload", "type": "file", "required": True}]}} + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "Required file parameter missing: upload" in result["error"] + + def test_trigger_workflow_execution_success(self, test_data, mock_external_dependencies, flask_app_with_containers): + """Test successful workflow execution trigger.""" + webhook_data = { + "method": "POST", + "headers": {"Authorization": "Bearer token"}, + "query_params": {"version": "1"}, + "body": {"message": "hello"}, + "files": {}, + } + + with flask_app_with_containers.app_context(): + # Mock tenant owner lookup to return the test account + with patch("services.webhook_service.select") as mock_select: + mock_query = MagicMock() + mock_select.return_value.join.return_value.where.return_value = mock_query + + # Mock the session to return our test account + with patch("services.webhook_service.Session") as mock_session: + mock_session_instance = MagicMock() + mock_session.return_value.__enter__.return_value = mock_session_instance + mock_session_instance.scalar.return_value = test_data["account"] + + # Should not raise any exceptions + WebhookService.trigger_workflow_execution( + test_data["webhook_trigger"], webhook_data, test_data["workflow"] + ) + + # Verify AsyncWorkflowService was called + mock_external_dependencies["async_service"].trigger_workflow_async.assert_called_once() + + def test_trigger_workflow_execution_no_tenant_owner( + self, test_data, mock_external_dependencies, flask_app_with_containers + ): + """Test workflow execution trigger when tenant owner not found.""" + webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + with flask_app_with_containers.app_context(): + # Mock tenant owner lookup to return None + with ( + patch("services.webhook_service.select") as mock_select, + patch("services.webhook_service.Session") as mock_session, + ): + mock_session_instance = MagicMock() + mock_session.return_value.__enter__.return_value = mock_session_instance + mock_session_instance.scalar.return_value = None + + with pytest.raises(ValueError, match="Tenant owner not found"): + WebhookService.trigger_workflow_execution( + test_data["webhook_trigger"], webhook_data, test_data["workflow"] + ) + + def test_generate_webhook_response_default(self): + """Test webhook response generation with default values.""" + node_config = {"data": {}} + + response_data, status_code = WebhookService.generate_webhook_response(node_config) + + assert status_code == 200 + assert response_data["status"] == "success" + assert "Webhook processed successfully" in response_data["message"] + + def test_generate_webhook_response_custom_json(self): + """Test webhook response generation with custom JSON response.""" + node_config = {"data": {"status_code": 201, "response_body": '{"result": "created", "id": 123}'}} + + response_data, status_code = WebhookService.generate_webhook_response(node_config) + + assert status_code == 201 + assert response_data["result"] == "created" + assert response_data["id"] == 123 + + def test_generate_webhook_response_custom_text(self): + """Test webhook response generation with custom text response.""" + node_config = {"data": {"status_code": 202, "response_body": "Request accepted for processing"}} + + response_data, status_code = WebhookService.generate_webhook_response(node_config) + + assert status_code == 202 + assert response_data["message"] == "Request accepted for processing" + + def test_generate_webhook_response_invalid_json(self): + """Test webhook response generation with invalid JSON response.""" + node_config = {"data": {"status_code": 400, "response_body": '{"invalid": json}'}} + + response_data, status_code = WebhookService.generate_webhook_response(node_config) + + assert status_code == 400 + assert response_data["message"] == '{"invalid": json}' + + def test_process_file_uploads_success(self, mock_external_dependencies): + """Test successful file upload processing.""" + # Create mock files + files = { + "file1": MagicMock(filename="test1.txt", content_type="text/plain"), + "file2": MagicMock(filename="test2.jpg", content_type="image/jpeg"), + } + + # Mock file reads + files["file1"].read.return_value = b"content1" + files["file2"].read.return_value = b"content2" + + webhook_trigger = MagicMock() + webhook_trigger.tenant_id = "test_tenant" + + result = WebhookService._process_file_uploads(files, webhook_trigger) + + assert len(result) == 2 + assert "file1" in result + assert "file2" in result + + # Verify file processing was called for each file + assert mock_external_dependencies["tool_file_manager"].call_count == 2 + assert mock_external_dependencies["file_factory"].build_from_mapping.call_count == 2 + + def test_process_file_uploads_with_errors(self, mock_external_dependencies): + """Test file upload processing with errors.""" + # Create mock files, one will fail + files = { + "good_file": MagicMock(filename="test.txt", content_type="text/plain"), + "bad_file": MagicMock(filename="test.bad", content_type="text/plain"), + } + + files["good_file"].read.return_value = b"content" + files["bad_file"].read.side_effect = Exception("Read error") + + webhook_trigger = MagicMock() + webhook_trigger.tenant_id = "test_tenant" + + result = WebhookService._process_file_uploads(files, webhook_trigger) + + # Should process the good file and skip the bad one + assert len(result) == 1 + assert "good_file" in result + assert "bad_file" not in result + + def test_process_file_uploads_empty_filename(self, mock_external_dependencies): + """Test file upload processing with empty filename.""" + files = { + "no_filename": MagicMock(filename="", content_type="text/plain"), + "none_filename": MagicMock(filename=None, content_type="text/plain"), + } + + webhook_trigger = MagicMock() + webhook_trigger.tenant_id = "test_tenant" + + result = WebhookService._process_file_uploads(files, webhook_trigger) + + # Should skip files without filenames + assert len(result) == 0 + mock_external_dependencies["tool_file_manager"].assert_not_called() diff --git a/api/tests/unit_tests/core/workflow/nodes/webhook/__init__.py b/api/tests/unit_tests/core/workflow/nodes/webhook/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/tests/unit_tests/core/workflow/nodes/webhook/test_entities.py b/api/tests/unit_tests/core/workflow/nodes/webhook/test_entities.py new file mode 100644 index 0000000000..3806efd488 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/webhook/test_entities.py @@ -0,0 +1,294 @@ +import pytest +from pydantic import ValidationError + +from core.workflow.nodes.webhook.entities import ( + ContentType, + Method, + WebhookBodyParameter, + WebhookData, + WebhookParameter, +) + + +def test_method_enum(): + """Test Method enum values.""" + assert Method.GET == "get" + assert Method.POST == "post" + assert Method.HEAD == "head" + assert Method.PATCH == "patch" + assert Method.PUT == "put" + assert Method.DELETE == "delete" + + # Test all enum values are strings + for method in Method: + assert isinstance(method.value, str) + + +def test_content_type_enum(): + """Test ContentType enum values.""" + assert ContentType.JSON == "application/json" + assert ContentType.FORM_DATA == "multipart/form-data" + assert ContentType.FORM_URLENCODED == "application/x-www-form-urlencoded" + assert ContentType.TEXT == "text/plain" + assert ContentType.FORM == "form" + + # Test all enum values are strings + for content_type in ContentType: + assert isinstance(content_type.value, str) + + +def test_webhook_parameter_creation(): + """Test WebhookParameter model creation and validation.""" + # Test with all fields + param = WebhookParameter(name="api_key", required=True) + assert param.name == "api_key" + assert param.required is True + + # Test with defaults + param_default = WebhookParameter(name="optional_param") + assert param_default.name == "optional_param" + assert param_default.required is False + + # Test validation - name is required + with pytest.raises(ValidationError): + WebhookParameter() + + +def test_webhook_body_parameter_creation(): + """Test WebhookBodyParameter model creation and validation.""" + # Test with all fields + body_param = WebhookBodyParameter( + name="user_data", + type="object", + required=True, + ) + assert body_param.name == "user_data" + assert body_param.type == "object" + assert body_param.required is True + + # Test with defaults + body_param_default = WebhookBodyParameter(name="message") + assert body_param_default.name == "message" + assert body_param_default.type == "string" # Default type + assert body_param_default.required is False + + # Test validation - name is required + with pytest.raises(ValidationError): + WebhookBodyParameter() + + +def test_webhook_body_parameter_types(): + """Test WebhookBodyParameter type validation.""" + valid_types = ["string", "number", "boolean", "object", "array", "file"] + + for param_type in valid_types: + param = WebhookBodyParameter(name="test", type=param_type) + assert param.type == param_type + + # Test invalid type + with pytest.raises(ValidationError): + WebhookBodyParameter(name="test", type="invalid_type") + + +def test_webhook_data_creation_minimal(): + """Test WebhookData creation with minimal required fields.""" + data = WebhookData(title="Test Webhook") + + assert data.title == "Test Webhook" + assert data.method == Method.GET # Default + assert data.content_type == ContentType.JSON # Default + assert data.headers == [] # Default + assert data.params == [] # Default + assert data.body == [] # Default + assert data.status_code == 200 # Default + assert data.response_body == "" # Default + assert data.webhook_id is None # Default + assert data.timeout == 30 # Default + + +def test_webhook_data_creation_full(): + """Test WebhookData creation with all fields.""" + headers = [ + WebhookParameter(name="Authorization", required=True), + WebhookParameter(name="Content-Type", required=False), + ] + params = [ + WebhookParameter(name="version", required=True), + WebhookParameter(name="format", required=False), + ] + body = [ + WebhookBodyParameter(name="message", type="string", required=True), + WebhookBodyParameter(name="count", type="number", required=False), + WebhookBodyParameter(name="upload", type="file", required=True), + ] + + # Use the alias for content_type to test it properly + data = WebhookData( + title="Full Webhook Test", + desc="A comprehensive webhook test", + method=Method.POST, + **{"content-type": ContentType.FORM_DATA}, + headers=headers, + params=params, + body=body, + status_code=201, + response_body='{"success": true}', + webhook_id="webhook_123", + timeout=60, + ) + + assert data.title == "Full Webhook Test" + assert data.desc == "A comprehensive webhook test" + assert data.method == Method.POST + assert data.content_type == ContentType.FORM_DATA + assert len(data.headers) == 2 + assert len(data.params) == 2 + assert len(data.body) == 3 + assert data.status_code == 201 + assert data.response_body == '{"success": true}' + assert data.webhook_id == "webhook_123" + assert data.timeout == 60 + + +def test_webhook_data_content_type_alias(): + """Test WebhookData content_type field alias.""" + # Test using the alias "content-type" + data1 = WebhookData(title="Test", **{"content-type": "application/json"}) + assert data1.content_type == ContentType.JSON + + # Test using the alias with enum value + data2 = WebhookData(title="Test", **{"content-type": ContentType.FORM_DATA}) + assert data2.content_type == ContentType.FORM_DATA + + # Test both approaches result in same field + assert hasattr(data1, "content_type") + assert hasattr(data2, "content_type") + + +def test_webhook_data_model_dump(): + """Test WebhookData model serialization.""" + data = WebhookData( + title="Test Webhook", + method=Method.POST, + content_type=ContentType.JSON, + headers=[WebhookParameter(name="Authorization", required=True)], + params=[WebhookParameter(name="version", required=False)], + body=[WebhookBodyParameter(name="message", type="string", required=True)], + status_code=200, + response_body="OK", + timeout=30, + ) + + dumped = data.model_dump() + + assert dumped["title"] == "Test Webhook" + assert dumped["method"] == "post" + assert dumped["content_type"] == "application/json" + assert len(dumped["headers"]) == 1 + assert dumped["headers"][0]["name"] == "Authorization" + assert dumped["headers"][0]["required"] is True + assert len(dumped["params"]) == 1 + assert len(dumped["body"]) == 1 + assert dumped["body"][0]["type"] == "string" + + +def test_webhook_data_model_dump_with_alias(): + """Test WebhookData model serialization includes alias.""" + data = WebhookData( + title="Test Webhook", + **{"content-type": ContentType.FORM_DATA}, + ) + + dumped = data.model_dump(by_alias=True) + assert "content-type" in dumped + assert dumped["content-type"] == "multipart/form-data" + + +def test_webhook_data_validation_errors(): + """Test WebhookData validation errors.""" + # Title is required (inherited from BaseNodeData) + with pytest.raises(ValidationError): + WebhookData() + + # Invalid method + with pytest.raises(ValidationError): + WebhookData(title="Test", method="invalid_method") + + # Invalid content_type via alias + with pytest.raises(ValidationError): + WebhookData(title="Test", **{"content-type": "invalid/type"}) + + # Invalid status_code (should be int) - use non-numeric string + with pytest.raises(ValidationError): + WebhookData(title="Test", status_code="invalid") + + # Invalid timeout (should be int) - use non-numeric string + with pytest.raises(ValidationError): + WebhookData(title="Test", timeout="invalid") + + # Valid cases that should NOT raise errors + # These should work fine (pydantic converts string numbers to int) + valid_data = WebhookData(title="Test", status_code="200", timeout="30") + assert valid_data.status_code == 200 + assert valid_data.timeout == 30 + + +def test_webhook_data_sequence_fields(): + """Test WebhookData sequence field behavior.""" + # Test empty sequences + data = WebhookData(title="Test") + assert data.headers == [] + assert data.params == [] + assert data.body == [] + + # Test immutable sequences + headers = [WebhookParameter(name="test")] + data = WebhookData(title="Test", headers=headers) + + # Original list shouldn't affect the model + headers.append(WebhookParameter(name="test2")) + assert len(data.headers) == 1 # Should still be 1 + + +def test_webhook_data_sync_mode(): + """Test WebhookData SyncMode nested enum.""" + # Test that SyncMode enum exists and has expected value + assert hasattr(WebhookData, "SyncMode") + assert WebhookData.SyncMode.SYNC == "async" # Note: confusingly named but correct + + +def test_webhook_parameter_edge_cases(): + """Test WebhookParameter edge cases.""" + # Test with special characters in name + param = WebhookParameter(name="X-Custom-Header-123", required=True) + assert param.name == "X-Custom-Header-123" + + # Test with empty string name (should be valid if pydantic allows it) + param_empty = WebhookParameter(name="", required=False) + assert param_empty.name == "" + + +def test_webhook_body_parameter_edge_cases(): + """Test WebhookBodyParameter edge cases.""" + # Test file type parameter + file_param = WebhookBodyParameter(name="upload", type="file", required=True) + assert file_param.type == "file" + assert file_param.required is True + + # Test all valid types + for param_type in ["string", "number", "boolean", "object", "array", "file"]: + param = WebhookBodyParameter(name=f"test_{param_type}", type=param_type) + assert param.type == param_type + + +def test_webhook_data_inheritance(): + """Test WebhookData inherits from BaseNodeData correctly.""" + from core.workflow.nodes.base import BaseNodeData + + # Test that WebhookData is a subclass of BaseNodeData + assert issubclass(WebhookData, BaseNodeData) + + # Test that instances have BaseNodeData properties + data = WebhookData(title="Test") + assert hasattr(data, "title") + assert hasattr(data, "desc") # Inherited from BaseNodeData diff --git a/api/tests/unit_tests/core/workflow/nodes/webhook/test_exceptions.py b/api/tests/unit_tests/core/workflow/nodes/webhook/test_exceptions.py new file mode 100644 index 0000000000..3d747210ff --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/webhook/test_exceptions.py @@ -0,0 +1,195 @@ +import pytest + +from core.workflow.nodes.base.exc import BaseNodeError +from core.workflow.nodes.webhook.exc import ( + WebhookConfigError, + WebhookNodeError, + WebhookNotFoundError, + WebhookTimeoutError, +) + + +def test_webhook_node_error_inheritance(): + """Test WebhookNodeError inherits from BaseNodeError.""" + assert issubclass(WebhookNodeError, BaseNodeError) + + # Test instantiation + error = WebhookNodeError("Test error message") + assert str(error) == "Test error message" + assert isinstance(error, BaseNodeError) + + +def test_webhook_timeout_error(): + """Test WebhookTimeoutError functionality.""" + # Test inheritance + assert issubclass(WebhookTimeoutError, WebhookNodeError) + assert issubclass(WebhookTimeoutError, BaseNodeError) + + # Test instantiation with message + error = WebhookTimeoutError("Webhook request timed out") + assert str(error) == "Webhook request timed out" + + # Test instantiation without message + error_no_msg = WebhookTimeoutError() + assert isinstance(error_no_msg, WebhookTimeoutError) + + +def test_webhook_not_found_error(): + """Test WebhookNotFoundError functionality.""" + # Test inheritance + assert issubclass(WebhookNotFoundError, WebhookNodeError) + assert issubclass(WebhookNotFoundError, BaseNodeError) + + # Test instantiation with message + error = WebhookNotFoundError("Webhook trigger not found") + assert str(error) == "Webhook trigger not found" + + # Test instantiation without message + error_no_msg = WebhookNotFoundError() + assert isinstance(error_no_msg, WebhookNotFoundError) + + +def test_webhook_config_error(): + """Test WebhookConfigError functionality.""" + # Test inheritance + assert issubclass(WebhookConfigError, WebhookNodeError) + assert issubclass(WebhookConfigError, BaseNodeError) + + # Test instantiation with message + error = WebhookConfigError("Invalid webhook configuration") + assert str(error) == "Invalid webhook configuration" + + # Test instantiation without message + error_no_msg = WebhookConfigError() + assert isinstance(error_no_msg, WebhookConfigError) + + +def test_webhook_error_hierarchy(): + """Test the complete webhook error hierarchy.""" + # All webhook errors should inherit from WebhookNodeError + webhook_errors = [ + WebhookTimeoutError, + WebhookNotFoundError, + WebhookConfigError, + ] + + for error_class in webhook_errors: + assert issubclass(error_class, WebhookNodeError) + assert issubclass(error_class, BaseNodeError) + + +def test_webhook_error_instantiation_with_args(): + """Test webhook error instantiation with various arguments.""" + # Test with single string argument + error1 = WebhookNodeError("Simple error message") + assert str(error1) == "Simple error message" + + # Test with multiple arguments + error2 = WebhookTimeoutError("Timeout after", 30, "seconds") + # Note: The exact string representation depends on Exception.__str__ implementation + assert "Timeout after" in str(error2) + + # Test with keyword arguments (if supported by base Exception) + error3 = WebhookConfigError("Config error in field: timeout") + assert "Config error in field: timeout" in str(error3) + + +def test_webhook_error_as_exceptions(): + """Test that webhook errors can be raised and caught properly.""" + # Test raising and catching WebhookNodeError + with pytest.raises(WebhookNodeError) as exc_info: + raise WebhookNodeError("Base webhook error") + assert str(exc_info.value) == "Base webhook error" + + # Test raising and catching specific errors + with pytest.raises(WebhookTimeoutError) as exc_info: + raise WebhookTimeoutError("Request timeout") + assert str(exc_info.value) == "Request timeout" + + with pytest.raises(WebhookNotFoundError) as exc_info: + raise WebhookNotFoundError("Webhook not found") + assert str(exc_info.value) == "Webhook not found" + + with pytest.raises(WebhookConfigError) as exc_info: + raise WebhookConfigError("Invalid config") + assert str(exc_info.value) == "Invalid config" + + +def test_webhook_error_catching_hierarchy(): + """Test that webhook errors can be caught by their parent classes.""" + # WebhookTimeoutError should be catchable as WebhookNodeError + with pytest.raises(WebhookNodeError): + raise WebhookTimeoutError("Timeout error") + + # WebhookNotFoundError should be catchable as WebhookNodeError + with pytest.raises(WebhookNodeError): + raise WebhookNotFoundError("Not found error") + + # WebhookConfigError should be catchable as WebhookNodeError + with pytest.raises(WebhookNodeError): + raise WebhookConfigError("Config error") + + # All webhook errors should be catchable as BaseNodeError + with pytest.raises(BaseNodeError): + raise WebhookTimeoutError("Timeout as base error") + + with pytest.raises(BaseNodeError): + raise WebhookNotFoundError("Not found as base error") + + with pytest.raises(BaseNodeError): + raise WebhookConfigError("Config as base error") + + +def test_webhook_error_attributes(): + """Test webhook error class attributes.""" + # Test that all error classes have proper __name__ + assert WebhookNodeError.__name__ == "WebhookNodeError" + assert WebhookTimeoutError.__name__ == "WebhookTimeoutError" + assert WebhookNotFoundError.__name__ == "WebhookNotFoundError" + assert WebhookConfigError.__name__ == "WebhookConfigError" + + # Test that all error classes have proper __module__ + expected_module = "core.workflow.nodes.webhook.exc" + assert WebhookNodeError.__module__ == expected_module + assert WebhookTimeoutError.__module__ == expected_module + assert WebhookNotFoundError.__module__ == expected_module + assert WebhookConfigError.__module__ == expected_module + + +def test_webhook_error_docstrings(): + """Test webhook error class docstrings.""" + assert WebhookNodeError.__doc__ == "Base webhook node error." + assert WebhookTimeoutError.__doc__ == "Webhook timeout error." + assert WebhookNotFoundError.__doc__ == "Webhook not found error." + assert WebhookConfigError.__doc__ == "Webhook configuration error." + + +def test_webhook_error_repr_and_str(): + """Test webhook error string representations.""" + error = WebhookNodeError("Test message") + + # Test __str__ method + assert str(error) == "Test message" + + # Test __repr__ method (should include class name) + repr_str = repr(error) + assert "WebhookNodeError" in repr_str + assert "Test message" in repr_str + + +def test_webhook_error_with_no_message(): + """Test webhook errors with no message.""" + # Test that errors can be instantiated without messages + errors = [ + WebhookNodeError(), + WebhookTimeoutError(), + WebhookNotFoundError(), + WebhookConfigError(), + ] + + for error in errors: + # Should be instances of their respective classes + assert isinstance(error, type(error)) + # Should be able to be raised + with pytest.raises(type(error)): + raise error diff --git a/api/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_node.py b/api/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_node.py new file mode 100644 index 0000000000..a8fa2b8637 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/webhook/test_webhook_node.py @@ -0,0 +1,481 @@ +import pytest + +from core.app.entities.app_invoke_entities import InvokeFrom +from core.file import File, FileTransferMethod, FileType +from core.variables import StringVariable +from core.workflow.entities.variable_pool import VariablePool +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus +from core.workflow.graph_engine import Graph, GraphInitParams, GraphRuntimeState +from core.workflow.nodes.answer import AnswerStreamGenerateRoute +from core.workflow.nodes.end import EndStreamParam +from core.workflow.nodes.webhook import WebhookNode +from core.workflow.nodes.webhook.entities import ( + ContentType, + Method, + WebhookBodyParameter, + WebhookData, + WebhookParameter, +) +from core.workflow.system_variable import SystemVariable +from models.enums import UserFrom +from models.workflow import WorkflowType + + +def create_webhook_node(webhook_data: WebhookData, variable_pool: VariablePool) -> WebhookNode: + """Helper function to create a webhook node with proper initialization.""" + node_config = { + "id": "1", + "data": webhook_data.model_dump(), + } + + node = WebhookNode( + id="1", + config=node_config, + graph_init_params=GraphInitParams( + tenant_id="1", + app_id="1", + workflow_type=WorkflowType.WORKFLOW, + workflow_id="1", + graph_config={}, + user_id="1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.SERVICE_API, + call_depth=0, + ), + graph=Graph( + root_node_id="1", + answer_stream_generate_routes=AnswerStreamGenerateRoute( + answer_dependencies={}, + answer_generate_route={}, + ), + end_stream_param=EndStreamParam( + end_dependencies={}, + end_stream_variable_selector_mapping={}, + ), + ), + graph_runtime_state=GraphRuntimeState( + variable_pool=variable_pool, + start_at=0, + ), + ) + + node.init_node_data(node_config["data"]) + return node + + +def test_webhook_node_basic_initialization(): + """Test basic webhook node initialization and configuration.""" + data = WebhookData( + title="Test Webhook", + method=Method.POST, + content_type=ContentType.JSON, + headers=[WebhookParameter(name="X-API-Key", required=True)], + params=[WebhookParameter(name="version", required=False)], + body=[WebhookBodyParameter(name="message", type="string", required=True)], + status_code=200, + response_body="OK", + timeout=30, + ) + + variable_pool = VariablePool( + system_variables=SystemVariable.empty(), + user_inputs={}, + ) + + node = create_webhook_node(data, variable_pool) + + assert node._node_type.value == "webhook" + assert node.version() == "1" + assert node._get_title() == "Test Webhook" + assert node._node_data.method == Method.POST + assert node._node_data.content_type == ContentType.JSON + assert len(node._node_data.headers) == 1 + assert len(node._node_data.params) == 1 + assert len(node._node_data.body) == 1 + + +def test_webhook_node_default_config(): + """Test webhook node default configuration.""" + config = WebhookNode.get_default_config() + + assert config["type"] == "webhook" + assert config["config"]["method"] == "get" + assert config["config"]["content-type"] == "application/json" + assert config["config"]["headers"] == [] + assert config["config"]["params"] == [] + assert config["config"]["body"] == [] + assert config["config"]["async_mode"] is True + assert config["config"]["status_code"] == 200 + assert config["config"]["response_body"] == "" + assert config["config"]["timeout"] == 30 + + +def test_webhook_node_run_with_headers(): + """Test webhook node execution with header extraction.""" + data = WebhookData( + title="Test Webhook", + headers=[ + WebhookParameter(name="Authorization", required=True), + WebhookParameter(name="Content-Type", required=False), + ], + ) + + variable_pool = VariablePool( + system_variables=SystemVariable.empty(), + user_inputs={ + "webhook_data": { + "headers": { + "Authorization": "Bearer token123", + "content-type": "application/json", # Different case + "X-Custom": "custom-value", + }, + "query_params": {}, + "body": {}, + "files": {}, + } + }, + ) + + node = create_webhook_node(data, variable_pool) + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["Authorization"] == "Bearer token123" + assert result.outputs["Content-Type"] == "application/json" # Case-insensitive match + assert "_webhook_raw" in result.outputs + + +def test_webhook_node_run_with_query_params(): + """Test webhook node execution with query parameter extraction.""" + data = WebhookData( + title="Test Webhook", + params=[ + WebhookParameter(name="page", required=True), + WebhookParameter(name="limit", required=False), + WebhookParameter(name="missing", required=False), + ], + ) + + variable_pool = VariablePool( + system_variables=SystemVariable.empty(), + user_inputs={ + "webhook_data": { + "headers": {}, + "query_params": { + "page": "1", + "limit": "10", + }, + "body": {}, + "files": {}, + } + }, + ) + + node = create_webhook_node(data, variable_pool) + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["page"] == "1" + assert result.outputs["limit"] == "10" + assert result.outputs["missing"] is None # Missing parameter should be None + + +def test_webhook_node_run_with_body_params(): + """Test webhook node execution with body parameter extraction.""" + data = WebhookData( + title="Test Webhook", + body=[ + WebhookBodyParameter(name="message", type="string", required=True), + WebhookBodyParameter(name="count", type="number", required=False), + WebhookBodyParameter(name="active", type="boolean", required=False), + WebhookBodyParameter(name="metadata", type="object", required=False), + ], + ) + + variable_pool = VariablePool( + system_variables=SystemVariable.empty(), + user_inputs={ + "webhook_data": { + "headers": {}, + "query_params": {}, + "body": { + "message": "Hello World", + "count": 42, + "active": True, + "metadata": {"key": "value"}, + }, + "files": {}, + } + }, + ) + + node = create_webhook_node(data, variable_pool) + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["message"] == "Hello World" + assert result.outputs["count"] == 42 + assert result.outputs["active"] is True + assert result.outputs["metadata"] == {"key": "value"} + + +def test_webhook_node_run_with_file_params(): + """Test webhook node execution with file parameter extraction.""" + # Create mock file objects + file1 = File( + tenant_id="1", + type=FileType.IMAGE, + transfer_method=FileTransferMethod.LOCAL_FILE, + related_id="file1", + filename="image.jpg", + mime_type="image/jpeg", + storage_key="", + ) + + file2 = File( + tenant_id="1", + type=FileType.DOCUMENT, + transfer_method=FileTransferMethod.LOCAL_FILE, + related_id="file2", + filename="document.pdf", + mime_type="application/pdf", + storage_key="", + ) + + data = WebhookData( + title="Test Webhook", + body=[ + WebhookBodyParameter(name="upload", type="file", required=True), + WebhookBodyParameter(name="document", type="file", required=False), + WebhookBodyParameter(name="missing_file", type="file", required=False), + ], + ) + + variable_pool = VariablePool( + system_variables=SystemVariable.empty(), + user_inputs={ + "webhook_data": { + "headers": {}, + "query_params": {}, + "body": {}, + "files": { + "upload": file1, + "document": file2, + }, + } + }, + ) + + node = create_webhook_node(data, variable_pool) + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["upload"] == file1 + assert result.outputs["document"] == file2 + assert result.outputs["missing_file"] is None + + +def test_webhook_node_run_mixed_parameters(): + """Test webhook node execution with mixed parameter types.""" + file_obj = File( + tenant_id="1", + type=FileType.IMAGE, + transfer_method=FileTransferMethod.LOCAL_FILE, + related_id="file1", + filename="test.jpg", + mime_type="image/jpeg", + storage_key="", + ) + + data = WebhookData( + title="Test Webhook", + headers=[WebhookParameter(name="Authorization", required=True)], + params=[WebhookParameter(name="version", required=False)], + body=[ + WebhookBodyParameter(name="message", type="string", required=True), + WebhookBodyParameter(name="upload", type="file", required=False), + ], + ) + + variable_pool = VariablePool( + system_variables=SystemVariable.empty(), + user_inputs={ + "webhook_data": { + "headers": {"Authorization": "Bearer token"}, + "query_params": {"version": "v1"}, + "body": {"message": "Test message"}, + "files": {"upload": file_obj}, + } + }, + ) + + node = create_webhook_node(data, variable_pool) + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["Authorization"] == "Bearer token" + assert result.outputs["version"] == "v1" + assert result.outputs["message"] == "Test message" + assert result.outputs["upload"] == file_obj + assert "_webhook_raw" in result.outputs + + +def test_webhook_node_run_empty_webhook_data(): + """Test webhook node execution with empty webhook data.""" + data = WebhookData( + title="Test Webhook", + headers=[WebhookParameter(name="Authorization", required=False)], + params=[WebhookParameter(name="page", required=False)], + body=[WebhookBodyParameter(name="message", type="string", required=False)], + ) + + variable_pool = VariablePool( + system_variables=SystemVariable.empty(), + user_inputs={}, # No webhook_data + ) + + node = create_webhook_node(data, variable_pool) + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["Authorization"] is None + assert result.outputs["page"] is None + assert result.outputs["message"] is None + assert result.outputs["_webhook_raw"] == {} + + +def test_webhook_node_run_case_insensitive_headers(): + """Test webhook node header extraction is case-insensitive.""" + data = WebhookData( + title="Test Webhook", + headers=[ + WebhookParameter(name="Content-Type", required=True), + WebhookParameter(name="X-API-KEY", required=True), + WebhookParameter(name="authorization", required=True), + ], + ) + + variable_pool = VariablePool( + system_variables=SystemVariable.empty(), + user_inputs={ + "webhook_data": { + "headers": { + "content-type": "application/json", # lowercase + "x-api-key": "key123", # lowercase + "Authorization": "Bearer token", # different case + }, + "query_params": {}, + "body": {}, + "files": {}, + } + }, + ) + + node = create_webhook_node(data, variable_pool) + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["Content-Type"] == "application/json" + assert result.outputs["X-API-KEY"] == "key123" + assert result.outputs["authorization"] == "Bearer token" + + +def test_webhook_node_variable_pool_user_inputs(): + """Test that webhook node uses user_inputs from variable pool correctly.""" + data = WebhookData(title="Test Webhook") + + # Add some additional variables to the pool + variable_pool = VariablePool( + system_variables=SystemVariable.empty(), + user_inputs={ + "webhook_data": {"headers": {}, "query_params": {}, "body": {}, "files": {}}, + "other_var": "should_be_included", + }, + ) + variable_pool.add(["node1", "extra"], StringVariable(name="extra", value="extra_value")) + + node = create_webhook_node(data, variable_pool) + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + # Check that all user_inputs are included in the inputs (they get converted to dict) + inputs_dict = dict(result.inputs) + assert "webhook_data" in inputs_dict + assert "other_var" in inputs_dict + assert inputs_dict["other_var"] == "should_be_included" + + +@pytest.mark.parametrize( + "method", + [Method.GET, Method.POST, Method.PUT, Method.DELETE, Method.PATCH, Method.HEAD], +) +def test_webhook_node_different_methods(method): + """Test webhook node with different HTTP methods.""" + data = WebhookData( + title="Test Webhook", + method=method, + ) + + variable_pool = VariablePool( + system_variables=SystemVariable.empty(), + user_inputs={ + "webhook_data": { + "headers": {}, + "query_params": {}, + "body": {}, + "files": {}, + } + }, + ) + + node = create_webhook_node(data, variable_pool) + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert node._node_data.method == method + + +def test_webhook_data_alias_content_type(): + """Test that content-type field alias works correctly.""" + # Test both ways of setting content_type + data1 = WebhookData(title="Test", **{"content-type": "application/json"}) + assert data1.content_type == ContentType.JSON + + data2 = WebhookData(title="Test", **{"content-type": ContentType.FORM_DATA}) + assert data2.content_type == ContentType.FORM_DATA + + +def test_webhook_parameter_models(): + """Test webhook parameter model validation.""" + # Test WebhookParameter + param = WebhookParameter(name="test_param", required=True) + assert param.name == "test_param" + assert param.required is True + + param_default = WebhookParameter(name="test_param") + assert param_default.required is False + + # Test WebhookBodyParameter + body_param = WebhookBodyParameter(name="test_body", type="string", required=True) + assert body_param.name == "test_body" + assert body_param.type == "string" + assert body_param.required is True + + body_param_default = WebhookBodyParameter(name="test_body") + assert body_param_default.type == "string" # Default type + assert body_param_default.required is False + + +def test_webhook_data_field_defaults(): + """Test webhook data model field defaults.""" + data = WebhookData(title="Minimal Webhook") + + assert data.method == Method.GET + assert data.content_type == ContentType.JSON + assert data.headers == [] + assert data.params == [] + assert data.body == [] + assert data.status_code == 200 + assert data.response_body == "" + assert data.webhook_id is None + assert data.timeout == 30 diff --git a/api/tests/unit_tests/services/test_webhook_service.py b/api/tests/unit_tests/services/test_webhook_service.py new file mode 100644 index 0000000000..8b58e5d76f --- /dev/null +++ b/api/tests/unit_tests/services/test_webhook_service.py @@ -0,0 +1,368 @@ +from io import BytesIO +from unittest.mock import MagicMock, patch + +from flask import Flask +from werkzeug.datastructures import FileStorage + +from services.webhook_service import WebhookService + + +class TestWebhookServiceUnit: + """Unit tests for WebhookService focusing on business logic without database dependencies.""" + + def test_extract_webhook_data_json(self): + """Test webhook data extraction from JSON request.""" + app = Flask(__name__) + + with app.test_request_context( + "/webhook", + method="POST", + headers={"Content-Type": "application/json", "Authorization": "Bearer token"}, + query_string="version=1&format=json", + json={"message": "hello", "count": 42}, + ): + webhook_trigger = MagicMock() + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + + assert webhook_data["method"] == "POST" + assert webhook_data["headers"]["Authorization"] == "Bearer token" + assert webhook_data["query_params"]["version"] == "1" + assert webhook_data["query_params"]["format"] == "json" + assert webhook_data["body"]["message"] == "hello" + assert webhook_data["body"]["count"] == 42 + assert webhook_data["files"] == {} + + def test_extract_webhook_data_form_urlencoded(self): + """Test webhook data extraction from form URL encoded request.""" + app = Flask(__name__) + + with app.test_request_context( + "/webhook", + method="POST", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={"username": "test", "password": "secret"}, + ): + webhook_trigger = MagicMock() + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + + assert webhook_data["method"] == "POST" + assert webhook_data["body"]["username"] == "test" + assert webhook_data["body"]["password"] == "secret" + + def test_extract_webhook_data_multipart_with_files(self): + """Test webhook data extraction from multipart form with files.""" + app = Flask(__name__) + + # Create a mock file + file_content = b"test file content" + file_storage = FileStorage(stream=BytesIO(file_content), filename="test.txt", content_type="text/plain") + + with app.test_request_context( + "/webhook", + method="POST", + headers={"Content-Type": "multipart/form-data"}, + data={"message": "test", "upload": file_storage}, + ): + webhook_trigger = MagicMock() + webhook_trigger.tenant_id = "test_tenant" + + with patch.object(WebhookService, "_process_file_uploads") as mock_process_files: + mock_process_files.return_value = {"upload": "mocked_file_obj"} + + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + + assert webhook_data["method"] == "POST" + assert webhook_data["body"]["message"] == "test" + assert webhook_data["files"]["upload"] == "mocked_file_obj" + mock_process_files.assert_called_once() + + def test_extract_webhook_data_raw_text(self): + """Test webhook data extraction from raw text request.""" + app = Flask(__name__) + + with app.test_request_context( + "/webhook", method="POST", headers={"Content-Type": "text/plain"}, data="raw text content" + ): + webhook_trigger = MagicMock() + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + + assert webhook_data["method"] == "POST" + assert webhook_data["body"]["raw"] == "raw text content" + + def test_extract_webhook_data_invalid_json(self): + """Test webhook data extraction with invalid JSON.""" + app = Flask(__name__) + + with app.test_request_context( + "/webhook", method="POST", headers={"Content-Type": "application/json"}, data="invalid json" + ): + webhook_trigger = MagicMock() + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + + assert webhook_data["method"] == "POST" + assert webhook_data["body"] == {} # Should default to empty dict + + def test_validate_webhook_request_success(self): + """Test successful webhook request validation.""" + webhook_data = { + "method": "POST", + "headers": {"Authorization": "Bearer token", "Content-Type": "application/json"}, + "query_params": {"version": "1"}, + "body": {"message": "hello"}, + "files": {}, + } + + node_config = { + "data": { + "method": "post", + "headers": [{"name": "Authorization", "required": True}, {"name": "Content-Type", "required": False}], + "params": [{"name": "version", "required": True}], + "body": [{"name": "message", "type": "string", "required": True}], + } + } + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is True + + def test_validate_webhook_request_method_mismatch(self): + """Test webhook validation with HTTP method mismatch.""" + webhook_data = {"method": "GET", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + node_config = {"data": {"method": "post"}} + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "HTTP method mismatch" in result["error"] + assert "Expected POST, got GET" in result["error"] + + def test_validate_webhook_request_missing_required_header(self): + """Test webhook validation with missing required header.""" + webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + node_config = {"data": {"method": "post", "headers": [{"name": "Authorization", "required": True}]}} + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "Required header missing: Authorization" in result["error"] + + def test_validate_webhook_request_case_insensitive_headers(self): + """Test webhook validation with case-insensitive header matching.""" + webhook_data = { + "method": "POST", + "headers": {"authorization": "Bearer token"}, # lowercase + "query_params": {}, + "body": {}, + "files": {}, + } + + node_config = { + "data": { + "method": "post", + "headers": [ + {"name": "Authorization", "required": True} # Pascal case + ], + } + } + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is True + + def test_validate_webhook_request_missing_required_param(self): + """Test webhook validation with missing required query parameter.""" + webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + node_config = {"data": {"method": "post", "params": [{"name": "version", "required": True}]}} + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "Required query parameter missing: version" in result["error"] + + def test_validate_webhook_request_missing_required_body_param(self): + """Test webhook validation with missing required body parameter.""" + webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + node_config = {"data": {"method": "post", "body": [{"name": "message", "type": "string", "required": True}]}} + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "Required body parameter missing: message" in result["error"] + + def test_validate_webhook_request_missing_required_file(self): + """Test webhook validation with missing required file parameter.""" + webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + node_config = {"data": {"method": "post", "body": [{"name": "upload", "type": "file", "required": True}]}} + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "Required file parameter missing: upload" in result["error"] + + def test_validate_webhook_request_validation_exception(self): + """Test webhook validation with exception handling.""" + webhook_data = {"method": "POST", "headers": {}, "query_params": {}, "body": {}, "files": {}} + + # Invalid node config that will cause an exception + node_config = None + + result = WebhookService.validate_webhook_request(webhook_data, node_config) + + assert result["valid"] is False + assert "Validation failed:" in result["error"] + + def test_generate_webhook_response_default(self): + """Test webhook response generation with default values.""" + node_config = {"data": {}} + + response_data, status_code = WebhookService.generate_webhook_response(node_config) + + assert status_code == 200 + assert response_data["status"] == "success" + assert "Webhook processed successfully" in response_data["message"] + + def test_generate_webhook_response_custom_json(self): + """Test webhook response generation with custom JSON response.""" + node_config = {"data": {"status_code": 201, "response_body": '{"result": "created", "id": 123}'}} + + response_data, status_code = WebhookService.generate_webhook_response(node_config) + + assert status_code == 201 + assert response_data["result"] == "created" + assert response_data["id"] == 123 + + def test_generate_webhook_response_custom_text(self): + """Test webhook response generation with custom text response.""" + node_config = {"data": {"status_code": 202, "response_body": "Request accepted for processing"}} + + response_data, status_code = WebhookService.generate_webhook_response(node_config) + + assert status_code == 202 + assert response_data["message"] == "Request accepted for processing" + + def test_generate_webhook_response_invalid_json(self): + """Test webhook response generation with invalid JSON response.""" + node_config = {"data": {"status_code": 400, "response_body": '{"invalid": json}'}} + + response_data, status_code = WebhookService.generate_webhook_response(node_config) + + assert status_code == 400 + assert response_data["message"] == '{"invalid": json}' + + def test_generate_webhook_response_empty_response_body(self): + """Test webhook response generation with empty response body.""" + node_config = {"data": {"status_code": 204, "response_body": ""}} + + response_data, status_code = WebhookService.generate_webhook_response(node_config) + + assert status_code == 204 + assert response_data["status"] == "success" + assert "Webhook processed successfully" in response_data["message"] + + def test_generate_webhook_response_array_json(self): + """Test webhook response generation with JSON array response.""" + node_config = {"data": {"status_code": 200, "response_body": '[{"id": 1}, {"id": 2}]'}} + + response_data, status_code = WebhookService.generate_webhook_response(node_config) + + assert status_code == 200 + assert isinstance(response_data, list) + assert len(response_data) == 2 + assert response_data[0]["id"] == 1 + assert response_data[1]["id"] == 2 + + @patch("services.webhook_service.ToolFileManager") + @patch("services.webhook_service.file_factory") + def test_process_file_uploads_success(self, mock_file_factory, mock_tool_file_manager): + """Test successful file upload processing.""" + # Mock ToolFileManager + mock_tool_file_instance = MagicMock() + mock_tool_file_manager.return_value = mock_tool_file_instance + + # Mock file creation + mock_tool_file = MagicMock() + mock_tool_file.id = "test_file_id" + mock_tool_file_instance.create_file_by_raw.return_value = mock_tool_file + + # Mock file factory + mock_file_obj = MagicMock() + mock_file_factory.build_from_mapping.return_value = mock_file_obj + + # Create mock files + files = { + "file1": MagicMock(filename="test1.txt", content_type="text/plain"), + "file2": MagicMock(filename="test2.jpg", content_type="image/jpeg"), + } + + # Mock file reads + files["file1"].read.return_value = b"content1" + files["file2"].read.return_value = b"content2" + + webhook_trigger = MagicMock() + webhook_trigger.tenant_id = "test_tenant" + + result = WebhookService._process_file_uploads(files, webhook_trigger) + + assert len(result) == 2 + assert "file1" in result + assert "file2" in result + + # Verify file processing was called for each file + assert mock_tool_file_manager.call_count == 2 + assert mock_file_factory.build_from_mapping.call_count == 2 + + @patch("services.webhook_service.ToolFileManager") + @patch("services.webhook_service.file_factory") + def test_process_file_uploads_with_errors(self, mock_file_factory, mock_tool_file_manager): + """Test file upload processing with errors.""" + # Mock ToolFileManager + mock_tool_file_instance = MagicMock() + mock_tool_file_manager.return_value = mock_tool_file_instance + + # Mock file creation + mock_tool_file = MagicMock() + mock_tool_file.id = "test_file_id" + mock_tool_file_instance.create_file_by_raw.return_value = mock_tool_file + + # Mock file factory + mock_file_obj = MagicMock() + mock_file_factory.build_from_mapping.return_value = mock_file_obj + + # Create mock files, one will fail + files = { + "good_file": MagicMock(filename="test.txt", content_type="text/plain"), + "bad_file": MagicMock(filename="test.bad", content_type="text/plain"), + } + + files["good_file"].read.return_value = b"content" + files["bad_file"].read.side_effect = Exception("Read error") + + webhook_trigger = MagicMock() + webhook_trigger.tenant_id = "test_tenant" + + result = WebhookService._process_file_uploads(files, webhook_trigger) + + # Should process the good file and skip the bad one + assert len(result) == 1 + assert "good_file" in result + assert "bad_file" not in result + + def test_process_file_uploads_empty_filename(self): + """Test file upload processing with empty filename.""" + files = { + "no_filename": MagicMock(filename="", content_type="text/plain"), + "none_filename": MagicMock(filename=None, content_type="text/plain"), + } + + webhook_trigger = MagicMock() + webhook_trigger.tenant_id = "test_tenant" + + result = WebhookService._process_file_uploads(files, webhook_trigger) + + # Should skip files without filenames + assert len(result) == 0