feat: implement logging for failed trigger invocations in workflow processing

This commit is contained in:
Harry 2025-11-05 16:35:03 +08:00
parent 7de533a643
commit f1e513830c
1 changed files with 166 additions and 25 deletions

View File

@ -5,8 +5,10 @@ These tasks handle trigger workflow execution asynchronously
to avoid blocking the main request thread.
"""
import json
import logging
from collections.abc import Mapping, Sequence
from datetime import UTC, datetime
from typing import Any
from celery import shared_task
@ -16,24 +18,27 @@ from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import InvokeFrom
from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.entities.request import TriggerInvokeEventResponse
from core.plugin.impl.exc import PluginInvokeError
from core.trigger.debug.event_bus import TriggerDebugEventBus
from core.trigger.debug.events import PluginTriggerDebugEvent, build_plugin_pool_key
from core.trigger.entities.entities import TriggerProviderEntity
from core.trigger.provider import PluginTriggerProviderController
from core.trigger.trigger_manager import TriggerManager
from core.workflow.enums import NodeType
from core.workflow.enums import NodeType, WorkflowExecutionStatus
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
from extensions.ext_database import db
from models.enums import AppTriggerType, CreatorUserRole, WorkflowRunTriggeredFrom, WorkflowTriggerStatus
from models.model import EndUser
from models.provider_ids import TriggerProviderID
from models.trigger import TriggerSubscription, WorkflowPluginTrigger
from models.workflow import Workflow
from models.trigger import TriggerSubscription, WorkflowPluginTrigger, WorkflowTriggerLog
from models.workflow import Workflow, WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowRun
from services.async_workflow_service import AsyncWorkflowService
from services.end_user_service import EndUserService
from services.trigger.trigger_provider_service import TriggerProviderService
from services.trigger.trigger_request_service import TriggerHttpRequestCachingService
from services.trigger.trigger_subscription_operator_service import TriggerSubscriptionOperatorService
from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData, PluginTriggerMetadata
from services.workflow.queue_dispatcher import QueueDispatcherManager
logger = logging.getLogger(__name__)
@ -106,6 +111,110 @@ def _get_latest_workflows_by_app_ids(
return {w.app_id: w for w in workflows}
def _record_trigger_failure_log(
*,
session: Session,
workflow: Workflow,
plugin_trigger: WorkflowPluginTrigger,
subscription: TriggerSubscription,
trigger_metadata: PluginTriggerMetadata,
end_user: EndUser | None,
error_message: str,
event_name: str,
request_id: str,
) -> None:
"""
Persist a workflow run, workflow app log, and trigger log entry for failed trigger invocations.
"""
now = datetime.now(UTC)
if end_user:
created_by_role = CreatorUserRole.END_USER
created_by = end_user.id
else:
created_by_role = CreatorUserRole.ACCOUNT
created_by = subscription.user_id
failure_inputs = {
"event_name": event_name,
"subscription_id": subscription.id,
"request_id": request_id,
"plugin_trigger_id": plugin_trigger.id,
}
workflow_run = WorkflowRun(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
workflow_id=workflow.id,
type=workflow.type,
triggered_from=WorkflowRunTriggeredFrom.PLUGIN.value,
version=workflow.version,
graph=workflow.graph,
inputs=json.dumps(failure_inputs),
status=WorkflowExecutionStatus.FAILED.value,
outputs="{}",
error=error_message,
elapsed_time=0.0,
total_tokens=0,
total_steps=0,
created_by_role=created_by_role.value,
created_by=created_by,
created_at=now,
finished_at=now,
exceptions_count=0,
)
session.add(workflow_run)
session.flush()
workflow_app_log = WorkflowAppLog(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
workflow_id=workflow.id,
workflow_run_id=workflow_run.id,
created_from=WorkflowAppLogCreatedFrom.SERVICE_API.value,
created_by_role=created_by_role.value,
created_by=created_by,
)
session.add(workflow_app_log)
dispatcher = QueueDispatcherManager.get_dispatcher(subscription.tenant_id)
queue_name = dispatcher.get_queue_name()
trigger_data = PluginTriggerData(
app_id=plugin_trigger.app_id,
tenant_id=subscription.tenant_id,
workflow_id=workflow.id,
root_node_id=plugin_trigger.node_id,
inputs={},
trigger_metadata=trigger_metadata,
plugin_id=subscription.provider_id,
endpoint_id=subscription.endpoint_id,
)
trigger_log = WorkflowTriggerLog(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
workflow_id=workflow.id,
workflow_run_id=workflow_run.id,
root_node_id=plugin_trigger.node_id,
trigger_metadata=trigger_metadata.model_dump_json(),
trigger_type=AppTriggerType.TRIGGER_PLUGIN,
trigger_data=trigger_data.model_dump_json(),
inputs=json.dumps({}),
status=WorkflowTriggerStatus.FAILED,
error=error_message,
queue_name=queue_name,
retry_count=0,
created_by_role=created_by_role.value,
created_by=created_by,
triggered_at=now,
finished_at=now,
elapsed_time=0.0,
total_tokens=0,
)
session.add(trigger_log)
session.commit()
def dispatch_triggered_workflow(
user_id: str,
subscription: TriggerSubscription,
@ -169,22 +278,61 @@ def dispatch_triggered_workflow(
continue
# invoke triger
node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node)
invoke_response: TriggerInvokeEventResponse = TriggerManager.invoke_trigger_event(
tenant_id=subscription.tenant_id,
user_id=user_id,
provider_id=TriggerProviderID(subscription.provider_id),
trigger_metadata = PluginTriggerMetadata(
plugin_unique_identifier=provider_controller.plugin_unique_identifier or "",
endpoint_id=subscription.endpoint_id,
provider_id=subscription.provider_id,
event_name=event_name,
parameters=node_data.resolve_parameters(
parameter_schemas=provider_controller.get_event_parameters(event_name=event_name)
),
credentials=subscription.credentials,
credential_type=CredentialType.of(subscription.credential_type),
subscription=subscription.to_entity(),
request=request,
payload=payload,
icon_filename=trigger_entity.identity.icon or "",
icon_dark_filename=trigger_entity.identity.icon_dark or "",
)
if invoke_response.cancelled:
node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node)
invoke_response: TriggerInvokeEventResponse | None = None
try:
invoke_response = TriggerManager.invoke_trigger_event(
tenant_id=subscription.tenant_id,
user_id=user_id,
provider_id=TriggerProviderID(subscription.provider_id),
event_name=event_name,
parameters=node_data.resolve_parameters(
parameter_schemas=provider_controller.get_event_parameters(event_name=event_name)
),
credentials=subscription.credentials,
credential_type=CredentialType.of(subscription.credential_type),
subscription=subscription.to_entity(),
request=request,
payload=payload,
)
except PluginInvokeError as e:
error_message = e.to_user_friendly_error(plugin_name=trigger_entity.identity.name)
try:
end_user = end_users.get(plugin_trigger.app_id)
_record_trigger_failure_log(
session=session,
workflow=workflow,
plugin_trigger=plugin_trigger,
subscription=subscription,
trigger_metadata=trigger_metadata,
end_user=end_user,
error_message=error_message,
event_name=event_name,
request_id=request_id,
)
except Exception:
logger.exception(
"Failed to record trigger failure log for app %s",
plugin_trigger.app_id,
)
continue
except Exception:
logger.exception(
"Failed to invoke trigger event for app %s",
plugin_trigger.app_id,
)
continue
if invoke_response is not None and invoke_response.cancelled:
logger.info(
"Trigger ignored for app %s with trigger event %s",
plugin_trigger.app_id,
@ -201,14 +349,7 @@ def dispatch_triggered_workflow(
plugin_id=subscription.provider_id,
endpoint_id=subscription.endpoint_id,
inputs=invoke_response.variables,
trigger_metadata=PluginTriggerMetadata(
plugin_unique_identifier=provider_controller.plugin_unique_identifier or "",
endpoint_id=subscription.endpoint_id,
provider_id=subscription.provider_id,
event_name=event_name,
icon_filename=trigger_entity.identity.icon or "",
icon_dark_filename=trigger_entity.identity.icon_dark or "",
),
trigger_metadata=trigger_metadata,
)
# Trigger async workflow