From 6857bb4406ae47121e903a9c413e594e970adf34 Mon Sep 17 00:00:00 2001 From: Harry Date: Mon, 15 Sep 2025 15:49:07 +0800 Subject: [PATCH] feat(trigger): implement plugin trigger synchronization and subscription management in workflow - Added a new event handler for syncing plugin trigger relationships when a draft workflow is synced, ensuring that the database reflects the current state of plugin triggers. - Introduced subscription management features in the frontend, allowing users to select, add, and remove subscriptions for trigger plugins. - Updated various components to support subscription handling, including the addition of new UI elements for subscription selection and removal. - Enhanced internationalization support by adding new translation keys related to subscription management. These changes improve the overall functionality and user experience of trigger plugins within workflows. --- api/core/app/entities/queue_entities.py | 1 + api/events/event_handlers/__init__.py | 2 + .../sync_plugin_trigger_when_app_created.py | 22 +++ api/models/workflow.py | 2 +- api/services/trigger_debug_service.py | 1 + .../workflow_plugin_trigger_service.py | 167 +++++++++++++++++- .../_base/components/workflow-panel/index.tsx | 11 ++ .../workflow-panel/node-auth-factory.tsx | 76 ++++---- .../components/authentication-menu.tsx | 145 ++++++++++----- web/app/components/workflow/types.ts | 1 + web/i18n/en-US/workflow.ts | 6 + 11 files changed, 352 insertions(+), 82 deletions(-) create mode 100644 api/events/event_handlers/sync_plugin_trigger_when_app_created.py diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 4cb3d219e4..db0297c352 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -52,6 +52,7 @@ class QueueEvent(StrEnum): STOP = "stop" RETRY = "retry" + class AppQueueEvent(BaseModel): """ QueueEvent abstract entity diff --git a/api/events/event_handlers/__init__.py b/api/events/event_handlers/__init__.py index 76b08ccc79..daa3dabeff 100644 --- a/api/events/event_handlers/__init__.py +++ b/api/events/event_handlers/__init__.py @@ -4,6 +4,8 @@ from .create_document_index import handle from .create_installed_app_when_app_created import handle from .create_site_record_when_app_created import handle from .delete_tool_parameters_cache_when_sync_draft_workflow import handle +from .sync_plugin_trigger_when_app_created import handle +from .sync_webhook_when_app_created import handle from .sync_workflow_schedule_when_app_published import handle from .update_app_dataset_join_when_app_model_config_updated import handle from .update_app_dataset_join_when_app_published_workflow_updated import handle diff --git a/api/events/event_handlers/sync_plugin_trigger_when_app_created.py b/api/events/event_handlers/sync_plugin_trigger_when_app_created.py new file mode 100644 index 0000000000..c20cf22f32 --- /dev/null +++ b/api/events/event_handlers/sync_plugin_trigger_when_app_created.py @@ -0,0 +1,22 @@ +import logging + +from events.app_event import app_draft_workflow_was_synced +from models.model import App, AppMode +from models.workflow import Workflow +from services.workflow_plugin_trigger_service import WorkflowPluginTriggerService + +logger = logging.getLogger(__name__) + + +@app_draft_workflow_was_synced.connect +def handle(sender, synced_draft_workflow: Workflow, **kwargs): + """ + While creating a workflow or updating a workflow, we may need to sync + its plugin trigger relationships in DB. + """ + app: App = sender + if app.mode != AppMode.WORKFLOW.value: + # only handle workflow app, chatflow is not supported yet + return + + WorkflowPluginTriggerService.sync_plugin_trigger_relationships(app, synced_draft_workflow) diff --git a/api/models/workflow.py b/api/models/workflow.py index e5589f51d6..b00eed976b 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1484,7 +1484,7 @@ class WorkflowPluginTrigger(Base): - node_id (varchar) Node ID which node in the workflow - tenant_id (uuid) Workspace ID - provider_id (varchar) Plugin provider ID - - trigger_name (varchar) trigger name (github_issues_trigger) + - trigger_name (varchar) trigger name - subscription_id (varchar) Subscription ID - created_at (timestamp) Creation time - updated_at (timestamp) Last update time diff --git a/api/services/trigger_debug_service.py b/api/services/trigger_debug_service.py index 707e1a78e9..956ce7132f 100644 --- a/api/services/trigger_debug_service.py +++ b/api/services/trigger_debug_service.py @@ -13,6 +13,7 @@ logger = logging.getLogger(__name__) TRIGGER_DEBUG_EVENT_TTL = 300 + class TriggerDebugEvent(BaseModel): subscription_id: str request_id: str diff --git a/api/services/workflow_plugin_trigger_service.py b/api/services/workflow_plugin_trigger_service.py index 89429c3850..3303bda817 100644 --- a/api/services/workflow_plugin_trigger_service.py +++ b/api/services/workflow_plugin_trigger_service.py @@ -1,17 +1,24 @@ from typing import Optional +from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.orm import Session from werkzeug.exceptions import NotFound +from core.workflow.nodes.enums import NodeType from extensions.ext_database import db +from extensions.ext_redis import redis_client +from models.model import App from models.trigger import TriggerSubscription -from models.workflow import WorkflowPluginTrigger +from models.workflow import Workflow, WorkflowPluginTrigger class WorkflowPluginTriggerService: """Service for managing workflow plugin triggers""" + __PLUGIN_TRIGGER_NODE_CACHE_KEY__ = "plugin_trigger_nodes" + MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW = 5 # Maximum allowed plugin trigger nodes per workflow + @classmethod def create_plugin_trigger( cls, @@ -379,3 +386,161 @@ class WorkflowPluginTriggerService: session.commit() return count + + @classmethod + def sync_plugin_trigger_relationships(cls, app: App, workflow: Workflow): + """ + Sync plugin trigger relationships in DB. + + 1. Check if the workflow has any plugin trigger nodes + 2. Fetch the nodes from DB, see if there were any plugin trigger records already + 3. Diff the nodes and the plugin trigger records, create/update/delete the records as needed + + Approach: + Frequent DB operations may cause performance issues, using Redis to cache it instead. + If any record exists, cache it. + + Limits: + - Maximum 5 plugin trigger nodes per workflow + """ + + class Cache(BaseModel): + """ + Cache model for plugin trigger nodes + """ + + record_id: str + node_id: str + provider_id: str + trigger_name: str + subscription_id: str + + # Walk nodes to find plugin triggers + nodes_in_graph = [] + for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN): + # Extract plugin trigger configuration from node + plugin_id = node_config.get("plugin_id", "") + provider_id = node_config.get("provider_id", "") + trigger_name = node_config.get("trigger_name", "") + subscription_id = node_config.get("subscription_id", "") + + if not subscription_id: + continue + + nodes_in_graph.append( + { + "node_id": node_id, + "plugin_id": plugin_id, + "provider_id": provider_id, + "trigger_name": trigger_name, + "subscription_id": subscription_id, + } + ) + + # Check plugin trigger node limit + if len(nodes_in_graph) > cls.MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW: + raise ValueError( + f"Workflow exceeds maximum plugin trigger node limit. " + f"Found {len(nodes_in_graph)} plugin trigger nodes, " + f"maximum allowed is {cls.MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW}" + ) + + not_found_in_cache: list[dict] = [] + for node_info in nodes_in_graph: + node_id = node_info["node_id"] + # firstly check if the node exists in cache + if not redis_client.get(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}"): + not_found_in_cache.append(node_info) + continue + + with Session(db.engine) as session: + try: + # lock the concurrent plugin trigger creation + redis_client.lock(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:apps:{app.id}:lock", timeout=10) + # fetch the non-cached nodes from DB + all_records = session.scalars( + select(WorkflowPluginTrigger).where( + WorkflowPluginTrigger.app_id == app.id, + WorkflowPluginTrigger.tenant_id == app.tenant_id, + ) + ).all() + + nodes_id_in_db = {node.node_id: node for node in all_records} + nodes_id_in_graph = {node["node_id"] for node in nodes_in_graph} + + # get the nodes not found both in cache and DB + nodes_not_found = [ + node_info for node_info in not_found_in_cache if node_info["node_id"] not in nodes_id_in_db + ] + + # create new plugin trigger records + for node_info in nodes_not_found: + plugin_trigger = WorkflowPluginTrigger( + app_id=app.id, + tenant_id=app.tenant_id, + node_id=node_info["node_id"], + provider_id=node_info["provider_id"], + trigger_name=node_info["trigger_name"], + subscription_id=node_info["subscription_id"], + ) + session.add(plugin_trigger) + session.flush() # Get the ID for caching + + cache = Cache( + record_id=plugin_trigger.id, + node_id=node_info["node_id"], + provider_id=node_info["provider_id"], + trigger_name=node_info["trigger_name"], + subscription_id=node_info["subscription_id"], + ) + redis_client.set( + f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_info['node_id']}", + cache.model_dump_json(), + ex=60 * 60, + ) + session.commit() + + # Update existing records if subscription_id changed + for node_info in nodes_in_graph: + node_id = node_info["node_id"] + if node_id in nodes_id_in_db: + existing_record = nodes_id_in_db[node_id] + if ( + existing_record.subscription_id != node_info["subscription_id"] + or existing_record.provider_id != node_info["provider_id"] + or existing_record.trigger_name != node_info["trigger_name"] + ): + existing_record.subscription_id = node_info["subscription_id"] + existing_record.provider_id = node_info["provider_id"] + existing_record.trigger_name = node_info["trigger_name"] + session.add(existing_record) + + # Update cache + cache = Cache( + record_id=existing_record.id, + node_id=node_id, + provider_id=node_info["provider_id"], + trigger_name=node_info["trigger_name"], + subscription_id=node_info["subscription_id"], + ) + redis_client.set( + f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}", + cache.model_dump_json(), + ex=60 * 60, + ) + session.commit() + + # delete the nodes not found in the graph + for node_id in nodes_id_in_db: + if node_id not in nodes_id_in_graph: + session.delete(nodes_id_in_db[node_id]) + redis_client.delete(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}") + session.commit() + except Exception: + import logging + + logger = logging.getLogger(__name__) + logger.exception("Failed to sync plugin trigger relationships for app %s", app.id) + raise + finally: + redis_client.delete(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:apps:{app.id}:lock") diff --git a/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx b/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx index 8056797260..8a5634f7d4 100644 --- a/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx +++ b/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx @@ -280,6 +280,15 @@ const BasePanel: FC = ({ }) }, [handleNodeDataUpdateWithSyncDraft, id]) + const handleSubscriptionChange = useCallback((subscription_id: string) => { + handleNodeDataUpdateWithSyncDraft({ + id, + data: { + subscription_id, + }, + }) + }, [handleNodeDataUpdateWithSyncDraft, id]) + if(logParams.showSpecialResultPanel) { return (
= ({
@@ -451,6 +461,7 @@ const BasePanel: FC = ({ ) diff --git a/web/app/components/workflow/nodes/_base/components/workflow-panel/node-auth-factory.tsx b/web/app/components/workflow/nodes/_base/components/workflow-panel/node-auth-factory.tsx index 7ffc016b50..0a534f431c 100644 --- a/web/app/components/workflow/nodes/_base/components/workflow-panel/node-auth-factory.tsx +++ b/web/app/components/workflow/nodes/_base/components/workflow-panel/node-auth-factory.tsx @@ -7,7 +7,6 @@ import { BlockEnum, type Node } from '@/app/components/workflow/types' import { canFindTool } from '@/utils' import { useStore } from '@/app/components/workflow/store' import AuthenticationMenu from '@/app/components/workflow/nodes/trigger-plugin/components/authentication-menu' -import type { AuthSubscription } from '@/app/components/workflow/nodes/trigger-plugin/components/authentication-menu' import { useDeleteTriggerSubscription, useInitiateTriggerOAuth, @@ -20,9 +19,10 @@ import { openOAuthPopup } from '@/hooks/use-oauth' type NodeAuthProps = { data: Node['data'] onAuthorizationChange: (credential_id: string) => void + onSubscriptionChange?: (subscription_id: string) => void } -const NodeAuth: FC = ({ data, onAuthorizationChange }) => { +const NodeAuth: FC = ({ data, onAuthorizationChange, onSubscriptionChange }) => { const { t } = useTranslation() const buildInTools = useStore(s => s.buildInTools) const { notify } = useToastContext() @@ -51,39 +51,8 @@ const NodeAuth: FC = ({ data, onAuthorizationChange }) => { return buildInTools.find(item => canFindTool(item.id, data.provider_id)) }, [buildInTools, data.provider_id]) - // Convert TriggerSubscription to AuthSubscription format - const authSubscription: AuthSubscription = useMemo(() => { - if (data.type !== BlockEnum.TriggerPlugin) { - return { - id: '', - name: '', - status: 'not_configured', - credentials: {}, - } - } - - const subscription = subscriptions[0] // Use first subscription if available - - if (!subscription) { - return { - id: '', - name: '', - status: 'not_configured', - credentials: {}, - } - } - - const status = subscription.credential_type === 'unauthorized' - ? 'not_configured' - : 'authorized' - - return { - id: subscription.id, - name: subscription.name, - status, - credentials: subscription.credentials, - } - }, [data.type, subscriptions]) + // Get selected subscription ID from node data + const selectedSubscriptionId = data.subscription_id const handleConfigure = useCallback(async () => { if (!provider) return @@ -117,10 +86,35 @@ const NodeAuth: FC = ({ data, onAuthorizationChange }) => { } }, [provider, initiateTriggerOAuth, invalidateSubscriptions, notify]) - const handleRemove = useCallback(() => { - if (authSubscription.id) - deleteSubscription.mutate(authSubscription.id) - }, [authSubscription.id, deleteSubscription]) + const handleRemove = useCallback(async (subscriptionId: string) => { + if (!subscriptionId) return + + try { + await deleteSubscription.mutateAsync(subscriptionId) + // Clear subscription_id from node data + if (onSubscriptionChange) + onSubscriptionChange('') + + // Refresh subscriptions list + invalidateSubscriptions(provider) + + notify({ + type: 'success', + message: t('workflow.nodes.triggerPlugin.subscriptionRemoved'), + }) + } + catch (error: any) { + notify({ + type: 'error', + message: `Failed to remove subscription: ${error.message}`, + }) + } + }, [deleteSubscription, invalidateSubscriptions, notify, onSubscriptionChange, provider, t]) + + const handleSubscriptionSelect = useCallback((subscriptionId: string) => { + if (onSubscriptionChange) + onSubscriptionChange(subscriptionId) + }, [onSubscriptionChange]) // Tool authentication if (data.type === BlockEnum.Tool && currCollection?.allow_delete) { @@ -140,7 +134,9 @@ const NodeAuth: FC = ({ data, onAuthorizationChange }) => { if (data.type === BlockEnum.TriggerPlugin) { return ( diff --git a/web/app/components/workflow/nodes/trigger-plugin/components/authentication-menu.tsx b/web/app/components/workflow/nodes/trigger-plugin/components/authentication-menu.tsx index 78de87010f..71b8e6fa06 100644 --- a/web/app/components/workflow/nodes/trigger-plugin/components/authentication-menu.tsx +++ b/web/app/components/workflow/nodes/trigger-plugin/components/authentication-menu.tsx @@ -1,12 +1,13 @@ 'use client' import type { FC } from 'react' -import { memo, useCallback, useState } from 'react' +import { memo, useCallback, useMemo, useState } from 'react' import { useTranslation } from 'react-i18next' -import { RiArrowDownSLine } from '@remixicon/react' +import { RiAddLine, RiArrowDownSLine, RiCheckLine } from '@remixicon/react' import Button from '@/app/components/base/button' import Indicator from '@/app/components/header/indicator' import cn from '@/utils/classnames' +import type { TriggerSubscription } from '@/app/components/workflow/block-selector/types' export type AuthenticationStatus = 'authorized' | 'not_configured' | 'error' @@ -18,14 +19,18 @@ export type AuthSubscription = { } type AuthenticationMenuProps = { - subscription?: AuthSubscription + subscriptions: TriggerSubscription[] + selectedSubscriptionId?: string + onSubscriptionSelect: (subscriptionId: string) => void onConfigure: () => void - onRemove: () => void + onRemove: (subscriptionId: string) => void className?: string } const AuthenticationMenu: FC = ({ - subscription, + subscriptions, + selectedSubscriptionId, + onSubscriptionSelect, onConfigure, onRemove, className, @@ -33,32 +38,40 @@ const AuthenticationMenu: FC = ({ const { t } = useTranslation() const [isOpen, setIsOpen] = useState(false) + const selectedSubscription = useMemo(() => { + return subscriptions.find(sub => sub.id === selectedSubscriptionId) + }, [subscriptions, selectedSubscriptionId]) + const getStatusConfig = useCallback(() => { - if (!subscription) { + if (!selectedSubscription) { + if (subscriptions.length > 0) { + return { + label: t('workflow.nodes.triggerPlugin.selectSubscription'), + color: 'yellow' as const, + } + } return { label: t('workflow.nodes.triggerPlugin.notConfigured'), color: 'red' as const, } } - switch (subscription.status) { - case 'authorized': - return { - label: t('workflow.nodes.triggerPlugin.authorized'), - color: 'green' as const, - } - case 'error': - return { - label: t('workflow.nodes.triggerPlugin.error'), - color: 'red' as const, - } - default: - return { - label: t('workflow.nodes.triggerPlugin.notConfigured'), - color: 'red' as const, - } + // Check if subscription is authorized based on credential_type + const isAuthorized = selectedSubscription.credential_type !== 'unauthorized' + + if (isAuthorized) { + return { + label: selectedSubscription.name || t('workflow.nodes.triggerPlugin.authorized'), + color: 'green' as const, + } } - }, [subscription, t]) + else { + return { + label: t('workflow.nodes.triggerPlugin.notAuthorized'), + color: 'red' as const, + } + } + }, [selectedSubscription, subscriptions.length, t]) const statusConfig = getStatusConfig() @@ -67,11 +80,16 @@ const AuthenticationMenu: FC = ({ setIsOpen(false) }, [onConfigure]) - const handleRemove = useCallback(() => { - onRemove() + const handleRemove = useCallback((subscriptionId: string) => { + onRemove(subscriptionId) setIsOpen(false) }, [onRemove]) + const handleSelectSubscription = useCallback((subscriptionId: string) => { + onSubscriptionSelect(subscriptionId) + setIsOpen(false) + }, [onSubscriptionSelect]) + return (
+ ) + })} +
+
+ + )} + + {/* Add new subscription */} - {subscription && subscription.status === 'authorized' && ( - + + {/* Remove subscription */} + {selectedSubscription && ( + <> +
+ + )}
diff --git a/web/app/components/workflow/types.ts b/web/app/components/workflow/types.ts index 1f5ec739cf..7041acc701 100644 --- a/web/app/components/workflow/types.ts +++ b/web/app/components/workflow/types.ts @@ -97,6 +97,7 @@ export type CommonNodeType = { retry_config?: WorkflowRetryConfig default_value?: DefaultValueForm[] credential_id?: string + subscription_id?: string _dimmed?: boolean } & T & Partial> diff --git a/web/i18n/en-US/workflow.ts b/web/i18n/en-US/workflow.ts index 0bd35f9daa..98d8fa165c 100644 --- a/web/i18n/en-US/workflow.ts +++ b/web/i18n/en-US/workflow.ts @@ -727,6 +727,12 @@ const translation = { triggerPlugin: { authorized: 'Authorized', notConfigured: 'Not Configured', + notAuthorized: 'Not Authorized', + selectSubscription: 'Select Subscription', + availableSubscriptions: 'Available Subscriptions', + addSubscription: 'Add New Subscription', + removeSubscription: 'Remove Subscription', + subscriptionRemoved: 'Subscription removed successfully', error: 'Error', configuration: 'Configuration', remove: 'Remove',