fix: typing

This commit is contained in:
Yeuoly 2025-10-18 19:32:08 +08:00
parent 30a341331f
commit 5df9afa91a
2 changed files with 11 additions and 8 deletions

View File

@ -1,7 +1,8 @@
import json
import time
from collections.abc import Mapping
from datetime import datetime
from typing import cast
from typing import Any, cast
import sqlalchemy as sa
from sqlalchemy import DateTime, Index, Integer, String, UniqueConstraint, func
@ -40,10 +41,12 @@ class TriggerSubscription(Base):
String(255), nullable=False, comment="Provider identifier (e.g., plugin_id/provider_name)"
)
endpoint_id: Mapped[str] = mapped_column(String(255), nullable=False, comment="Subscription endpoint")
parameters: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription parameters JSON")
properties: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription properties JSON")
parameters: Mapped[dict[str, Any]] = mapped_column(sa.JSON, nullable=False, comment="Subscription parameters JSON")
properties: Mapped[dict[str, Any]] = mapped_column(sa.JSON, nullable=False, comment="Subscription properties JSON")
credentials: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription credentials JSON")
credentials: Mapped[dict[str, Any]] = mapped_column(
sa.JSON, nullable=False, comment="Subscription credentials JSON"
)
credential_type: Mapped[str] = mapped_column(String(50), nullable=False, comment="oauth or api_key")
credential_expires_at: Mapped[int] = mapped_column(
Integer, default=-1, comment="OAuth token expiration timestamp, -1 for never"
@ -136,5 +139,5 @@ class TriggerOAuthTenantClient(Base):
)
@property
def oauth_params(self) -> dict:
return cast(dict, json.loads(self.encrypted_oauth_params or "{}"))
def oauth_params(self) -> Mapping[str, Any]:
return cast(Mapping[str, Any], json.loads(self.encrypted_oauth_params or "{}"))

View File

@ -239,7 +239,7 @@ class TriggerService:
subscription_id: str
# Walk nodes to find plugin triggers
nodes_in_graph = []
nodes_in_graph: list[Mapping[str, Any]] = []
for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN):
# Extract plugin trigger configuration from node
plugin_id = node_config.get("plugin_id", "")
@ -268,7 +268,7 @@ class TriggerService:
f"maximum allowed is {cls.MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW}"
)
not_found_in_cache: list[dict] = []
not_found_in_cache: list[Mapping[str, Any]] = []
for node_info in nodes_in_graph:
node_id = node_info["node_id"]
# firstly check if the node exists in cache