refactor(api): replace Any with precise types in db_migration_lock (#34891)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
corevibe555 2026-04-10 14:09:33 +03:00 committed by GitHub
parent 6612ba69b1
commit 130ad295d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -14,9 +14,15 @@ from __future__ import annotations
import logging
import threading
from typing import Any
from typing import TYPE_CHECKING, Any
import redis
from redis.cluster import RedisCluster
from redis.exceptions import LockNotOwnedError, RedisError
from redis.lock import Lock
if TYPE_CHECKING:
from extensions.ext_redis import RedisClientWrapper
logger = logging.getLogger(__name__)
@ -38,21 +44,21 @@ class DbMigrationAutoRenewLock:
primary error/exit code.
"""
_redis_client: Any
_redis_client: redis.Redis | RedisCluster | RedisClientWrapper
_name: str
_ttl_seconds: float
_renew_interval_seconds: float
_log_context: str | None
_logger: logging.Logger
_lock: Any
_lock: Lock | None
_stop_event: threading.Event | None
_thread: threading.Thread | None
_acquired: bool
def __init__(
self,
redis_client: Any,
redis_client: redis.Redis | RedisCluster | RedisClientWrapper,
name: str,
ttl_seconds: float = 60,
renew_interval_seconds: float | None = None,
@ -127,7 +133,7 @@ class DbMigrationAutoRenewLock:
)
self._thread.start()
def _heartbeat_loop(self, lock: Any, stop_event: threading.Event) -> None:
def _heartbeat_loop(self, lock: Lock, stop_event: threading.Event) -> None:
while not stop_event.wait(self._renew_interval_seconds):
try:
lock.reacquire()