feat: add app trigger list api (#24693)

This commit is contained in:
非法操作 2025-08-28 15:23:08 +08:00 committed by GitHub
parent 4f73bc9693
commit 89ad6ad902
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 381 additions and 39 deletions

View File

@ -1,7 +1,8 @@
import logging
import secrets
from flask_restful import Resource, reqparse
from flask_restx import Resource, marshal_with, reqparse
from sqlalchemy import select
from sqlalchemy.orm import Session
from werkzeug.exceptions import BadRequest, Forbidden, NotFound
@ -10,19 +11,26 @@ from controllers.console import api
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from extensions.ext_database import db
from fields.workflow_trigger_fields import trigger_fields, triggers_list_fields, webhook_trigger_fields
from libs.login import current_user, login_required
from models.workflow import WorkflowWebhookTrigger
from models.model import AppMode
from models.workflow import AppTrigger, AppTriggerStatus, WorkflowWebhookTrigger
logger = logging.getLogger(__name__)
class WebhookTriggerApi(Resource):
"""Webhook Trigger API"""
@setup_required
@login_required
@account_initialization_required
@get_app_model
@get_app_model(mode=AppMode.WORKFLOW)
@marshal_with(webhook_trigger_fields)
def post(self, app_model):
"""Create webhook trigger"""
parser = reqparse.RequestParser()
@ -37,9 +45,6 @@ class WebhookTriggerApi(Resource):
)
args = parser.parse_args()
if app_model.mode != "workflow":
raise BadRequest("Invalid app mode, only workflow can add webhook node")
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()
@ -78,19 +83,17 @@ class WebhookTriggerApi(Resource):
session.commit()
session.refresh(webhook_trigger)
return {
"id": webhook_trigger.id,
"webhook_id": webhook_trigger.webhook_id,
"webhook_url": f"{dify_config.SERVICE_API_URL}/triggers/webhook/{webhook_trigger.webhook_id}",
"node_id": webhook_trigger.node_id,
"triggered_by": webhook_trigger.triggered_by,
"created_at": webhook_trigger.created_at.isoformat(),
}
# Add computed fields for marshal_with
base_url = dify_config.SERVICE_API_URL
webhook_trigger.webhook_url = f"{base_url}/triggers/webhook/{webhook_trigger.webhook_id}"
webhook_trigger.webhook_debug_url = f"{base_url}/triggers/webhook-debug/{webhook_trigger.webhook_id}"
return webhook_trigger
@setup_required
@login_required
@account_initialization_required
@get_app_model
@get_app_model(mode=AppMode.WORKFLOW)
def delete(self, app_model):
"""Delete webhook trigger"""
parser = reqparse.RequestParser()
@ -148,4 +151,90 @@ class WebhookTriggerApi(Resource):
return webhook_id
class AppTriggersApi(Resource):
"""App Triggers list API"""
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=AppMode.WORKFLOW)
@marshal_with(triggers_list_fields)
def get(self, app_model):
"""Get app triggers list"""
with Session(db.engine) as session:
# Get all triggers for this app using select API
triggers = (
session.execute(
select(AppTrigger)
.where(
AppTrigger.tenant_id == current_user.current_tenant_id,
AppTrigger.app_id == app_model.id,
)
.order_by(AppTrigger.created_at.desc())
)
.scalars()
.all()
)
# Add computed icon field for each trigger
url_prefix = dify_config.CONSOLE_API_URL + "/console/api/workspaces/current/tool-provider/builtin/"
for trigger in triggers:
if trigger.trigger_type == "trigger-plugin":
trigger.icon = url_prefix + trigger.provider_name + "/icon"
else:
trigger.icon = ""
return {"data": triggers}
class AppTriggerEnableApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=AppMode.WORKFLOW)
@marshal_with(trigger_fields)
def post(self, app_model):
"""Update app trigger (enable/disable)"""
parser = reqparse.RequestParser()
parser.add_argument("trigger_id", type=str, required=True, nullable=False, location="json")
parser.add_argument("enable_trigger", type=bool, required=True, nullable=False, location="json")
args = parser.parse_args()
# The role of the current user must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()
trigger_id = args["trigger_id"]
with Session(db.engine) as session:
# Find the trigger using select
trigger = session.execute(
select(AppTrigger).where(
AppTrigger.id == trigger_id,
AppTrigger.tenant_id == current_user.current_tenant_id,
AppTrigger.app_id == app_model.id,
)
).scalar_one_or_none()
if not trigger:
raise NotFound("Trigger not found")
# Update status based on enable_trigger boolean
trigger.status = AppTriggerStatus.ENABLED if args["enable_trigger"] else AppTriggerStatus.DISABLED
session.commit()
session.refresh(trigger)
# Add computed icon field
url_prefix = dify_config.CONSOLE_API_URL + "/console/api/workspaces/current/tool-provider/builtin/"
if trigger.trigger_type == "trigger-plugin":
trigger.icon = url_prefix + trigger.provider_name + "/icon"
else:
trigger.icon = ""
return trigger
api.add_resource(WebhookTriggerApi, "/apps/<uuid:app_id>/workflows/triggers/webhook")
api.add_resource(AppTriggersApi, "/apps/<uuid:app_id>/triggers")
api.add_resource(AppTriggerEnableApi, "/apps/<uuid:app_id>/trigger-enable")

