diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index d6d94719a1..6fab876c1c 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -31,12 +31,12 @@ class TriggerProviderListApi(Resource): return jsonable_encoder(TriggerProviderService.list_trigger_providers(user.current_tenant_id)) -class TriggerProviderCredentialListApi(Resource): +class TriggerProviderSubscriptionListApi(Resource): @setup_required @login_required @account_initialization_required def get(self, provider): - """List all trigger providers for the current tenant""" + """List all trigger subscriptions for the current tenant's provider""" user = current_user assert isinstance(user, Account) assert user.current_tenant_id is not None @@ -45,7 +45,7 @@ class TriggerProviderCredentialListApi(Resource): try: return jsonable_encoder( - TriggerProviderService.list_trigger_provider_credentials( + TriggerProviderService.list_trigger_provider_subscriptions( tenant_id=user.current_tenant_id, provider_id=TriggerProviderID(provider) ) ) @@ -54,12 +54,12 @@ class TriggerProviderCredentialListApi(Resource): raise -class TriggerProviderCredentialsAddApi(Resource): +class TriggerProviderSubscriptionsAddApi(Resource): @setup_required @login_required @account_initialization_required def post(self, provider): - """Add a new credential instance for a trigger provider""" + """Add a new subscription instance for a trigger provider""" user = current_user assert isinstance(user, Account) assert user.current_tenant_id is not None @@ -99,46 +99,12 @@ class TriggerProviderCredentialsAddApi(Resource): raise -class TriggerProviderCredentialsUpdateApi(Resource): +class TriggerProviderSubscriptionsDeleteApi(Resource): @setup_required @login_required @account_initialization_required - def post(self, credential_id): - """Update an existing credential instance""" - user = current_user - assert isinstance(user, Account) - assert user.current_tenant_id is not None - if not user.is_admin_or_owner: - raise Forbidden() - - parser = reqparse.RequestParser() - parser.add_argument("credentials", type=dict, required=False, nullable=True, location="json") - parser.add_argument("name", type=str, required=False, nullable=True, location="json") - args = parser.parse_args() - - try: - result = TriggerProviderService.update_trigger_provider( - tenant_id=user.current_tenant_id, - credential_id=credential_id, - credentials=args.get("credentials"), - name=args.get("name"), - ) - - return result - - except ValueError as e: - raise BadRequest(str(e)) - except Exception as e: - logger.exception("Error updating provider credential", exc_info=e) - raise - - -class TriggerProviderCredentialsDeleteApi(Resource): - @setup_required - @login_required - @account_initialization_required - def post(self, credential_id): - """Delete a credential instance""" + def post(self, subscription_id): + """Delete a subscription instance""" user = current_user assert isinstance(user, Account) assert user.current_tenant_id is not None @@ -148,7 +114,7 @@ class TriggerProviderCredentialsDeleteApi(Resource): try: result = TriggerProviderService.delete_trigger_provider( tenant_id=user.current_tenant_id, - credential_id=credential_id, + subscription_id=subscription_id, ) return result @@ -290,8 +256,8 @@ class TriggerProviderOAuthRefreshTokenApi(Resource): @setup_required @login_required @account_initialization_required - def post(self, credential_id): - """Refresh OAuth token for a trigger provider credential""" + def post(self, subscription_id): + """Refresh OAuth token for a trigger provider subscription""" user = current_user assert isinstance(user, Account) assert user.current_tenant_id is not None @@ -301,7 +267,7 @@ class TriggerProviderOAuthRefreshTokenApi(Resource): try: result = TriggerProviderService.refresh_oauth_token( tenant_id=user.current_tenant_id, - credential_id=credential_id, + subscription_id=subscription_id, ) return result @@ -413,17 +379,16 @@ class TriggerProviderOAuthClientManageApi(Resource): # Trigger provider endpoints +api.add_resource(TriggerProviderListApi, "/workspaces/current/trigger-providers") api.add_resource( - TriggerProviderCredentialListApi, "/workspaces/current/trigger-provider/credentials//list" + TriggerProviderSubscriptionListApi, "/workspaces/current/trigger-provider/subscriptions//list" ) api.add_resource( - TriggerProviderCredentialsAddApi, "/workspaces/current/trigger-provider/credentials//add" + TriggerProviderSubscriptionsAddApi, "/workspaces/current/trigger-provider/subscriptions//add" ) api.add_resource( - TriggerProviderCredentialsUpdateApi, "/workspaces/current/trigger-provider/credentials//update" -) -api.add_resource( - TriggerProviderCredentialsDeleteApi, "/workspaces/current/trigger-provider/credentials//delete" + TriggerProviderSubscriptionsDeleteApi, + "/workspaces/current/trigger-provider/subscriptions//delete", ) # OAuth @@ -433,7 +398,7 @@ api.add_resource( api.add_resource(TriggerProviderOAuthCallbackApi, "/oauth/plugin//trigger/callback") api.add_resource( TriggerProviderOAuthRefreshTokenApi, - "/workspaces/current/trigger-provider/credentials//oauth/refresh", + "/workspaces/current/trigger-provider/subscriptions//oauth/refresh", ) api.add_resource( TriggerProviderOAuthClientManageApi, "/workspaces/current/trigger-provider//oauth/client" diff --git a/api/core/helper/provider_cache.py b/api/core/helper/provider_cache.py index ea5f4f0e4b..def3c897e0 100644 --- a/api/core/helper/provider_cache.py +++ b/api/core/helper/provider_cache.py @@ -68,7 +68,7 @@ class ToolProviderCredentialsCache(ProviderCredentialsCache): return f"tool_credentials:tenant_id:{tenant_id}:provider:{provider}:credential_id:{credential_id}" -class TriggerProviderCredentialCache(ProviderCredentialsCache): +class TriggerProviderCredentialsCache(ProviderCredentialsCache): """Cache for trigger provider credentials""" def __init__(self, tenant_id: str, provider_id: str, credential_id: str): @@ -81,7 +81,7 @@ class TriggerProviderCredentialCache(ProviderCredentialsCache): return f"trigger_credentials:tenant_id:{tenant_id}:provider_id:{provider_id}:credential_id:{credential_id}" -class TriggerProviderOAuthClientCache(ProviderCredentialsCache): +class TriggerProviderOAuthClientParamsCache(ProviderCredentialsCache): """Cache for trigger provider OAuth client""" def __init__(self, tenant_id: str, provider_id: str): diff --git a/api/core/trigger/entities/api_entities.py b/api/core/trigger/entities/api_entities.py index 9d489f37e2..54f297b4b5 100644 --- a/api/core/trigger/entities/api_entities.py +++ b/api/core/trigger/entities/api_entities.py @@ -15,12 +15,12 @@ from core.trigger.entities.entities import ( ) -class TriggerProviderCredentialApiEntity(BaseModel): - id: str = Field(description="The unique id of the credential") - name: str = Field(description="The name of the credential") - provider: str = Field(description="The provider id of the credential") +class TriggerProviderSubscriptionApiEntity(BaseModel): + id: str = Field(description="The unique id of the subscription") + name: str = Field(description="The name of the subscription") + provider: str = Field(description="The provider id of the subscription") credential_type: CredentialType = Field(description="The type of the credential") - credentials: dict = Field(description="The credentials of the credential") + credentials: dict = Field(description="The credentials of the subscription") class TriggerProviderApiEntity(BaseModel): @@ -40,4 +40,4 @@ class TriggerApiEntity(BaseModel): output_schema: Optional[Mapping[str, Any]] = Field(description="The output schema of the trigger") -__all__ = ["TriggerApiEntity", "TriggerProviderApiEntity", "TriggerProviderCredentialApiEntity"] +__all__ = ["TriggerApiEntity", "TriggerProviderApiEntity", "TriggerProviderSubscriptionApiEntity"] diff --git a/api/core/trigger/entities/entities.py b/api/core/trigger/entities/entities.py index 2f4ef4b9af..b0d8109f75 100644 --- a/api/core/trigger/entities/entities.py +++ b/api/core/trigger/entities/entities.py @@ -99,6 +99,7 @@ class OAuthSchema(BaseModel): default_factory=list, description="The schema of the OAuth credentials" ) + class SubscriptionSchema(BaseModel): """ The subscription schema of the trigger provider @@ -114,6 +115,7 @@ class SubscriptionSchema(BaseModel): description="The configuration schema stored in the subscription entity", ) + class TriggerProviderEntity(BaseModel): """ The configuration of a trigger provider diff --git a/api/core/trigger/utils/encryption.py b/api/core/trigger/utils/encryption.py index 2abfa604c2..663807ff7b 100644 --- a/api/core/trigger/utils/encryption.py +++ b/api/core/trigger/utils/encryption.py @@ -1,26 +1,26 @@ from typing import Union -from core.helper.provider_cache import TriggerProviderCredentialCache, TriggerProviderOAuthClientCache +from core.helper.provider_cache import TriggerProviderCredentialsCache, TriggerProviderOAuthClientParamsCache from core.helper.provider_encryption import ProviderConfigCache, ProviderConfigEncrypter, create_provider_encrypter from core.plugin.entities.plugin_daemon import CredentialType -from core.trigger.entities.api_entities import TriggerProviderCredentialApiEntity +from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity from core.trigger.provider import PluginTriggerProviderController -from models.trigger import TriggerProvider +from models.trigger import TriggerSubscription -def create_trigger_provider_encrypter_for_credential( +def create_trigger_provider_encrypter_for_subscription( tenant_id: str, controller: PluginTriggerProviderController, - credential: Union[TriggerProvider, TriggerProviderCredentialApiEntity], + subscription: Union[TriggerSubscription, TriggerProviderSubscriptionApiEntity], ) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]: - cache = TriggerProviderCredentialCache( + cache = TriggerProviderCredentialsCache( tenant_id=tenant_id, provider_id=str(controller.get_provider_id()), - credential_id=credential.id, + credential_id=subscription.id, ) encrypter, _ = create_provider_encrypter( tenant_id=tenant_id, - config=controller.get_credential_schema_config(credential.credential_type), + config=controller.get_credential_schema_config(subscription.credential_type), cache=cache, ) return encrypter, cache @@ -29,7 +29,7 @@ def create_trigger_provider_encrypter_for_credential( def create_trigger_provider_encrypter( tenant_id: str, controller: PluginTriggerProviderController, credential_id: str, credential_type: CredentialType ) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]: - cache = TriggerProviderCredentialCache( + cache = TriggerProviderCredentialsCache( tenant_id=tenant_id, provider_id=str(controller.get_provider_id()), credential_id=credential_id, @@ -45,7 +45,7 @@ def create_trigger_provider_encrypter( def create_trigger_provider_oauth_encrypter( tenant_id: str, controller: PluginTriggerProviderController ) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]: - cache = TriggerProviderOAuthClientCache( + cache = TriggerProviderOAuthClientParamsCache( tenant_id=tenant_id, provider_id=str(controller.get_provider_id()), ) diff --git a/api/models/trigger.py b/api/models/trigger.py index 2db3a627dd..25c08f2ea3 100644 --- a/api/models/trigger.py +++ b/api/models/trigger.py @@ -4,60 +4,61 @@ from datetime import datetime from typing import cast import sqlalchemy as sa -from sqlalchemy import DateTime, Index, Integer, String, Text, func +from sqlalchemy import DateTime, Index, Integer, String, UniqueConstraint, func from sqlalchemy.orm import Mapped, mapped_column from core.plugin.entities.plugin_daemon import CredentialType -from core.trigger.entities.api_entities import TriggerProviderCredentialApiEntity +from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity from models.base import Base from models.types import StringUUID -class TriggerProvider(Base): +class TriggerSubscription(Base): """ Trigger provider model for managing credentials Supports multiple credential instances per provider """ - __tablename__ = "trigger_providers" - __table_args__ = (Index("idx_trigger_providers_tenant_provider", "tenant_id", "provider_id"),) + __tablename__ = "trigger_subscriptions" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="trigger_subscription_pkey"), + Index("idx_trigger_subscriptions_tenant_provider", "tenant_id", "provider_id"), + UniqueConstraint("tenant_id", "provider_id", "name", name="unique_trigger_subscription"), + ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) + name: Mapped[str] = mapped_column(String(255), nullable=False, comment="Subscription instance name") tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) user_id: Mapped[str] = mapped_column(StringUUID, nullable=False) provider_id: Mapped[str] = mapped_column( String(255), nullable=False, comment="Provider identifier (e.g., plugin_id/provider_name)" ) + parameters: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription parameters JSON") + configuration: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription configuration JSON") + + credentials: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription credentials JSON") credential_type: Mapped[str] = mapped_column(String(50), nullable=False, comment="oauth or api_key") - encrypted_credentials: Mapped[str] = mapped_column(Text, nullable=False, comment="Encrypted credentials JSON") - name: Mapped[str] = mapped_column(String(255), nullable=False, comment="Credential instance name") - expires_at: Mapped[int] = mapped_column( + credential_expires_at: Mapped[int] = mapped_column( Integer, default=-1, comment="OAuth token expiration timestamp, -1 for never" ) + expires_at: Mapped[int] = mapped_column( + Integer, default=-1, comment="Subscription instance expiration timestamp, -1 for never" + ) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.now()) updated_at: Mapped[datetime] = mapped_column( DateTime, nullable=False, server_default=func.now(), onupdate=func.now() ) - @property - def credentials(self) -> dict: - """Get credentials as dict (still encrypted)""" - try: - return json.loads(self.encrypted_credentials) if self.encrypted_credentials else {} - except (json.JSONDecodeError, TypeError): - return {} - - def is_oauth_expired(self) -> bool: - """Check if OAuth token is expired""" - if self.credential_type != CredentialType.OAUTH2.value: - return False - if self.expires_at == -1: + def is_credential_expired(self) -> bool: + """Check if credential is expired""" + if self.credential_expires_at == -1: return False # Check if token expires in next 3 minutes - return (self.expires_at - 180) < int(time.time()) + return (self.credential_expires_at - 180) < int(time.time()) - def to_api_entity(self) -> TriggerProviderCredentialApiEntity: - return TriggerProviderCredentialApiEntity( + def to_api_entity(self) -> TriggerProviderSubscriptionApiEntity: + return TriggerProviderSubscriptionApiEntity( id=self.id, name=self.name, provider=self.provider_id, diff --git a/api/services/trigger/trigger_provider_service.py b/api/services/trigger/trigger_provider_service.py index 4628ec9c18..7b4cd5852d 100644 --- a/api/services/trigger/trigger_provider_service.py +++ b/api/services/trigger/trigger_provider_service.py @@ -15,15 +15,15 @@ from core.plugin.entities.plugin import TriggerProviderID from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.impl.oauth import OAuthHandler from core.tools.utils.system_oauth_encryption import decrypt_system_oauth_params -from core.trigger.entities.api_entities import TriggerProviderApiEntity, TriggerProviderCredentialApiEntity +from core.trigger.entities.api_entities import TriggerProviderApiEntity, TriggerProviderSubscriptionApiEntity from core.trigger.trigger_manager import TriggerManager from core.trigger.utils.encryption import ( - create_trigger_provider_encrypter_for_credential, + create_trigger_provider_encrypter_for_subscription, create_trigger_provider_oauth_encrypter, ) from extensions.ext_database import db from extensions.ext_redis import redis_client -from models.trigger import TriggerOAuthSystemClient, TriggerOAuthTenantClient, TriggerProvider +from models.trigger import TriggerOAuthSystemClient, TriggerOAuthTenantClient, TriggerSubscription from services.plugin.plugin_service import PluginService logger = logging.getLogger(__name__) @@ -40,29 +40,29 @@ class TriggerProviderService: return [provider.to_api_entity() for provider in TriggerManager.list_all_trigger_providers(tenant_id)] @classmethod - def list_trigger_provider_credentials( + def list_trigger_provider_subscriptions( cls, tenant_id: str, provider_id: TriggerProviderID - ) -> list[TriggerProviderCredentialApiEntity]: - """List all trigger providers for the current tenant""" - credentials: list[TriggerProviderCredentialApiEntity] = [] + ) -> list[TriggerProviderSubscriptionApiEntity]: + """List all trigger subscriptions for the current tenant""" + subscriptions: list[TriggerProviderSubscriptionApiEntity] = [] with Session(db.engine, autoflush=False) as session: - credentials_db = ( - session.query(TriggerProvider) + subscriptions_db = ( + session.query(TriggerSubscription) .filter_by(tenant_id=tenant_id, provider_id=str(provider_id)) - .order_by(desc(TriggerProvider.created_at)) + .order_by(desc(TriggerSubscription.created_at)) .all() ) - credentials = [credential.to_api_entity() for credential in credentials_db] + subscriptions = [subscription.to_api_entity() for subscription in subscriptions_db] provider_controller = TriggerManager.get_trigger_provider(tenant_id, provider_id) - for credential in credentials: - encrypter, _ = create_trigger_provider_encrypter_for_credential( + for subscription in subscriptions: + encrypter, _ = create_trigger_provider_encrypter_for_subscription( tenant_id=tenant_id, controller=provider_controller, - credential=credential, + subscription=subscription, ) - credential.credentials = encrypter.decrypt(credential.credentials) - return credentials + subscription.credentials = encrypter.decrypt(subscription.credentials) + return subscriptions @classmethod def add_trigger_provider( @@ -95,7 +95,9 @@ class TriggerProviderService: with redis_client.lock(lock_key, timeout=20): # Check provider count limit provider_count = ( - session.query(TriggerProvider).filter_by(tenant_id=tenant_id, provider_id=provider_id).count() + session.query(TriggerSubscription) + .filter_by(tenant_id=tenant_id, provider_id=provider_id) + .count() ) if provider_count >= cls.__MAX_TRIGGER_PROVIDER_COUNT__: @@ -115,7 +117,7 @@ class TriggerProviderService: else: # Check if name already exists existing = ( - session.query(TriggerProvider) + session.query(TriggerSubscription) .filter_by(tenant_id=tenant_id, provider_id=provider_id, name=name) .first() ) @@ -129,12 +131,12 @@ class TriggerProviderService: ) # Create provider record - db_provider = TriggerProvider( + db_provider = TriggerSubscription( tenant_id=tenant_id, user_id=user_id, provider_id=provider_id, credential_type=credential_type.value, - encrypted_credentials=json.dumps(encrypter.encrypt(credentials)), + credentials=encrypter.encrypt(credentials), name=name, expires_at=expires_at, ) @@ -152,7 +154,7 @@ class TriggerProviderService: def update_trigger_provider( cls, tenant_id: str, - credential_id: str, + subscription_id: str, credentials: Optional[dict] = None, name: Optional[str] = None, ) -> dict: @@ -160,15 +162,15 @@ class TriggerProviderService: Update an existing trigger provider's credentials or name. :param tenant_id: Tenant ID - :param credential_id: Credential instance ID + :param subscription_id: Subscription instance ID :param credentials: New credentials (optional) :param name: New name (optional) :return: Success response """ with Session(db.engine) as session: - db_provider = session.query(TriggerProvider).filter_by(tenant_id=tenant_id, id=credential_id).first() + db_provider = session.query(TriggerSubscription).filter_by(tenant_id=tenant_id, id=subscription_id).first() if not db_provider: - raise ValueError(f"Trigger provider credential {credential_id} not found") + raise ValueError(f"Trigger provider subscription {subscription_id} not found") try: provider_controller = TriggerManager.get_trigger_provider( @@ -176,10 +178,10 @@ class TriggerProviderService: ) if credentials: - encrypter, cache = create_trigger_provider_encrypter_for_credential( + encrypter, cache = create_trigger_provider_encrypter_for_subscription( tenant_id=tenant_id, controller=provider_controller, - credential=db_provider, + subscription=db_provider, ) # Handle hidden values original_credentials = encrypter.decrypt(db_provider.credentials) @@ -188,16 +190,16 @@ class TriggerProviderService: for key, value in credentials.items() } - db_provider.encrypted_credentials = json.dumps(encrypter.encrypt(new_credentials)) + db_provider.credentials = encrypter.encrypt(new_credentials) cache.delete() # Update name if provided if name and name != db_provider.name: # Check if name already exists existing = ( - session.query(TriggerProvider) + session.query(TriggerSubscription) .filter_by(tenant_id=tenant_id, provider_id=db_provider.provider_id, name=name) - .filter(TriggerProvider.id != credential_id) + .filter(TriggerSubscription.id != subscription_id) .first() ) if existing: @@ -213,27 +215,27 @@ class TriggerProviderService: raise ValueError(str(e)) @classmethod - def delete_trigger_provider(cls, tenant_id: str, credential_id: str) -> dict: + def delete_trigger_provider(cls, tenant_id: str, subscription_id: str) -> dict: """ - Delete a trigger provider credential. + Delete a trigger provider subscription. :param tenant_id: Tenant ID - :param credential_id: Credential instance ID + :param subscription_id: Subscription instance ID :return: Success response """ with Session(db.engine) as session: - db_provider = session.query(TriggerProvider).filter_by(tenant_id=tenant_id, id=credential_id).first() + db_provider = session.query(TriggerSubscription).filter_by(tenant_id=tenant_id, id=subscription_id).first() if not db_provider: - raise ValueError(f"Trigger provider credential {credential_id} not found") + raise ValueError(f"Trigger provider subscription {subscription_id} not found") provider_controller = TriggerManager.get_trigger_provider( tenant_id, TriggerProviderID(db_provider.provider_id) ) # Clear cache - _, cache = create_trigger_provider_encrypter_for_credential( + _, cache = create_trigger_provider_encrypter_for_subscription( tenant_id=tenant_id, controller=provider_controller, - credential=db_provider, + subscription=db_provider, ) session.delete(db_provider) @@ -246,20 +248,20 @@ class TriggerProviderService: def refresh_oauth_token( cls, tenant_id: str, - credential_id: str, + subscription_id: str, ) -> dict: """ Refresh OAuth token for a trigger provider. :param tenant_id: Tenant ID - :param credential_id: Credential instance ID + :param subscription_id: Subscription instance ID :return: New token info """ with Session(db.engine) as session: - db_provider = session.query(TriggerProvider).filter_by(tenant_id=tenant_id, id=credential_id).first() + db_provider = session.query(TriggerSubscription).filter_by(tenant_id=tenant_id, id=subscription_id).first() if not db_provider: - raise ValueError(f"Trigger provider credential {credential_id} not found") + raise ValueError(f"Trigger provider subscription {subscription_id} not found") if db_provider.credential_type != CredentialType.OAUTH2.value: raise ValueError("Only OAuth credentials can be refreshed") @@ -267,10 +269,10 @@ class TriggerProviderService: provider_id = TriggerProviderID(db_provider.provider_id) provider_controller = TriggerManager.get_trigger_provider(tenant_id, provider_id) # Create encrypter - encrypter, cache = create_trigger_provider_encrypter_for_credential( + encrypter, cache = create_trigger_provider_encrypter_for_subscription( tenant_id=tenant_id, controller=provider_controller, - credential=db_provider, + subscription=db_provider, ) # Decrypt current credentials @@ -295,7 +297,7 @@ class TriggerProviderService: ) # Update credentials - db_provider.encrypted_credentials = json.dumps(encrypter.encrypt(dict(refreshed_credentials.credentials))) + db_provider.credentials = encrypter.encrypt(dict(refreshed_credentials.credentials)) db_provider.expires_at = refreshed_credentials.expires_at session.commit() @@ -518,13 +520,13 @@ class TriggerProviderService: """ try: db_providers = ( - session.query(TriggerProvider) + session.query(TriggerSubscription) .filter_by( tenant_id=tenant_id, provider_id=provider_id, credential_type=credential_type.value, ) - .order_by(desc(TriggerProvider.created_at)) + .order_by(desc(TriggerSubscription.created_at)) .all() )