diff --git a/api/controllers/trigger/trigger.py b/api/controllers/trigger/trigger.py index 9597e72b14..29ccd87812 100644 --- a/api/controllers/trigger/trigger.py +++ b/api/controllers/trigger/trigger.py @@ -17,9 +17,6 @@ UUID_MATCHER = re.compile(UUID_PATTERN) @bp.route( "/trigger/endpoint/", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"] ) -@bp.route( - "/trigger/endpoint-debug/", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"] -) def trigger_endpoint(endpoint_id: str): """ Handle endpoint trigger calls. diff --git a/api/core/trigger/entities/api_entities.py b/api/core/trigger/entities/api_entities.py index 5f551ff034..54f297b4b5 100644 --- a/api/core/trigger/entities/api_entities.py +++ b/api/core/trigger/entities/api_entities.py @@ -39,4 +39,5 @@ class TriggerApiEntity(BaseModel): parameters: list[TriggerParameter] = Field(description="The parameters of the trigger") output_schema: Optional[Mapping[str, Any]] = Field(description="The output schema of the trigger") + __all__ = ["TriggerApiEntity", "TriggerProviderApiEntity", "TriggerProviderSubscriptionApiEntity"] diff --git a/api/core/workflow/nodes/enums.py b/api/core/workflow/nodes/enums.py index ee13b6eea0..a09816f4fe 100644 --- a/api/core/workflow/nodes/enums.py +++ b/api/core/workflow/nodes/enums.py @@ -27,6 +27,7 @@ class NodeType(StrEnum): AGENT = "agent" TRIGGER_WEBHOOK = "trigger-webhook" TRIGGER_SCHEDULE = "trigger-schedule" + TRIGGER_PLUGIN = "trigger-plugin" class ErrorStrategy(StrEnum): diff --git a/api/core/workflow/nodes/node_mapping.py b/api/core/workflow/nodes/node_mapping.py index 83ad222591..372c935a54 100644 --- a/api/core/workflow/nodes/node_mapping.py +++ b/api/core/workflow/nodes/node_mapping.py @@ -19,6 +19,7 @@ from core.workflow.nodes.question_classifier import QuestionClassifierNode from core.workflow.nodes.start import StartNode from core.workflow.nodes.template_transform import TemplateTransformNode from core.workflow.nodes.tool import ToolNode +from core.workflow.nodes.trigger_plugin import TriggerPluginNode from core.workflow.nodes.trigger_webhook import TriggerWebhookNode from core.workflow.nodes.variable_aggregator import VariableAggregatorNode from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1 @@ -137,4 +138,8 @@ NODE_TYPE_CLASSES_MAPPING: Mapping[NodeType, Mapping[str, type[BaseNode]]] = { LATEST_VERSION: TriggerWebhookNode, "1": TriggerWebhookNode, }, + NodeType.TRIGGER_PLUGIN: { + LATEST_VERSION: TriggerPluginNode, + "1": TriggerPluginNode, + }, } diff --git a/api/core/workflow/nodes/trigger_plugin/__init__.py b/api/core/workflow/nodes/trigger_plugin/__init__.py new file mode 100644 index 0000000000..d69ca1fbb2 --- /dev/null +++ b/api/core/workflow/nodes/trigger_plugin/__init__.py @@ -0,0 +1,3 @@ +from .node import TriggerPluginNode + +__all__ = ["TriggerPluginNode"] diff --git a/api/core/workflow/nodes/trigger_plugin/entities.py b/api/core/workflow/nodes/trigger_plugin/entities.py new file mode 100644 index 0000000000..0ecda92bbb --- /dev/null +++ b/api/core/workflow/nodes/trigger_plugin/entities.py @@ -0,0 +1,27 @@ +from typing import Any, Optional + +from pydantic import Field + +from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig +from core.workflow.nodes.enums import ErrorStrategy + + +class PluginTriggerData(BaseNodeData): + """Plugin trigger node data""" + + title: str + desc: Optional[str] = None + plugin_id: str = Field(..., description="Plugin ID") + provider_id: str = Field(..., description="Provider ID") + trigger_name: str = Field(..., description="Trigger name") + subscription_id: str = Field(..., description="Subscription ID") + parameters: dict[str, Any] = Field(default_factory=dict, description="Trigger parameters") + + # Error handling + error_strategy: Optional[ErrorStrategy] = Field( + default=ErrorStrategy.FAIL_BRANCH, description="Error handling strategy" + ) + retry_config: RetryConfig = Field(default_factory=lambda: RetryConfig(), description="Retry configuration") + default_value_dict: dict[str, Any] = Field( + default_factory=dict, description="Default values for outputs when error occurs" + ) diff --git a/api/core/workflow/nodes/trigger_plugin/node.py b/api/core/workflow/nodes/trigger_plugin/node.py new file mode 100644 index 0000000000..221f568d71 --- /dev/null +++ b/api/core/workflow/nodes/trigger_plugin/node.py @@ -0,0 +1,96 @@ +from collections.abc import Mapping +from typing import Any, Optional + +from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus +from core.workflow.nodes.base import BaseNode +from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig +from core.workflow.nodes.enums import ErrorStrategy, NodeType + +from .entities import PluginTriggerData + + +class TriggerPluginNode(BaseNode): + _node_type = NodeType.TRIGGER_PLUGIN + + _node_data: PluginTriggerData + + def init_node_data(self, data: Mapping[str, Any]) -> None: + self._node_data = PluginTriggerData.model_validate(data) + + def _get_error_strategy(self) -> Optional[ErrorStrategy]: + return self._node_data.error_strategy + + def _get_retry_config(self) -> RetryConfig: + return self._node_data.retry_config + + def _get_title(self) -> str: + return self._node_data.title + + def _get_description(self) -> Optional[str]: + return self._node_data.desc + + def _get_default_value_dict(self) -> dict[str, Any]: + return self._node_data.default_value_dict + + def get_base_node_data(self) -> BaseNodeData: + return self._node_data + + @classmethod + def get_default_config(cls, filters: Optional[dict[str, Any]] = None) -> dict: + return { + "type": "plugin", + "config": { + "plugin_id": "", + "provider_id": "", + "trigger_name": "", + "subscription_id": "", + "parameters": {}, + }, + } + + @classmethod + def version(cls) -> str: + return "1" + + def _run(self) -> NodeRunResult: + """ + Run the plugin trigger node. + + Like the webhook node, this takes the trigger data from the variable pool + and makes it available to downstream nodes. The actual trigger invocation + happens in the async task executor. + """ + # Get trigger data from variable pool (injected by async task) + trigger_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs) + + # Extract trigger-specific outputs + outputs = self._extract_trigger_outputs(trigger_inputs) + + return NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + inputs=trigger_inputs, + outputs=outputs, + ) + + def _extract_trigger_outputs(self, trigger_inputs: dict[str, Any]) -> dict[str, Any]: + """Extract outputs from trigger invocation response.""" + outputs = {} + + # Get the trigger data (should be injected by async task) + trigger_data = trigger_inputs.get("trigger_data", {}) + trigger_metadata = trigger_inputs.get("trigger_metadata", {}) + + # Make trigger data available as outputs + outputs["data"] = trigger_data + outputs["trigger_name"] = trigger_metadata.get("trigger_name", "") + outputs["provider_id"] = trigger_metadata.get("provider_id", "") + outputs["subscription_id"] = self._node_data.subscription_id + + # Include raw trigger data for debugging/advanced use + outputs["_trigger_raw"] = { + "data": trigger_data, + "metadata": trigger_metadata, + } + + return outputs diff --git a/api/models/workflow.py b/api/models/workflow.py index 8c4a8ac593..ac1289fe96 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1482,3 +1482,46 @@ class AppTrigger(Base): default=naive_utc_now(), server_onupdate=func.current_timestamp(), ) + + +class WorkflowPluginTrigger(Base): + """ + Workflow Plugin Trigger + + Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger + + Attributes: + - id (uuid) Primary key + - app_id (uuid) App ID to bind to a specific app + - node_id (varchar) Node ID which node in the workflow + - tenant_id (uuid) Workspace ID + - provider_id (varchar) Plugin provider ID + - trigger_id (varchar) Unique trigger identifier (provider_id + trigger_name) + - triggered_by (varchar) Environment: debugger or production + - created_at (timestamp) Creation time + - updated_at (timestamp) Last update time + """ + + __tablename__ = "workflow_plugin_triggers" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"), + sa.Index("workflow_plugin_trigger_tenant_idx", "tenant_id"), + sa.Index("workflow_plugin_trigger_trigger_idx", "trigger_id"), + sa.UniqueConstraint("app_id", "node_id", "triggered_by", name="uniq_plugin_node"), + sa.UniqueConstraint("trigger_id", "node_id", name="uniq_trigger_node"), + ) + + id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + node_id: Mapped[str] = mapped_column(String(64), nullable=False) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + provider_id: Mapped[str] = mapped_column(String(255), nullable=False) + trigger_id: Mapped[str] = mapped_column(String(510), nullable=False) # provider_id + trigger_name + triggered_by: Mapped[str] = mapped_column(String(16), nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + server_default=func.current_timestamp(), + server_onupdate=func.current_timestamp(), + ) diff --git a/api/services/trigger_service.py b/api/services/trigger_service.py index 75140ad0ea..0ab61efc8f 100644 --- a/api/services/trigger_service.py +++ b/api/services/trigger_service.py @@ -1,12 +1,23 @@ +import json import logging +import uuid from flask import Request, Response +from sqlalchemy import select +from sqlalchemy.orm import Session from core.plugin.entities.plugin import TriggerProviderID from core.trigger.entities.entities import TriggerEntity from core.trigger.trigger_manager import TriggerManager +from extensions.ext_database import db +from extensions.ext_redis import redis_client +from models.account import Account, TenantAccountJoin, TenantAccountRole +from models.enums import WorkflowRunTriggeredFrom from models.trigger import TriggerSubscription +from models.workflow import Workflow, WorkflowPluginTrigger +from services.async_workflow_service import AsyncWorkflowService from services.trigger.trigger_provider_service import TriggerProviderService +from services.workflow.entities import PluginTriggerData logger = logging.getLogger(__name__) @@ -21,7 +32,83 @@ class TriggerService: cls, subscription: TriggerSubscription, trigger: TriggerEntity, request: Request ) -> None: """Process triggered workflows.""" - pass + # 1. Find associated WorkflowPluginTriggers + trigger_id = f"{subscription.provider_id}:{trigger.identity.name}" + plugin_triggers = cls._get_plugin_triggers(trigger_id) + + if not plugin_triggers: + logger.warning( + "No workflows found for trigger '%s' in subscription '%s'", + trigger.identity.name, + subscription.id, + ) + return + + with Session(db.engine) as session: + # Get tenant owner for workflow execution + tenant_owner = session.scalar( + select(Account) + .join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id) + .where( + TenantAccountJoin.tenant_id == subscription.tenant_id, + TenantAccountJoin.role == TenantAccountRole.OWNER, + ) + ) + + if not tenant_owner: + logger.error("Tenant owner not found for tenant %s", subscription.tenant_id) + return + + for plugin_trigger in plugin_triggers: + # 2. Get workflow + workflow = session.scalar( + select(Workflow) + .where( + Workflow.app_id == plugin_trigger.app_id, + Workflow.version != Workflow.VERSION_DRAFT, + ) + .order_by(Workflow.created_at.desc()) + ) + + if not workflow: + logger.error( + "Workflow not found for app %s", + plugin_trigger.app_id, + ) + continue + + # Get trigger parameters from node configuration + node_config = workflow.get_node_config_by_id(plugin_trigger.node_id) + parameters = node_config.get("data", {}).get("parameters", {}) if node_config else {} + + # 3. Store trigger data + storage_key = cls._store_trigger_data(request, subscription, trigger, parameters) + + # 4. Create trigger data for async execution + trigger_data = PluginTriggerData( + app_id=plugin_trigger.app_id, + tenant_id=subscription.tenant_id, + workflow_id=workflow.id, + root_node_id=plugin_trigger.node_id, + trigger_type=WorkflowRunTriggeredFrom.PLUGIN, + plugin_id=subscription.provider_id, + webhook_url=f"trigger/endpoint/{subscription.endpoint_id}", # For tracking + inputs={"storage_key": storage_key}, # Pass storage key to async task + ) + + # 5. Trigger async workflow + try: + AsyncWorkflowService.trigger_workflow_async(session, tenant_owner, trigger_data) + logger.info( + "Triggered workflow for app %s with trigger %s", + plugin_trigger.app_id, + trigger.identity.name, + ) + except Exception: + logger.exception( + "Failed to trigger workflow for app %s", + plugin_trigger.app_id, + ) @classmethod def select_triggers(cls, controller, dispatch_response, provider_id, subscription) -> list[TriggerEntity]: @@ -65,3 +152,52 @@ class TriggerService: request=request, ) return dispatch_response.response + + @classmethod + def _get_plugin_triggers(cls, trigger_id: str) -> list[WorkflowPluginTrigger]: + """Get WorkflowPluginTriggers for a trigger_id.""" + with Session(db.engine) as session: + triggers = session.scalars( + select(WorkflowPluginTrigger).where( + WorkflowPluginTrigger.trigger_id == trigger_id, + WorkflowPluginTrigger.triggered_by == "production", # Only production triggers for now + ) + ).all() + return list(triggers) + + @classmethod + def _store_trigger_data( + cls, + request: Request, + subscription: TriggerSubscription, + trigger: TriggerEntity, + parameters: dict, + ) -> str: + """Store trigger data in storage and return key.""" + storage_key = f"trigger_data_{uuid.uuid4().hex}" + + # Prepare data to store + trigger_data = { + "request": { + "method": request.method, + "headers": dict(request.headers), + "query_params": dict(request.args), + "body": request.get_data(as_text=True), + }, + "subscription": { + "id": subscription.id, + "provider_id": subscription.provider_id, + "credentials": subscription.credentials, + "credential_type": subscription.credential_type, + }, + "trigger": { + "name": trigger.identity.name, + "parameters": parameters, + }, + "user_id": subscription.user_id, + } + + # Store with 1 hour TTL using Redis + redis_client.setex(storage_key, 3600, json.dumps(trigger_data)) + + return storage_key