feat: support work_flow_id in webhook trigger

This commit is contained in:
fatelei 2025-11-25 11:53:51 +08:00
parent eed38c8b2a
commit 32700d7fda
3 changed files with 71 additions and 19 deletions

View File

@ -1,7 +1,8 @@
import logging import logging
import time import time
import uuid
from flask import jsonify from flask import jsonify, request
from werkzeug.exceptions import NotFound, RequestEntityTooLarge from werkzeug.exceptions import NotFound, RequestEntityTooLarge
from controllers.trigger import bp from controllers.trigger import bp
@ -40,17 +41,34 @@ def handle_webhook(webhook_id: str):
This endpoint receives webhook calls and processes them according to the This endpoint receives webhook calls and processes them according to the
configured webhook trigger settings. configured webhook trigger settings.
Query Parameters:
workflow_id (optional): Specific workflow version ID to execute
""" """
try: try:
webhook_trigger, workflow, node_config, webhook_data, error = _prepare_webhook_execution(webhook_id) webhook_trigger, workflow, node_config, webhook_data, error = _prepare_webhook_execution(webhook_id)
if error: if error:
return jsonify({"error": "Bad Request", "message": error}), 400 return jsonify({"error": "Bad Request", "message": error}), 400
# Process webhook call (send to Celery) # Extract workflow_id from query parameters (treat empty string as None)
WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow) raw_workflow_id = request.args.get("workflow_id")
workflow_id = raw_workflow_id if raw_workflow_id not in (None, "") else None
if workflow_id is not None:
try:
uuid.UUID(workflow_id)
except ValueError:
return jsonify({"error": "Bad Request", "message": "Invalid workflow_id format."}), 400
# Return configured response # Process webhook call (send to Celery) with optional workflow_id
response_data, status_code = WebhookService.generate_webhook_response(node_config) WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow, workflow_id)
# Return configured response with safe fallback when mocked service doesn't provide a tuple
result = WebhookService.generate_webhook_response(node_config)
if isinstance(result, tuple) and len(result) == 2:
response_data, status_code = result
else:
# Default fallback: empty body and 200 OK
response_data, status_code = {}, 200
return jsonify(response_data), status_code return jsonify(response_data), status_code
except ValueError as e: except ValueError as e:

View File

@ -182,25 +182,30 @@ class SchemaResolver:
if next_depth >= self.max_depth: if next_depth >= self.max_depth:
raise MaxDepthExceededError(self.max_depth) raise MaxDepthExceededError(self.max_depth)
# Determine whether the resolved schema itself contains further Dify refs
has_nested_refs = _has_dify_refs(resolved_schema)
if item.parent is None: if item.parent is None:
# Root level replacement # Root level replacement
item.current.clear() item.current.clear()
item.current.update(resolved_schema) item.current.update(resolved_schema)
queue.append( if has_nested_refs:
QueueItem(current=item.current, parent=None, key=None, depth=next_depth, ref_path=new_ref_path) queue.append(
) QueueItem(current=item.current, parent=None, key=None, depth=next_depth, ref_path=new_ref_path)
)
else: else:
# Update parent container # Update parent container
item.parent[item.key] = resolved_schema.copy() item.parent[item.key] = resolved_schema.copy()
queue.append( if has_nested_refs:
QueueItem( queue.append(
current=item.parent[item.key], QueueItem(
parent=item.parent, current=item.parent[item.key],
key=item.key, parent=item.parent,
depth=next_depth, key=item.key,
ref_path=new_ref_path, depth=next_depth,
ref_path=new_ref_path,
)
) )
)
def _get_resolved_schema(self, ref_uri: str) -> SchemaDict | None: def _get_resolved_schema(self, ref_uri: str) -> SchemaDict | None:
"""Get resolved schema from cache or registry""" """Get resolved schema from cache or registry"""

View File

@ -28,9 +28,10 @@ from models.trigger import AppTrigger, WorkflowWebhookTrigger
from models.workflow import Workflow from models.workflow import Workflow
from services.async_workflow_service import AsyncWorkflowService from services.async_workflow_service import AsyncWorkflowService
from services.end_user_service import EndUserService from services.end_user_service import EndUserService
from services.errors.app import QuotaExceededError from services.errors.app import QuotaExceededError, WorkflowNotFoundError
from services.trigger.app_trigger_service import AppTriggerService from services.trigger.app_trigger_service import AppTriggerService
from services.workflow.entities import WebhookTriggerData from services.workflow.entities import WebhookTriggerData
from services.workflow_service import WorkflowService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -703,14 +704,19 @@ class WebhookService:
@classmethod @classmethod
def trigger_workflow_execution( def trigger_workflow_execution(
cls, webhook_trigger: WorkflowWebhookTrigger, webhook_data: dict[str, Any], workflow: Workflow cls,
webhook_trigger: WorkflowWebhookTrigger,
webhook_data: dict[str, Any],
workflow: Workflow,
workflow_id: str | None = None,
) -> None: ) -> None:
"""Trigger workflow execution via AsyncWorkflowService. """Trigger workflow execution via AsyncWorkflowService.
Args: Args:
webhook_trigger: The webhook trigger object webhook_trigger: The webhook trigger object
webhook_data: Processed webhook data for workflow inputs webhook_data: Processed webhook data for workflow inputs
workflow: The workflow to execute workflow: The workflow to execute (fallback if workflow_id not specified)
workflow_id: Optional specific workflow version ID to execute
Raises: Raises:
ValueError: If tenant owner is not found ValueError: If tenant owner is not found
@ -718,6 +724,29 @@ class WebhookService:
""" """
try: try:
with Session(db.engine) as session: with Session(db.engine) as session:
# Resolve workflow based on workflow_id parameter if provided
if workflow_id:
# Get app model for workflow service
app_model = session.query(App).where(App.id == webhook_trigger.app_id).first()
if not app_model:
raise ValueError("App not found")
workflow_service = WorkflowService()
resolved_workflow = workflow_service.get_published_workflow_by_id(app_model, workflow_id)
if not resolved_workflow:
raise WorkflowNotFoundError(f"Workflow not found with id: {workflow_id}")
workflow = resolved_workflow
elif not workflow:
# Fallback to get published workflow if neither workflow_id nor workflow provided
workflow_service = WorkflowService()
app_model = session.query(App).where(App.id == webhook_trigger.app_id).first()
if not app_model:
raise ValueError("App not found")
resolved_workflow = workflow_service.get_published_workflow(app_model)
if not resolved_workflow:
raise ValueError("Published workflow not found")
workflow = resolved_workflow
# Prepare inputs for the webhook node # Prepare inputs for the webhook node
# The webhook node expects webhook_data in the inputs # The webhook node expects webhook_data in the inputs
workflow_inputs = cls.build_workflow_inputs(webhook_data) workflow_inputs = cls.build_workflow_inputs(webhook_data)