View File

@ -10,6 +10,7 @@ logger = logging.getLogger(__name__)
@bp.route("/webhook/<string:webhook_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
@bp.route("/webhook-debug/<string:webhook_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
def handle_webhook(webhook_id: str):
"""
Handle webhook trigger calls.

View File

@ -25,7 +25,8 @@ class NodeType(StrEnum):
DOCUMENT_EXTRACTOR = "document-extractor"
LIST_OPERATOR = "list-operator"
AGENT = "agent"
WEBHOOK = "webhook"
TRIGGER_WEBHOOK = "trigger-webhook"
TRIGGER_SCHEDULE = "trigger-schedule"
class ErrorStrategy(StrEnum):

View File

@ -19,10 +19,10 @@ from core.workflow.nodes.question_classifier import QuestionClassifierNode
from core.workflow.nodes.start import StartNode
from core.workflow.nodes.template_transform import TemplateTransformNode
from core.workflow.nodes.tool import ToolNode
from core.workflow.nodes.trigger_webhook import TriggerWebhookNode
from core.workflow.nodes.variable_aggregator import VariableAggregatorNode
from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1
from core.workflow.nodes.variable_assigner.v2 import VariableAssignerNode as VariableAssignerNodeV2
from core.workflow.nodes.webhook import WebhookNode
LATEST_VERSION = "latest"
@ -133,8 +133,8 @@ NODE_TYPE_CLASSES_MAPPING: Mapping[NodeType, Mapping[str, type[BaseNode]]] = {
"2": AgentNode,
"1": AgentNode,
},
NodeType.WEBHOOK: {
LATEST_VERSION: WebhookNode,
"1": WebhookNode,
NodeType.TRIGGER_WEBHOOK: {
LATEST_VERSION: TriggerWebhookNode,
"1": TriggerWebhookNode,
},
}

View File

@ -0,0 +1,3 @@
from .node import TriggerWebhookNode
__all__ = ["TriggerWebhookNode"]

View File

@ -10,8 +10,8 @@ from core.workflow.nodes.enums import ErrorStrategy, NodeType
from .entities import WebhookData
class WebhookNode(BaseNode):
_node_type = NodeType.WEBHOOK
class TriggerWebhookNode(BaseNode):
_node_type = NodeType.TRIGGER_WEBHOOK
_node_data: WebhookData

View File

@ -1,3 +0,0 @@
from .node import WebhookNode
__all__ = ["WebhookNode"]

View File

@ -6,6 +6,7 @@ from .create_site_record_when_app_created import handle
from .delete_tool_parameters_cache_when_sync_draft_workflow import handle
from .update_app_dataset_join_when_app_model_config_updated import handle
from .update_app_dataset_join_when_app_published_workflow_updated import handle
from .update_app_triggers_when_app_published_workflow_updated import handle
# Consolidated handler replaces both deduct_quota_when_message_created and
# update_provider_last_used_at_when_message_created

View File

@ -0,0 +1,111 @@
from typing import cast
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.workflow.nodes import NodeType
from events.app_event import app_published_workflow_was_updated
from extensions.ext_database import db
from models import AppMode, AppTrigger, AppTriggerStatus, Workflow
@app_published_workflow_was_updated.connect
def handle(sender, **kwargs):
"""
Handle app published workflow update event to sync app_triggers table.
When a workflow is published, this handler will:
1. Extract trigger nodes from the workflow graph
2. Compare with existing app_triggers records
3. Add new triggers and remove obsolete ones
"""
app = sender
if app.mode != AppMode.WORKFLOW.value:
return
published_workflow = kwargs.get("published_workflow")
published_workflow = cast(Workflow, published_workflow)
# Extract trigger info from workflow
trigger_infos = get_trigger_infos_from_workflow(published_workflow)
with Session(db.engine) as session:
# Get existing app triggers
existing_triggers = (
session.execute(
select(AppTrigger).where(AppTrigger.tenant_id == app.tenant_id, AppTrigger.app_id == app.id)
)
.scalars()
.all()
)
# Convert existing triggers to dict for easy lookup
existing_triggers_map = {trigger.node_id: trigger for trigger in existing_triggers}
# Get current and new node IDs
existing_node_ids = set(existing_triggers_map.keys())
new_node_ids = {info["node_id"] for info in trigger_infos}
# Calculate changes
added_node_ids = new_node_ids - existing_node_ids
removed_node_ids = existing_node_ids - new_node_ids
# Remove obsolete triggers
for node_id in removed_node_ids:
session.delete(existing_triggers_map[node_id])
for trigger_info in trigger_infos:
node_id = trigger_info["node_id"]
if node_id in added_node_ids:
# Create new trigger
app_trigger = AppTrigger(
tenant_id=app.tenant_id,
app_id=app.id,
trigger_type=trigger_info["node_type"],
title=trigger_info["node_title"],
node_id=node_id,
provider_name=trigger_info.get("node_provider_name", ""),
status=AppTriggerStatus.DISABLED,
)
session.add(app_trigger)
elif node_id in existing_node_ids:
# Update existing trigger if needed
existing_trigger = existing_triggers_map[node_id]
new_title = trigger_info["node_title"]
if new_title and existing_trigger.title != new_title:
existing_trigger.title = new_title
session.add(existing_trigger)
session.commit()
def get_trigger_infos_from_workflow(published_workflow: Workflow) -> list[dict]:
"""
Extract trigger node information from the workflow graph.
Returns:
List of trigger info dictionaries containing:
- node_type: The type of the trigger node ('trigger-webhook', 'trigger-schedule', 'trigger-plugin')
- node_id: The node ID in the workflow
- node_title: The title of the node
- node_provider_name: The name of the node's provider, only for plugin
"""
graph = published_workflow.graph_dict
if not graph:
return []
nodes = graph.get("nodes", [])
trigger_types = {NodeType.TRIGGER_WEBHOOK.value, NodeType.TRIGGER_SCHEDULE.value}
trigger_infos = [
{
"node_type": node.get("data", {}).get("type"),
"node_id": node.get("id"),
"node_title": node.get("data", {}).get("title"),
"node_provider_name": node.get("data", {}).get("provider_name"),
}
for node in nodes
if node.get("data", {}).get("type") in trigger_types
]
return trigger_infos

View File

@ -0,0 +1,26 @@
from flask_restx import fields
trigger_fields = {
"id": fields.String,
"trigger_type": fields.String,
"title": fields.String,
"node_id": fields.String,
"provider_name": fields.String,
"icon": fields.String,
"status": fields.String,
"created_at": fields.DateTime(dt_format="iso8601"),
"updated_at": fields.DateTime(dt_format="iso8601"),
}
triggers_list_fields = {"triggers": fields.List(fields.Nested(trigger_fields))}
webhook_trigger_fields = {
"id": fields.String,
"webhook_id": fields.String,
"webhook_url": fields.String,
"webhook_debug_url": fields.String,
"node_id": fields.String,
"triggered_by": fields.String,
"created_at": fields.DateTime(dt_format="iso8601"),
}

View File

@ -1,7 +1,7 @@
"""empty message
Revision ID: 4558cfabe44e
Revises: fa8b0fa6f407
Revises: 0e154742a5fa
Create Date: 2025-08-23 20:38:20.059323
"""
@ -12,7 +12,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '4558cfabe44e'
down_revision = 'fa8b0fa6f407'
down_revision = '0e154742a5fa'
branch_labels = None
depends_on = None
@ -20,7 +20,7 @@ depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('workflow_trigger_logs',
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False),
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('workflow_id', models.types.StringUUID(), nullable=False),

View File

@ -20,7 +20,7 @@ depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('workflow_webhook_triggers',
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False),
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('node_id', sa.String(length=64), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),

View File

@ -0,0 +1,47 @@
"""Add app triggers table
Revision ID: 9ee7d347f4c1
Revises: 5871f634954d
Create Date: 2025-08-27 17:33:30.082812
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9ee7d347f4c1'
down_revision = '5871f634954d'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('app_triggers',
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('node_id', sa.String(length=64), nullable=False),
sa.Column('trigger_type', sa.String(length=50), nullable=False),
sa.Column('title', sa.String(length=255), nullable=False),
sa.Column('provider_name', sa.String(length=255), server_default='', nullable=True),
sa.Column('status', sa.String(length=50), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id', name='app_trigger_pkey')
)
with op.batch_alter_table('app_triggers', schema=None) as batch_op:
batch_op.create_index('app_trigger_tenant_app_idx', ['tenant_id', 'app_id'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('app_triggers', schema=None) as batch_op:
batch_op.drop_index('app_trigger_tenant_app_idx')
op.drop_table('app_triggers')
# ### end Alembic commands ###

View File

@ -81,6 +81,9 @@ from .tools import (
)
from .web import PinnedConversation, SavedMessage
from .workflow import (
AppTrigger,
AppTriggerStatus,
AppTriggerType,
ConversationVariable,
Workflow,
WorkflowAppLog,
@ -104,9 +107,12 @@ __all__ = [
"AppAnnotationHitHistory",
"AppAnnotationSetting",
"AppDatasetJoin",
"AppMCPServer", # Added
"AppMCPServer",
"AppMode",
"AppModelConfig",
"AppTrigger",
"AppTriggerStatus",
"AppTriggerType",
"BuiltinToolProvider",
"CeleryTask",
"CeleryTaskSet",

View File

@ -1317,7 +1317,7 @@ class WorkflowTriggerLog(Base):
sa.Index("workflow_trigger_log_workflow_id_idx", "workflow_id"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
@ -1408,7 +1408,7 @@ class WorkflowWebhookTrigger(Base):
sa.UniqueConstraint("webhook_id", name="uniq_webhook_id"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
@ -1421,3 +1421,63 @@ class WorkflowWebhookTrigger(Base):
server_default=func.current_timestamp(),
server_onupdate=func.current_timestamp(),
)
class AppTriggerType(StrEnum):
"""App Trigger Type Enum"""
TRIGGER_WEBHOOK = "trigger-webhook"
TRIGGER_SCHEDULE = "trigger-schedule"
TRIGGER_PLUGIN = "trigger-plugin"
class AppTriggerStatus(StrEnum):
"""App Trigger Status Enum"""
ENABLED = "enabled"
DISABLED = "disabled"
UNAUTHORIZED = "unauthorized"
class AppTrigger(Base):
"""
App Trigger
Manages multiple triggers for an app with enable/disable and authorization states.
Attributes:
- id (uuid) Primary key
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- trigger_type (string) Type: webhook, schedule, plugin
- title (string) Trigger title
- status (string) Status: enabled, disabled, unauthorized, error
- node_id (string) Optional workflow node ID
- created_at (timestamp) Creation time
- updated_at (timestamp) Last update time
"""
__tablename__ = "app_triggers"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="app_trigger_pkey"),
sa.Index("app_trigger_tenant_app_idx", "tenant_id", "app_id"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=False)
trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False)
title: Mapped[str] = mapped_column(String(255), nullable=False)
provider_name: Mapped[str] = mapped_column(String(255), server_default="", nullable=True)
status: Mapped[str] = mapped_column(
EnumText(AppTriggerStatus, length=50), nullable=False, default=AppTriggerStatus.DISABLED
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
default=naive_utc_now(),
server_onupdate=func.current_timestamp(),
)

View File

@ -1,7 +1,7 @@
import pytest
from pydantic import ValidationError
from core.workflow.nodes.webhook.entities import (
from core.workflow.nodes.trigger_webhook.entities import (
ContentType,
Method,
WebhookBodyParameter,

View File

@ -1,7 +1,7 @@
import pytest
from core.workflow.nodes.base.exc import BaseNodeError
from core.workflow.nodes.webhook.exc import (
from core.workflow.nodes.trigger_webhook.exc import (
WebhookConfigError,
WebhookNodeError,
WebhookNotFoundError,

View File

@ -8,27 +8,27 @@ from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution
from core.workflow.graph_engine import Graph, GraphInitParams, GraphRuntimeState
from core.workflow.nodes.answer import AnswerStreamGenerateRoute
from core.workflow.nodes.end import EndStreamParam
from core.workflow.nodes.webhook import WebhookNode
from core.workflow.nodes.webhook.entities import (
from core.workflow.nodes.trigger_webhook.entities import (
ContentType,
Method,
WebhookBodyParameter,
WebhookData,
WebhookParameter,
)
from core.workflow.nodes.webhook import TriggerWebhookNode
from core.workflow.system_variable import SystemVariable
from models.enums import UserFrom
from models.workflow import WorkflowType
def create_webhook_node(webhook_data: WebhookData, variable_pool: VariablePool) -> WebhookNode:
def create_webhook_node(webhook_data: WebhookData, variable_pool: VariablePool) -> TriggerWebhookNode:
"""Helper function to create a webhook node with proper initialization."""
node_config = {
"id": "1",
"data": webhook_data.model_dump(),
}
node = WebhookNode(
node = TriggerWebhookNode(
id="1",
config=node_config,
graph_init_params=GraphInitParams(
@ -96,7 +96,7 @@ def test_webhook_node_basic_initialization():
def test_webhook_node_default_config():
"""Test webhook node default configuration."""
config = WebhookNode.get_default_config()
config = TriggerWebhookNode.get_default_config()
assert config["type"] == "webhook"
assert config["config"]["method"] == "get"