From 0371d71409dd1ff8aafd216e011b9e486d55a58c Mon Sep 17 00:00:00 2001 From: Harry Date: Fri, 5 Sep 2025 14:22:20 +0800 Subject: [PATCH] feat(trigger): enhance trigger subscription management and cache handling - Added `name` parameter to `TriggerSubscriptionBuilderCreateApi` for better subscription identification. - Implemented `delete_cache_for_subscription` function to clear cache associated with trigger subscriptions. - Updated `WorkflowPluginTriggerService` to check for existing subscriptions before creating new plugin triggers, improving error handling. - Refactored `TriggerProviderService` to utilize the new cache deletion method during provider deletion. This improves the overall management of trigger subscriptions and enhances cache efficiency. --- .../console/workspace/trigger_providers.py | 1 + api/core/trigger/utils/encryption.py | 7 +++++++ .../trigger/trigger_provider_service.py | 19 ++++++++++--------- .../trigger_subscription_builder_service.py | 5 ++++- .../workflow_plugin_trigger_service.py | 13 ++++++++++++- 5 files changed, 34 insertions(+), 11 deletions(-) diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index eb940ca21b..aa64461d08 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -82,6 +82,7 @@ class TriggerSubscriptionBuilderCreateApi(Resource): tenant_id=user.current_tenant_id, user_id=user.id, provider_id=TriggerProviderID(provider), + name=args.get("name", None), credentials=credentials, credential_type=credential_type, credential_expires_at=-1, diff --git a/api/core/trigger/utils/encryption.py b/api/core/trigger/utils/encryption.py index 0a081b1bdb..463844a495 100644 --- a/api/core/trigger/utils/encryption.py +++ b/api/core/trigger/utils/encryption.py @@ -65,6 +65,13 @@ def create_trigger_provider_encrypter_for_subscription( ) return encrypter, cache +def delete_cache_for_subscription(tenant_id: str, provider_id: str, subscription_id: str): + cache = TriggerProviderCredentialsCache( + tenant_id=tenant_id, + provider_id=provider_id, + credential_id=subscription_id, + ) + cache.delete() def create_trigger_provider_encrypter_for_properties( tenant_id: str, diff --git a/api/services/trigger/trigger_provider_service.py b/api/services/trigger/trigger_provider_service.py index 95794e0b8e..8713443bcd 100644 --- a/api/services/trigger/trigger_provider_service.py +++ b/api/services/trigger/trigger_provider_service.py @@ -2,6 +2,7 @@ import json import logging from collections.abc import Mapping from typing import Any, Optional +import uuid from sqlalchemy import desc from sqlalchemy.orm import Session @@ -23,6 +24,7 @@ from core.trigger.utils.encryption import ( create_trigger_provider_encrypter_for_properties, create_trigger_provider_encrypter_for_subscription, create_trigger_provider_oauth_encrypter, + delete_cache_for_subscription, ) from extensions.ext_database import db from extensions.ext_redis import redis_client @@ -82,6 +84,7 @@ class TriggerProviderService: parameters: Mapping[str, Any], properties: Mapping[str, Any], credentials: Mapping[str, str], + subscription_id: Optional[str] = None, credential_expires_at: int = -1, expires_at: int = -1, ) -> dict: @@ -139,6 +142,7 @@ class TriggerProviderService: # Create provider record db_provider = TriggerSubscription( + id=subscription_id or str(uuid.uuid4()), tenant_id=tenant_id, user_id=user_id, name=name, @@ -175,17 +179,14 @@ class TriggerProviderService: if not db_provider: raise ValueError(f"Trigger provider subscription {subscription_id} not found") - provider_controller = TriggerManager.get_trigger_provider(tenant_id, TriggerProviderID(db_provider.provider_id)) # Clear cache - _, cache = create_trigger_provider_encrypter_for_subscription( - tenant_id=tenant_id, - controller=provider_controller, - subscription=db_provider, - ) - session.delete(db_provider) - cache.delete() - + delete_cache_for_subscription( + tenant_id=tenant_id, + provider_id=db_provider.provider_id, + subscription_id=db_provider.id, + ) + @classmethod def refresh_oauth_token( cls, diff --git a/api/services/trigger/trigger_subscription_builder_service.py b/api/services/trigger/trigger_subscription_builder_service.py index 4e1a26a28c..bdf4d37253 100644 --- a/api/services/trigger/trigger_subscription_builder_service.py +++ b/api/services/trigger/trigger_subscription_builder_service.py @@ -96,6 +96,7 @@ class TriggerSubscriptionBuilderService: if credential_type == CredentialType.UNAUTHORIZED: # manually create TriggerProviderService.add_trigger_provider( + subscription_id=subscription_builder.id, tenant_id=tenant_id, user_id=user_id, name=subscription_builder.name, @@ -120,6 +121,7 @@ class TriggerSubscriptionBuilderService: ) TriggerProviderService.add_trigger_provider( + subscription_id=subscription_builder.id, tenant_id=tenant_id, user_id=user_id, name=subscription_builder.name, @@ -145,6 +147,7 @@ class TriggerSubscriptionBuilderService: credential_type: CredentialType, credential_expires_at: int, expires_at: int, + name: str | None, ) -> SubscriptionBuilder: """ Add a new trigger subscription validation. @@ -157,7 +160,7 @@ class TriggerSubscriptionBuilderService: subscription_id = str(uuid.uuid4()) subscription_builder = SubscriptionBuilder( id=subscription_id, - name="", + name=name or "", endpoint_id=subscription_id, tenant_id=tenant_id, user_id=user_id, diff --git a/api/services/workflow_plugin_trigger_service.py b/api/services/workflow_plugin_trigger_service.py index b171e73709..9124e3653f 100644 --- a/api/services/workflow_plugin_trigger_service.py +++ b/api/services/workflow_plugin_trigger_service.py @@ -5,6 +5,7 @@ from sqlalchemy.orm import Session from werkzeug.exceptions import BadRequest, NotFound from extensions.ext_database import db +from models.trigger import TriggerSubscription from models.workflow import WorkflowPluginTrigger @@ -53,6 +54,16 @@ class WorkflowPluginTriggerService: if existing_trigger: raise BadRequest("Plugin trigger already exists for this app and node") + # Check if subscription exists + subscription = session.scalar( + select(TriggerSubscription).where( + TriggerSubscription.id == subscription_id, + ) + ) + + if not subscription: + raise BadRequest("Subscription not found") + # Create new plugin trigger plugin_trigger = WorkflowPluginTrigger( app_id=app_id, @@ -340,7 +351,7 @@ class WorkflowPluginTriggerService: ) if not plugin_trigger: - raise NotFound("Plugin trigger not found") + return session.delete(plugin_trigger)