diff --git a/api/models/trigger.py b/api/models/trigger.py index 092fd84935..6eb54cc77f 100644 --- a/api/models/trigger.py +++ b/api/models/trigger.py @@ -1,7 +1,8 @@ import json import time +from collections.abc import Mapping from datetime import datetime -from typing import cast +from typing import Any, cast import sqlalchemy as sa from sqlalchemy import DateTime, Index, Integer, String, UniqueConstraint, func @@ -40,10 +41,12 @@ class TriggerSubscription(Base): String(255), nullable=False, comment="Provider identifier (e.g., plugin_id/provider_name)" ) endpoint_id: Mapped[str] = mapped_column(String(255), nullable=False, comment="Subscription endpoint") - parameters: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription parameters JSON") - properties: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription properties JSON") + parameters: Mapped[dict[str, Any]] = mapped_column(sa.JSON, nullable=False, comment="Subscription parameters JSON") + properties: Mapped[dict[str, Any]] = mapped_column(sa.JSON, nullable=False, comment="Subscription properties JSON") - credentials: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription credentials JSON") + credentials: Mapped[dict[str, Any]] = mapped_column( + sa.JSON, nullable=False, comment="Subscription credentials JSON" + ) credential_type: Mapped[str] = mapped_column(String(50), nullable=False, comment="oauth or api_key") credential_expires_at: Mapped[int] = mapped_column( Integer, default=-1, comment="OAuth token expiration timestamp, -1 for never" @@ -136,5 +139,5 @@ class TriggerOAuthTenantClient(Base): ) @property - def oauth_params(self) -> dict: - return cast(dict, json.loads(self.encrypted_oauth_params or "{}")) + def oauth_params(self) -> Mapping[str, Any]: + return cast(Mapping[str, Any], json.loads(self.encrypted_oauth_params or "{}")) diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index 5bfd4a13d4..c4f00aabbf 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -239,7 +239,7 @@ class TriggerService: subscription_id: str # Walk nodes to find plugin triggers - nodes_in_graph = [] + nodes_in_graph: list[Mapping[str, Any]] = [] for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN): # Extract plugin trigger configuration from node plugin_id = node_config.get("plugin_id", "") @@ -268,7 +268,7 @@ class TriggerService: f"maximum allowed is {cls.MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW}" ) - not_found_in_cache: list[dict] = [] + not_found_in_cache: list[Mapping[str, Any]] = [] for node_info in nodes_in_graph: node_id = node_info["node_id"] # firstly check if the node exists in cache