refactor(trigger): rename request logs API and enhance logging functionality

- Renamed `TriggerSubscriptionBuilderRequestLogsApi` to `TriggerSubscriptionBuilderLogsApi` for clarity.
- Updated the API endpoint to retrieve logs for subscription builders.
- Enhanced logging functionality in `TriggerSubscriptionBuilderService` to append and list logs more effectively.
- Refactored trigger processing tasks to improve naming consistency and clarity in logging.

🤖 Generated with [Claude Code](https://claude.ai/code)
This commit is contained in:
Harry 2025-09-04 21:11:03 +08:00
parent 461829274a
commit eab03e63d4
5 changed files with 77 additions and 54 deletions

View File

@ -154,7 +154,7 @@ class TriggerSubscriptionBuilderUpdateApi(Resource):
raise raise
class TriggerSubscriptionBuilderRequestLogsApi(Resource): class TriggerSubscriptionBuilderLogsApi(Resource):
@setup_required @setup_required
@login_required @login_required
@account_initialization_required @account_initialization_required
@ -165,9 +165,10 @@ class TriggerSubscriptionBuilderRequestLogsApi(Resource):
assert user.current_tenant_id is not None assert user.current_tenant_id is not None
try: try:
return jsonable_encoder(TriggerSubscriptionBuilderService.list_request_logs(subscription_builder_id)) logs = TriggerSubscriptionBuilderService.list_logs(subscription_builder_id)
return jsonable_encoder({"logs": [log.model_dump(mode="json") for log in logs]})
except Exception as e: except Exception as e:
logger.exception("Error getting request logs for provider credential", exc_info=e) logger.exception("Error getting request logs for subscription builder", exc_info=e)
raise raise
@ -494,8 +495,8 @@ api.add_resource(
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/build/<path:subscription_builder_id>", "/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/build/<path:subscription_builder_id>",
) )
api.add_resource( api.add_resource(
TriggerSubscriptionBuilderRequestLogsApi, TriggerSubscriptionBuilderLogsApi,
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/request-logs/<path:subscription_builder_id>", "/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/logs/<path:subscription_builder_id>",
) )

View File

@ -1447,7 +1447,7 @@ class WorkflowPluginTrigger(Base):
__tablename__ = "workflow_plugin_triggers" __tablename__ = "workflow_plugin_triggers"
__table_args__ = ( __table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"), sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"),
sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id"), sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "trigger_id"),
sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"), sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"),
) )

View File

@ -2,6 +2,7 @@ import json
import logging import logging
import uuid import uuid
from collections.abc import Mapping from collections.abc import Mapping
from datetime import datetime
from typing import Any from typing import Any
from flask import Request, Response from flask import Request, Response
@ -251,18 +252,41 @@ class TriggerSubscriptionBuilderService:
return None return None
@classmethod @classmethod
def append_request_log(cls, endpoint_id: str, request: Request, response: Response) -> None: def append_log(cls, endpoint_id: str, request: Request, response: Response) -> None:
""" """Append validation request log to Redis."""
Append the validation request log to Redis. log = RequestLog(
""" id=str(uuid.uuid4()),
pass endpoint=endpoint_id,
request={
"method": request.method,
"url": request.url,
"headers": dict(request.headers),
"data": request.get_data(as_text=True),
},
response={
"status_code": response.status_code,
"headers": dict(response.headers),
"data": response.get_data(as_text=True),
},
created_at=datetime.now(),
)
key = f"trigger:subscription:validation:logs:{endpoint_id}"
logs = json.loads(redis_client.get(key) or "[]")
logs.append(log.model_dump(mode="json"))
# Keep last N logs
logs = logs[-cls.__VALIDATION_REQUEST_CACHE_COUNT__ :]
redis_client.setex(key, cls.__VALIDATION_REQUEST_CACHE_EXPIRE_MS__, json.dumps(logs, default=str))
@classmethod @classmethod
def list_request_logs(cls, endpoint_id: str) -> list[RequestLog]: def list_logs(cls, endpoint_id: str) -> list[RequestLog]:
""" """List request logs for validation endpoint."""
List the request logs for a validation endpoint. key = f"trigger:subscription:validation:logs:{endpoint_id}"
""" logs_json = redis_client.get(key)
return [] if not logs_json:
return []
return [RequestLog.model_validate(log) for log in json.loads(logs_json)]
@classmethod @classmethod
def process_builder_validation_endpoint(cls, endpoint_id: str, request: Request) -> Response | None: def process_builder_validation_endpoint(cls, endpoint_id: str, request: Request) -> Response | None:
@ -288,5 +312,5 @@ class TriggerSubscriptionBuilderService:
subscription=subscription_builder.to_subscription(), subscription=subscription_builder.to_subscription(),
) )
# append the request log # append the request log
cls.append_request_log(endpoint_id, request, response.response) cls.append_log(endpoint_id, request, response.response)
return response.response return response.response

