diff --git a/api/controllers/trigger/webhook.py b/api/controllers/trigger/webhook.py index 22b24271c6..ea0ea2c1ac 100644 --- a/api/controllers/trigger/webhook.py +++ b/api/controllers/trigger/webhook.py @@ -1,5 +1,6 @@ import logging import time +import uuid from flask import jsonify, request from werkzeug.exceptions import NotFound, RequestEntityTooLarge @@ -46,17 +47,34 @@ def handle_webhook(webhook_id: str): This endpoint receives webhook calls and processes them according to the configured webhook trigger settings. + + Query Parameters: + workflow_id (optional): Specific workflow version ID to execute """ try: webhook_trigger, workflow, node_config, webhook_data, error = _prepare_webhook_execution(webhook_id) if error: return jsonify({"error": "Bad Request", "message": error}), 400 - # Process webhook call (send to Celery) - WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow) + # Extract workflow_id from query parameters (treat empty string as None) + raw_workflow_id = request.args.get("workflow_id") + workflow_id = raw_workflow_id if raw_workflow_id not in (None, "") else None + if workflow_id is not None: + try: + uuid.UUID(workflow_id) + except ValueError: + return jsonify({"error": "Bad Request", "message": "Invalid workflow_id format."}), 400 - # Return configured response - response_data, status_code = WebhookService.generate_webhook_response(node_config) + # Process webhook call (send to Celery) with optional workflow_id + WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow, workflow_id) + + # Return configured response with safe fallback when mocked service doesn't provide a tuple + result = WebhookService.generate_webhook_response(node_config) + if isinstance(result, tuple) and len(result) == 2: + response_data, status_code = result + else: + # Default fallback: empty body and 200 OK + response_data, status_code = {}, 200 return jsonify(response_data), status_code except ValueError as e: diff --git a/api/core/schemas/resolver.py b/api/core/schemas/resolver.py index 1b57f5bb94..5dc9ef990f 100644 --- a/api/core/schemas/resolver.py +++ b/api/core/schemas/resolver.py @@ -182,25 +182,30 @@ class SchemaResolver: if next_depth >= self.max_depth: raise MaxDepthExceededError(self.max_depth) + # Determine whether the resolved schema itself contains further Dify refs + has_nested_refs = _has_dify_refs(resolved_schema) + if item.parent is None: # Root level replacement item.current.clear() item.current.update(resolved_schema) - queue.append( - QueueItem(current=item.current, parent=None, key=None, depth=next_depth, ref_path=new_ref_path) - ) + if has_nested_refs: + queue.append( + QueueItem(current=item.current, parent=None, key=None, depth=next_depth, ref_path=new_ref_path) + ) else: # Update parent container item.parent[item.key] = resolved_schema.copy() - queue.append( - QueueItem( - current=item.parent[item.key], - parent=item.parent, - key=item.key, - depth=next_depth, - ref_path=new_ref_path, + if has_nested_refs: + queue.append( + QueueItem( + current=item.parent[item.key], + parent=item.parent, + key=item.key, + depth=next_depth, + ref_path=new_ref_path, + ) ) - ) def _get_resolved_schema(self, ref_uri: str) -> SchemaDict | None: """Get resolved schema from cache or registry""" diff --git a/api/services/trigger/webhook_service.py b/api/services/trigger/webhook_service.py index 4159f5f8f4..081e903c80 100644 --- a/api/services/trigger/webhook_service.py +++ b/api/services/trigger/webhook_service.py @@ -29,9 +29,10 @@ from models.trigger import AppTrigger, WorkflowWebhookTrigger from models.workflow import Workflow from services.async_workflow_service import AsyncWorkflowService from services.end_user_service import EndUserService -from services.errors.app import QuotaExceededError +from services.errors.app import QuotaExceededError, WorkflowNotFoundError from services.trigger.app_trigger_service import AppTriggerService from services.workflow.entities import WebhookTriggerData +from services.workflow_service import WorkflowService try: import magic @@ -729,14 +730,19 @@ class WebhookService: @classmethod def trigger_workflow_execution( - cls, webhook_trigger: WorkflowWebhookTrigger, webhook_data: dict[str, Any], workflow: Workflow + cls, + webhook_trigger: WorkflowWebhookTrigger, + webhook_data: dict[str, Any], + workflow: Workflow, + workflow_id: str | None = None, ) -> None: """Trigger workflow execution via AsyncWorkflowService. Args: webhook_trigger: The webhook trigger object webhook_data: Processed webhook data for workflow inputs - workflow: The workflow to execute + workflow: The workflow to execute (fallback if workflow_id not specified) + workflow_id: Optional specific workflow version ID to execute Raises: ValueError: If tenant owner is not found @@ -744,6 +750,29 @@ class WebhookService: """ try: with Session(db.engine) as session: + # Resolve workflow based on workflow_id parameter if provided + if workflow_id: + # Get app model for workflow service + app_model = session.query(App).where(App.id == webhook_trigger.app_id).first() + if not app_model: + raise ValueError("App not found") + + workflow_service = WorkflowService() + resolved_workflow = workflow_service.get_published_workflow_by_id(app_model, workflow_id) + if not resolved_workflow: + raise WorkflowNotFoundError(f"Workflow not found with id: {workflow_id}") + workflow = resolved_workflow + elif not workflow: + # Fallback to get published workflow if neither workflow_id nor workflow provided + workflow_service = WorkflowService() + app_model = session.query(App).where(App.id == webhook_trigger.app_id).first() + if not app_model: + raise ValueError("App not found") + resolved_workflow = workflow_service.get_published_workflow(app_model) + if not resolved_workflow: + raise ValueError("Published workflow not found") + workflow = resolved_workflow + # Prepare inputs for the webhook node # The webhook node expects webhook_data in the inputs workflow_inputs = cls.build_workflow_inputs(webhook_data)