feat(trigger): plugin trigger workflow

This commit is contained in:
Harry 2025-09-02 20:16:11 +08:00
parent dd929dbf0e
commit 4b253e1f73
10 changed files with 375 additions and 110 deletions

View File

@ -18,6 +18,209 @@ from models.workflow import AppTrigger, AppTriggerStatus, WorkflowWebhookTrigger
logger = logging.getLogger(__name__)
from models.workflow import WorkflowPluginTrigger
class PluginTriggerApi(Resource):
"""Workflow Plugin Trigger API"""
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=AppMode.WORKFLOW)
def post(self, app_model):
"""Create plugin trigger"""
parser = reqparse.RequestParser()
parser.add_argument("node_id", type=str, required=True, help="Node ID is required")
parser.add_argument("provider_id", type=str, required=True, help="Provider ID is required")
parser.add_argument("trigger_name", type=str, required=True, help="Trigger name is required")
parser.add_argument(
"triggered_by",
type=str,
required=False,
default="production",
choices=["debugger", "production"],
help="triggered_by must be debugger or production",
)
args = parser.parse_args()
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()
node_id = args["node_id"]
provider_id = args["provider_id"]
trigger_name = args["trigger_name"]
triggered_by = args["triggered_by"]
# Create trigger_id from provider_id and trigger_name
trigger_id = f"{provider_id}:{trigger_name}"
with Session(db.engine) as session:
# Check if plugin trigger already exists for this app, node, and environment
existing_trigger = session.scalar(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.app_id == app_model.id,
WorkflowPluginTrigger.node_id == node_id,
WorkflowPluginTrigger.triggered_by == triggered_by,
)
)
if existing_trigger:
raise BadRequest("Plugin trigger already exists for this node and environment")
# Create new plugin trigger
plugin_trigger = WorkflowPluginTrigger(
app_id=app_model.id,
node_id=node_id,
tenant_id=current_user.current_tenant_id,
provider_id=provider_id,
trigger_id=trigger_id,
triggered_by=triggered_by,
)
session.add(plugin_trigger)
session.commit()
session.refresh(plugin_trigger)
return plugin_trigger
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=AppMode.WORKFLOW)
def get(self, app_model):
"""Get plugin trigger"""
parser = reqparse.RequestParser()
parser.add_argument("node_id", type=str, required=True, help="Node ID is required")
parser.add_argument(
"triggered_by",
type=str,
required=False,
default="production",
choices=["debugger", "production"],
help="triggered_by must be debugger or production",
)
args = parser.parse_args()
node_id = args["node_id"]
triggered_by = args["triggered_by"]
with Session(db.engine) as session:
# Find plugin trigger
plugin_trigger = session.scalar(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.app_id == app_model.id,
WorkflowPluginTrigger.node_id == node_id,
WorkflowPluginTrigger.triggered_by == triggered_by,
WorkflowPluginTrigger.tenant_id == current_user.current_tenant_id,
)
)
if not plugin_trigger:
raise NotFound("Plugin trigger not found")
return plugin_trigger
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=AppMode.WORKFLOW)
def put(self, app_model):
"""Update plugin trigger"""
parser = reqparse.RequestParser()
parser.add_argument("node_id", type=str, required=True, help="Node ID is required")
parser.add_argument("provider_id", type=str, required=False, help="Provider ID")
parser.add_argument("trigger_name", type=str, required=False, help="Trigger name")
parser.add_argument(
"triggered_by",
type=str,
required=False,
default="production",
choices=["debugger", "production"],
help="triggered_by must be debugger or production",
)
args = parser.parse_args()
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()
node_id = args["node_id"]
triggered_by = args["triggered_by"]
with Session(db.engine) as session:
# Find plugin trigger
plugin_trigger = session.scalar(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.app_id == app_model.id,
WorkflowPluginTrigger.node_id == node_id,
WorkflowPluginTrigger.triggered_by == triggered_by,
WorkflowPluginTrigger.tenant_id == current_user.current_tenant_id,
)
)
if not plugin_trigger:
raise NotFound("Plugin trigger not found")
# Update fields if provided
if args.get("provider_id"):
plugin_trigger.provider_id = args["provider_id"]
if args.get("trigger_name"):
# Update trigger_id if provider_id or trigger_name changed
provider_id = args.get("provider_id") or plugin_trigger.provider_id
trigger_name = args["trigger_name"]
plugin_trigger.trigger_id = f"{provider_id}:{trigger_name}"
session.commit()
session.refresh(plugin_trigger)
return plugin_trigger
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=AppMode.WORKFLOW)
def delete(self, app_model):
"""Delete plugin trigger"""
parser = reqparse.RequestParser()
parser.add_argument("node_id", type=str, required=True, help="Node ID is required")
parser.add_argument(
"triggered_by",
type=str,
required=False,
default="production",
choices=["debugger", "production"],
help="triggered_by must be debugger or production",
)
args = parser.parse_args()
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()
node_id = args["node_id"]
triggered_by = args["triggered_by"]
with Session(db.engine) as session:
# Find plugin trigger
plugin_trigger = session.scalar(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.app_id == app_model.id,
WorkflowPluginTrigger.node_id == node_id,
WorkflowPluginTrigger.triggered_by == triggered_by,
WorkflowPluginTrigger.tenant_id == current_user.current_tenant_id,
)
)
if not plugin_trigger:
raise NotFound("Plugin trigger not found")
session.delete(plugin_trigger)
session.commit()
return {"result": "success"}, 204
class WebhookTriggerApi(Resource):
"""Webhook Trigger API"""
@ -232,5 +435,6 @@ class AppTriggerEnableApi(Resource):
api.add_resource(WebhookTriggerApi, "/apps/<uuid:app_id>/workflows/triggers/webhook")
api.add_resource(PluginTriggerApi, "/apps/<uuid:app_id>/workflows/triggers/plugin")
api.add_resource(AppTriggersApi, "/apps/<uuid:app_id>/triggers")
api.add_resource(AppTriggerEnableApi, "/apps/<uuid:app_id>/trigger-enable")

