diff --git a/api/controllers/console/app/workflow_trigger.py b/api/controllers/console/app/workflow_trigger.py index 5afc39f63a..c303e2defa 100644 --- a/api/controllers/console/app/workflow_trigger.py +++ b/api/controllers/console/app/workflow_trigger.py @@ -1,10 +1,9 @@ import logging -import secrets from flask_restx import Resource, marshal_with, reqparse from sqlalchemy import select from sqlalchemy.orm import Session -from werkzeug.exceptions import BadRequest, Forbidden, NotFound +from werkzeug.exceptions import Forbidden, NotFound from configs import dify_config from controllers.console import api @@ -126,127 +125,34 @@ class WebhookTriggerApi(Resource): @account_initialization_required @get_app_model(mode=AppMode.WORKFLOW) @marshal_with(webhook_trigger_fields) - def post(self, app_model): - """Create webhook trigger""" + def get(self, app_model): + """Get webhook trigger for a node""" parser = reqparse.RequestParser() parser.add_argument("node_id", type=str, required=True, help="Node ID is required") - parser.add_argument( - "triggered_by", - type=str, - required=False, - default="production", - choices=["debugger", "production"], - help="triggered_by must be debugger or production", - ) args = parser.parse_args() - assert isinstance(current_user, Account) - assert current_user.current_tenant_id is not None - if not current_user.is_editor: - raise Forbidden() - node_id = args["node_id"] - triggered_by = args["triggered_by"] with Session(db.engine) as session: - # Check if webhook trigger already exists for this app, node, and environment - existing_trigger = ( - session.query(WorkflowWebhookTrigger) - .filter( - WorkflowWebhookTrigger.app_id == app_model.id, - WorkflowWebhookTrigger.node_id == node_id, - WorkflowWebhookTrigger.triggered_by == triggered_by, - ) - .first() - ) - - if existing_trigger: - raise BadRequest("Webhook trigger already exists for this node and environment") - - # Generate unique webhook_id - webhook_id = self._generate_webhook_id(session) - - # Create new webhook trigger - webhook_trigger = WorkflowWebhookTrigger( - app_id=app_model.id, - node_id=node_id, - tenant_id=current_user.current_tenant_id, - webhook_id=webhook_id, - triggered_by=triggered_by, - created_by=current_user.id, - ) - - session.add(webhook_trigger) - session.commit() - session.refresh(webhook_trigger) - - # Add computed fields for marshal_with - base_url = dify_config.SERVICE_API_URL - webhook_trigger.webhook_url = f"{base_url}/triggers/webhook/{webhook_trigger.webhook_id}" - webhook_trigger.webhook_debug_url = f"{base_url}/triggers/webhook-debug/{webhook_trigger.webhook_id}" - - return webhook_trigger - - @setup_required - @login_required - @account_initialization_required - @get_app_model(mode=AppMode.WORKFLOW) - def delete(self, app_model): - """Delete webhook trigger""" - parser = reqparse.RequestParser() - parser.add_argument("node_id", type=str, required=True, help="Node ID is required") - parser.add_argument( - "triggered_by", - type=str, - required=False, - default="production", - choices=["debugger", "production"], - help="triggered_by must be debugger or production", - ) - args = parser.parse_args() - - assert isinstance(current_user, Account) - assert current_user.current_tenant_id is not None - if not current_user.is_editor: - raise Forbidden() - - node_id = args["node_id"] - triggered_by = args["triggered_by"] - - with Session(db.engine) as session: - # Find webhook trigger + # Get webhook trigger for this app and node webhook_trigger = ( session.query(WorkflowWebhookTrigger) .filter( WorkflowWebhookTrigger.app_id == app_model.id, WorkflowWebhookTrigger.node_id == node_id, - WorkflowWebhookTrigger.triggered_by == triggered_by, - WorkflowWebhookTrigger.tenant_id == current_user.current_tenant_id, ) .first() ) if not webhook_trigger: - raise NotFound("Webhook trigger not found") + raise NotFound("Webhook trigger not found for this node") - session.delete(webhook_trigger) - session.commit() + # Add computed fields for marshal_with + base_url = dify_config.SERVICE_API_URL + webhook_trigger.webhook_url = f"{base_url}/triggers/webhook/{webhook_trigger.webhook_id}" # type: ignore + webhook_trigger.webhook_debug_url = f"{base_url}/triggers/webhook-debug/{webhook_trigger.webhook_id}" # type: ignore - return {"result": "success"}, 204 - - def _generate_webhook_id(self, session: Session) -> str: - """Generate unique 24-character webhook ID""" - while True: - # Generate 24-character random string - webhook_id = secrets.token_urlsafe(18)[:24] # token_urlsafe gives base64url, take first 24 chars - - # Check if it already exists - existing = ( - session.query(WorkflowWebhookTrigger).filter(WorkflowWebhookTrigger.webhook_id == webhook_id).first() - ) - - if not existing: - return webhook_id + return webhook_trigger class AppTriggersApi(Resource): @@ -259,6 +165,9 @@ class AppTriggersApi(Resource): @marshal_with(triggers_list_fields) def get(self, app_model): """Get app triggers list""" + assert isinstance(current_user, Account) + assert current_user.current_tenant_id is not None + with Session(db.engine) as session: # Get all triggers for this app using select API triggers = ( @@ -278,9 +187,9 @@ class AppTriggersApi(Resource): url_prefix = dify_config.CONSOLE_API_URL + "/console/api/workspaces/current/tool-provider/builtin/" for trigger in triggers: if trigger.trigger_type == "trigger-plugin": - trigger.icon = url_prefix + trigger.provider_name + "/icon" + trigger.icon = url_prefix + trigger.provider_name + "/icon" # type: ignore else: - trigger.icon = "" + trigger.icon = "" # type: ignore return {"data": triggers} @@ -327,9 +236,9 @@ class AppTriggerEnableApi(Resource): # Add computed icon field url_prefix = dify_config.CONSOLE_API_URL + "/console/api/workspaces/current/tool-provider/builtin/" if trigger.trigger_type == "trigger-plugin": - trigger.icon = url_prefix + trigger.provider_name + "/icon" + trigger.icon = url_prefix + trigger.provider_name + "/icon" # type: ignore else: - trigger.icon = "" + trigger.icon = "" # type: ignore return trigger diff --git a/api/core/plugin/entities/request.py b/api/core/plugin/entities/request.py index d2bd4001c0..ac9c4ecc7f 100644 --- a/api/core/plugin/entities/request.py +++ b/api/core/plugin/entities/request.py @@ -240,9 +240,11 @@ class RequestFetchAppInfo(BaseModel): app_id: str + class Event(BaseModel): variables: Mapping[str, Any] + class TriggerInvokeResponse(BaseModel): event: Event diff --git a/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py b/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py index bc5211c6e3..37341401ce 100644 --- a/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py +++ b/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py @@ -67,7 +67,6 @@ class TriggerPluginNode(BaseNode): and makes them available to downstream nodes. """ - # Get trigger data passed when workflow was triggered trigger_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs) diff --git a/api/events/event_handlers/sync_webhook_when_app_created.py b/api/events/event_handlers/sync_webhook_when_app_created.py new file mode 100644 index 0000000000..f5e1d1e925 --- /dev/null +++ b/api/events/event_handlers/sync_webhook_when_app_created.py @@ -0,0 +1,19 @@ +from events.app_event import app_draft_workflow_was_synced +from models.model import App, AppMode +from models.workflow import Workflow +from services.webhook_service import WebhookService + + +@app_draft_workflow_was_synced.connect +def handle(sender, synced_draft_workflow: Workflow, **kwargs): + """ + While creating a workflow or updating a workflow, we may need to sync + its webhook relationships in DB. + """ + app: App = sender + if app.mode != AppMode.WORKFLOW.value: + # only handle workflow app, chatflow is not supported yet + return + + # sync webhook relationships in DB + WebhookService.sync_webhook_relationships(app, synced_draft_workflow) diff --git a/api/fields/workflow_trigger_fields.py b/api/fields/workflow_trigger_fields.py index 702d20b3ce..ce51d1833a 100644 --- a/api/fields/workflow_trigger_fields.py +++ b/api/fields/workflow_trigger_fields.py @@ -21,6 +21,5 @@ webhook_trigger_fields = { "webhook_url": fields.String, "webhook_debug_url": fields.String, "node_id": fields.String, - "triggered_by": fields.String, "created_at": fields.DateTime(dt_format="iso8601"), } diff --git a/api/migrations/versions/2025_08_23_2039-5871f634954d_add_workflow_webhook_table.py b/api/migrations/versions/2025_08_23_2039-5871f634954d_add_workflow_webhook_table.py index 0902531366..dba124a70b 100644 --- a/api/migrations/versions/2025_08_23_2039-5871f634954d_add_workflow_webhook_table.py +++ b/api/migrations/versions/2025_08_23_2039-5871f634954d_add_workflow_webhook_table.py @@ -25,12 +25,11 @@ def upgrade(): sa.Column('node_id', sa.String(length=64), nullable=False), sa.Column('tenant_id', models.types.StringUUID(), nullable=False), sa.Column('webhook_id', sa.String(length=24), nullable=False), - sa.Column('triggered_by', sa.String(length=16), nullable=False), sa.Column('created_by', models.types.StringUUID(), nullable=False), sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), sa.PrimaryKeyConstraint('id', name='workflow_webhook_trigger_pkey'), - sa.UniqueConstraint('app_id', 'node_id', 'triggered_by', name='uniq_node'), + sa.UniqueConstraint('app_id', 'node_id', name='uniq_node'), sa.UniqueConstraint('webhook_id', name='uniq_webhook_id') ) with op.batch_alter_table('workflow_webhook_triggers', schema=None) as batch_op: diff --git a/api/models/workflow.py b/api/models/workflow.py index 10b1837a34..e5589f51d6 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1,6 +1,6 @@ import json import logging -from collections.abc import Mapping, Sequence +from collections.abc import Generator, Mapping, Sequence from datetime import datetime from enum import Enum, StrEnum from typing import TYPE_CHECKING, Any, Optional, Union @@ -289,6 +289,54 @@ class Workflow(Base): def features_dict(self) -> dict[str, Any]: return json.loads(self.features) if self.features else {} + def walk_nodes( + self, specific_node_type: NodeType | None = None + ) -> Generator[tuple[str, Mapping[str, Any]], None, None]: + """ + Walk through the workflow nodes, yield each node configuration. + + Each node configuration is a tuple containing the node's id and the node's properties. + + Node properties example: + { + "type": "llm", + "title": "LLM", + "desc": "", + "variables": [], + "model": + { + "provider": "langgenius/openai/openai", + "name": "gpt-4", + "mode": "chat", + "completion_params": { "temperature": 0.7 }, + }, + "prompt_template": [{ "role": "system", "text": "" }], + "context": { "enabled": false, "variable_selector": [] }, + "vision": { "enabled": false }, + "memory": + { + "window": { "enabled": false, "size": 10 }, + "query_prompt_template": "{{#sys.query#}}\n\n{{#sys.files#}}", + "role_prefix": { "user": "", "assistant": "" }, + }, + "selected": false, + } + + For specific node type, refer to `core.workflow.nodes` + """ + graph_dict = self.graph_dict + if "nodes" not in graph_dict: + raise WorkflowDataError("nodes not found in workflow graph") + + if specific_node_type: + yield from ( + (node["id"], node["data"]) + for node in graph_dict["nodes"] + if node["data"]["type"] == specific_node_type.value + ) + else: + yield from ((node["id"], node["data"]) for node in graph_dict["nodes"]) + def user_input_form(self, to_old_structure: bool = False) -> list: # get start node from graph if not self.graph: @@ -1396,7 +1444,6 @@ class WorkflowWebhookTrigger(Base): - node_id (varchar) Node ID which node in the workflow - tenant_id (uuid) Workspace ID - webhook_id (varchar) Webhook ID for URL: https://api.dify.ai/triggers/webhook/:webhook_id - - triggered_by (varchar) Environment: debugger or production - created_by (varchar) User ID of the creator - created_at (timestamp) Creation time - updated_at (timestamp) Last update time @@ -1406,7 +1453,7 @@ class WorkflowWebhookTrigger(Base): __table_args__ = ( sa.PrimaryKeyConstraint("id", name="workflow_webhook_trigger_pkey"), sa.Index("workflow_webhook_trigger_tenant_idx", "tenant_id"), - sa.UniqueConstraint("app_id", "node_id", "triggered_by", name="uniq_node"), + sa.UniqueConstraint("app_id", "node_id", name="uniq_node"), sa.UniqueConstraint("webhook_id", name="uniq_webhook_id"), ) @@ -1415,7 +1462,6 @@ class WorkflowWebhookTrigger(Base): node_id: Mapped[str] = mapped_column(String(64), nullable=False) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) webhook_id: Mapped[str] = mapped_column(String(24), nullable=False) - triggered_by: Mapped[str] = mapped_column(String(16), nullable=False) created_by: Mapped[str] = mapped_column(StringUUID, nullable=False) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) updated_at: Mapped[datetime] = mapped_column( diff --git a/api/services/trigger_debug_service.py b/api/services/trigger_debug_service.py index 6dfb95d488..35993ebf68 100644 --- a/api/services/trigger_debug_service.py +++ b/api/services/trigger_debug_service.py @@ -197,4 +197,4 @@ class TriggerDebugService: return active_sessions except Exception as e: logger.exception("Failed to dispatch to debug sessions", exc_info=e) - return 0 \ No newline at end of file + return 0 diff --git a/api/services/trigger_service.py b/api/services/trigger_service.py index 3e6e6b9248..13fc7340f0 100644 --- a/api/services/trigger_service.py +++ b/api/services/trigger_service.py @@ -29,6 +29,8 @@ class TriggerService: __ENDPOINT_REQUEST_CACHE_COUNT__ = 10 __ENDPOINT_REQUEST_CACHE_EXPIRE_MS__ = 5 * 60 * 1000 + __WEBHOOK_NODE_CACHE_KEY__ = "webhook_nodes" + @classmethod def dispatch_triggered_workflows( cls, subscription: TriggerSubscription, trigger: TriggerEntity, request_id: str @@ -150,7 +152,13 @@ class TriggerService: @classmethod def process_endpoint(cls, endpoint_id: str, request: Request) -> Response | None: - """Extract and process data from incoming endpoint request.""" + """ + Extract and process data from incoming endpoint request. + + Args: + endpoint_id: Endpoint ID + request: Request + """ subscription = TriggerProviderService.get_subscription_by_endpoint(endpoint_id) if not subscription: return None @@ -192,7 +200,14 @@ class TriggerService: def get_subscriber_triggers( cls, tenant_id: str, subscription_id: str, trigger_name: str ) -> list[WorkflowPluginTrigger]: - """Get WorkflowPluginTriggers for a subscription and trigger.""" + """ + Get WorkflowPluginTriggers for a subscription and trigger. + + Args: + tenant_id: Tenant ID + subscription_id: Subscription ID + trigger_name: Trigger name + """ with Session(db.engine, expire_on_commit=False) as session: subscribers = session.scalars( select(WorkflowPluginTrigger).where( diff --git a/api/services/webhook_service.py b/api/services/webhook_service.py index 38d47be742..5b35b36ad1 100644 --- a/api/services/webhook_service.py +++ b/api/services/webhook_service.py @@ -1,10 +1,12 @@ import json import logging import mimetypes +import secrets from collections.abc import Mapping from typing import Any from flask import request +from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.orm import Session from werkzeug.exceptions import RequestEntityTooLarge @@ -13,10 +15,13 @@ from configs import dify_config from core.file.models import FileTransferMethod from core.tools.tool_file_manager import ToolFileManager from core.variables.types import SegmentType +from core.workflow.nodes.enums import NodeType from extensions.ext_database import db +from extensions.ext_redis import redis_client from factories import file_factory from models.account import Account, TenantAccountJoin, TenantAccountRole from models.enums import WorkflowRunTriggeredFrom +from models.model import App from models.workflow import AppTrigger, AppTriggerStatus, AppTriggerType, Workflow, WorkflowWebhookTrigger from services.async_workflow_service import AsyncWorkflowService from services.workflow.entities import TriggerData @@ -27,6 +32,8 @@ logger = logging.getLogger(__name__) class WebhookService: """Service for handling webhook operations.""" + __WEBHOOK_NODE_CACHE_KEY__ = "webhook_nodes" + @classmethod def get_webhook_trigger_and_workflow( cls, webhook_id: str @@ -88,7 +95,7 @@ class WebhookService: } # Extract and normalize content type - content_type = cls._extract_content_type(request.headers) + content_type = cls._extract_content_type(dict(request.headers)) # Route to appropriate extractor based on content type extractors = { @@ -436,7 +443,7 @@ class WebhookService: } # Get validator for the type - validator_info = type_validators.get(param_type) + validator_info = type_validators.get(SegmentType(param_type)) if not validator_info: logger.warning("Unknown parameter type: %s for parameter %s", param_type, param_name) return {"valid": True} @@ -482,7 +489,7 @@ class WebhookService: } # Get validator for the type - validator_info = form_validators.get(param_type) + validator_info = form_validators.get(SegmentType(param_type)) if not validator_info: # Unsupported type for form data return { @@ -591,3 +598,84 @@ class WebhookService: response_data = {"message": response_body or "Webhook processed successfully"} return response_data, status_code + + @classmethod + def sync_webhook_relationships(cls, app: App, workflow: Workflow): + """ + Sync webhook relationships in DB. + + 1. Check if the workflow has any webhook trigger nodes + 2. Fetch the nodes from DB, see if there were any webhook records already + 3. Diff the nodes and the webhook records, create/update/delete the webhook records as needed + + Approach: + Frequent DB operations may cause performance issues, using Redis to cache it instead. + If any record exists, cache it. + """ + + class Cache(BaseModel): + """ + Cache model for webhook nodes + """ + + record_id: str + node_id: str + webhook_id: str + + nodes_id_in_graph = [node_id for node_id, _ in workflow.walk_nodes(NodeType.TRIGGER_WEBHOOK)] + + not_found_in_cache: list[str] = [] + for node_id in nodes_id_in_graph: + # firstly check if the node exists in cache + if not redis_client.get(f"{cls.__WEBHOOK_NODE_CACHE_KEY__}:{node_id}"): + not_found_in_cache.append(node_id) + continue + + with Session(db.engine) as session: + try: + # lock the concurrent webhook trigger creation + redis_client.lock(f"{cls.__WEBHOOK_NODE_CACHE_KEY__}:apps:{app.id}:lock", timeout=10) + # fetch the non-cached nodes from DB + all_records = session.scalars( + select(WorkflowWebhookTrigger).where( + WorkflowWebhookTrigger.app_id == app.id, + WorkflowWebhookTrigger.tenant_id == app.tenant_id, + ) + ).all() + + nodes_id_in_db = {node.node_id: node for node in all_records} + + # get the nodes not found both in cache and DB + nodes_not_found = [node_id for node_id in not_found_in_cache if node_id not in nodes_id_in_db] + + # create new webhook records + for node_id in nodes_not_found: + webhook_record = WorkflowWebhookTrigger( + app_id=app.id, + tenant_id=app.tenant_id, + node_id=node_id, + webhook_id=cls.generate_webhook_id(), + created_by=app.created_by, + ) + session.add(webhook_record) + cache = Cache(record_id=webhook_record.id, node_id=node_id, webhook_id=webhook_record.webhook_id) + redis_client.set(f"{cls.__WEBHOOK_NODE_CACHE_KEY__}:{node_id}", cache.model_dump_json(), ex=60 * 60) + session.commit() + + # delete the nodes not found in the graph + for node_id in nodes_id_in_db: + if node_id not in nodes_id_in_graph: + session.delete(nodes_id_in_db[node_id]) + redis_client.delete(f"{cls.__WEBHOOK_NODE_CACHE_KEY__}:{node_id}") + session.commit() + except Exception: + logger.exception("Failed to sync webhook relationships for app %s", app.id) + raise + finally: + redis_client.delete(f"{cls.__WEBHOOK_NODE_CACHE_KEY__}:apps:{app.id}:lock") + + @classmethod + def generate_webhook_id(cls) -> str: + """Generate unique 24-character webhook ID""" + # Generate 24-character random string + return secrets.token_urlsafe(18)[:24] # token_urlsafe gives base64url, take first 24 chars diff --git a/api/tests/test_containers_integration_tests/services/test_webhook_service.py b/api/tests/test_containers_integration_tests/services/test_webhook_service.py index fb7d7b5e16..264a12947f 100644 --- a/api/tests/test_containers_integration_tests/services/test_webhook_service.py +++ b/api/tests/test_containers_integration_tests/services/test_webhook_service.py @@ -130,7 +130,7 @@ class TestWebhookService: node_id="webhook_node", tenant_id=tenant.id, webhook_id=webhook_id, - triggered_by="production", + created_by=account.id, ) db_session_with_containers.add(webhook_trigger) db_session_with_containers.commit() diff --git a/api/tests/unit_tests/services/test_schedule_service.py b/api/tests/unit_tests/services/test_schedule_service.py index 59849bc3f4..c342af7a26 100644 --- a/api/tests/unit_tests/services/test_schedule_service.py +++ b/api/tests/unit_tests/services/test_schedule_service.py @@ -385,7 +385,7 @@ class TestVisualToCron(unittest.TestCase): visual_config = VisualConfig(on_minute=0) result = ScheduleService.visual_to_cron("hourly", visual_config) assert result == "0 * * * *" - + # Maximum value visual_config = VisualConfig(on_minute=59) result = ScheduleService.visual_to_cron("hourly", visual_config) @@ -397,7 +397,7 @@ class TestVisualToCron(unittest.TestCase): visual_config = VisualConfig(time="12:00 AM") result = ScheduleService.visual_to_cron("daily", visual_config) assert result == "0 0 * * *" - + # Noon visual_config = VisualConfig(time="12:00 PM") result = ScheduleService.visual_to_cron("daily", visual_config) @@ -429,7 +429,7 @@ class TestVisualToCron(unittest.TestCase): monthly_days=all_days, ) result = ScheduleService.visual_to_cron("monthly", visual_config) - expected_days = ','.join([str(i) for i in range(1, 32)]) + ',L' + expected_days = ",".join([str(i) for i in range(1, 32)]) + ",L" assert result == f"1 0 {expected_days} * *" def test_visual_to_cron_monthly_no_days(self):