From 65c6203ad7df5ceb67fb61ea5f63b626bddb6326 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Sat, 18 Oct 2025 19:54:06 +0800 Subject: [PATCH] fix: correct building reference --- api/controllers/trigger/webhook.py | 4 +- api/core/trigger/debug/event_bus.py | 4 +- api/core/trigger/debug/event_selectors.py | 17 +++--- api/core/trigger/debug/events.py | 64 +++++++++++++---------- api/tasks/trigger_processing_tasks.py | 4 +- api/tasks/workflow_schedule_tasks.py | 4 +- 6 files changed, 55 insertions(+), 42 deletions(-) diff --git a/api/controllers/trigger/webhook.py b/api/controllers/trigger/webhook.py index 35f97daa0f..0eb42e8f0d 100644 --- a/api/controllers/trigger/webhook.py +++ b/api/controllers/trigger/webhook.py @@ -6,7 +6,7 @@ from werkzeug.exceptions import NotFound, RequestEntityTooLarge from controllers.trigger import bp from core.trigger.debug.event_bus import TriggerDebugEventBus -from core.trigger.debug.events import WebhookDebugEvent +from core.trigger.debug.events import WebhookDebugEvent, build_webhook_pool_key from services.trigger.webhook_service import WebhookService logger = logging.getLogger(__name__) @@ -73,7 +73,7 @@ def handle_webhook_debug(webhook_id: str): workflow_inputs = WebhookService.build_workflow_inputs(webhook_data) # Generate pool key and dispatch debug event - pool_key: str = WebhookDebugEvent.build_pool_key( + pool_key: str = build_webhook_pool_key( tenant_id=webhook_trigger.tenant_id, app_id=webhook_trigger.app_id, node_id=webhook_trigger.node_id, diff --git a/api/core/trigger/debug/event_bus.py b/api/core/trigger/debug/event_bus.py index 05b0b315bc..1c4bba0009 100644 --- a/api/core/trigger/debug/event_bus.py +++ b/api/core/trigger/debug/event_bus.py @@ -61,7 +61,7 @@ class TriggerDebugEventBus: Args: tenant_id: Tenant ID for hash tag event: Event object to dispatch - pool_key: Pool key (generate using event_class.build_pool_key(...)) + pool_key: Pool key (generate using build_{?}_pool_key(...)) Returns: Number of addresses the event was dispatched to @@ -98,7 +98,7 @@ class TriggerDebugEventBus: Args: event_class: Event class for deserialization and type safety - pool_key: Pool key (generate using event_class.build_pool_key(...)) + pool_key: Pool key (generate using build_{?}_pool_key(...)) tenant_id: Tenant ID user_id: User ID for address calculation app_id: App ID for address calculation diff --git a/api/core/trigger/debug/event_selectors.py b/api/core/trigger/debug/event_selectors.py index da507735bd..f940d7f6b0 100644 --- a/api/core/trigger/debug/event_selectors.py +++ b/api/core/trigger/debug/event_selectors.py @@ -9,7 +9,14 @@ from pydantic import BaseModel from core.plugin.entities.request import TriggerInvokeEventResponse from core.trigger.debug.event_bus import TriggerDebugEventBus -from core.trigger.debug.events import PluginTriggerDebugEvent, ScheduleDebugEvent, WebhookDebugEvent +from core.trigger.debug.events import ( + PluginTriggerDebugEvent, + ScheduleDebugEvent, + WebhookDebugEvent, + build_plugin_pool_key, + build_schedule_pool_key, + build_webhook_pool_key, +) from core.workflow.enums import NodeType from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData from models.model import App @@ -49,7 +56,7 @@ class PluginTriggerDebugEventPoller(TriggerDebugEventPoller): plugin_trigger_data = TriggerEventNodeData.model_validate(self.node_config.get("data", {})) provider_id = TriggerProviderID(plugin_trigger_data.provider_id) - pool_key: str = PluginTriggerDebugEvent.build_pool_key( + pool_key: str = build_plugin_pool_key( name=plugin_trigger_data.event_name, provider_id=str(provider_id), tenant_id=self.tenant_id, @@ -86,7 +93,7 @@ class PluginTriggerDebugEventPoller(TriggerDebugEventPoller): class WebhookTriggerDebugEventPoller(TriggerDebugEventPoller): def poll(self) -> TriggerDebugEvent | None: - pool_key = WebhookDebugEvent.build_pool_key( + pool_key = build_webhook_pool_key( tenant_id=self.tenant_id, app_id=self.app_id, node_id=self.node_id, @@ -119,9 +126,7 @@ class WebhookTriggerDebugEventPoller(TriggerDebugEventPoller): class ScheduleTriggerDebugEventPoller(TriggerDebugEventPoller): def poll(self) -> TriggerDebugEvent | None: - pool_key: str = ScheduleDebugEvent.build_pool_key( - tenant_id=self.tenant_id, app_id=self.app_id, node_id=self.node_id - ) + pool_key: str = build_schedule_pool_key(tenant_id=self.tenant_id, app_id=self.app_id, node_id=self.node_id) schedule_event: ScheduleDebugEvent | None = TriggerDebugEventBus.poll( event_type=ScheduleDebugEvent, pool_key=pool_key, diff --git a/api/core/trigger/debug/events.py b/api/core/trigger/debug/events.py index ea7bc986b2..4766ec4c6a 100644 --- a/api/core/trigger/debug/events.py +++ b/api/core/trigger/debug/events.py @@ -1,9 +1,18 @@ from collections.abc import Mapping +from enum import StrEnum from typing import Any from pydantic import BaseModel, Field +class TriggerDebugPoolKey(StrEnum): + """Trigger debug pool key.""" + + SCHEDULE = "schedule_trigger_debug_waiting_pool" + WEBHOOK = "webhook_trigger_debug_waiting_pool" + PLUGIN = "plugin_trigger_debug_waiting_pool" + + class BaseDebugEvent(BaseModel): """Base class for all debug events.""" @@ -16,16 +25,15 @@ class ScheduleDebugEvent(BaseDebugEvent): node_id: str inputs: Mapping[str, Any] - @classmethod - def build_pool_key(cls, tenant_id: str, app_id: str, node_id: str) -> str: - """Generate pool key for schedule events. - Args: - tenant_id: Tenant ID - app_id: App ID - node_id: Node ID - """ - return f"schedule_trigger_debug_waiting_pool:{tenant_id}:{app_id}:{node_id}" +def build_schedule_pool_key(tenant_id: str, app_id: str, node_id: str) -> str: + """Generate pool key for schedule events. + Args: + tenant_id: Tenant ID + app_id: App ID + node_id: Node ID + """ + return f"{TriggerDebugPoolKey.SCHEDULE}:{tenant_id}:{app_id}:{node_id}" class WebhookDebugEvent(BaseDebugEvent): @@ -35,16 +43,16 @@ class WebhookDebugEvent(BaseDebugEvent): node_id: str payload: dict[str, Any] = Field(default_factory=dict) - @classmethod - def build_pool_key(cls, tenant_id: str, app_id: str, node_id: str) -> str: - """Generate pool key for webhook events. - Args: - tenant_id: Tenant ID - app_id: App ID - node_id: Node ID - """ - return f"webhook_trigger_debug_waiting_pool:{tenant_id}:{app_id}:{node_id}" +def build_webhook_pool_key(tenant_id: str, app_id: str, node_id: str) -> str: + """Generate pool key for webhook events. + + Args: + tenant_id: Tenant ID + app_id: App ID + node_id: Node ID + """ + return f"{TriggerDebugPoolKey.WEBHOOK}:{tenant_id}:{app_id}:{node_id}" class PluginTriggerDebugEvent(BaseDebugEvent): @@ -56,14 +64,14 @@ class PluginTriggerDebugEvent(BaseDebugEvent): subscription_id: str provider_id: str - @classmethod - def build_pool_key(cls, tenant_id: str, provider_id: str, subscription_id: str, name: str) -> str: - """Generate pool key for plugin trigger events. - Args: - name: Event name - tenant_id: Tenant ID - provider_id: Provider ID - subscription_id: Subscription ID - """ - return f"plugin_trigger_debug_waiting_pool:{tenant_id}:{str(provider_id)}:{subscription_id}:{name}" +def build_plugin_pool_key(tenant_id: str, provider_id: str, subscription_id: str, name: str) -> str: + """Generate pool key for plugin trigger events. + + Args: + name: Event name + tenant_id: Tenant ID + provider_id: Provider ID + subscription_id: Subscription ID + """ + return f"{TriggerDebugPoolKey.PLUGIN}:{tenant_id}:{str(provider_id)}:{subscription_id}:{name}" diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 9667c1eed5..e4343bbde9 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -17,7 +17,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.entities.request import TriggerInvokeEventResponse from core.trigger.debug.event_bus import TriggerDebugEventBus -from core.trigger.debug.events import PluginTriggerDebugEvent +from core.trigger.debug.events import PluginTriggerDebugEvent, build_plugin_pool_key from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager from core.workflow.enums import NodeType @@ -50,7 +50,7 @@ def dispatch_trigger_debug_event( debug_dispatched = 0 try: for event_name in events: - pool_key: str = PluginTriggerDebugEvent.build_pool_key( + pool_key: str = build_plugin_pool_key( name=event_name, tenant_id=subscription.tenant_id, subscription_id=subscription.id, diff --git a/api/tasks/workflow_schedule_tasks.py b/api/tasks/workflow_schedule_tasks.py index 95095a69bd..078039913e 100644 --- a/api/tasks/workflow_schedule_tasks.py +++ b/api/tasks/workflow_schedule_tasks.py @@ -7,7 +7,7 @@ from celery import shared_task from sqlalchemy.orm import sessionmaker from core.trigger.debug.event_bus import TriggerDebugEventBus -from core.trigger.debug.events import ScheduleDebugEvent +from core.trigger.debug.events import ScheduleDebugEvent, build_schedule_pool_key from core.workflow.nodes.trigger_schedule.exc import ( ScheduleExecutionError, ScheduleNotFoundError, @@ -74,7 +74,7 @@ def run_schedule_trigger(schedule_id: str) -> None: node_id=schedule.node_id, inputs=inputs, ) - pool_key = ScheduleDebugEvent.build_pool_key( + pool_key = build_schedule_pool_key( tenant_id=schedule.tenant_id, app_id=schedule.app_id, node_id=schedule.node_id,