diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 2b0b908114..fc48c8b3b0 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -10,7 +10,12 @@ import logging from celery import shared_task from sqlalchemy.orm import Session +from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager +from core.trigger.utils.encryption import ( + create_trigger_provider_encrypter_for_properties, + create_trigger_provider_encrypter_for_subscription, +) from extensions.ext_database import db from extensions.ext_storage import storage from models.provider_ids import TriggerProviderID @@ -72,17 +77,35 @@ def dispatch_triggered_workflows_async( with Session(db.engine) as session: # Get subscription - subscription = session.query(TriggerSubscription).filter_by(id=subscription_id).first() + subscription: TriggerSubscription | None = ( + session.query(TriggerSubscription).filter_by(id=subscription_id).first() + ) if not subscription: logger.error("Subscription not found: %s", subscription_id) return {"status": "failed", "error": "Subscription not found"} # Get controller - controller = TriggerManager.get_trigger_provider(subscription.tenant_id, TriggerProviderID(provider_id)) + controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( + subscription.tenant_id, TriggerProviderID(provider_id) + ) if not controller: logger.error("Controller not found for provider: %s", provider_id) return {"status": "failed", "error": "Controller not found"} + credential_encrypter, _ = create_trigger_provider_encrypter_for_subscription( + tenant_id=subscription.tenant_id, + controller=controller, + subscription=subscription, + ) + subscription.credentials = credential_encrypter.decrypt(subscription.credentials) + + properties_encrypter, _ = create_trigger_provider_encrypter_for_properties( + tenant_id=subscription.tenant_id, + controller=controller, + subscription=subscription, + ) + subscription.properties = properties_encrypter.decrypt(subscription.properties) + # Dispatch each trigger dispatched_count = 0 for event_name in events: