From 4b253e1f7310f177e1bde28afbff7b441e30b027 Mon Sep 17 00:00:00 2001 From: Harry Date: Tue, 2 Sep 2025 20:16:11 +0800 Subject: [PATCH] feat(trigger): plugin trigger workflow --- .../console/app/workflow_trigger.py | 204 ++++++++++++++++++ api/core/trigger/entities/api_entities.py | 3 + api/core/trigger/entities/entities.py | 42 ++-- ...ptimize_trigger_provider_endpoint_index.py | 42 ---- ..._09_02_1528-afa344924e5c_plugin_trigger.py | 68 ++++++ api/models/trigger.py | 24 ++- api/models/workflow.py | 86 ++++---- api/services/trigger_service.py | 1 - api/services/webhook_service.py | 4 +- api/tasks/remove_app_and_related_data_task.py | 11 + 10 files changed, 375 insertions(+), 110 deletions(-) delete mode 100644 api/migrations/versions/2025_09_01_1242-9d83760807c5_optimize_trigger_provider_endpoint_index.py create mode 100644 api/migrations/versions/2025_09_02_1528-afa344924e5c_plugin_trigger.py diff --git a/api/controllers/console/app/workflow_trigger.py b/api/controllers/console/app/workflow_trigger.py index 2a099935a8..30c8a8a8ab 100644 --- a/api/controllers/console/app/workflow_trigger.py +++ b/api/controllers/console/app/workflow_trigger.py @@ -18,6 +18,209 @@ from models.workflow import AppTrigger, AppTriggerStatus, WorkflowWebhookTrigger logger = logging.getLogger(__name__) +from models.workflow import WorkflowPluginTrigger + + +class PluginTriggerApi(Resource): + """Workflow Plugin Trigger API""" + + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=AppMode.WORKFLOW) + def post(self, app_model): + """Create plugin trigger""" + parser = reqparse.RequestParser() + parser.add_argument("node_id", type=str, required=True, help="Node ID is required") + parser.add_argument("provider_id", type=str, required=True, help="Provider ID is required") + parser.add_argument("trigger_name", type=str, required=True, help="Trigger name is required") + parser.add_argument( + "triggered_by", + type=str, + required=False, + default="production", + choices=["debugger", "production"], + help="triggered_by must be debugger or production", + ) + args = parser.parse_args() + + # The role of the current user in the ta table must be admin, owner, or editor + if not current_user.is_editor: + raise Forbidden() + + node_id = args["node_id"] + provider_id = args["provider_id"] + trigger_name = args["trigger_name"] + triggered_by = args["triggered_by"] + + # Create trigger_id from provider_id and trigger_name + trigger_id = f"{provider_id}:{trigger_name}" + + with Session(db.engine) as session: + # Check if plugin trigger already exists for this app, node, and environment + existing_trigger = session.scalar( + select(WorkflowPluginTrigger).where( + WorkflowPluginTrigger.app_id == app_model.id, + WorkflowPluginTrigger.node_id == node_id, + WorkflowPluginTrigger.triggered_by == triggered_by, + ) + ) + + if existing_trigger: + raise BadRequest("Plugin trigger already exists for this node and environment") + + # Create new plugin trigger + plugin_trigger = WorkflowPluginTrigger( + app_id=app_model.id, + node_id=node_id, + tenant_id=current_user.current_tenant_id, + provider_id=provider_id, + trigger_id=trigger_id, + triggered_by=triggered_by, + ) + + session.add(plugin_trigger) + session.commit() + session.refresh(plugin_trigger) + + return plugin_trigger + + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=AppMode.WORKFLOW) + def get(self, app_model): + """Get plugin trigger""" + parser = reqparse.RequestParser() + parser.add_argument("node_id", type=str, required=True, help="Node ID is required") + parser.add_argument( + "triggered_by", + type=str, + required=False, + default="production", + choices=["debugger", "production"], + help="triggered_by must be debugger or production", + ) + args = parser.parse_args() + + node_id = args["node_id"] + triggered_by = args["triggered_by"] + + with Session(db.engine) as session: + # Find plugin trigger + plugin_trigger = session.scalar( + select(WorkflowPluginTrigger).where( + WorkflowPluginTrigger.app_id == app_model.id, + WorkflowPluginTrigger.node_id == node_id, + WorkflowPluginTrigger.triggered_by == triggered_by, + WorkflowPluginTrigger.tenant_id == current_user.current_tenant_id, + ) + ) + + if not plugin_trigger: + raise NotFound("Plugin trigger not found") + return plugin_trigger + + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=AppMode.WORKFLOW) + def put(self, app_model): + """Update plugin trigger""" + parser = reqparse.RequestParser() + parser.add_argument("node_id", type=str, required=True, help="Node ID is required") + parser.add_argument("provider_id", type=str, required=False, help="Provider ID") + parser.add_argument("trigger_name", type=str, required=False, help="Trigger name") + parser.add_argument( + "triggered_by", + type=str, + required=False, + default="production", + choices=["debugger", "production"], + help="triggered_by must be debugger or production", + ) + args = parser.parse_args() + + # The role of the current user in the ta table must be admin, owner, or editor + if not current_user.is_editor: + raise Forbidden() + + node_id = args["node_id"] + triggered_by = args["triggered_by"] + + with Session(db.engine) as session: + # Find plugin trigger + plugin_trigger = session.scalar( + select(WorkflowPluginTrigger).where( + WorkflowPluginTrigger.app_id == app_model.id, + WorkflowPluginTrigger.node_id == node_id, + WorkflowPluginTrigger.triggered_by == triggered_by, + WorkflowPluginTrigger.tenant_id == current_user.current_tenant_id, + ) + ) + + if not plugin_trigger: + raise NotFound("Plugin trigger not found") + + # Update fields if provided + if args.get("provider_id"): + plugin_trigger.provider_id = args["provider_id"] + + if args.get("trigger_name"): + # Update trigger_id if provider_id or trigger_name changed + provider_id = args.get("provider_id") or plugin_trigger.provider_id + trigger_name = args["trigger_name"] + plugin_trigger.trigger_id = f"{provider_id}:{trigger_name}" + + session.commit() + session.refresh(plugin_trigger) + + return plugin_trigger + + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=AppMode.WORKFLOW) + def delete(self, app_model): + """Delete plugin trigger""" + parser = reqparse.RequestParser() + parser.add_argument("node_id", type=str, required=True, help="Node ID is required") + parser.add_argument( + "triggered_by", + type=str, + required=False, + default="production", + choices=["debugger", "production"], + help="triggered_by must be debugger or production", + ) + args = parser.parse_args() + + # The role of the current user in the ta table must be admin, owner, or editor + if not current_user.is_editor: + raise Forbidden() + + node_id = args["node_id"] + triggered_by = args["triggered_by"] + + with Session(db.engine) as session: + # Find plugin trigger + plugin_trigger = session.scalar( + select(WorkflowPluginTrigger).where( + WorkflowPluginTrigger.app_id == app_model.id, + WorkflowPluginTrigger.node_id == node_id, + WorkflowPluginTrigger.triggered_by == triggered_by, + WorkflowPluginTrigger.tenant_id == current_user.current_tenant_id, + ) + ) + + if not plugin_trigger: + raise NotFound("Plugin trigger not found") + + session.delete(plugin_trigger) + session.commit() + + return {"result": "success"}, 204 + class WebhookTriggerApi(Resource): """Webhook Trigger API""" @@ -232,5 +435,6 @@ class AppTriggerEnableApi(Resource): api.add_resource(WebhookTriggerApi, "/apps//workflows/triggers/webhook") +api.add_resource(PluginTriggerApi, "/apps//workflows/triggers/plugin") api.add_resource(AppTriggersApi, "/apps//triggers") api.add_resource(AppTriggerEnableApi, "/apps//trigger-enable") diff --git a/api/core/trigger/entities/api_entities.py b/api/core/trigger/entities/api_entities.py index 54f297b4b5..0820044d88 100644 --- a/api/core/trigger/entities/api_entities.py +++ b/api/core/trigger/entities/api_entities.py @@ -21,6 +21,9 @@ class TriggerProviderSubscriptionApiEntity(BaseModel): provider: str = Field(description="The provider id of the subscription") credential_type: CredentialType = Field(description="The type of the credential") credentials: dict = Field(description="The credentials of the subscription") + endpoint: str = Field(description="The endpoint of the subscription") + parameters: dict = Field(description="The parameters of the subscription") + properties: dict = Field(description="The properties of the subscription") class TriggerProviderApiEntity(BaseModel): diff --git a/api/core/trigger/entities/entities.py b/api/core/trigger/entities/entities.py index 359deec71c..5ec70943e3 100644 --- a/api/core/trigger/entities/entities.py +++ b/api/core/trigger/entities/entities.py @@ -1,4 +1,5 @@ from collections.abc import Mapping +from datetime import datetime from enum import StrEnum from typing import Any, Optional, Union @@ -39,7 +40,7 @@ class TriggerParameter(BaseModel): template: Optional[PluginParameterTemplate] = Field(default=None, description="The template of the parameter") scope: Optional[str] = None required: Optional[bool] = False - default: Union[int, float, str, None] = None + default: Union[int, float, str, list, None] = None min: Union[float, int, None] = None max: Union[float, int, None] = None precision: Optional[int] = None @@ -105,7 +106,7 @@ class SubscriptionSchema(BaseModel): The subscription schema of the trigger provider """ - parameters_schema: list[ProviderConfig] | None = Field( + parameters_schema: list[TriggerParameter] | None = Field( default_factory=list, description="The parameters schema required to create a subscription", ) @@ -184,32 +185,33 @@ class Unsubscription(BaseModel): class RequestLog(BaseModel): - id: str - endpoint: str - request: dict - response: dict - created_at: str + id: str = Field(..., description="The id of the request log") + endpoint: str = Field(..., description="The endpoint of the request log") + request: dict = Field(..., description="The request of the request log") + response: dict = Field(..., description="The response of the request log") + created_at: datetime = Field(..., description="The created at of the request log") class SubscriptionBuilder(BaseModel): - id: str - name: str | None = None - tenant_id: str - user_id: str - provider_id: str - endpoint_id: str - parameters: Mapping[str, Any] - properties: Mapping[str, Any] - credentials: Mapping[str, str] - credential_type: str | None = None - credential_expires_at: int | None = None - expires_at: int + id: str = Field(..., description="The id of the subscription builder") + name: str | None = Field(default=None, description="The name of the subscription builder") + tenant_id: str = Field(..., description="The tenant id of the subscription builder") + user_id: str = Field(..., description="The user id of the subscription builder") + provider_id: str = Field(..., description="The provider id of the subscription builder") + endpoint_id: str = Field(..., description="The endpoint id of the subscription builder") + parameters: Mapping[str, Any] = Field(..., description="The parameters of the subscription builder") + properties: Mapping[str, Any] = Field(..., description="The properties of the subscription builder") + credentials: Mapping[str, str] = Field(..., description="The credentials of the subscription builder") + credential_type: str | None = Field(default=None, description="The credential type of the subscription builder") + credential_expires_at: int | None = Field( + default=None, description="The credential expires at of the subscription builder" + ) + expires_at: int = Field(..., description="The expires at of the subscription builder") def to_subscription(self) -> Subscription: return Subscription( expires_at=self.expires_at, endpoint=self.endpoint_id, - parameters=self.parameters, properties=self.properties, ) diff --git a/api/migrations/versions/2025_09_01_1242-9d83760807c5_optimize_trigger_provider_endpoint_index.py b/api/migrations/versions/2025_09_01_1242-9d83760807c5_optimize_trigger_provider_endpoint_index.py deleted file mode 100644 index 2b6c4113ce..0000000000 --- a/api/migrations/versions/2025_09_01_1242-9d83760807c5_optimize_trigger_provider_endpoint_index.py +++ /dev/null @@ -1,42 +0,0 @@ -"""Optimize trigger provider endpoint index - -Revision ID: 9d83760807c5 -Revises: 9ee7d347f4c1 -Create Date: 2025-09-01 12:42:00.000000 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '9d83760807c5' -down_revision = '9ee7d347f4c1' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - - # Drop the old unique constraint on endpoint - with op.batch_alter_table('trigger_providers', schema=None) as batch_op: - batch_op.drop_constraint('unique_trigger_provider_endpoint', type_='unique') - - # Create a new unique index on endpoint for O(1) lookup - batch_op.create_index('idx_trigger_providers_endpoint', ['endpoint'], unique=True) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - - # Drop the new index - with op.batch_alter_table('trigger_providers', schema=None) as batch_op: - batch_op.drop_index('idx_trigger_providers_endpoint') - - # Recreate the old unique constraint - batch_op.create_unique_constraint('unique_trigger_provider_endpoint', ['endpoint']) - - # ### end Alembic commands ### diff --git a/api/migrations/versions/2025_09_02_1528-afa344924e5c_plugin_trigger.py b/api/migrations/versions/2025_09_02_1528-afa344924e5c_plugin_trigger.py new file mode 100644 index 0000000000..d1b56f3efd --- /dev/null +++ b/api/migrations/versions/2025_09_02_1528-afa344924e5c_plugin_trigger.py @@ -0,0 +1,68 @@ +"""plugin_trigger + +Revision ID: afa344924e5c +Revises: 9ee7d347f4c1 +Create Date: 2025-09-02 15:28:31.426728 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'afa344924e5c' +down_revision = '9ee7d347f4c1' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('oauth_provider_apps', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), + sa.Column('app_icon', sa.String(length=255), nullable=False), + sa.Column('app_label', sa.JSON(), server_default='{}', nullable=False), + sa.Column('client_id', sa.String(length=255), nullable=False), + sa.Column('client_secret', sa.String(length=255), nullable=False), + sa.Column('redirect_uris', sa.JSON(), server_default='[]', nullable=False), + sa.Column('scope', sa.String(length=255), server_default=sa.text("'read:name read:email read:avatar read:interface_language read:timezone'"), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP(0)'), nullable=False), + sa.PrimaryKeyConstraint('id', name='oauth_provider_app_pkey') + ) + with op.batch_alter_table('oauth_provider_apps', schema=None) as batch_op: + batch_op.create_index('oauth_provider_app_client_id_idx', ['client_id'], unique=False) + + op.create_table('workflow_plugin_triggers', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False), + sa.Column('app_id', models.types.StringUUID(), nullable=False), + sa.Column('node_id', sa.String(length=64), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('provider_id', sa.String(length=255), nullable=False), + sa.Column('trigger_id', sa.String(length=510), nullable=False), + sa.Column('triggered_by', sa.String(length=16), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.PrimaryKeyConstraint('id', name='workflow_plugin_trigger_pkey'), + sa.UniqueConstraint('app_id', 'node_id', 'triggered_by', name='uniq_plugin_node'), + sa.UniqueConstraint('trigger_id', 'node_id', name='uniq_trigger_node') + ) + with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: + batch_op.create_index('workflow_plugin_trigger_tenant_idx', ['tenant_id'], unique=False) + batch_op.create_index('workflow_plugin_trigger_trigger_idx', ['trigger_id'], unique=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: + batch_op.drop_index('workflow_plugin_trigger_trigger_idx') + batch_op.drop_index('workflow_plugin_trigger_tenant_idx') + + op.drop_table('workflow_plugin_triggers') + with op.batch_alter_table('oauth_provider_apps', schema=None) as batch_op: + batch_op.drop_index('oauth_provider_app_client_id_idx') + + op.drop_table('oauth_provider_apps') + # ### end Alembic commands ### diff --git a/api/models/trigger.py b/api/models/trigger.py index bd107c541a..d03a8c91a7 100644 --- a/api/models/trigger.py +++ b/api/models/trigger.py @@ -52,9 +52,12 @@ class TriggerSubscription(Base): Integer, default=-1, comment="Subscription instance expiration timestamp, -1 for never" ) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.now()) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) updated_at: Mapped[datetime] = mapped_column( - DateTime, nullable=False, server_default=func.now(), onupdate=func.now() + DateTime, + nullable=False, + server_default=func.current_timestamp(), + server_onupdate=func.current_timestamp(), ) def is_credential_expired(self) -> bool: @@ -76,6 +79,9 @@ class TriggerSubscription(Base): id=self.id, name=self.name, provider=self.provider_id, + endpoint=parse_endpoint_id(self.endpoint_id), + parameters=self.parameters, + properties=self.properties, credential_type=CredentialType(self.credential_type), credentials=self.credentials, ) @@ -94,6 +100,13 @@ class TriggerOAuthSystemClient(Base): provider: Mapped[str] = mapped_column(String(255), nullable=False) # oauth params of the trigger provider encrypted_oauth_params: Mapped[str] = mapped_column(sa.Text, nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + server_default=func.current_timestamp(), + server_onupdate=func.current_timestamp(), + ) # tenant level trigger oauth client params (client_id, client_secret, etc.) @@ -112,6 +125,13 @@ class TriggerOAuthTenantClient(Base): enabled: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("true")) # oauth params of the trigger provider encrypted_oauth_params: Mapped[str] = mapped_column(sa.Text, nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + server_default=func.current_timestamp(), + server_onupdate=func.current_timestamp(), + ) @property def oauth_params(self) -> dict: diff --git a/api/models/workflow.py b/api/models/workflow.py index ac1289fe96..58ec685dd2 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1424,6 +1424,49 @@ class WorkflowWebhookTrigger(Base): ) +class WorkflowPluginTrigger(Base): + """ + Workflow Plugin Trigger + + Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger + + Attributes: + - id (uuid) Primary key + - app_id (uuid) App ID to bind to a specific app + - node_id (varchar) Node ID which node in the workflow + - tenant_id (uuid) Workspace ID + - provider_id (varchar) Plugin provider ID + - trigger_id (varchar) Unique trigger identifier (provider_id + trigger_name) + - triggered_by (varchar) Environment: debugger or production + - created_at (timestamp) Creation time + - updated_at (timestamp) Last update time + """ + + __tablename__ = "workflow_plugin_triggers" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"), + sa.Index("workflow_plugin_trigger_tenant_idx", "tenant_id"), + sa.Index("workflow_plugin_trigger_trigger_idx", "trigger_id"), + sa.UniqueConstraint("app_id", "node_id", "triggered_by", name="uniq_plugin_node"), + sa.UniqueConstraint("trigger_id", "node_id", name="uniq_trigger_node"), + ) + + id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + node_id: Mapped[str] = mapped_column(String(64), nullable=False) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + provider_id: Mapped[str] = mapped_column(String(255), nullable=False) + trigger_id: Mapped[str] = mapped_column(String(510), nullable=False) # provider_id + trigger_name + triggered_by: Mapped[str] = mapped_column(String(16), nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + server_default=func.current_timestamp(), + server_onupdate=func.current_timestamp(), + ) + + class AppTriggerType(StrEnum): """App Trigger Type Enum""" @@ -1482,46 +1525,3 @@ class AppTrigger(Base): default=naive_utc_now(), server_onupdate=func.current_timestamp(), ) - - -class WorkflowPluginTrigger(Base): - """ - Workflow Plugin Trigger - - Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger - - Attributes: - - id (uuid) Primary key - - app_id (uuid) App ID to bind to a specific app - - node_id (varchar) Node ID which node in the workflow - - tenant_id (uuid) Workspace ID - - provider_id (varchar) Plugin provider ID - - trigger_id (varchar) Unique trigger identifier (provider_id + trigger_name) - - triggered_by (varchar) Environment: debugger or production - - created_at (timestamp) Creation time - - updated_at (timestamp) Last update time - """ - - __tablename__ = "workflow_plugin_triggers" - __table_args__ = ( - sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"), - sa.Index("workflow_plugin_trigger_tenant_idx", "tenant_id"), - sa.Index("workflow_plugin_trigger_trigger_idx", "trigger_id"), - sa.UniqueConstraint("app_id", "node_id", "triggered_by", name="uniq_plugin_node"), - sa.UniqueConstraint("trigger_id", "node_id", name="uniq_trigger_node"), - ) - - id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) - app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - node_id: Mapped[str] = mapped_column(String(64), nullable=False) - tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) - provider_id: Mapped[str] = mapped_column(String(255), nullable=False) - trigger_id: Mapped[str] = mapped_column(String(510), nullable=False) # provider_id + trigger_name - triggered_by: Mapped[str] = mapped_column(String(16), nullable=False) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) - updated_at: Mapped[datetime] = mapped_column( - DateTime, - nullable=False, - server_default=func.current_timestamp(), - server_onupdate=func.current_timestamp(), - ) diff --git a/api/services/trigger_service.py b/api/services/trigger_service.py index 0ab61efc8f..927154a149 100644 --- a/api/services/trigger_service.py +++ b/api/services/trigger_service.py @@ -142,7 +142,6 @@ class TriggerService: user_id=subscription.user_id, request=request, subscription=subscription.to_entity() ) - # TODO invoke triggers if dispatch_response.triggers: triggers = cls.select_triggers(controller, dispatch_response, provider_id, subscription) for trigger in triggers: diff --git a/api/services/webhook_service.py b/api/services/webhook_service.py index ff2cc83247..f0b0ab311e 100644 --- a/api/services/webhook_service.py +++ b/api/services/webhook_service.py @@ -45,10 +45,10 @@ class WebhookService: ) .first() ) - + if not app_trigger: raise ValueError(f"App trigger not found for webhook {webhook_id}") - + if app_trigger.status != AppTriggerStatus.ENABLED: raise ValueError(f"Webhook trigger is disabled for webhook {webhook_id}") diff --git a/api/tasks/remove_app_and_related_data_task.py b/api/tasks/remove_app_and_related_data_task.py index 7bfda3d740..576824671e 100644 --- a/api/tasks/remove_app_and_related_data_task.py +++ b/api/tasks/remove_app_and_related_data_task.py @@ -69,6 +69,7 @@ def remove_app_and_related_data_task(self, tenant_id: str, app_id: str): _delete_trace_app_configs(tenant_id, app_id) _delete_conversation_variables(app_id=app_id) _delete_draft_variables(app_id) + _delete_app_plugin_triggers(tenant_id, app_id) end_at = time.perf_counter() logger.info(click.style(f"App and related data deleted: {app_id} latency: {end_at - start_at}", fg="green")) @@ -412,3 +413,13 @@ def _delete_records(query_sql: str, params: dict, delete_func: Callable, name: s logger.exception("Error occurred while deleting %s %s", name, record_id) continue rs.close() + + +def _delete_app_plugin_triggers(tenant_id: str, app_id: str): + with db.engine.begin() as conn: + result = conn.execute( + sa.text("DELETE FROM workflow_plugin_triggers WHERE app_id = :app_id"), {"app_id": app_id} + ) + deleted_count = result.rowcount + if deleted_count > 0: + logger.info(click.style(f"Deleted {deleted_count} workflow plugin triggers for app {app_id}", fg="green"))