refactor(api): type Redis connection param builder functions with TypedDicts (#34875)

This commit is contained in:
dataCenter430 2026-04-10 04:36:39 -07:00 committed by GitHub
parent 04f5fe5e38
commit 674495680d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -14,6 +14,7 @@ from redis.cluster import ClusterNode, RedisCluster
from redis.connection import Connection, SSLConnection
from redis.retry import Retry
from redis.sentinel import Sentinel
from typing_extensions import TypedDict
from configs import dify_config
from dify_app import DifyApp
@ -126,6 +127,35 @@ redis_client: RedisClientWrapper = RedisClientWrapper()
_pubsub_redis_client: redis.Redis | RedisCluster | None = None
class RedisSSLParamsDict(TypedDict):
ssl_cert_reqs: int
ssl_ca_certs: str | None
ssl_certfile: str | None
ssl_keyfile: str | None
class RedisHealthParamsDict(TypedDict):
retry: Retry
socket_timeout: float | None
socket_connect_timeout: float | None
health_check_interval: int | None
class RedisBaseParamsDict(TypedDict):
username: str | None
password: str | None
db: int
encoding: str
encoding_errors: str
decode_responses: bool
protocol: int
cache_config: CacheConfig | None
retry: Retry
socket_timeout: float | None
socket_connect_timeout: float | None
health_check_interval: int | None
def _get_ssl_configuration() -> tuple[type[Union[Connection, SSLConnection]], dict[str, Any]]:
"""Get SSL configuration for Redis connection."""
if not dify_config.REDIS_USE_SSL:
@ -171,14 +201,14 @@ def _get_retry_policy() -> Retry:
)
def _get_connection_health_params() -> dict[str, Any]:
def _get_connection_health_params() -> RedisHealthParamsDict:
"""Get connection health and retry parameters for standalone and Sentinel Redis clients."""
return {
"retry": _get_retry_policy(),
"socket_timeout": dify_config.REDIS_SOCKET_TIMEOUT,
"socket_connect_timeout": dify_config.REDIS_SOCKET_CONNECT_TIMEOUT,
"health_check_interval": dify_config.REDIS_HEALTH_CHECK_INTERVAL,
}
return RedisHealthParamsDict(
retry=_get_retry_policy(),
socket_timeout=dify_config.REDIS_SOCKET_TIMEOUT,
socket_connect_timeout=dify_config.REDIS_SOCKET_CONNECT_TIMEOUT,
health_check_interval=dify_config.REDIS_HEALTH_CHECK_INTERVAL,
)
def _get_cluster_connection_health_params() -> dict[str, Any]:
@ -189,26 +219,26 @@ def _get_cluster_connection_health_params() -> dict[str, Any]:
here. Only ``retry``, ``socket_timeout``, and ``socket_connect_timeout``
are passed through.
"""
params = _get_connection_health_params()
params: dict[str, Any] = dict(_get_connection_health_params())
return {k: v for k, v in params.items() if k != "health_check_interval"}
def _get_base_redis_params() -> dict[str, Any]:
def _get_base_redis_params() -> RedisBaseParamsDict:
"""Get base Redis connection parameters including retry and health policy."""
return {
"username": dify_config.REDIS_USERNAME,
"password": dify_config.REDIS_PASSWORD or None,
"db": dify_config.REDIS_DB,
"encoding": "utf-8",
"encoding_errors": "strict",
"decode_responses": False,
"protocol": dify_config.REDIS_SERIALIZATION_PROTOCOL,
"cache_config": _get_cache_configuration(),
return RedisBaseParamsDict(
username=dify_config.REDIS_USERNAME,
password=dify_config.REDIS_PASSWORD or None,
db=dify_config.REDIS_DB,
encoding="utf-8",
encoding_errors="strict",
decode_responses=False,
protocol=dify_config.REDIS_SERIALIZATION_PROTOCOL,
cache_config=_get_cache_configuration(),
**_get_connection_health_params(),
}
)
def _create_sentinel_client(redis_params: dict[str, Any]) -> Union[redis.Redis, RedisCluster]:
def _create_sentinel_client(redis_params: RedisBaseParamsDict) -> Union[redis.Redis, RedisCluster]:
"""Create Redis client using Sentinel configuration."""
if not dify_config.REDIS_SENTINELS:
raise ValueError("REDIS_SENTINELS must be set when REDIS_USE_SENTINEL is True")
@ -232,7 +262,8 @@ def _create_sentinel_client(redis_params: dict[str, Any]) -> Union[redis.Redis,
sentinel_kwargs=sentinel_kwargs,
)
master: redis.Redis = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)
params: dict[str, Any] = {**redis_params}
master: redis.Redis = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **params)
return master
@ -259,18 +290,16 @@ def _create_cluster_client() -> Union[redis.Redis, RedisCluster]:
return cluster
def _create_standalone_client(redis_params: dict[str, Any]) -> Union[redis.Redis, RedisCluster]:
def _create_standalone_client(redis_params: RedisBaseParamsDict) -> Union[redis.Redis, RedisCluster]:
"""Create standalone Redis client."""
connection_class, ssl_kwargs = _get_ssl_configuration()
params = {**redis_params}
params.update(
{
"host": dify_config.REDIS_HOST,
"port": dify_config.REDIS_PORT,
"connection_class": connection_class,
}
)
params: dict[str, Any] = {
**redis_params,
"host": dify_config.REDIS_HOST,
"port": dify_config.REDIS_PORT,
"connection_class": connection_class,
}
if dify_config.REDIS_MAX_CONNECTIONS:
params["max_connections"] = dify_config.REDIS_MAX_CONNECTIONS
@ -293,8 +322,8 @@ def _create_pubsub_client(pubsub_url: str, use_clusters: bool) -> redis.Redis |
kwargs["max_connections"] = max_conns
return RedisCluster.from_url(pubsub_url, **kwargs)
health_params = _get_connection_health_params()
kwargs = {**health_params}
standalone_health_params: dict[str, Any] = dict(_get_connection_health_params())
kwargs = {**standalone_health_params}
if max_conns:
kwargs["max_connections"] = max_conns
return redis.Redis.from_url(pubsub_url, **kwargs)