From f1e513830cdfee425ff6d1f5c9024e8496a6ac3d Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 5 Nov 2025 16:35:03 +0800 Subject: [PATCH] feat: implement logging for failed trigger invocations in workflow processing --- api/tasks/trigger_processing_tasks.py | 191 ++++++++++++++++++++++---- 1 file changed, 166 insertions(+), 25 deletions(-) diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 75c36ef677..1b2d36ca8f 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -5,8 +5,10 @@ These tasks handle trigger workflow execution asynchronously to avoid blocking the main request thread. """ +import json import logging from collections.abc import Mapping, Sequence +from datetime import UTC, datetime from typing import Any from celery import shared_task @@ -16,24 +18,27 @@ from sqlalchemy.orm import Session from core.app.entities.app_invoke_entities import InvokeFrom from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.entities.request import TriggerInvokeEventResponse +from core.plugin.impl.exc import PluginInvokeError from core.trigger.debug.event_bus import TriggerDebugEventBus from core.trigger.debug.events import PluginTriggerDebugEvent, build_plugin_pool_key from core.trigger.entities.entities import TriggerProviderEntity from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager -from core.workflow.enums import NodeType +from core.workflow.enums import NodeType, WorkflowExecutionStatus from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData from extensions.ext_database import db +from models.enums import AppTriggerType, CreatorUserRole, WorkflowRunTriggeredFrom, WorkflowTriggerStatus from models.model import EndUser from models.provider_ids import TriggerProviderID -from models.trigger import TriggerSubscription, WorkflowPluginTrigger -from models.workflow import Workflow +from models.trigger import TriggerSubscription, WorkflowPluginTrigger, WorkflowTriggerLog +from models.workflow import Workflow, WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowRun from services.async_workflow_service import AsyncWorkflowService from services.end_user_service import EndUserService from services.trigger.trigger_provider_service import TriggerProviderService from services.trigger.trigger_request_service import TriggerHttpRequestCachingService from services.trigger.trigger_subscription_operator_service import TriggerSubscriptionOperatorService from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData, PluginTriggerMetadata +from services.workflow.queue_dispatcher import QueueDispatcherManager logger = logging.getLogger(__name__) @@ -106,6 +111,110 @@ def _get_latest_workflows_by_app_ids( return {w.app_id: w for w in workflows} +def _record_trigger_failure_log( + *, + session: Session, + workflow: Workflow, + plugin_trigger: WorkflowPluginTrigger, + subscription: TriggerSubscription, + trigger_metadata: PluginTriggerMetadata, + end_user: EndUser | None, + error_message: str, + event_name: str, + request_id: str, +) -> None: + """ + Persist a workflow run, workflow app log, and trigger log entry for failed trigger invocations. + """ + now = datetime.now(UTC) + if end_user: + created_by_role = CreatorUserRole.END_USER + created_by = end_user.id + else: + created_by_role = CreatorUserRole.ACCOUNT + created_by = subscription.user_id + + failure_inputs = { + "event_name": event_name, + "subscription_id": subscription.id, + "request_id": request_id, + "plugin_trigger_id": plugin_trigger.id, + } + + workflow_run = WorkflowRun( + tenant_id=workflow.tenant_id, + app_id=workflow.app_id, + workflow_id=workflow.id, + type=workflow.type, + triggered_from=WorkflowRunTriggeredFrom.PLUGIN.value, + version=workflow.version, + graph=workflow.graph, + inputs=json.dumps(failure_inputs), + status=WorkflowExecutionStatus.FAILED.value, + outputs="{}", + error=error_message, + elapsed_time=0.0, + total_tokens=0, + total_steps=0, + created_by_role=created_by_role.value, + created_by=created_by, + created_at=now, + finished_at=now, + exceptions_count=0, + ) + session.add(workflow_run) + session.flush() + + workflow_app_log = WorkflowAppLog( + tenant_id=workflow.tenant_id, + app_id=workflow.app_id, + workflow_id=workflow.id, + workflow_run_id=workflow_run.id, + created_from=WorkflowAppLogCreatedFrom.SERVICE_API.value, + created_by_role=created_by_role.value, + created_by=created_by, + ) + session.add(workflow_app_log) + + dispatcher = QueueDispatcherManager.get_dispatcher(subscription.tenant_id) + queue_name = dispatcher.get_queue_name() + + trigger_data = PluginTriggerData( + app_id=plugin_trigger.app_id, + tenant_id=subscription.tenant_id, + workflow_id=workflow.id, + root_node_id=plugin_trigger.node_id, + inputs={}, + trigger_metadata=trigger_metadata, + plugin_id=subscription.provider_id, + endpoint_id=subscription.endpoint_id, + ) + + trigger_log = WorkflowTriggerLog( + tenant_id=workflow.tenant_id, + app_id=workflow.app_id, + workflow_id=workflow.id, + workflow_run_id=workflow_run.id, + root_node_id=plugin_trigger.node_id, + trigger_metadata=trigger_metadata.model_dump_json(), + trigger_type=AppTriggerType.TRIGGER_PLUGIN, + trigger_data=trigger_data.model_dump_json(), + inputs=json.dumps({}), + status=WorkflowTriggerStatus.FAILED, + error=error_message, + queue_name=queue_name, + retry_count=0, + created_by_role=created_by_role.value, + created_by=created_by, + triggered_at=now, + finished_at=now, + elapsed_time=0.0, + total_tokens=0, + ) + session.add(trigger_log) + session.commit() + + def dispatch_triggered_workflow( user_id: str, subscription: TriggerSubscription, @@ -169,22 +278,61 @@ def dispatch_triggered_workflow( continue # invoke triger - node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node) - invoke_response: TriggerInvokeEventResponse = TriggerManager.invoke_trigger_event( - tenant_id=subscription.tenant_id, - user_id=user_id, - provider_id=TriggerProviderID(subscription.provider_id), + trigger_metadata = PluginTriggerMetadata( + plugin_unique_identifier=provider_controller.plugin_unique_identifier or "", + endpoint_id=subscription.endpoint_id, + provider_id=subscription.provider_id, event_name=event_name, - parameters=node_data.resolve_parameters( - parameter_schemas=provider_controller.get_event_parameters(event_name=event_name) - ), - credentials=subscription.credentials, - credential_type=CredentialType.of(subscription.credential_type), - subscription=subscription.to_entity(), - request=request, - payload=payload, + icon_filename=trigger_entity.identity.icon or "", + icon_dark_filename=trigger_entity.identity.icon_dark or "", ) - if invoke_response.cancelled: + + node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node) + invoke_response: TriggerInvokeEventResponse | None = None + try: + invoke_response = TriggerManager.invoke_trigger_event( + tenant_id=subscription.tenant_id, + user_id=user_id, + provider_id=TriggerProviderID(subscription.provider_id), + event_name=event_name, + parameters=node_data.resolve_parameters( + parameter_schemas=provider_controller.get_event_parameters(event_name=event_name) + ), + credentials=subscription.credentials, + credential_type=CredentialType.of(subscription.credential_type), + subscription=subscription.to_entity(), + request=request, + payload=payload, + ) + except PluginInvokeError as e: + error_message = e.to_user_friendly_error(plugin_name=trigger_entity.identity.name) + try: + end_user = end_users.get(plugin_trigger.app_id) + _record_trigger_failure_log( + session=session, + workflow=workflow, + plugin_trigger=plugin_trigger, + subscription=subscription, + trigger_metadata=trigger_metadata, + end_user=end_user, + error_message=error_message, + event_name=event_name, + request_id=request_id, + ) + except Exception: + logger.exception( + "Failed to record trigger failure log for app %s", + plugin_trigger.app_id, + ) + continue + except Exception: + logger.exception( + "Failed to invoke trigger event for app %s", + plugin_trigger.app_id, + ) + continue + + if invoke_response is not None and invoke_response.cancelled: logger.info( "Trigger ignored for app %s with trigger event %s", plugin_trigger.app_id, @@ -201,14 +349,7 @@ def dispatch_triggered_workflow( plugin_id=subscription.provider_id, endpoint_id=subscription.endpoint_id, inputs=invoke_response.variables, - trigger_metadata=PluginTriggerMetadata( - plugin_unique_identifier=provider_controller.plugin_unique_identifier or "", - endpoint_id=subscription.endpoint_id, - provider_id=subscription.provider_id, - event_name=event_name, - icon_filename=trigger_entity.identity.icon or "", - icon_dark_filename=trigger_entity.identity.icon_dark or "", - ), + trigger_metadata=trigger_metadata, ) # Trigger async workflow