feat(trigger): refactor trigger provider to subscription model

- Rename classes and methods to reflect the transition from credentials to subscriptions
- Update API endpoints for managing trigger subscriptions
- Modify data models and entities to support subscription attributes
- Enhance service methods for listing, adding, updating, and deleting subscriptions
- Adjust encryption utilities to handle subscription data

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Harry 2025-08-29 17:07:06 +08:00
parent 5ddd5e49ee
commit 6acc77d86d
7 changed files with 110 additions and 140 deletions

View File

@ -31,12 +31,12 @@ class TriggerProviderListApi(Resource):
return jsonable_encoder(TriggerProviderService.list_trigger_providers(user.current_tenant_id))
class TriggerProviderCredentialListApi(Resource):
class TriggerProviderSubscriptionListApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self, provider):
"""List all trigger providers for the current tenant"""
"""List all trigger subscriptions for the current tenant's provider"""
user = current_user
assert isinstance(user, Account)
assert user.current_tenant_id is not None
@ -45,7 +45,7 @@ class TriggerProviderCredentialListApi(Resource):
try:
return jsonable_encoder(
TriggerProviderService.list_trigger_provider_credentials(
TriggerProviderService.list_trigger_provider_subscriptions(
tenant_id=user.current_tenant_id, provider_id=TriggerProviderID(provider)
)
)
@ -54,12 +54,12 @@ class TriggerProviderCredentialListApi(Resource):
raise
class TriggerProviderCredentialsAddApi(Resource):
class TriggerProviderSubscriptionsAddApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self, provider):
"""Add a new credential instance for a trigger provider"""
"""Add a new subscription instance for a trigger provider"""
user = current_user
assert isinstance(user, Account)
assert user.current_tenant_id is not None
@ -99,46 +99,12 @@ class TriggerProviderCredentialsAddApi(Resource):
raise
class TriggerProviderCredentialsUpdateApi(Resource):
class TriggerProviderSubscriptionsDeleteApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self, credential_id):
"""Update an existing credential instance"""
user = current_user
assert isinstance(user, Account)
assert user.current_tenant_id is not None
if not user.is_admin_or_owner:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("credentials", type=dict, required=False, nullable=True, location="json")
parser.add_argument("name", type=str, required=False, nullable=True, location="json")
args = parser.parse_args()
try:
result = TriggerProviderService.update_trigger_provider(
tenant_id=user.current_tenant_id,
credential_id=credential_id,
credentials=args.get("credentials"),
name=args.get("name"),
)
return result
except ValueError as e:
raise BadRequest(str(e))
except Exception as e:
logger.exception("Error updating provider credential", exc_info=e)
raise
class TriggerProviderCredentialsDeleteApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self, credential_id):
"""Delete a credential instance"""
def post(self, subscription_id):
"""Delete a subscription instance"""
user = current_user
assert isinstance(user, Account)
assert user.current_tenant_id is not None
@ -148,7 +114,7 @@ class TriggerProviderCredentialsDeleteApi(Resource):
try:
result = TriggerProviderService.delete_trigger_provider(
tenant_id=user.current_tenant_id,
credential_id=credential_id,
subscription_id=subscription_id,
)
return result
@ -290,8 +256,8 @@ class TriggerProviderOAuthRefreshTokenApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self, credential_id):
"""Refresh OAuth token for a trigger provider credential"""
def post(self, subscription_id):
"""Refresh OAuth token for a trigger provider subscription"""
user = current_user
assert isinstance(user, Account)
assert user.current_tenant_id is not None
@ -301,7 +267,7 @@ class TriggerProviderOAuthRefreshTokenApi(Resource):
try:
result = TriggerProviderService.refresh_oauth_token(
tenant_id=user.current_tenant_id,
credential_id=credential_id,
subscription_id=subscription_id,
)
return result
@ -413,17 +379,16 @@ class TriggerProviderOAuthClientManageApi(Resource):
# Trigger provider endpoints
api.add_resource(TriggerProviderListApi, "/workspaces/current/trigger-providers")
api.add_resource(
TriggerProviderCredentialListApi, "/workspaces/current/trigger-provider/credentials/<path:provider>/list"
TriggerProviderSubscriptionListApi, "/workspaces/current/trigger-provider/subscriptions/<path:provider>/list"
)
api.add_resource(
TriggerProviderCredentialsAddApi, "/workspaces/current/trigger-provider/credentials/<path:provider>/add"
TriggerProviderSubscriptionsAddApi, "/workspaces/current/trigger-provider/subscriptions/<path:provider>/add"
)
api.add_resource(
TriggerProviderCredentialsUpdateApi, "/workspaces/current/trigger-provider/credentials/<path:credential_id>/update"
)
api.add_resource(
TriggerProviderCredentialsDeleteApi, "/workspaces/current/trigger-provider/credentials/<path:credential_id>/delete"
TriggerProviderSubscriptionsDeleteApi,
"/workspaces/current/trigger-provider/subscriptions/<path:subscription_id>/delete",
)
# OAuth
@ -433,7 +398,7 @@ api.add_resource(
api.add_resource(TriggerProviderOAuthCallbackApi, "/oauth/plugin/<path:provider>/trigger/callback")
api.add_resource(
TriggerProviderOAuthRefreshTokenApi,
"/workspaces/current/trigger-provider/credentials/<path:credential_id>/oauth/refresh",
"/workspaces/current/trigger-provider/subscriptions/<path:subscription_id>/oauth/refresh",
)
api.add_resource(
TriggerProviderOAuthClientManageApi, "/workspaces/current/trigger-provider/<path:provider>/oauth/client"

View File

@ -68,7 +68,7 @@ class ToolProviderCredentialsCache(ProviderCredentialsCache):
return f"tool_credentials:tenant_id:{tenant_id}:provider:{provider}:credential_id:{credential_id}"
class TriggerProviderCredentialCache(ProviderCredentialsCache):
class TriggerProviderCredentialsCache(ProviderCredentialsCache):
"""Cache for trigger provider credentials"""
def __init__(self, tenant_id: str, provider_id: str, credential_id: str):
@ -81,7 +81,7 @@ class TriggerProviderCredentialCache(ProviderCredentialsCache):
return f"trigger_credentials:tenant_id:{tenant_id}:provider_id:{provider_id}:credential_id:{credential_id}"
class TriggerProviderOAuthClientCache(ProviderCredentialsCache):
class TriggerProviderOAuthClientParamsCache(ProviderCredentialsCache):
"""Cache for trigger provider OAuth client"""
def __init__(self, tenant_id: str, provider_id: str):

View File

@ -15,12 +15,12 @@ from core.trigger.entities.entities import (
)
class TriggerProviderCredentialApiEntity(BaseModel):
id: str = Field(description="The unique id of the credential")
name: str = Field(description="The name of the credential")
provider: str = Field(description="The provider id of the credential")
class TriggerProviderSubscriptionApiEntity(BaseModel):
id: str = Field(description="The unique id of the subscription")
name: str = Field(description="The name of the subscription")
provider: str = Field(description="The provider id of the subscription")
credential_type: CredentialType = Field(description="The type of the credential")
credentials: dict = Field(description="The credentials of the credential")
credentials: dict = Field(description="The credentials of the subscription")
class TriggerProviderApiEntity(BaseModel):
@ -40,4 +40,4 @@ class TriggerApiEntity(BaseModel):
output_schema: Optional[Mapping[str, Any]] = Field(description="The output schema of the trigger")
__all__ = ["TriggerApiEntity", "TriggerProviderApiEntity", "TriggerProviderCredentialApiEntity"]
__all__ = ["TriggerApiEntity", "TriggerProviderApiEntity", "TriggerProviderSubscriptionApiEntity"]

View File

@ -99,6 +99,7 @@ class OAuthSchema(BaseModel):
default_factory=list, description="The schema of the OAuth credentials"
)
class SubscriptionSchema(BaseModel):
"""
The subscription schema of the trigger provider
@ -114,6 +115,7 @@ class SubscriptionSchema(BaseModel):
description="The configuration schema stored in the subscription entity",
)
class TriggerProviderEntity(BaseModel):
"""
The configuration of a trigger provider

View File

@ -1,26 +1,26 @@
from typing import Union
from core.helper.provider_cache import TriggerProviderCredentialCache, TriggerProviderOAuthClientCache
from core.helper.provider_cache import TriggerProviderCredentialsCache, TriggerProviderOAuthClientParamsCache
from core.helper.provider_encryption import ProviderConfigCache, ProviderConfigEncrypter, create_provider_encrypter
from core.plugin.entities.plugin_daemon import CredentialType
from core.trigger.entities.api_entities import TriggerProviderCredentialApiEntity
from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity
from core.trigger.provider import PluginTriggerProviderController
from models.trigger import TriggerProvider
from models.trigger import TriggerSubscription
def create_trigger_provider_encrypter_for_credential(
def create_trigger_provider_encrypter_for_subscription(
tenant_id: str,
controller: PluginTriggerProviderController,
credential: Union[TriggerProvider, TriggerProviderCredentialApiEntity],
subscription: Union[TriggerSubscription, TriggerProviderSubscriptionApiEntity],
) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]:
cache = TriggerProviderCredentialCache(
cache = TriggerProviderCredentialsCache(
tenant_id=tenant_id,
provider_id=str(controller.get_provider_id()),
credential_id=credential.id,
credential_id=subscription.id,
)
encrypter, _ = create_provider_encrypter(
tenant_id=tenant_id,
config=controller.get_credential_schema_config(credential.credential_type),
config=controller.get_credential_schema_config(subscription.credential_type),
cache=cache,
)
return encrypter, cache
@ -29,7 +29,7 @@ def create_trigger_provider_encrypter_for_credential(
def create_trigger_provider_encrypter(
tenant_id: str, controller: PluginTriggerProviderController, credential_id: str, credential_type: CredentialType
) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]:
cache = TriggerProviderCredentialCache(
cache = TriggerProviderCredentialsCache(
tenant_id=tenant_id,
provider_id=str(controller.get_provider_id()),
credential_id=credential_id,
@ -45,7 +45,7 @@ def create_trigger_provider_encrypter(
def create_trigger_provider_oauth_encrypter(
tenant_id: str, controller: PluginTriggerProviderController
) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]:
cache = TriggerProviderOAuthClientCache(
cache = TriggerProviderOAuthClientParamsCache(
tenant_id=tenant_id,
provider_id=str(controller.get_provider_id()),
)

View File

@ -4,60 +4,61 @@ from datetime import datetime
from typing import cast
import sqlalchemy as sa
from sqlalchemy import DateTime, Index, Integer, String, Text, func
from sqlalchemy import DateTime, Index, Integer, String, UniqueConstraint, func
from sqlalchemy.orm import Mapped, mapped_column
from core.plugin.entities.plugin_daemon import CredentialType
from core.trigger.entities.api_entities import TriggerProviderCredentialApiEntity
from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity
from models.base import Base
from models.types import StringUUID
class TriggerProvider(Base):
class TriggerSubscription(Base):
"""
Trigger provider model for managing credentials
Supports multiple credential instances per provider
"""
__tablename__ = "trigger_providers"
__table_args__ = (Index("idx_trigger_providers_tenant_provider", "tenant_id", "provider_id"),)
__tablename__ = "trigger_subscriptions"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="trigger_subscription_pkey"),
Index("idx_trigger_subscriptions_tenant_provider", "tenant_id", "provider_id"),
UniqueConstraint("tenant_id", "provider_id", "name", name="unique_trigger_subscription"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
name: Mapped[str] = mapped_column(String(255), nullable=False, comment="Subscription instance name")
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
user_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
provider_id: Mapped[str] = mapped_column(
String(255), nullable=False, comment="Provider identifier (e.g., plugin_id/provider_name)"
)
parameters: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription parameters JSON")
configuration: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription configuration JSON")
credentials: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription credentials JSON")
credential_type: Mapped[str] = mapped_column(String(50), nullable=False, comment="oauth or api_key")
encrypted_credentials: Mapped[str] = mapped_column(Text, nullable=False, comment="Encrypted credentials JSON")
name: Mapped[str] = mapped_column(String(255), nullable=False, comment="Credential instance name")
expires_at: Mapped[int] = mapped_column(
credential_expires_at: Mapped[int] = mapped_column(
Integer, default=-1, comment="OAuth token expiration timestamp, -1 for never"
)
expires_at: Mapped[int] = mapped_column(
Integer, default=-1, comment="Subscription instance expiration timestamp, -1 for never"
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.now(), onupdate=func.now()
)
@property
def credentials(self) -> dict:
"""Get credentials as dict (still encrypted)"""
try:
return json.loads(self.encrypted_credentials) if self.encrypted_credentials else {}
except (json.JSONDecodeError, TypeError):
return {}
def is_oauth_expired(self) -> bool:
"""Check if OAuth token is expired"""
if self.credential_type != CredentialType.OAUTH2.value:
return False
if self.expires_at == -1:
def is_credential_expired(self) -> bool:
"""Check if credential is expired"""
if self.credential_expires_at == -1:
return False
# Check if token expires in next 3 minutes
return (self.expires_at - 180) < int(time.time())
return (self.credential_expires_at - 180) < int(time.time())
def to_api_entity(self) -> TriggerProviderCredentialApiEntity:
return TriggerProviderCredentialApiEntity(
def to_api_entity(self) -> TriggerProviderSubscriptionApiEntity:
return TriggerProviderSubscriptionApiEntity(
id=self.id,
name=self.name,
provider=self.provider_id,

View File

@ -15,15 +15,15 @@ from core.plugin.entities.plugin import TriggerProviderID
from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.impl.oauth import OAuthHandler
from core.tools.utils.system_oauth_encryption import decrypt_system_oauth_params
from core.trigger.entities.api_entities import TriggerProviderApiEntity, TriggerProviderCredentialApiEntity
from core.trigger.entities.api_entities import TriggerProviderApiEntity, TriggerProviderSubscriptionApiEntity
from core.trigger.trigger_manager import TriggerManager
from core.trigger.utils.encryption import (
create_trigger_provider_encrypter_for_credential,
create_trigger_provider_encrypter_for_subscription,
create_trigger_provider_oauth_encrypter,
)
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.trigger import TriggerOAuthSystemClient, TriggerOAuthTenantClient, TriggerProvider
from models.trigger import TriggerOAuthSystemClient, TriggerOAuthTenantClient, TriggerSubscription
from services.plugin.plugin_service import PluginService
logger = logging.getLogger(__name__)
@ -40,29 +40,29 @@ class TriggerProviderService:
return [provider.to_api_entity() for provider in TriggerManager.list_all_trigger_providers(tenant_id)]
@classmethod
def list_trigger_provider_credentials(
def list_trigger_provider_subscriptions(
cls, tenant_id: str, provider_id: TriggerProviderID
) -> list[TriggerProviderCredentialApiEntity]:
"""List all trigger providers for the current tenant"""
credentials: list[TriggerProviderCredentialApiEntity] = []
) -> list[TriggerProviderSubscriptionApiEntity]:
"""List all trigger subscriptions for the current tenant"""
subscriptions: list[TriggerProviderSubscriptionApiEntity] = []
with Session(db.engine, autoflush=False) as session:
credentials_db = (
session.query(TriggerProvider)
subscriptions_db = (
session.query(TriggerSubscription)
.filter_by(tenant_id=tenant_id, provider_id=str(provider_id))
.order_by(desc(TriggerProvider.created_at))
.order_by(desc(TriggerSubscription.created_at))
.all()
)
credentials = [credential.to_api_entity() for credential in credentials_db]
subscriptions = [subscription.to_api_entity() for subscription in subscriptions_db]
provider_controller = TriggerManager.get_trigger_provider(tenant_id, provider_id)
for credential in credentials:
encrypter, _ = create_trigger_provider_encrypter_for_credential(
for subscription in subscriptions:
encrypter, _ = create_trigger_provider_encrypter_for_subscription(
tenant_id=tenant_id,
controller=provider_controller,
credential=credential,
subscription=subscription,
)
credential.credentials = encrypter.decrypt(credential.credentials)
return credentials
subscription.credentials = encrypter.decrypt(subscription.credentials)
return subscriptions
@classmethod
def add_trigger_provider(
@ -95,7 +95,9 @@ class TriggerProviderService:
with redis_client.lock(lock_key, timeout=20):
# Check provider count limit
provider_count = (
session.query(TriggerProvider).filter_by(tenant_id=tenant_id, provider_id=provider_id).count()
session.query(TriggerSubscription)
.filter_by(tenant_id=tenant_id, provider_id=provider_id)
.count()
)
if provider_count >= cls.__MAX_TRIGGER_PROVIDER_COUNT__:
@ -115,7 +117,7 @@ class TriggerProviderService:
else:
# Check if name already exists
existing = (
session.query(TriggerProvider)
session.query(TriggerSubscription)
.filter_by(tenant_id=tenant_id, provider_id=provider_id, name=name)
.first()
)
@ -129,12 +131,12 @@ class TriggerProviderService:
)
# Create provider record
db_provider = TriggerProvider(
db_provider = TriggerSubscription(
tenant_id=tenant_id,
user_id=user_id,
provider_id=provider_id,
credential_type=credential_type.value,
encrypted_credentials=json.dumps(encrypter.encrypt(credentials)),
credentials=encrypter.encrypt(credentials),
name=name,
expires_at=expires_at,
)
@ -152,7 +154,7 @@ class TriggerProviderService:
def update_trigger_provider(
cls,
tenant_id: str,
credential_id: str,
subscription_id: str,
credentials: Optional[dict] = None,
name: Optional[str] = None,
) -> dict:
@ -160,15 +162,15 @@ class TriggerProviderService:
Update an existing trigger provider's credentials or name.
:param tenant_id: Tenant ID
:param credential_id: Credential instance ID
:param subscription_id: Subscription instance ID
:param credentials: New credentials (optional)
:param name: New name (optional)
:return: Success response
"""
with Session(db.engine) as session:
db_provider = session.query(TriggerProvider).filter_by(tenant_id=tenant_id, id=credential_id).first()
db_provider = session.query(TriggerSubscription).filter_by(tenant_id=tenant_id, id=subscription_id).first()
if not db_provider:
raise ValueError(f"Trigger provider credential {credential_id} not found")
raise ValueError(f"Trigger provider subscription {subscription_id} not found")
try:
provider_controller = TriggerManager.get_trigger_provider(
@ -176,10 +178,10 @@ class TriggerProviderService:
)
if credentials:
encrypter, cache = create_trigger_provider_encrypter_for_credential(
encrypter, cache = create_trigger_provider_encrypter_for_subscription(
tenant_id=tenant_id,
controller=provider_controller,
credential=db_provider,
subscription=db_provider,
)
# Handle hidden values
original_credentials = encrypter.decrypt(db_provider.credentials)
@ -188,16 +190,16 @@ class TriggerProviderService:
for key, value in credentials.items()
}
db_provider.encrypted_credentials = json.dumps(encrypter.encrypt(new_credentials))
db_provider.credentials = encrypter.encrypt(new_credentials)
cache.delete()
# Update name if provided
if name and name != db_provider.name:
# Check if name already exists
existing = (
session.query(TriggerProvider)
session.query(TriggerSubscription)
.filter_by(tenant_id=tenant_id, provider_id=db_provider.provider_id, name=name)
.filter(TriggerProvider.id != credential_id)
.filter(TriggerSubscription.id != subscription_id)
.first()
)
if existing:
@ -213,27 +215,27 @@ class TriggerProviderService:
raise ValueError(str(e))
@classmethod
def delete_trigger_provider(cls, tenant_id: str, credential_id: str) -> dict:
def delete_trigger_provider(cls, tenant_id: str, subscription_id: str) -> dict:
"""
Delete a trigger provider credential.
Delete a trigger provider subscription.
:param tenant_id: Tenant ID
:param credential_id: Credential instance ID
:param subscription_id: Subscription instance ID
:return: Success response
"""
with Session(db.engine) as session:
db_provider = session.query(TriggerProvider).filter_by(tenant_id=tenant_id, id=credential_id).first()
db_provider = session.query(TriggerSubscription).filter_by(tenant_id=tenant_id, id=subscription_id).first()
if not db_provider:
raise ValueError(f"Trigger provider credential {credential_id} not found")
raise ValueError(f"Trigger provider subscription {subscription_id} not found")
provider_controller = TriggerManager.get_trigger_provider(
tenant_id, TriggerProviderID(db_provider.provider_id)
)
# Clear cache
_, cache = create_trigger_provider_encrypter_for_credential(
_, cache = create_trigger_provider_encrypter_for_subscription(
tenant_id=tenant_id,
controller=provider_controller,
credential=db_provider,
subscription=db_provider,
)
session.delete(db_provider)
@ -246,20 +248,20 @@ class TriggerProviderService:
def refresh_oauth_token(
cls,
tenant_id: str,
credential_id: str,
subscription_id: str,
) -> dict:
"""
Refresh OAuth token for a trigger provider.
:param tenant_id: Tenant ID
:param credential_id: Credential instance ID
:param subscription_id: Subscription instance ID
:return: New token info
"""
with Session(db.engine) as session:
db_provider = session.query(TriggerProvider).filter_by(tenant_id=tenant_id, id=credential_id).first()
db_provider = session.query(TriggerSubscription).filter_by(tenant_id=tenant_id, id=subscription_id).first()
if not db_provider:
raise ValueError(f"Trigger provider credential {credential_id} not found")
raise ValueError(f"Trigger provider subscription {subscription_id} not found")
if db_provider.credential_type != CredentialType.OAUTH2.value:
raise ValueError("Only OAuth credentials can be refreshed")
@ -267,10 +269,10 @@ class TriggerProviderService:
provider_id = TriggerProviderID(db_provider.provider_id)
provider_controller = TriggerManager.get_trigger_provider(tenant_id, provider_id)
# Create encrypter
encrypter, cache = create_trigger_provider_encrypter_for_credential(
encrypter, cache = create_trigger_provider_encrypter_for_subscription(
tenant_id=tenant_id,
controller=provider_controller,
credential=db_provider,
subscription=db_provider,
)
# Decrypt current credentials
@ -295,7 +297,7 @@ class TriggerProviderService:
)
# Update credentials
db_provider.encrypted_credentials = json.dumps(encrypter.encrypt(dict(refreshed_credentials.credentials)))
db_provider.credentials = encrypter.encrypt(dict(refreshed_credentials.credentials))
db_provider.expires_at = refreshed_credentials.expires_at
session.commit()
@ -518,13 +520,13 @@ class TriggerProviderService:
"""
try:
db_providers = (
session.query(TriggerProvider)
session.query(TriggerSubscription)
.filter_by(
tenant_id=tenant_id,
provider_id=provider_id,
credential_type=credential_type.value,
)
.order_by(desc(TriggerProvider.created_at))
.order_by(desc(TriggerSubscription.created_at))
.all()
)