View File

@ -34,11 +34,9 @@ class TriggerService:
cls, subscription: TriggerSubscription, trigger: TriggerEntity, request: Request cls, subscription: TriggerSubscription, trigger: TriggerEntity, request: Request
) -> None: ) -> None:
"""Process triggered workflows.""" """Process triggered workflows."""
# 1. Find associated WorkflowPluginTriggers
trigger_id = f"{subscription.provider_id}:{trigger.identity.name}"
plugin_triggers = cls._get_plugin_triggers(trigger_id)
if not plugin_triggers: subscribers = cls._get_subscriber_triggers(subscription=subscription, trigger=trigger)
if not subscribers:
logger.warning( logger.warning(
"No workflows found for trigger '%s' in subscription '%s'", "No workflows found for trigger '%s' in subscription '%s'",
trigger.identity.name, trigger.identity.name,
@ -61,7 +59,7 @@ class TriggerService:
logger.error("Tenant owner not found for tenant %s", subscription.tenant_id) logger.error("Tenant owner not found for tenant %s", subscription.tenant_id)
return return
for plugin_trigger in plugin_triggers: for plugin_trigger in subscribers:
# 2. Get workflow # 2. Get workflow
workflow = session.scalar( workflow = session.scalar(
select(Workflow) select(Workflow)
@ -145,16 +143,13 @@ class TriggerService:
) )
if dispatch_response.triggers: if dispatch_response.triggers:
# Process triggers asynchronously to avoid blocking
from tasks.trigger_processing_tasks import process_triggers_async
# Serialize and store the request
request_id = f"trigger_request_{uuid.uuid4().hex}" request_id = f"trigger_request_{uuid.uuid4().hex}"
serialized_request = serialize_request(request) serialized_request = serialize_request(request)
storage.save(f"triggers/{request_id}", serialized_request) storage.save(f"triggers/{request_id}", serialized_request)
# Queue async task with just the request ID from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async
process_triggers_async.delay(
dispatch_triggered_workflows_async(
endpoint_id=endpoint_id, endpoint_id=endpoint_id,
provider_id=subscription.provider_id, provider_id=subscription.provider_id,
subscription_id=subscription.id, subscription_id=subscription.id,
@ -163,7 +158,7 @@ class TriggerService:
) )
logger.info( logger.info(
"Queued async processing for %d triggers on endpoint %s with request_id %s", "Queued async dispatching for %d triggers on endpoint %s with request_id %s",
len(dispatch_response.triggers), len(dispatch_response.triggers),
endpoint_id, endpoint_id,
request_id, request_id,
@ -172,15 +167,19 @@ class TriggerService:
return dispatch_response.response return dispatch_response.response
@classmethod @classmethod
def _get_plugin_triggers(cls, trigger_id: str) -> list[WorkflowPluginTrigger]: def _get_subscriber_triggers(
"""Get WorkflowPluginTriggers for a trigger_id.""" cls, subscription: TriggerSubscription, trigger: TriggerEntity
with Session(db.engine) as session: ) -> list[WorkflowPluginTrigger]:
triggers = session.scalars( """Get WorkflowPluginTriggers for a subscription and trigger."""
with Session(db.engine, expire_on_commit=False) as session:
subscribers = session.scalars(
select(WorkflowPluginTrigger).where( select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.trigger_id == trigger_id, WorkflowPluginTrigger.tenant_id == subscription.tenant_id,
WorkflowPluginTrigger.subscription_id == subscription.id,
WorkflowPluginTrigger.trigger_id == trigger.identity.name,
) )
).all() ).all()
return list(triggers) return list(subscribers)
@classmethod @classmethod
def _store_trigger_data( def _store_trigger_data(

View File

@ -25,7 +25,7 @@ TRIGGER_QUEUE = "triggered_workflow_dispatcher"
@shared_task(queue=TRIGGER_QUEUE, bind=True, max_retries=3) @shared_task(queue=TRIGGER_QUEUE, bind=True, max_retries=3)
def process_triggers_async( def dispatch_triggered_workflows_async(
self, self,
endpoint_id: str, endpoint_id: str,
provider_id: str, provider_id: str,
@ -34,21 +34,21 @@ def process_triggers_async(
request_id: str, request_id: str,
) -> dict: ) -> dict:
""" """
Process triggers asynchronously. Dispatch triggers asynchronously.
Args: Args:
endpoint_id: Endpoint ID endpoint_id: Endpoint ID
provider_id: Provider ID provider_id: Provider ID
subscription_id: Subscription ID subscription_id: Subscription ID
triggers: List of triggers to process triggers: List of triggers to dispatch
request_id: Unique ID of the stored request request_id: Unique ID of the stored request
Returns: Returns:
dict: Execution result with status and processed trigger count dict: Execution result with status and dispatched trigger count
""" """
try: try:
logger.info( logger.info(
"Starting async trigger processing for endpoint=%s, triggers=%s, request_id=%s", "Starting async trigger dispatching for endpoint=%s, triggers=%s, request_id=%s",
endpoint_id, endpoint_id,
triggers, triggers,
request_id, request_id,
@ -70,21 +70,20 @@ def process_triggers_async(
return {"status": "failed", "error": "Subscription not found"} return {"status": "failed", "error": "Subscription not found"}
# Get controller # Get controller
provider_id_obj = TriggerProviderID(provider_id) controller = TriggerManager.get_trigger_provider(subscription.tenant_id, TriggerProviderID(provider_id))
controller = TriggerManager.get_trigger_provider(subscription.tenant_id, provider_id_obj)
if not controller: if not controller:
logger.error("Controller not found for provider: %s", provider_id) logger.error("Controller not found for provider: %s", provider_id)
return {"status": "failed", "error": "Controller not found"} return {"status": "failed", "error": "Controller not found"}
# Process each trigger # Dispatch each trigger
processed_count = 0 dispatched_count = 0
for trigger_name in triggers: for trigger in triggers:
try: try:
trigger = controller.get_trigger(trigger_name) trigger = controller.get_trigger(trigger)
if trigger is None: if trigger is None:
logger.error( logger.error(
"Trigger '%s' not found in provider '%s'", "Trigger '%s' not found in provider '%s'",
trigger_name, trigger,
provider_id, provider_id,
) )
continue continue
@ -94,20 +93,20 @@ def process_triggers_async(
trigger=trigger, trigger=trigger,
request=request, request=request,
) )
processed_count += 1 dispatched_count += 1
except Exception: except Exception:
logger.exception( logger.exception(
"Failed to process trigger '%s' for subscription %s", "Failed to dispatch trigger '%s' for subscription %s",
trigger_name, trigger,
subscription_id, subscription_id,
) )
# Continue processing other triggers even if one fails # Continue processing other triggers even if one fails
continue continue
logger.info( logger.info(
"Completed async trigger processing: processed %d/%d triggers", "Completed async trigger dispatching: processed %d/%d triggers",
processed_count, dispatched_count,
len(triggers), len(triggers),
) )
@ -118,13 +117,13 @@ def process_triggers_async(
return { return {
"status": "completed", "status": "completed",
"processed_count": processed_count, "dispatched_count": dispatched_count,
"total_count": len(triggers), "total_count": len(triggers),
} }
except Exception as e: except Exception as e:
logger.exception( logger.exception(
"Error in async trigger processing for endpoint %s", "Error in async trigger dispatching for endpoint %s",
endpoint_id, endpoint_id,
) )
# Retry the task if not exceeded max retries # Retry the task if not exceeded max retries