diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 3b2c769960..a6e2d64b19 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -1072,7 +1072,9 @@ class DraftWorkflowTriggerNodeApi(Resource): return jsonable_encoder(node_execution) except Exception as e: logger.exception("Error running draft workflow trigger node") - return jsonable_encoder({"status": "error", "error": "An unexpected error occurred while running the node."}), 500 + return jsonable_encoder( + {"status": "error", "error": "An unexpected error occurred while running the node."} + ), 500 @console_ns.route("/apps//workflows/draft/trigger/run-all") diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index 309dbde1b6..bbbbe12fb0 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -20,8 +20,8 @@ from models.account import Account from models.provider_ids import TriggerProviderID from services.plugin.oauth_service import OAuthProxyService from services.trigger.trigger_provider_service import TriggerProviderService -from services.trigger.trigger_service import TriggerService from services.trigger.trigger_subscription_builder_service import TriggerSubscriptionBuilderService +from services.trigger.trigger_subscription_operator_service import TriggerSubscriptionOperatorService logger = logging.getLogger(__name__) @@ -265,7 +265,7 @@ class TriggerSubscriptionDeleteApi(Resource): @setup_required @login_required @account_initialization_required - def post(self, subscription_id): + def post(self, subscription_id: str): """Delete a subscription instance""" user = current_user assert isinstance(user, Account) @@ -282,7 +282,7 @@ class TriggerSubscriptionDeleteApi(Resource): subscription_id=subscription_id, ) # Delete plugin triggers - TriggerService.delete_plugin_trigger_by_subscription( + TriggerSubscriptionOperatorService.delete_plugin_trigger_by_subscription( session=session, tenant_id=user.current_tenant_id, subscription_id=subscription_id, diff --git a/api/services/trigger/schedule_service.py b/api/services/trigger/schedule_service.py index b0f39bf929..97f2def0a1 100644 --- a/api/services/trigger/schedule_service.py +++ b/api/services/trigger/schedule_service.py @@ -14,6 +14,7 @@ from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h from models.account import Account, TenantAccountJoin from models.trigger import WorkflowSchedulePlan from models.workflow import Workflow +from services.errors.account import AccountNotFoundError logger = logging.getLogger(__name__) @@ -124,7 +125,7 @@ class ScheduleService: session.flush() @staticmethod - def get_tenant_owner(session: Session, tenant_id: str) -> Optional[Account]: + def get_tenant_owner(session: Session, tenant_id: str) -> Account: """ Returns an account to execute scheduled workflows on behalf of the tenant. Prioritizes owner over admin to ensure proper authorization hierarchy. @@ -144,7 +145,12 @@ class ScheduleService: ).scalar_one_or_none() if result: - return session.get(Account, result.account_id) + account = session.get(Account, result.account_id) + if not account: + raise AccountNotFoundError(f"Account not found: {result.account_id}") + return account + else: + raise AccountNotFoundError(f"Account not found for tenant: {tenant_id}") @staticmethod def update_next_run_at( diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index 8a04a3c642..0255e42546 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -6,7 +6,7 @@ from typing import Any from flask import Request, Response from pydantic import BaseModel -from sqlalchemy import and_, select +from sqlalchemy import select from sqlalchemy.orm import Session from core.plugin.entities.plugin_daemon import CredentialType @@ -22,7 +22,7 @@ from extensions.ext_database import db from extensions.ext_redis import redis_client from models.model import App from models.provider_ids import TriggerProviderID -from models.trigger import AppTrigger, AppTriggerStatus, TriggerSubscription, WorkflowPluginTrigger +from models.trigger import TriggerSubscription, WorkflowPluginTrigger from models.workflow import Workflow from services.trigger.trigger_provider_service import TriggerProviderService from services.trigger.trigger_request_service import TriggerHttpRequestCachingService @@ -148,68 +148,6 @@ class TriggerService: ) return dispatch_response.response - @classmethod - def get_subscriber_triggers( - cls, tenant_id: str, subscription_id: str, event_name: str - ) -> list[WorkflowPluginTrigger]: - """ - Get WorkflowPluginTriggers for a subscription and trigger. - - Args: - tenant_id: Tenant ID - subscription_id: Subscription ID - event_name: Event name - """ - with Session(db.engine, expire_on_commit=False) as session: - subscribers = session.scalars( - select(WorkflowPluginTrigger) - .join( - AppTrigger, - and_( - AppTrigger.tenant_id == WorkflowPluginTrigger.tenant_id, - AppTrigger.app_id == WorkflowPluginTrigger.app_id, - AppTrigger.node_id == WorkflowPluginTrigger.node_id, - ), - ) - .where( - WorkflowPluginTrigger.tenant_id == tenant_id, - WorkflowPluginTrigger.subscription_id == subscription_id, - WorkflowPluginTrigger.event_name == event_name, - AppTrigger.status == AppTriggerStatus.ENABLED, - ) - ).all() - return list(subscribers) - - @classmethod - def delete_plugin_trigger_by_subscription( - cls, - session: Session, - tenant_id: str, - subscription_id: str, - ) -> None: - """Delete a plugin trigger by tenant_id and subscription_id within an existing session - - Args: - session: Database session - tenant_id: The tenant ID - subscription_id: The subscription ID - - Raises: - NotFound: If plugin trigger not found - """ - # Find plugin trigger using indexed columns - plugin_trigger = session.scalar( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.tenant_id == tenant_id, - WorkflowPluginTrigger.subscription_id == subscription_id, - ) - ) - - if not plugin_trigger: - return - - session.delete(plugin_trigger) - @classmethod def sync_plugin_trigger_relationships(cls, app: App, workflow: Workflow): """ diff --git a/api/services/trigger/trigger_subscription_operator_service.py b/api/services/trigger/trigger_subscription_operator_service.py new file mode 100644 index 0000000000..5d7785549e --- /dev/null +++ b/api/services/trigger/trigger_subscription_operator_service.py @@ -0,0 +1,70 @@ +from sqlalchemy import and_, select +from sqlalchemy.orm import Session + +from extensions.ext_database import db +from models.enums import AppTriggerStatus +from models.trigger import AppTrigger, WorkflowPluginTrigger + + +class TriggerSubscriptionOperatorService: + @classmethod + def get_subscriber_triggers( + cls, tenant_id: str, subscription_id: str, event_name: str + ) -> list[WorkflowPluginTrigger]: + """ + Get WorkflowPluginTriggers for a subscription and trigger. + + Args: + tenant_id: Tenant ID + subscription_id: Subscription ID + event_name: Event name + """ + with Session(db.engine, expire_on_commit=False) as session: + subscribers = session.scalars( + select(WorkflowPluginTrigger) + .join( + AppTrigger, + and_( + AppTrigger.tenant_id == WorkflowPluginTrigger.tenant_id, + AppTrigger.app_id == WorkflowPluginTrigger.app_id, + AppTrigger.node_id == WorkflowPluginTrigger.node_id, + ), + ) + .where( + WorkflowPluginTrigger.tenant_id == tenant_id, + WorkflowPluginTrigger.subscription_id == subscription_id, + WorkflowPluginTrigger.event_name == event_name, + AppTrigger.status == AppTriggerStatus.ENABLED, + ) + ).all() + return list(subscribers) + + @classmethod + def delete_plugin_trigger_by_subscription( + cls, + session: Session, + tenant_id: str, + subscription_id: str, + ) -> None: + """Delete a plugin trigger by tenant_id and subscription_id within an existing session + + Args: + session: Database session + tenant_id: The tenant ID + subscription_id: The subscription ID + + Raises: + NotFound: If plugin trigger not found + """ + # Find plugin trigger using indexed columns + plugin_trigger = session.scalar( + select(WorkflowPluginTrigger).where( + WorkflowPluginTrigger.tenant_id == tenant_id, + WorkflowPluginTrigger.subscription_id == subscription_id, + ) + ) + + if not plugin_trigger: + return + + session.delete(plugin_trigger) diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index fca7e47c95..75c36ef677 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -32,6 +32,7 @@ from services.async_workflow_service import AsyncWorkflowService from services.end_user_service import EndUserService from services.trigger.trigger_provider_service import TriggerProviderService from services.trigger.trigger_request_service import TriggerHttpRequestCachingService +from services.trigger.trigger_subscription_operator_service import TriggerSubscriptionOperatorService from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData, PluginTriggerMetadata logger = logging.getLogger(__name__) @@ -121,10 +122,7 @@ def dispatch_triggered_workflow( request = TriggerHttpRequestCachingService.get_request(request_id) payload = TriggerHttpRequestCachingService.get_payload(request_id) - from services.trigger.trigger_service import TriggerService - # FIXME: we should avoid import modules inside methods - - subscribers: list[WorkflowPluginTrigger] = TriggerService.get_subscriber_triggers( + subscribers: list[WorkflowPluginTrigger] = TriggerSubscriptionOperatorService.get_subscriber_triggers( tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event_name ) if not subscribers: