diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index d17f22fd95..d585b66964 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -154,7 +154,7 @@ class TriggerSubscriptionBuilderUpdateApi(Resource): raise -class TriggerSubscriptionBuilderRequestLogsApi(Resource): +class TriggerSubscriptionBuilderLogsApi(Resource): @setup_required @login_required @account_initialization_required @@ -165,9 +165,10 @@ class TriggerSubscriptionBuilderRequestLogsApi(Resource): assert user.current_tenant_id is not None try: - return jsonable_encoder(TriggerSubscriptionBuilderService.list_request_logs(subscription_builder_id)) + logs = TriggerSubscriptionBuilderService.list_logs(subscription_builder_id) + return jsonable_encoder({"logs": [log.model_dump(mode="json") for log in logs]}) except Exception as e: - logger.exception("Error getting request logs for provider credential", exc_info=e) + logger.exception("Error getting request logs for subscription builder", exc_info=e) raise @@ -494,8 +495,8 @@ api.add_resource( "/workspaces/current/trigger-provider//subscriptions/builder/build/", ) api.add_resource( - TriggerSubscriptionBuilderRequestLogsApi, - "/workspaces/current/trigger-provider//subscriptions/builder/request-logs/", + TriggerSubscriptionBuilderLogsApi, + "/workspaces/current/trigger-provider//subscriptions/builder/logs/", ) diff --git a/api/models/workflow.py b/api/models/workflow.py index 949393d049..bdcea54506 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1447,7 +1447,7 @@ class WorkflowPluginTrigger(Base): __tablename__ = "workflow_plugin_triggers" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"), - sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id"), + sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "trigger_id"), sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"), ) diff --git a/api/services/trigger/trigger_subscription_builder_service.py b/api/services/trigger/trigger_subscription_builder_service.py index 745c39370a..4e1a26a28c 100644 --- a/api/services/trigger/trigger_subscription_builder_service.py +++ b/api/services/trigger/trigger_subscription_builder_service.py @@ -2,6 +2,7 @@ import json import logging import uuid from collections.abc import Mapping +from datetime import datetime from typing import Any from flask import Request, Response @@ -251,18 +252,41 @@ class TriggerSubscriptionBuilderService: return None @classmethod - def append_request_log(cls, endpoint_id: str, request: Request, response: Response) -> None: - """ - Append the validation request log to Redis. - """ - pass + def append_log(cls, endpoint_id: str, request: Request, response: Response) -> None: + """Append validation request log to Redis.""" + log = RequestLog( + id=str(uuid.uuid4()), + endpoint=endpoint_id, + request={ + "method": request.method, + "url": request.url, + "headers": dict(request.headers), + "data": request.get_data(as_text=True), + }, + response={ + "status_code": response.status_code, + "headers": dict(response.headers), + "data": response.get_data(as_text=True), + }, + created_at=datetime.now(), + ) + + key = f"trigger:subscription:validation:logs:{endpoint_id}" + logs = json.loads(redis_client.get(key) or "[]") + logs.append(log.model_dump(mode="json")) + + # Keep last N logs + logs = logs[-cls.__VALIDATION_REQUEST_CACHE_COUNT__ :] + redis_client.setex(key, cls.__VALIDATION_REQUEST_CACHE_EXPIRE_MS__, json.dumps(logs, default=str)) @classmethod - def list_request_logs(cls, endpoint_id: str) -> list[RequestLog]: - """ - List the request logs for a validation endpoint. - """ - return [] + def list_logs(cls, endpoint_id: str) -> list[RequestLog]: + """List request logs for validation endpoint.""" + key = f"trigger:subscription:validation:logs:{endpoint_id}" + logs_json = redis_client.get(key) + if not logs_json: + return [] + return [RequestLog.model_validate(log) for log in json.loads(logs_json)] @classmethod def process_builder_validation_endpoint(cls, endpoint_id: str, request: Request) -> Response | None: @@ -288,5 +312,5 @@ class TriggerSubscriptionBuilderService: subscription=subscription_builder.to_subscription(), ) # append the request log - cls.append_request_log(endpoint_id, request, response.response) + cls.append_log(endpoint_id, request, response.response) return response.response diff --git a/api/services/trigger_service.py b/api/services/trigger_service.py index 06855dd3fc..b24ff66e39 100644 --- a/api/services/trigger_service.py +++ b/api/services/trigger_service.py @@ -34,11 +34,9 @@ class TriggerService: cls, subscription: TriggerSubscription, trigger: TriggerEntity, request: Request ) -> None: """Process triggered workflows.""" - # 1. Find associated WorkflowPluginTriggers - trigger_id = f"{subscription.provider_id}:{trigger.identity.name}" - plugin_triggers = cls._get_plugin_triggers(trigger_id) - if not plugin_triggers: + subscribers = cls._get_subscriber_triggers(subscription=subscription, trigger=trigger) + if not subscribers: logger.warning( "No workflows found for trigger '%s' in subscription '%s'", trigger.identity.name, @@ -61,7 +59,7 @@ class TriggerService: logger.error("Tenant owner not found for tenant %s", subscription.tenant_id) return - for plugin_trigger in plugin_triggers: + for plugin_trigger in subscribers: # 2. Get workflow workflow = session.scalar( select(Workflow) @@ -145,16 +143,13 @@ class TriggerService: ) if dispatch_response.triggers: - # Process triggers asynchronously to avoid blocking - from tasks.trigger_processing_tasks import process_triggers_async - - # Serialize and store the request request_id = f"trigger_request_{uuid.uuid4().hex}" serialized_request = serialize_request(request) storage.save(f"triggers/{request_id}", serialized_request) - # Queue async task with just the request ID - process_triggers_async.delay( + from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async + + dispatch_triggered_workflows_async( endpoint_id=endpoint_id, provider_id=subscription.provider_id, subscription_id=subscription.id, @@ -163,7 +158,7 @@ class TriggerService: ) logger.info( - "Queued async processing for %d triggers on endpoint %s with request_id %s", + "Queued async dispatching for %d triggers on endpoint %s with request_id %s", len(dispatch_response.triggers), endpoint_id, request_id, @@ -172,15 +167,19 @@ class TriggerService: return dispatch_response.response @classmethod - def _get_plugin_triggers(cls, trigger_id: str) -> list[WorkflowPluginTrigger]: - """Get WorkflowPluginTriggers for a trigger_id.""" - with Session(db.engine) as session: - triggers = session.scalars( + def _get_subscriber_triggers( + cls, subscription: TriggerSubscription, trigger: TriggerEntity + ) -> list[WorkflowPluginTrigger]: + """Get WorkflowPluginTriggers for a subscription and trigger.""" + with Session(db.engine, expire_on_commit=False) as session: + subscribers = session.scalars( select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.trigger_id == trigger_id, + WorkflowPluginTrigger.tenant_id == subscription.tenant_id, + WorkflowPluginTrigger.subscription_id == subscription.id, + WorkflowPluginTrigger.trigger_id == trigger.identity.name, ) ).all() - return list(triggers) + return list(subscribers) @classmethod def _store_trigger_data( diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index ca5cad8251..4b5add56e0 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -25,7 +25,7 @@ TRIGGER_QUEUE = "triggered_workflow_dispatcher" @shared_task(queue=TRIGGER_QUEUE, bind=True, max_retries=3) -def process_triggers_async( +def dispatch_triggered_workflows_async( self, endpoint_id: str, provider_id: str, @@ -34,21 +34,21 @@ def process_triggers_async( request_id: str, ) -> dict: """ - Process triggers asynchronously. + Dispatch triggers asynchronously. Args: endpoint_id: Endpoint ID provider_id: Provider ID subscription_id: Subscription ID - triggers: List of triggers to process + triggers: List of triggers to dispatch request_id: Unique ID of the stored request Returns: - dict: Execution result with status and processed trigger count + dict: Execution result with status and dispatched trigger count """ try: logger.info( - "Starting async trigger processing for endpoint=%s, triggers=%s, request_id=%s", + "Starting async trigger dispatching for endpoint=%s, triggers=%s, request_id=%s", endpoint_id, triggers, request_id, @@ -70,21 +70,20 @@ def process_triggers_async( return {"status": "failed", "error": "Subscription not found"} # Get controller - provider_id_obj = TriggerProviderID(provider_id) - controller = TriggerManager.get_trigger_provider(subscription.tenant_id, provider_id_obj) + controller = TriggerManager.get_trigger_provider(subscription.tenant_id, TriggerProviderID(provider_id)) if not controller: logger.error("Controller not found for provider: %s", provider_id) return {"status": "failed", "error": "Controller not found"} - # Process each trigger - processed_count = 0 - for trigger_name in triggers: + # Dispatch each trigger + dispatched_count = 0 + for trigger in triggers: try: - trigger = controller.get_trigger(trigger_name) + trigger = controller.get_trigger(trigger) if trigger is None: logger.error( "Trigger '%s' not found in provider '%s'", - trigger_name, + trigger, provider_id, ) continue @@ -94,20 +93,20 @@ def process_triggers_async( trigger=trigger, request=request, ) - processed_count += 1 + dispatched_count += 1 except Exception: logger.exception( - "Failed to process trigger '%s' for subscription %s", - trigger_name, + "Failed to dispatch trigger '%s' for subscription %s", + trigger, subscription_id, ) # Continue processing other triggers even if one fails continue logger.info( - "Completed async trigger processing: processed %d/%d triggers", - processed_count, + "Completed async trigger dispatching: processed %d/%d triggers", + dispatched_count, len(triggers), ) @@ -118,13 +117,13 @@ def process_triggers_async( return { "status": "completed", - "processed_count": processed_count, + "dispatched_count": dispatched_count, "total_count": len(triggers), } except Exception as e: logger.exception( - "Error in async trigger processing for endpoint %s", + "Error in async trigger dispatching for endpoint %s", endpoint_id, ) # Retry the task if not exceeded max retries