View File

@ -21,6 +21,9 @@ class TriggerProviderSubscriptionApiEntity(BaseModel):
provider: str = Field(description="The provider id of the subscription")
credential_type: CredentialType = Field(description="The type of the credential")
credentials: dict = Field(description="The credentials of the subscription")
endpoint: str = Field(description="The endpoint of the subscription")
parameters: dict = Field(description="The parameters of the subscription")
properties: dict = Field(description="The properties of the subscription")
class TriggerProviderApiEntity(BaseModel):

View File

@ -1,4 +1,5 @@
from collections.abc import Mapping
from datetime import datetime
from enum import StrEnum
from typing import Any, Optional, Union
@ -39,7 +40,7 @@ class TriggerParameter(BaseModel):
template: Optional[PluginParameterTemplate] = Field(default=None, description="The template of the parameter")
scope: Optional[str] = None
required: Optional[bool] = False
default: Union[int, float, str, None] = None
default: Union[int, float, str, list, None] = None
min: Union[float, int, None] = None
max: Union[float, int, None] = None
precision: Optional[int] = None
@ -105,7 +106,7 @@ class SubscriptionSchema(BaseModel):
The subscription schema of the trigger provider
"""
parameters_schema: list[ProviderConfig] | None = Field(
parameters_schema: list[TriggerParameter] | None = Field(
default_factory=list,
description="The parameters schema required to create a subscription",
)
@ -184,32 +185,33 @@ class Unsubscription(BaseModel):
class RequestLog(BaseModel):
id: str
endpoint: str
request: dict
response: dict
created_at: str
id: str = Field(..., description="The id of the request log")
endpoint: str = Field(..., description="The endpoint of the request log")
request: dict = Field(..., description="The request of the request log")
response: dict = Field(..., description="The response of the request log")
created_at: datetime = Field(..., description="The created at of the request log")
class SubscriptionBuilder(BaseModel):
id: str
name: str | None = None
tenant_id: str
user_id: str
provider_id: str
endpoint_id: str
parameters: Mapping[str, Any]
properties: Mapping[str, Any]
credentials: Mapping[str, str]
credential_type: str | None = None
credential_expires_at: int | None = None
expires_at: int
id: str = Field(..., description="The id of the subscription builder")
name: str | None = Field(default=None, description="The name of the subscription builder")
tenant_id: str = Field(..., description="The tenant id of the subscription builder")
user_id: str = Field(..., description="The user id of the subscription builder")
provider_id: str = Field(..., description="The provider id of the subscription builder")
endpoint_id: str = Field(..., description="The endpoint id of the subscription builder")
parameters: Mapping[str, Any] = Field(..., description="The parameters of the subscription builder")
properties: Mapping[str, Any] = Field(..., description="The properties of the subscription builder")
credentials: Mapping[str, str] = Field(..., description="The credentials of the subscription builder")
credential_type: str | None = Field(default=None, description="The credential type of the subscription builder")
credential_expires_at: int | None = Field(
default=None, description="The credential expires at of the subscription builder"
)
expires_at: int = Field(..., description="The expires at of the subscription builder")
def to_subscription(self) -> Subscription:
return Subscription(
expires_at=self.expires_at,
endpoint=self.endpoint_id,
parameters=self.parameters,
properties=self.properties,
)

View File

@ -1,42 +0,0 @@
"""Optimize trigger provider endpoint index
Revision ID: 9d83760807c5
Revises: 9ee7d347f4c1
Create Date: 2025-09-01 12:42:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9d83760807c5'
down_revision = '9ee7d347f4c1'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
# Drop the old unique constraint on endpoint
with op.batch_alter_table('trigger_providers', schema=None) as batch_op:
batch_op.drop_constraint('unique_trigger_provider_endpoint', type_='unique')
# Create a new unique index on endpoint for O(1) lookup
batch_op.create_index('idx_trigger_providers_endpoint', ['endpoint'], unique=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
# Drop the new index
with op.batch_alter_table('trigger_providers', schema=None) as batch_op:
batch_op.drop_index('idx_trigger_providers_endpoint')
# Recreate the old unique constraint
batch_op.create_unique_constraint('unique_trigger_provider_endpoint', ['endpoint'])
# ### end Alembic commands ###

View File

@ -0,0 +1,68 @@
"""plugin_trigger
Revision ID: afa344924e5c
Revises: 9ee7d347f4c1
Create Date: 2025-09-02 15:28:31.426728
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'afa344924e5c'
down_revision = '9ee7d347f4c1'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('oauth_provider_apps',
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
sa.Column('app_icon', sa.String(length=255), nullable=False),
sa.Column('app_label', sa.JSON(), server_default='{}', nullable=False),
sa.Column('client_id', sa.String(length=255), nullable=False),
sa.Column('client_secret', sa.String(length=255), nullable=False),
sa.Column('redirect_uris', sa.JSON(), server_default='[]', nullable=False),
sa.Column('scope', sa.String(length=255), server_default=sa.text("'read:name read:email read:avatar read:interface_language read:timezone'"), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP(0)'), nullable=False),
sa.PrimaryKeyConstraint('id', name='oauth_provider_app_pkey')
)
with op.batch_alter_table('oauth_provider_apps', schema=None) as batch_op:
batch_op.create_index('oauth_provider_app_client_id_idx', ['client_id'], unique=False)
op.create_table('workflow_plugin_triggers',
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False),
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('node_id', sa.String(length=64), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
sa.Column('provider_id', sa.String(length=255), nullable=False),
sa.Column('trigger_id', sa.String(length=510), nullable=False),
sa.Column('triggered_by', sa.String(length=16), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.PrimaryKeyConstraint('id', name='workflow_plugin_trigger_pkey'),
sa.UniqueConstraint('app_id', 'node_id', 'triggered_by', name='uniq_plugin_node'),
sa.UniqueConstraint('trigger_id', 'node_id', name='uniq_trigger_node')
)
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
batch_op.create_index('workflow_plugin_trigger_tenant_idx', ['tenant_id'], unique=False)
batch_op.create_index('workflow_plugin_trigger_trigger_idx', ['trigger_id'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
batch_op.drop_index('workflow_plugin_trigger_trigger_idx')
batch_op.drop_index('workflow_plugin_trigger_tenant_idx')
op.drop_table('workflow_plugin_triggers')
with op.batch_alter_table('oauth_provider_apps', schema=None) as batch_op:
batch_op.drop_index('oauth_provider_app_client_id_idx')
op.drop_table('oauth_provider_apps')
# ### end Alembic commands ###

View File

@ -52,9 +52,12 @@ class TriggerSubscription(Base):
Integer, default=-1, comment="Subscription instance expiration timestamp, -1 for never"
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.now())
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.now(), onupdate=func.now()
DateTime,
nullable=False,
server_default=func.current_timestamp(),
server_onupdate=func.current_timestamp(),
)
def is_credential_expired(self) -> bool:
@ -76,6 +79,9 @@ class TriggerSubscription(Base):
id=self.id,
name=self.name,
provider=self.provider_id,
endpoint=parse_endpoint_id(self.endpoint_id),
parameters=self.parameters,
properties=self.properties,
credential_type=CredentialType(self.credential_type),
credentials=self.credentials,
)
@ -94,6 +100,13 @@ class TriggerOAuthSystemClient(Base):
provider: Mapped[str] = mapped_column(String(255), nullable=False)
# oauth params of the trigger provider
encrypted_oauth_params: Mapped[str] = mapped_column(sa.Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.current_timestamp(),
server_onupdate=func.current_timestamp(),
)
# tenant level trigger oauth client params (client_id, client_secret, etc.)
@ -112,6 +125,13 @@ class TriggerOAuthTenantClient(Base):
enabled: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("true"))
# oauth params of the trigger provider
encrypted_oauth_params: Mapped[str] = mapped_column(sa.Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.current_timestamp(),
server_onupdate=func.current_timestamp(),
)
@property
def oauth_params(self) -> dict:

View File

@ -1424,6 +1424,49 @@ class WorkflowWebhookTrigger(Base):
)
class WorkflowPluginTrigger(Base):
"""
Workflow Plugin Trigger
Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger
Attributes:
- id (uuid) Primary key
- app_id (uuid) App ID to bind to a specific app
- node_id (varchar) Node ID which node in the workflow
- tenant_id (uuid) Workspace ID
- provider_id (varchar) Plugin provider ID
- trigger_id (varchar) Unique trigger identifier (provider_id + trigger_name)
- triggered_by (varchar) Environment: debugger or production
- created_at (timestamp) Creation time
- updated_at (timestamp) Last update time
"""
__tablename__ = "workflow_plugin_triggers"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"),
sa.Index("workflow_plugin_trigger_tenant_idx", "tenant_id"),
sa.Index("workflow_plugin_trigger_trigger_idx", "trigger_id"),
sa.UniqueConstraint("app_id", "node_id", "triggered_by", name="uniq_plugin_node"),
sa.UniqueConstraint("trigger_id", "node_id", name="uniq_trigger_node"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
provider_id: Mapped[str] = mapped_column(String(255), nullable=False)
trigger_id: Mapped[str] = mapped_column(String(510), nullable=False) # provider_id + trigger_name
triggered_by: Mapped[str] = mapped_column(String(16), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.current_timestamp(),
server_onupdate=func.current_timestamp(),
)
class AppTriggerType(StrEnum):
"""App Trigger Type Enum"""
@ -1482,46 +1525,3 @@ class AppTrigger(Base):
default=naive_utc_now(),
server_onupdate=func.current_timestamp(),
)
class WorkflowPluginTrigger(Base):
"""
Workflow Plugin Trigger
Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger
Attributes:
- id (uuid) Primary key
- app_id (uuid) App ID to bind to a specific app
- node_id (varchar) Node ID which node in the workflow
- tenant_id (uuid) Workspace ID
- provider_id (varchar) Plugin provider ID
- trigger_id (varchar) Unique trigger identifier (provider_id + trigger_name)
- triggered_by (varchar) Environment: debugger or production
- created_at (timestamp) Creation time
- updated_at (timestamp) Last update time
"""
__tablename__ = "workflow_plugin_triggers"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"),
sa.Index("workflow_plugin_trigger_tenant_idx", "tenant_id"),
sa.Index("workflow_plugin_trigger_trigger_idx", "trigger_id"),
sa.UniqueConstraint("app_id", "node_id", "triggered_by", name="uniq_plugin_node"),
sa.UniqueConstraint("trigger_id", "node_id", name="uniq_trigger_node"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
provider_id: Mapped[str] = mapped_column(String(255), nullable=False)
trigger_id: Mapped[str] = mapped_column(String(510), nullable=False) # provider_id + trigger_name
triggered_by: Mapped[str] = mapped_column(String(16), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.current_timestamp(),
server_onupdate=func.current_timestamp(),
)

View File

@ -142,7 +142,6 @@ class TriggerService:
user_id=subscription.user_id, request=request, subscription=subscription.to_entity()
)
# TODO invoke triggers
if dispatch_response.triggers:
triggers = cls.select_triggers(controller, dispatch_response, provider_id, subscription)
for trigger in triggers:

View File

@ -45,10 +45,10 @@ class WebhookService:
)
.first()
)
if not app_trigger:
raise ValueError(f"App trigger not found for webhook {webhook_id}")
if app_trigger.status != AppTriggerStatus.ENABLED:
raise ValueError(f"Webhook trigger is disabled for webhook {webhook_id}")

View File

@ -69,6 +69,7 @@ def remove_app_and_related_data_task(self, tenant_id: str, app_id: str):
_delete_trace_app_configs(tenant_id, app_id)
_delete_conversation_variables(app_id=app_id)
_delete_draft_variables(app_id)
_delete_app_plugin_triggers(tenant_id, app_id)
end_at = time.perf_counter()
logger.info(click.style(f"App and related data deleted: {app_id} latency: {end_at - start_at}", fg="green"))
@ -412,3 +413,13 @@ def _delete_records(query_sql: str, params: dict, delete_func: Callable, name: s
logger.exception("Error occurred while deleting %s %s", name, record_id)
continue
rs.close()
def _delete_app_plugin_triggers(tenant_id: str, app_id: str):
with db.engine.begin() as conn:
result = conn.execute(
sa.text("DELETE FROM workflow_plugin_triggers WHERE app_id = :app_id"), {"app_id": app_id}
)
deleted_count = result.rowcount
if deleted_count > 0:
logger.info(click.style(f"Deleted {deleted_count} workflow plugin triggers for app {app_id}", fg="green"))