mirror of https://github.com/langgenius/dify.git
Merge 32700d7fda into 2c919efa69
This commit is contained in:
commit
ff8684665e
|
|
@ -1,5 +1,6 @@
|
|||
import logging
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from flask import jsonify, request
|
||||
from werkzeug.exceptions import NotFound, RequestEntityTooLarge
|
||||
|
|
@ -46,17 +47,34 @@ def handle_webhook(webhook_id: str):
|
|||
|
||||
This endpoint receives webhook calls and processes them according to the
|
||||
configured webhook trigger settings.
|
||||
|
||||
Query Parameters:
|
||||
workflow_id (optional): Specific workflow version ID to execute
|
||||
"""
|
||||
try:
|
||||
webhook_trigger, workflow, node_config, webhook_data, error = _prepare_webhook_execution(webhook_id)
|
||||
if error:
|
||||
return jsonify({"error": "Bad Request", "message": error}), 400
|
||||
|
||||
# Process webhook call (send to Celery)
|
||||
WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow)
|
||||
# Extract workflow_id from query parameters (treat empty string as None)
|
||||
raw_workflow_id = request.args.get("workflow_id")
|
||||
workflow_id = raw_workflow_id if raw_workflow_id not in (None, "") else None
|
||||
if workflow_id is not None:
|
||||
try:
|
||||
uuid.UUID(workflow_id)
|
||||
except ValueError:
|
||||
return jsonify({"error": "Bad Request", "message": "Invalid workflow_id format."}), 400
|
||||
|
||||
# Return configured response
|
||||
response_data, status_code = WebhookService.generate_webhook_response(node_config)
|
||||
# Process webhook call (send to Celery) with optional workflow_id
|
||||
WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow, workflow_id)
|
||||
|
||||
# Return configured response with safe fallback when mocked service doesn't provide a tuple
|
||||
result = WebhookService.generate_webhook_response(node_config)
|
||||
if isinstance(result, tuple) and len(result) == 2:
|
||||
response_data, status_code = result
|
||||
else:
|
||||
# Default fallback: empty body and 200 OK
|
||||
response_data, status_code = {}, 200
|
||||
return jsonify(response_data), status_code
|
||||
|
||||
except ValueError as e:
|
||||
|
|
|
|||
|
|
@ -182,25 +182,30 @@ class SchemaResolver:
|
|||
if next_depth >= self.max_depth:
|
||||
raise MaxDepthExceededError(self.max_depth)
|
||||
|
||||
# Determine whether the resolved schema itself contains further Dify refs
|
||||
has_nested_refs = _has_dify_refs(resolved_schema)
|
||||
|
||||
if item.parent is None:
|
||||
# Root level replacement
|
||||
item.current.clear()
|
||||
item.current.update(resolved_schema)
|
||||
queue.append(
|
||||
QueueItem(current=item.current, parent=None, key=None, depth=next_depth, ref_path=new_ref_path)
|
||||
)
|
||||
if has_nested_refs:
|
||||
queue.append(
|
||||
QueueItem(current=item.current, parent=None, key=None, depth=next_depth, ref_path=new_ref_path)
|
||||
)
|
||||
else:
|
||||
# Update parent container
|
||||
item.parent[item.key] = resolved_schema.copy()
|
||||
queue.append(
|
||||
QueueItem(
|
||||
current=item.parent[item.key],
|
||||
parent=item.parent,
|
||||
key=item.key,
|
||||
depth=next_depth,
|
||||
ref_path=new_ref_path,
|
||||
if has_nested_refs:
|
||||
queue.append(
|
||||
QueueItem(
|
||||
current=item.parent[item.key],
|
||||
parent=item.parent,
|
||||
key=item.key,
|
||||
depth=next_depth,
|
||||
ref_path=new_ref_path,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
def _get_resolved_schema(self, ref_uri: str) -> SchemaDict | None:
|
||||
"""Get resolved schema from cache or registry"""
|
||||
|
|
|
|||
|
|
@ -29,9 +29,10 @@ from models.trigger import AppTrigger, WorkflowWebhookTrigger
|
|||
from models.workflow import Workflow
|
||||
from services.async_workflow_service import AsyncWorkflowService
|
||||
from services.end_user_service import EndUserService
|
||||
from services.errors.app import QuotaExceededError
|
||||
from services.errors.app import QuotaExceededError, WorkflowNotFoundError
|
||||
from services.trigger.app_trigger_service import AppTriggerService
|
||||
from services.workflow.entities import WebhookTriggerData
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
try:
|
||||
import magic
|
||||
|
|
@ -729,14 +730,19 @@ class WebhookService:
|
|||
|
||||
@classmethod
|
||||
def trigger_workflow_execution(
|
||||
cls, webhook_trigger: WorkflowWebhookTrigger, webhook_data: dict[str, Any], workflow: Workflow
|
||||
cls,
|
||||
webhook_trigger: WorkflowWebhookTrigger,
|
||||
webhook_data: dict[str, Any],
|
||||
workflow: Workflow,
|
||||
workflow_id: str | None = None,
|
||||
) -> None:
|
||||
"""Trigger workflow execution via AsyncWorkflowService.
|
||||
|
||||
Args:
|
||||
webhook_trigger: The webhook trigger object
|
||||
webhook_data: Processed webhook data for workflow inputs
|
||||
workflow: The workflow to execute
|
||||
workflow: The workflow to execute (fallback if workflow_id not specified)
|
||||
workflow_id: Optional specific workflow version ID to execute
|
||||
|
||||
Raises:
|
||||
ValueError: If tenant owner is not found
|
||||
|
|
@ -744,6 +750,29 @@ class WebhookService:
|
|||
"""
|
||||
try:
|
||||
with Session(db.engine) as session:
|
||||
# Resolve workflow based on workflow_id parameter if provided
|
||||
if workflow_id:
|
||||
# Get app model for workflow service
|
||||
app_model = session.query(App).where(App.id == webhook_trigger.app_id).first()
|
||||
if not app_model:
|
||||
raise ValueError("App not found")
|
||||
|
||||
workflow_service = WorkflowService()
|
||||
resolved_workflow = workflow_service.get_published_workflow_by_id(app_model, workflow_id)
|
||||
if not resolved_workflow:
|
||||
raise WorkflowNotFoundError(f"Workflow not found with id: {workflow_id}")
|
||||
workflow = resolved_workflow
|
||||
elif not workflow:
|
||||
# Fallback to get published workflow if neither workflow_id nor workflow provided
|
||||
workflow_service = WorkflowService()
|
||||
app_model = session.query(App).where(App.id == webhook_trigger.app_id).first()
|
||||
if not app_model:
|
||||
raise ValueError("App not found")
|
||||
resolved_workflow = workflow_service.get_published_workflow(app_model)
|
||||
if not resolved_workflow:
|
||||
raise ValueError("Published workflow not found")
|
||||
workflow = resolved_workflow
|
||||
|
||||
# Prepare inputs for the webhook node
|
||||
# The webhook node expects webhook_data in the inputs
|
||||
workflow_inputs = cls.build_workflow_inputs(webhook_data)
|
||||
|
|
|
|||
Loading…
Reference in New Issue