feat(trigger): enhance trigger subscription management and cache handling

- Added `name` parameter to `TriggerSubscriptionBuilderCreateApi` for better subscription identification.
- Implemented `delete_cache_for_subscription` function to clear cache associated with trigger subscriptions.
- Updated `WorkflowPluginTriggerService` to check for existing subscriptions before creating new plugin triggers, improving error handling.
- Refactored `TriggerProviderService` to utilize the new cache deletion method during provider deletion.

This improves the overall management of trigger subscriptions and enhances cache efficiency.
This commit is contained in:
Harry 2025-09-05 14:22:20 +08:00
parent 81ef7343d4
commit 0371d71409
5 changed files with 34 additions and 11 deletions

View File

@ -82,6 +82,7 @@ class TriggerSubscriptionBuilderCreateApi(Resource):
tenant_id=user.current_tenant_id,
user_id=user.id,
provider_id=TriggerProviderID(provider),
name=args.get("name", None),
credentials=credentials,
credential_type=credential_type,
credential_expires_at=-1,

View File

@ -65,6 +65,13 @@ def create_trigger_provider_encrypter_for_subscription(
)
return encrypter, cache
def delete_cache_for_subscription(tenant_id: str, provider_id: str, subscription_id: str):
cache = TriggerProviderCredentialsCache(
tenant_id=tenant_id,
provider_id=provider_id,
credential_id=subscription_id,
)
cache.delete()
def create_trigger_provider_encrypter_for_properties(
tenant_id: str,

View File

@ -2,6 +2,7 @@ import json
import logging
from collections.abc import Mapping
from typing import Any, Optional
import uuid
from sqlalchemy import desc
from sqlalchemy.orm import Session
@ -23,6 +24,7 @@ from core.trigger.utils.encryption import (
create_trigger_provider_encrypter_for_properties,
create_trigger_provider_encrypter_for_subscription,
create_trigger_provider_oauth_encrypter,
delete_cache_for_subscription,
)
from extensions.ext_database import db
from extensions.ext_redis import redis_client
@ -82,6 +84,7 @@ class TriggerProviderService:
parameters: Mapping[str, Any],
properties: Mapping[str, Any],
credentials: Mapping[str, str],
subscription_id: Optional[str] = None,
credential_expires_at: int = -1,
expires_at: int = -1,
) -> dict:
@ -139,6 +142,7 @@ class TriggerProviderService:
# Create provider record
db_provider = TriggerSubscription(
id=subscription_id or str(uuid.uuid4()),
tenant_id=tenant_id,
user_id=user_id,
name=name,
@ -175,17 +179,14 @@ class TriggerProviderService:
if not db_provider:
raise ValueError(f"Trigger provider subscription {subscription_id} not found")
provider_controller = TriggerManager.get_trigger_provider(tenant_id, TriggerProviderID(db_provider.provider_id))
# Clear cache
_, cache = create_trigger_provider_encrypter_for_subscription(
tenant_id=tenant_id,
controller=provider_controller,
subscription=db_provider,
)
session.delete(db_provider)
cache.delete()
delete_cache_for_subscription(
tenant_id=tenant_id,
provider_id=db_provider.provider_id,
subscription_id=db_provider.id,
)
@classmethod
def refresh_oauth_token(
cls,

View File

@ -96,6 +96,7 @@ class TriggerSubscriptionBuilderService:
if credential_type == CredentialType.UNAUTHORIZED:
# manually create
TriggerProviderService.add_trigger_provider(
subscription_id=subscription_builder.id,
tenant_id=tenant_id,
user_id=user_id,
name=subscription_builder.name,
@ -120,6 +121,7 @@ class TriggerSubscriptionBuilderService:
)
TriggerProviderService.add_trigger_provider(
subscription_id=subscription_builder.id,
tenant_id=tenant_id,
user_id=user_id,
name=subscription_builder.name,
@ -145,6 +147,7 @@ class TriggerSubscriptionBuilderService:
credential_type: CredentialType,
credential_expires_at: int,
expires_at: int,
name: str | None,
) -> SubscriptionBuilder:
"""
Add a new trigger subscription validation.
@ -157,7 +160,7 @@ class TriggerSubscriptionBuilderService:
subscription_id = str(uuid.uuid4())
subscription_builder = SubscriptionBuilder(
id=subscription_id,
name="",
name=name or "",
endpoint_id=subscription_id,
tenant_id=tenant_id,
user_id=user_id,

View File

@ -5,6 +5,7 @@ from sqlalchemy.orm import Session
from werkzeug.exceptions import BadRequest, NotFound
from extensions.ext_database import db
from models.trigger import TriggerSubscription
from models.workflow import WorkflowPluginTrigger
@ -53,6 +54,16 @@ class WorkflowPluginTriggerService:
if existing_trigger:
raise BadRequest("Plugin trigger already exists for this app and node")
# Check if subscription exists
subscription = session.scalar(
select(TriggerSubscription).where(
TriggerSubscription.id == subscription_id,
)
)
if not subscription:
raise BadRequest("Subscription not found")
# Create new plugin trigger
plugin_trigger = WorkflowPluginTrigger(
app_id=app_id,
@ -340,7 +351,7 @@ class WorkflowPluginTriggerService:
)
if not plugin_trigger:
raise NotFound("Plugin trigger not found")
return
session.delete(plugin_trigger)