diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 09fd22bf57..2f7668566f 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -1114,7 +1114,10 @@ class DraftWorkflowTriggerRunAllApi(Resource): try: trigger_debug_event: TriggerDebugEvent | None = select_trigger_debug_events( - draft_workflow=draft_workflow, app_model=app_model, user_id=current_user.id, node_ids=node_ids + draft_workflow=draft_workflow, + app_model=app_model, + user_id=current_user.id, + node_ids=node_ids, ) except ValueError as e: raise e diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index 6e8e6c97f3..30ab31443a 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -81,6 +81,8 @@ class TriggerSubscriptionListApi(Resource): tenant_id=user.current_tenant_id, provider_id=TriggerProviderID(provider) ) ) + except ValueError as e: + return jsonable_encoder({"error": str(e)}), 404 except Exception as e: logger.exception("Error listing trigger providers", exc_info=e) raise diff --git a/api/controllers/service_api/wraps.py b/api/controllers/service_api/wraps.py index 638ab528f3..dd58fef1e5 100644 --- a/api/controllers/service_api/wraps.py +++ b/api/controllers/service_api/wraps.py @@ -1,5 +1,5 @@ import time -from collections.abc import Callable +from collections.abc import Callable, Mapping from datetime import timedelta from enum import StrEnum, auto from functools import wraps @@ -13,6 +13,7 @@ from sqlalchemy import select, update from sqlalchemy.orm import Session from werkzeug.exceptions import Forbidden, NotFound, Unauthorized +from core.app.entities.app_invoke_entities import InvokeFrom from extensions.ext_database import db from extensions.ext_redis import redis_client from libs.datetime_utils import naive_utc_now @@ -83,7 +84,7 @@ def validate_app_token(view: Callable[P, R] | None = None, *, fetch_user_arg: Fe if user_id: user_id = str(user_id) - end_user = create_or_update_end_user_for_user_id(app_model, user_id) + end_user = get_or_create_end_user(app_model, user_id) kwargs["end_user"] = end_user # Set EndUser as current logged-in user for flask_login.current_user @@ -308,10 +309,13 @@ def validate_and_get_api_token(scope: str | None = None): return api_token -def create_or_update_end_user_for_user_id(app_model: App, user_id: str | None = None) -> EndUser: - """ - Create or update session terminal based on user ID. - """ +def get_or_create_end_user(app_model: App, user_id: str | None = None) -> EndUser: + return get_or_create_end_user_by_type(InvokeFrom.SERVICE_API, app_model.tenant_id, app_model.id, user_id) + + +def get_or_create_end_user_by_type( + type: InvokeFrom, tenant_id: str, app_id: str, user_id: str | None = None +) -> EndUser: if not user_id: user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID @@ -319,21 +323,22 @@ def create_or_update_end_user_for_user_id(app_model: App, user_id: str | None = end_user = ( session.query(EndUser) .where( - EndUser.tenant_id == app_model.tenant_id, - EndUser.app_id == app_model.id, + EndUser.tenant_id == tenant_id, + EndUser.app_id == app_id, EndUser.session_id == user_id, - EndUser.type == "service_api", + EndUser.type == type, ) .first() ) if end_user is None: end_user = EndUser( - tenant_id=app_model.tenant_id, - app_id=app_model.id, - type="service_api", + tenant_id=tenant_id, + app_id=app_id, + type=type, is_anonymous=user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID, session_id=user_id, + external_user_id=user_id, ) session.add(end_user) session.commit() @@ -341,6 +346,87 @@ def create_or_update_end_user_for_user_id(app_model: App, user_id: str | None = return end_user +def create_end_user_batch(type: InvokeFrom, tenant_id: str, app_ids: list[str], user_id: str) -> Mapping[str, EndUser]: + """Create end users in batch. + + Creates end users in batch for the specified tenant and application IDs in O(1) time. + + This batch creation is necessary because trigger subscriptions can span multiple applications, + and trigger events may be dispatched to multiple applications simultaneously. + + For each app_id in app_ids, check if an `EndUser` with the given + `user_id` (as session_id/external_user_id) already exists for the + tenant/app and type `type`. If it exists, return it; otherwise, + create it. Operates with minimal DB I/O by querying and inserting in + batches. + + Returns a mapping of `app_id -> EndUser`. + """ + + # Normalize user_id to default if empty + if not user_id: + user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID + + # Deduplicate app_ids while preserving input order + seen: set[str] = set() + unique_app_ids: list[str] = [] + for app_id in app_ids: + if app_id not in seen: + seen.add(app_id) + unique_app_ids.append(app_id) + + # Result is a simple app_id -> EndUser mapping + result: dict[str, EndUser] = {} + if not unique_app_ids: + return result + + with Session(db.engine, expire_on_commit=False) as session: + # Fetch existing end users for all target apps in a single query + existing_end_users: list[EndUser] = ( + session.query(EndUser) + .where( + EndUser.tenant_id == tenant_id, + EndUser.app_id.in_(unique_app_ids), + EndUser.session_id == user_id, + EndUser.type == type, + ) + .all() + ) + + found_app_ids: set[str] = set() + for eu in existing_end_users: + # If duplicates exist due to weak DB constraints, prefer the first + if eu.app_id not in result: + result[eu.app_id] = eu + found_app_ids.add(eu.app_id) + + # Determine which apps still need an EndUser created + missing_app_ids = [app_id for app_id in unique_app_ids if app_id not in found_app_ids] + + if missing_app_ids: + new_end_users: list[EndUser] = [] + is_anonymous = user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID + for app_id in missing_app_ids: + new_end_users.append( + EndUser( + tenant_id=tenant_id, + app_id=app_id, + type=type, + is_anonymous=is_anonymous, + session_id=user_id, + external_user_id=user_id, + ) + ) + + session.add_all(new_end_users) + session.commit() + + for eu in new_end_users: + result[eu.app_id] = eu + + return result + + class DatasetApiResource(Resource): method_decorators = [validate_dataset_token] diff --git a/api/core/app/entities/app_invoke_entities.py b/api/core/app/entities/app_invoke_entities.py index a5ed0f8fa3..1ce86f1f9a 100644 --- a/api/core/app/entities/app_invoke_entities.py +++ b/api/core/app/entities/app_invoke_entities.py @@ -32,6 +32,10 @@ class InvokeFrom(StrEnum): # https://docs.dify.ai/en/guides/application-publishing/launch-your-webapp-quickly/README WEB_APP = "web-app" + # TRIGGER indicates that this invocation is from a trigger. + # this is used for plugin trigger and webhook trigger. + TRIGGER = "trigger" + # EXPLORE indicates that this invocation is from # the workflow (or chatflow) explore page. EXPLORE = "explore" @@ -65,6 +69,8 @@ class InvokeFrom(StrEnum): return "dev" elif self == InvokeFrom.EXPLORE: return "explore_app" + elif self == InvokeFrom.TRIGGER: + return "trigger" elif self == InvokeFrom.SERVICE_API: return "api" diff --git a/api/core/plugin/backwards_invocation/app.py b/api/core/plugin/backwards_invocation/app.py index 32ac132e1e..256538b973 100644 --- a/api/core/plugin/backwards_invocation/app.py +++ b/api/core/plugin/backwards_invocation/app.py @@ -4,7 +4,7 @@ from typing import Union from sqlalchemy import select from sqlalchemy.orm import Session -from controllers.service_api.wraps import create_or_update_end_user_for_user_id +from controllers.service_api.wraps import get_or_create_end_user from core.app.app_config.common.parameters_mapping import get_parameters_from_feature_dict from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator @@ -64,7 +64,7 @@ class PluginAppBackwardsInvocation(BaseBackwardsInvocation): """ app = cls._get_app(app_id, tenant_id) if not user_id: - user = create_or_update_end_user_for_user_id(app) + user = get_or_create_end_user(app) else: user = cls._get_user(user_id) diff --git a/api/core/plugin/entities/request.py b/api/core/plugin/entities/request.py index 8c5adf5ad6..532c25504c 100644 --- a/api/core/plugin/entities/request.py +++ b/api/core/plugin/entities/request.py @@ -247,6 +247,7 @@ class TriggerInvokeEventResponse(BaseModel): class PluginTriggerDispatchResponse(BaseModel): + user_id: str events: list[str] raw_http_response: str @@ -260,9 +261,11 @@ class TriggerValidateProviderCredentialsResponse(BaseModel): class TriggerDispatchResponse: + user_id: str events: list[str] response: Response - def __init__(self, events: list[str], response: Response): + def __init__(self, user_id: str, events: list[str], response: Response): + self.user_id = user_id self.events = events self.response = response diff --git a/api/core/plugin/impl/trigger.py b/api/core/plugin/impl/trigger.py index 9122ba7072..bdbb444e67 100644 --- a/api/core/plugin/impl/trigger.py +++ b/api/core/plugin/impl/trigger.py @@ -156,7 +156,6 @@ class PluginTriggerManager(BasePluginClient): def dispatch_event( self, tenant_id: str, - user_id: str, provider: str, subscription: Mapping[str, Any], request: Request, @@ -173,7 +172,6 @@ class PluginTriggerManager(BasePluginClient): path=f"plugin/{tenant_id}/dispatch/trigger/dispatch_event", type_=PluginTriggerDispatchResponse, data={ - "user_id": user_id, "data": { "provider": provider_id.provider_name, "subscription": subscription, @@ -191,6 +189,7 @@ class PluginTriggerManager(BasePluginClient): for resp in response: return TriggerDispatchResponse( + user_id=resp.user_id or "", events=resp.events, response=deserialize_response(binascii.unhexlify(resp.raw_http_response.encode())), ) diff --git a/api/core/trigger/debug/event_bus.py b/api/core/trigger/debug/event_bus.py index ffb9dabc92..05b0b315bc 100644 --- a/api/core/trigger/debug/event_bus.py +++ b/api/core/trigger/debug/event_bus.py @@ -118,6 +118,7 @@ class TriggerDebugEventBus: pool_key, address_id, ) + logger.info("event_data: %s", event_data) return event_type.model_validate_json(json_data=event_data) if event_data else None except RedisError: logger.exception("Failed to poll event from pool: %s", pool_key) diff --git a/api/core/trigger/debug/event_selectors.py b/api/core/trigger/debug/event_selectors.py index ffe109988f..f5faa51f12 100644 --- a/api/core/trigger/debug/event_selectors.py +++ b/api/core/trigger/debug/event_selectors.py @@ -15,8 +15,6 @@ from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData from models.model import App from models.provider_ids import TriggerProviderID from models.workflow import Workflow -from services.trigger.trigger_service import TriggerService -from services.trigger.webhook_service import WebhookService logger = logging.getLogger(__name__) @@ -47,6 +45,8 @@ class TriggerDebugEventPoller(ABC): class PluginTriggerDebugEventPoller(TriggerDebugEventPoller): def poll(self) -> TriggerDebugEvent | None: + from services.trigger.trigger_service import TriggerService + plugin_trigger_data = TriggerEventNodeData.model_validate(self.node_config.get("data", {})) provider_id = TriggerProviderID(plugin_trigger_data.provider_id) pool_key: str = PluginTriggerDebugEvent.build_pool_key( @@ -67,7 +67,7 @@ class PluginTriggerDebugEventPoller(TriggerDebugEventPoller): return None trigger_event_response: TriggerInvokeEventResponse = TriggerService.invoke_trigger_event( event=plugin_trigger_event, - user_id=self.user_id, + user_id=plugin_trigger_event.user_id, tenant_id=self.tenant_id, node_config=self.node_config, ) @@ -103,6 +103,8 @@ class WebhookTriggerDebugEventPoller(TriggerDebugEventPoller): if not webhook_event: return None + from services.trigger.webhook_service import WebhookService + payload = webhook_event.payload or {} workflow_inputs = payload.get("inputs") if workflow_inputs is None: diff --git a/api/core/trigger/debug/events.py b/api/core/trigger/debug/events.py index 06ce534eb8..3d461b506b 100644 --- a/api/core/trigger/debug/events.py +++ b/api/core/trigger/debug/events.py @@ -71,6 +71,7 @@ class PluginTriggerDebugEvent(BaseDebugEvent): """Debug event for plugin triggers.""" name: str + user_id: str = Field(description="This is end user id, only for trigger the event. no related with account user id") request_id: str subscription_id: str provider_id: str diff --git a/api/core/trigger/provider.py b/api/core/trigger/provider.py index 24517f56e7..7adf815333 100644 --- a/api/core/trigger/provider.py +++ b/api/core/trigger/provider.py @@ -267,7 +267,6 @@ class PluginTriggerProviderController: def dispatch( self, - user_id: str, request: Request, subscription: Subscription, credentials: Mapping[str, str], @@ -288,7 +287,6 @@ class PluginTriggerProviderController: response: TriggerDispatchResponse = manager.dispatch_event( tenant_id=self.tenant_id, - user_id=user_id, provider=str(provider_id), subscription=subscription.model_dump(), request=request, diff --git a/api/core/trigger/trigger_manager.py b/api/core/trigger/trigger_manager.py index 8da7ac0959..74b3bb0b92 100644 --- a/api/core/trigger/trigger_manager.py +++ b/api/core/trigger/trigger_manager.py @@ -13,7 +13,7 @@ import contexts from configs import dify_config from core.plugin.entities.plugin_daemon import CredentialType, PluginTriggerProviderEntity from core.plugin.entities.request import TriggerInvokeEventResponse -from core.plugin.impl.exc import PluginDaemonError, PluginInvokeError +from core.plugin.impl.exc import PluginDaemonError, PluginInvokeError, PluginNotFoundError from core.plugin.impl.trigger import PluginTriggerManager from core.trigger.entities.entities import ( EventEntity, @@ -100,13 +100,13 @@ class TriggerManager: if provider_id_str in plugin_trigger_providers: return plugin_trigger_providers[provider_id_str] - manager = PluginTriggerManager() - provider = manager.fetch_trigger_provider(tenant_id, provider_id) - - if not provider: - raise ValueError(f"Trigger provider {provider_id} not found") - try: + manager = PluginTriggerManager() + provider = manager.fetch_trigger_provider(tenant_id, provider_id) + + if not provider: + raise ValueError(f"Trigger provider {provider_id} not found") + controller = PluginTriggerProviderController( entity=provider.declaration, plugin_id=provider.plugin_id, @@ -116,6 +116,8 @@ class TriggerManager: ) plugin_trigger_providers[provider_id_str] = controller return controller + except PluginNotFoundError as e: + raise ValueError(f"Trigger provider {provider_id} not found") from e except PluginDaemonError as e: raise e except Exception as e: diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index fca9c25c37..f196108820 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -1,12 +1,12 @@ import logging import time import uuid -from collections.abc import Mapping, Sequence +from collections.abc import Mapping from typing import Any from flask import Request, Response from pydantic import BaseModel -from sqlalchemy import and_, func, select +from sqlalchemy import and_, select from sqlalchemy.orm import Session from core.plugin.entities.plugin_daemon import CredentialType @@ -14,25 +14,21 @@ from core.plugin.entities.request import TriggerDispatchResponse, TriggerInvokeE from core.plugin.impl.exc import PluginNotFoundError from core.plugin.utils.http_parser import deserialize_request, serialize_request from core.trigger.debug.events import PluginTriggerDebugEvent -from core.trigger.entities.entities import EventEntity from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager from core.trigger.utils.encryption import create_trigger_provider_encrypter_for_subscription from core.workflow.enums import NodeType from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData -from core.workflow.nodes.trigger_schedule.exc import TenantOwnerNotFoundError from extensions.ext_database import db from extensions.ext_redis import redis_client from extensions.ext_storage import storage -from models.account import Account, TenantAccountJoin, TenantAccountRole -from models.enums import WorkflowRunTriggeredFrom from models.model import App from models.provider_ids import TriggerProviderID from models.trigger import TriggerSubscription from models.workflow import AppTrigger, AppTriggerStatus, Workflow, WorkflowPluginTrigger -from services.async_workflow_service import AsyncWorkflowService from services.trigger.trigger_provider_service import TriggerProviderService -from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData +from services.workflow.entities import PluginTriggerDispatchData +from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async logger = logging.getLogger(__name__) @@ -77,147 +73,6 @@ class TriggerService: request=request, ) - @classmethod - def _get_latest_workflows_by_app_ids( - cls, session: Session, subscribers: Sequence[WorkflowPluginTrigger] - ) -> Mapping[str, Workflow]: - """Get the latest workflows by app_ids""" - workflow_query = ( - select(Workflow.app_id, func.max(Workflow.created_at).label("max_created_at")) - .where( - Workflow.app_id.in_({t.app_id for t in subscribers}), - Workflow.version != Workflow.VERSION_DRAFT, - ) - .group_by(Workflow.app_id) - .subquery() - ) - workflows = session.scalars( - select(Workflow).join( - workflow_query, - (Workflow.app_id == workflow_query.c.app_id) & (Workflow.created_at == workflow_query.c.max_created_at), - ) - ).all() - return {w.app_id: w for w in workflows} - - @classmethod - def _get_tenant_owner(cls, session: Session, tenant_id: str) -> Account: - """Get the tenant owner account for workflow execution.""" - owner = session.scalar( - select(Account) - .join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id) - .where(TenantAccountJoin.tenant_id == tenant_id, TenantAccountJoin.role == TenantAccountRole.OWNER) - ) - if not owner: - raise TenantOwnerNotFoundError(f"Tenant owner not found for tenant {tenant_id}") - return owner - - @classmethod - def dispatch_triggered_workflows( - cls, subscription: TriggerSubscription, event: EventEntity, request_id: str - ) -> int: - """Process triggered workflows. - - Args: - subscription: The trigger subscription - event: The trigger entity that was activated - request_id: The ID of the stored request in storage system - """ - request = deserialize_request(storage.load_once(f"triggers/{request_id}")) - if not request: - logger.error("Request not found for request_id %s", request_id) - return 0 - - subscribers: list[WorkflowPluginTrigger] = cls.get_subscriber_triggers( - tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event.identity.name - ) - if not subscribers: - logger.warning( - "No workflows found for trigger event '%s' in subscription '%s'", - event.identity.name, - subscription.id, - ) - return 0 - - dispatched_count = 0 - provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( - tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id) - ) - with Session(db.engine) as session: - tenant_owner = cls._get_tenant_owner(session, subscription.tenant_id) - workflows = cls._get_latest_workflows_by_app_ids(session, subscribers) - for plugin_trigger in subscribers: - # Get workflow from mapping - workflow = workflows.get(plugin_trigger.app_id) - if not workflow: - logger.error( - "Workflow not found for app %s", - plugin_trigger.app_id, - ) - continue - - # Find the trigger node in the workflow - event_node = None - for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN): - if node_id == plugin_trigger.node_id: - event_node = node_config - break - - if not event_node: - logger.error("Trigger event node not found for app %s", plugin_trigger.app_id) - continue - - # invoke triger - node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node) - invoke_response: TriggerInvokeEventResponse = TriggerManager.invoke_trigger_event( - tenant_id=subscription.tenant_id, - user_id=subscription.user_id, - provider_id=TriggerProviderID(subscription.provider_id), - event_name=event.identity.name, - parameters=node_data.resolve_parameters( - parameter_schemas=provider_controller.get_event_parameters(event_name=event.identity.name) - ), - credentials=subscription.credentials, - credential_type=CredentialType.of(subscription.credential_type), - subscription=subscription.to_entity(), - request=request, - ) - if invoke_response.cancelled: - logger.info( - "Trigger ignored for app %s with trigger event %s", - plugin_trigger.app_id, - event.identity.name, - ) - continue - - # Create trigger data for async execution - trigger_data = PluginTriggerData( - app_id=plugin_trigger.app_id, - tenant_id=subscription.tenant_id, - workflow_id=workflow.id, - root_node_id=plugin_trigger.node_id, - trigger_type=WorkflowRunTriggeredFrom.PLUGIN, - plugin_id=subscription.provider_id, - endpoint_id=subscription.endpoint_id, - inputs=invoke_response.variables, - ) - - # Trigger async workflow - try: - AsyncWorkflowService.trigger_workflow_async(session, tenant_owner, trigger_data) - dispatched_count += 1 - logger.info( - "Triggered workflow for app %s with trigger event %s", - plugin_trigger.app_id, - event.identity.name, - ) - except Exception: - logger.exception( - "Failed to trigger workflow for app %s", - plugin_trigger.app_id, - ) - - return dispatched_count - @classmethod def process_endpoint(cls, endpoint_id: str, request: Request) -> Response | None: """ @@ -249,7 +104,6 @@ class TriggerService: subscription=subscription, ) dispatch_response: TriggerDispatchResponse = controller.dispatch( - user_id=subscription.user_id, request=request, subscription=subscription.to_entity(), credentials=encrypter.decrypt(subscription.credentials), @@ -261,10 +115,20 @@ class TriggerService: serialized_request = serialize_request(request) storage.save(f"triggers/{request_id}", serialized_request) - # Production dispatch - from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async + # Validate event names + for event_name in dispatch_response.events: + if controller.get_event(event_name) is None: + logger.error( + "Event name %s not found in provider %s for endpoint %s", + event_name, + subscription.provider_id, + endpoint_id, + ) + raise ValueError(f"Event name {event_name} not found in provider {subscription.provider_id}") plugin_trigger_dispatch_data = PluginTriggerDispatchData( + user_id=dispatch_response.user_id, + tenant_id=subscription.tenant_id, endpoint_id=endpoint_id, provider_id=subscription.provider_id, subscription_id=subscription.id, diff --git a/api/services/trigger/trigger_subscription_builder_service.py b/api/services/trigger/trigger_subscription_builder_service.py index 5f5cd3e8ee..daa595bd2c 100644 --- a/api/services/trigger/trigger_subscription_builder_service.py +++ b/api/services/trigger/trigger_subscription_builder_service.py @@ -459,7 +459,6 @@ class TriggerSubscriptionBuilderService: ) try: dispatch_response: TriggerDispatchResponse = controller.dispatch( - user_id=subscription_builder.user_id, request=request, subscription=subscription_builder.to_subscription(), credentials={}, diff --git a/api/services/trigger/webhook_service.py b/api/services/trigger/webhook_service.py index e69886f38c..f2a15cb9d8 100644 --- a/api/services/trigger/webhook_service.py +++ b/api/services/trigger/webhook_service.py @@ -12,6 +12,8 @@ from sqlalchemy.orm import Session from werkzeug.exceptions import RequestEntityTooLarge from configs import dify_config +from controllers.service_api.wraps import get_or_create_end_user_by_type +from core.app.entities.app_invoke_entities import InvokeFrom from core.file.models import FileTransferMethod from core.tools.tool_file_manager import ToolFileManager from core.variables.types import SegmentType @@ -19,7 +21,6 @@ from core.workflow.enums import NodeType from extensions.ext_database import db from extensions.ext_redis import redis_client from factories import file_factory -from models.account import Account, TenantAccountJoin, TenantAccountRole from models.enums import WorkflowRunTriggeredFrom from models.model import App from models.workflow import AppTrigger, AppTriggerStatus, AppTriggerType, Workflow, WorkflowWebhookTrigger @@ -704,20 +705,6 @@ class WebhookService: """ try: with Session(db.engine) as session: - # Get tenant owner as the user for webhook execution - tenant_owner = session.scalar( - select(Account) - .join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id) - .where( - TenantAccountJoin.tenant_id == webhook_trigger.tenant_id, - TenantAccountJoin.role == TenantAccountRole.OWNER, - ) - ) - - if not tenant_owner: - logger.error("Tenant owner not found for tenant %s", webhook_trigger.tenant_id) - raise ValueError("Tenant owner not found") - # Prepare inputs for the webhook node # The webhook node expects webhook_data in the inputs workflow_inputs = cls.build_workflow_inputs(webhook_data) @@ -732,10 +719,17 @@ class WebhookService: tenant_id=webhook_trigger.tenant_id, ) + end_user = get_or_create_end_user_by_type( + type=InvokeFrom.TRIGGER, + tenant_id=webhook_trigger.tenant_id, + app_id=webhook_trigger.app_id, + user_id=None, + ) + # Trigger workflow execution asynchronously AsyncWorkflowService.trigger_workflow_async( session, - tenant_owner, + end_user, trigger_data, ) diff --git a/api/services/workflow/entities.py b/api/services/workflow/entities.py index fc1a5d91ee..01c7f3df17 100644 --- a/api/services/workflow/entities.py +++ b/api/services/workflow/entities.py @@ -61,6 +61,8 @@ class PluginTriggerData(TriggerData): class PluginTriggerDispatchData(BaseModel): """Plugin trigger dispatch data for Celery tasks""" + user_id: str + tenant_id: str endpoint_id: str provider_id: str subscription_id: str diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 345a566b35..d3fa387bd4 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -6,25 +6,32 @@ to avoid blocking the main request thread. """ import logging +from collections.abc import Mapping, Sequence from celery import shared_task +from sqlalchemy import func, select from sqlalchemy.orm import Session +from core.app.entities.app_invoke_entities import InvokeFrom +from core.plugin.entities.plugin_daemon import CredentialType +from core.plugin.entities.request import TriggerInvokeEventResponse +from core.plugin.utils.http_parser import deserialize_request from core.trigger.debug.event_bus import TriggerDebugEventBus from core.trigger.debug.events import PluginTriggerDebugEvent -from core.trigger.entities.entities import EventEntity from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager -from core.trigger.utils.encryption import ( - create_trigger_provider_encrypter_for_properties, - create_trigger_provider_encrypter_for_subscription, -) +from core.workflow.enums import NodeType +from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData from extensions.ext_database import db from extensions.ext_storage import storage +from models.enums import WorkflowRunTriggeredFrom +from models.model import EndUser from models.provider_ids import TriggerProviderID from models.trigger import TriggerSubscription -from services.trigger.trigger_service import TriggerService -from services.workflow.entities import PluginTriggerDispatchData +from models.workflow import Workflow, WorkflowPluginTrigger +from services.async_workflow_service import AsyncWorkflowService +from services.trigger.trigger_provider_service import TriggerProviderService +from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData logger = logging.getLogger(__name__) @@ -32,6 +39,230 @@ logger = logging.getLogger(__name__) TRIGGER_QUEUE = "triggered_workflow_dispatcher" +def dispatch_trigger_debug_event( + events: list[str], + user_id: str, + timestamp: int, + request_id: str, + subscription: TriggerSubscription, +) -> int: + debug_dispatched = 0 + try: + for event_name in events: + pool_key: str = PluginTriggerDebugEvent.build_pool_key( + name=event_name, + tenant_id=subscription.tenant_id, + subscription_id=subscription.id, + provider_id=subscription.provider_id, + ) + trigger_debug_event: PluginTriggerDebugEvent = PluginTriggerDebugEvent( + timestamp=timestamp, + user_id=user_id, + name=event_name, + request_id=request_id, + subscription_id=subscription.id, + provider_id=subscription.provider_id, + ) + debug_dispatched += TriggerDebugEventBus.dispatch( + tenant_id=subscription.tenant_id, + event=trigger_debug_event, + pool_key=pool_key, + ) + logger.debug( + "Trigger debug dispatched %d sessions to pool %s for event %s for subscription %s provider %s", + debug_dispatched, + pool_key, + event_name, + subscription.id, + subscription.provider_id, + ) + return debug_dispatched + except Exception: + logger.exception("Failed to dispatch to debug sessions") + return 0 + + +def _get_latest_workflows_by_app_ids( + session: Session, subscribers: Sequence[WorkflowPluginTrigger] +) -> Mapping[str, Workflow]: + """Get the latest workflows by app_ids""" + workflow_query = ( + select(Workflow.app_id, func.max(Workflow.created_at).label("max_created_at")) + .where( + Workflow.app_id.in_({t.app_id for t in subscribers}), + Workflow.version != Workflow.VERSION_DRAFT, + ) + .group_by(Workflow.app_id) + .subquery() + ) + workflows = session.scalars( + select(Workflow).join( + workflow_query, + (Workflow.app_id == workflow_query.c.app_id) & (Workflow.created_at == workflow_query.c.max_created_at), + ) + ).all() + return {w.app_id: w for w in workflows} + + +def dispatch_triggered_workflow( + user_id: str, + subscription: TriggerSubscription, + event_name: str, + request_id: str, +) -> int: + """Process triggered workflows. + + Args: + subscription: The trigger subscription + event: The trigger entity that was activated + request_id: The ID of the stored request in storage system + """ + request = deserialize_request(storage.load_once(f"triggers/{request_id}")) + if not request: + logger.error("Request not found for request_id %s", request_id) + return 0 + + from services.trigger.trigger_service import TriggerService + + subscribers: list[WorkflowPluginTrigger] = TriggerService.get_subscriber_triggers( + tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event_name + ) + if not subscribers: + logger.warning( + "No workflows found for trigger event '%s' in subscription '%s'", + event_name, + subscription.id, + ) + return 0 + + dispatched_count = 0 + provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( + tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id) + ) + with Session(db.engine) as session: + workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers) + # Lazy import to avoid circular import during app initialization + from controllers.service_api.wraps import create_end_user_batch + + end_users: Mapping[str, EndUser] = create_end_user_batch( + type=InvokeFrom.TRIGGER, + tenant_id=subscription.tenant_id, + app_ids=[plugin_trigger.app_id for plugin_trigger in subscribers], + user_id=user_id, + ) + for plugin_trigger in subscribers: + # Get workflow from mapping + workflow: Workflow | None = workflows.get(plugin_trigger.app_id) + if not workflow: + logger.error( + "Workflow not found for app %s", + plugin_trigger.app_id, + ) + continue + + # Find the trigger node in the workflow + event_node = None + for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN): + if node_id == plugin_trigger.node_id: + event_node = node_config + break + + if not event_node: + logger.error("Trigger event node not found for app %s", plugin_trigger.app_id) + continue + + # invoke triger + node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node) + invoke_response: TriggerInvokeEventResponse = TriggerManager.invoke_trigger_event( + tenant_id=subscription.tenant_id, + user_id=user_id, + provider_id=TriggerProviderID(subscription.provider_id), + event_name=event_name, + parameters=node_data.resolve_parameters( + parameter_schemas=provider_controller.get_event_parameters(event_name=event_name) + ), + credentials=subscription.credentials, + credential_type=CredentialType.of(subscription.credential_type), + subscription=subscription.to_entity(), + request=request, + ) + if invoke_response.cancelled: + logger.info( + "Trigger ignored for app %s with trigger event %s", + plugin_trigger.app_id, + event_name, + ) + continue + + # Create trigger data for async execution + trigger_data = PluginTriggerData( + app_id=plugin_trigger.app_id, + tenant_id=subscription.tenant_id, + workflow_id=workflow.id, + root_node_id=plugin_trigger.node_id, + trigger_type=WorkflowRunTriggeredFrom.PLUGIN, + plugin_id=subscription.provider_id, + endpoint_id=subscription.endpoint_id, + inputs=invoke_response.variables, + ) + + # Trigger async workflow + try: + end_user = end_users.get(plugin_trigger.app_id) + if not end_user: + raise ValueError(f"End user not found for app {plugin_trigger.app_id}") + + AsyncWorkflowService.trigger_workflow_async(session=session, user=end_user, trigger_data=trigger_data) + dispatched_count += 1 + logger.info( + "Triggered workflow for app %s with trigger event %s", + plugin_trigger.app_id, + event_name, + ) + except Exception: + logger.exception( + "Failed to trigger workflow for app %s", + plugin_trigger.app_id, + ) + + return dispatched_count + + +def dispatch_triggered_workflows( + user_id: str, + events: list[str], + subscription: TriggerSubscription, + request_id: str, +) -> int: + dispatched_count = 0 + for event_name in events: + try: + dispatched_count += dispatch_triggered_workflow( + user_id=user_id, + subscription=subscription, + event_name=event_name, + request_id=request_id, + ) + except Exception: + logger.exception( + "Failed to dispatch trigger '%s' for subscription %s and provider %s. Continuing...", + event_name, + subscription.id, + subscription.provider_id, + ) + # Continue processing other triggers even if one fails + continue + + logger.info( + "Completed async trigger dispatching: processed %d/%d triggers for subscription %s and provider %s", + dispatched_count, + len(events), + subscription.id, + subscription.provider_id, + ) + return dispatched_count + + @shared_task(queue=TRIGGER_QUEUE) def dispatch_triggered_workflows_async( dispatch_data: dict, @@ -51,6 +282,8 @@ def dispatch_triggered_workflows_async( dict: Execution result with status and dispatched trigger count """ dispatch_params: PluginTriggerDispatchData = PluginTriggerDispatchData.model_validate(dispatch_data) + user_id = dispatch_params.user_id + tenant_id = dispatch_params.tenant_id endpoint_id = dispatch_params.endpoint_id provider_id = dispatch_params.provider_id subscription_id = dispatch_params.subscription_id @@ -60,7 +293,8 @@ def dispatch_triggered_workflows_async( try: logger.info( - "Starting trigger dispatching endpoint=%s, events=%s, request_id=%s, subscription_id=%s, provider_id=%s", + "Starting trigger dispatching uid=%s, endpoint=%s, events=%s, req_id=%s, sub_id=%s, provider_id=%s", + user_id, endpoint_id, events, request_id, @@ -68,125 +302,35 @@ def dispatch_triggered_workflows_async( provider_id, ) - # Verify request exists in storage - try: - serialized_request = storage.load_once(f"triggers/{request_id}") - # Just verify it exists, we don't need to deserialize it here - if not serialized_request: - raise ValueError("Request not found in storage") - except Exception as e: - logger.exception("Failed to load request %s", request_id, exc_info=e) - return {"status": "failed", "error": f"Failed to load request: {str(e)}"} + subscription: TriggerSubscription | None = TriggerProviderService.get_subscription_by_id( + tenant_id=tenant_id, + subscription_id=subscription_id, + ) + if not subscription: + logger.error("Subscription not found: %s", subscription_id) + return {"status": "failed", "error": "Subscription not found"} - with Session(db.engine) as session: - # Get subscription - subscription: TriggerSubscription | None = ( - session.query(TriggerSubscription).filter_by(id=subscription_id).first() - ) - if not subscription: - logger.error("Subscription not found: %s", subscription_id) - return {"status": "failed", "error": "Subscription not found"} + workflow_dispatched = dispatch_triggered_workflows( + user_id=user_id, + events=events, + subscription=subscription, + request_id=request_id, + ) - # Get controller - controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( - subscription.tenant_id, TriggerProviderID(provider_id) - ) - if not controller: - logger.error("Controller not found for provider: %s", provider_id) - return {"status": "failed", "error": "Controller not found"} + debug_dispatched = dispatch_trigger_debug_event( + events=events, + user_id=user_id, + timestamp=timestamp, + request_id=request_id, + subscription=subscription, + ) - credential_encrypter, _ = create_trigger_provider_encrypter_for_subscription( - tenant_id=subscription.tenant_id, - controller=controller, - subscription=subscription, - ) - subscription.credentials = credential_encrypter.decrypt(subscription.credentials) - - properties_encrypter, _ = create_trigger_provider_encrypter_for_properties( - tenant_id=subscription.tenant_id, - controller=controller, - subscription=subscription, - ) - subscription.properties = properties_encrypter.decrypt(subscription.properties) - - # Dispatch each trigger - dispatched_count = 0 - for event_name in events: - try: - event: EventEntity | None = controller.get_event(event_name) - if event is None: - logger.error( - "Trigger '%s' not found in provider '%s'", - event_name, - provider_id, - ) - continue - - dispatched_count += TriggerService.dispatch_triggered_workflows( - subscription=subscription, - event=event, - request_id=request_id, - ) - - except Exception: - logger.exception( - "Failed to dispatch trigger '%s' for subscription %s and provider %s. Continuing...", - event_name, - subscription_id, - provider_id, - ) - # Continue processing other triggers even if one fails - continue - - # Dispatch to debug sessions after processing all triggers - debug_dispatched = 0 - try: - for event_name in events: - pool_key: str = PluginTriggerDebugEvent.build_pool_key( - name=event_name, - tenant_id=subscription.tenant_id, - subscription_id=subscription_id, - provider_id=provider_id, - ) - event = PluginTriggerDebugEvent( - provider_id=provider_id, - subscription_id=subscription_id, - request_id=request_id, - timestamp=timestamp, - name=event_name, - ) - debug_dispatched += TriggerDebugEventBus.dispatch( - tenant_id=subscription.tenant_id, - event=event, - pool_key=pool_key, - ) - logger.debug( - "Trigger debug dispatched %d sessions to pool %s for event %s for subscription %s provider %s", - debug_dispatched, - pool_key, - event_name, - subscription_id, - provider_id, - ) - - except Exception: - # Silent failure for debug dispatch - logger.exception("Failed to dispatch to debug sessions") - - logger.info( - "Completed async trigger dispatching: processed %d/%d triggers for subscription %s and provider %s", - dispatched_count, - len(events), - subscription_id, - provider_id, - ) - - return { - "status": "completed", - "total_count": len(events), - "dispatched_count": dispatched_count, - "debug_dispatched_count": debug_dispatched, - } + return { + "status": "completed", + "total_count": len(events), + "workflows": workflow_dispatched, + "debug_events": debug_dispatched, + } except Exception as e: logger.exception(