From db17119a96a32924f31ec0f712f14a70b04b5127 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Sat, 14 Feb 2026 14:55:05 +0800 Subject: [PATCH] fix(api): make DB migration Redis lock TTL configurable and prevent LockNotOwnedError from masking failures (#32299) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/commands.py | 16 +- api/libs/db_migration_lock.py | 213 ++++++++++++++++++ .../test_auto_renew_redis_lock_integration.py | 38 ++++ .../unit_tests/commands/test_upgrade_db.py | 146 ++++++++++++ 4 files changed, 411 insertions(+), 2 deletions(-) create mode 100644 api/libs/db_migration_lock.py create mode 100644 api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py create mode 100644 api/tests/unit_tests/commands/test_upgrade_db.py diff --git a/api/commands.py b/api/commands.py index 93855bc3b8..75b17df78e 100644 --- a/api/commands.py +++ b/api/commands.py @@ -30,6 +30,7 @@ from extensions.ext_redis import redis_client from extensions.ext_storage import storage from extensions.storage.opendal_storage import OpenDALStorage from extensions.storage.storage_type import StorageType +from libs.db_migration_lock import DbMigrationAutoRenewLock from libs.helper import email as email_validate from libs.password import hash_password, password_pattern, valid_password from libs.rsa import generate_key_pair @@ -54,6 +55,8 @@ from tasks.remove_app_and_related_data_task import delete_draft_variables_batch logger = logging.getLogger(__name__) +DB_UPGRADE_LOCK_TTL_SECONDS = 60 + @click.command("reset-password", help="Reset the account password.") @click.option("--email", prompt=True, help="Account email to reset password for") @@ -727,8 +730,15 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No @click.command("upgrade-db", help="Upgrade the database") def upgrade_db(): click.echo("Preparing database migration...") - lock = redis_client.lock(name="db_upgrade_lock", timeout=60) + lock = DbMigrationAutoRenewLock( + redis_client=redis_client, + name="db_upgrade_lock", + ttl_seconds=DB_UPGRADE_LOCK_TTL_SECONDS, + logger=logger, + log_context="db_migration", + ) if lock.acquire(blocking=False): + migration_succeeded = False try: click.echo(click.style("Starting database migration.", fg="green")) @@ -737,6 +747,7 @@ def upgrade_db(): flask_migrate.upgrade() + migration_succeeded = True click.echo(click.style("Database migration successful!", fg="green")) except Exception as e: @@ -744,7 +755,8 @@ def upgrade_db(): click.echo(click.style(f"Database migration failed: {e}", fg="red")) raise SystemExit(1) finally: - lock.release() + status = "successful" if migration_succeeded else "failed" + lock.release_safely(status=status) else: click.echo("Database migration skipped") diff --git a/api/libs/db_migration_lock.py b/api/libs/db_migration_lock.py new file mode 100644 index 0000000000..1d3a81e0a2 --- /dev/null +++ b/api/libs/db_migration_lock.py @@ -0,0 +1,213 @@ +""" +DB migration Redis lock with heartbeat renewal. + +This is intentionally migration-specific. Background renewal is a trade-off that makes sense +for unbounded, blocking operations like DB migrations (DDL/DML) where the main thread cannot +periodically refresh the lock TTL. + +Do NOT use this as a general-purpose lock primitive for normal application code. Prefer explicit +lock lifecycle management (e.g. redis-py Lock context manager + `extend()` / `reacquire()` from +the same thread) when execution flow is under control. +""" + +from __future__ import annotations + +import logging +import threading +from typing import Any + +from redis.exceptions import LockNotOwnedError, RedisError + +logger = logging.getLogger(__name__) + +MIN_RENEW_INTERVAL_SECONDS = 0.1 +DEFAULT_RENEW_INTERVAL_DIVISOR = 3 +MIN_JOIN_TIMEOUT_SECONDS = 0.5 +MAX_JOIN_TIMEOUT_SECONDS = 5.0 +JOIN_TIMEOUT_MULTIPLIER = 2.0 + + +class DbMigrationAutoRenewLock: + """ + Redis lock wrapper that automatically renews TTL while held (migration-only). + + Notes: + - We force `thread_local=False` when creating the underlying redis-py lock, because the + lock token must be accessible from the heartbeat thread for `reacquire()` to work. + - `release_safely()` is best-effort: it never raises, so it won't mask the caller's + primary error/exit code. + """ + + _redis_client: Any + _name: str + _ttl_seconds: float + _renew_interval_seconds: float + _log_context: str | None + _logger: logging.Logger + + _lock: Any + _stop_event: threading.Event | None + _thread: threading.Thread | None + _acquired: bool + + def __init__( + self, + redis_client: Any, + name: str, + ttl_seconds: float = 60, + renew_interval_seconds: float | None = None, + *, + logger: logging.Logger | None = None, + log_context: str | None = None, + ) -> None: + self._redis_client = redis_client + self._name = name + self._ttl_seconds = float(ttl_seconds) + self._renew_interval_seconds = ( + float(renew_interval_seconds) + if renew_interval_seconds is not None + else max(MIN_RENEW_INTERVAL_SECONDS, self._ttl_seconds / DEFAULT_RENEW_INTERVAL_DIVISOR) + ) + self._logger = logger or logging.getLogger(__name__) + self._log_context = log_context + + self._lock = None + self._stop_event = None + self._thread = None + self._acquired = False + + @property + def name(self) -> str: + return self._name + + def acquire(self, *args: Any, **kwargs: Any) -> bool: + """ + Acquire the lock and start heartbeat renewal on success. + + Accepts the same args/kwargs as redis-py `Lock.acquire()`. + """ + # Prevent accidental double-acquire which could leave the previous heartbeat thread running. + if self._acquired: + raise RuntimeError("DB migration lock is already acquired; call release_safely() before acquiring again.") + + # Reuse the lock object if we already created one. + if self._lock is None: + self._lock = self._redis_client.lock( + name=self._name, + timeout=self._ttl_seconds, + thread_local=False, + ) + acquired = bool(self._lock.acquire(*args, **kwargs)) + self._acquired = acquired + if acquired: + self._start_heartbeat() + return acquired + + def owned(self) -> bool: + if self._lock is None: + return False + try: + return bool(self._lock.owned()) + except Exception: + # Ownership checks are best-effort and must not break callers. + return False + + def _start_heartbeat(self) -> None: + if self._lock is None: + return + if self._stop_event is not None: + return + + self._stop_event = threading.Event() + self._thread = threading.Thread( + target=self._heartbeat_loop, + args=(self._lock, self._stop_event), + daemon=True, + name=f"DbMigrationAutoRenewLock({self._name})", + ) + self._thread.start() + + def _heartbeat_loop(self, lock: Any, stop_event: threading.Event) -> None: + while not stop_event.wait(self._renew_interval_seconds): + try: + lock.reacquire() + except LockNotOwnedError: + self._logger.warning( + "DB migration lock is no longer owned during heartbeat; stop renewing. log_context=%s", + self._log_context, + exc_info=True, + ) + return + except RedisError: + self._logger.warning( + "Failed to renew DB migration lock due to Redis error; will retry. log_context=%s", + self._log_context, + exc_info=True, + ) + except Exception: + self._logger.warning( + "Unexpected error while renewing DB migration lock; will retry. log_context=%s", + self._log_context, + exc_info=True, + ) + + def release_safely(self, *, status: str | None = None) -> None: + """ + Stop heartbeat and release lock. Never raises. + + Args: + status: Optional caller-provided status (e.g. 'successful'/'failed') to add context to logs. + """ + lock = self._lock + if lock is None: + return + + self._stop_heartbeat() + + # Lock release errors should never mask the real error/exit code. + try: + lock.release() + except LockNotOwnedError: + self._logger.warning( + "DB migration lock not owned on release; ignoring. status=%s log_context=%s", + status, + self._log_context, + exc_info=True, + ) + except RedisError: + self._logger.warning( + "Failed to release DB migration lock due to Redis error; ignoring. status=%s log_context=%s", + status, + self._log_context, + exc_info=True, + ) + except Exception: + self._logger.warning( + "Unexpected error while releasing DB migration lock; ignoring. status=%s log_context=%s", + status, + self._log_context, + exc_info=True, + ) + finally: + self._acquired = False + self._lock = None + + def _stop_heartbeat(self) -> None: + if self._stop_event is None: + return + self._stop_event.set() + if self._thread is not None: + # Best-effort join: if Redis calls are blocked, the daemon thread may remain alive. + join_timeout_seconds = max( + MIN_JOIN_TIMEOUT_SECONDS, + min(MAX_JOIN_TIMEOUT_SECONDS, self._renew_interval_seconds * JOIN_TIMEOUT_MULTIPLIER), + ) + self._thread.join(timeout=join_timeout_seconds) + if self._thread.is_alive(): + self._logger.warning( + "DB migration lock heartbeat thread did not stop within %.2fs; ignoring. log_context=%s", + join_timeout_seconds, + self._log_context, + ) + self._stop_event = None + self._thread = None diff --git a/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py b/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py new file mode 100644 index 0000000000..eb055ca332 --- /dev/null +++ b/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py @@ -0,0 +1,38 @@ +""" +Integration tests for DbMigrationAutoRenewLock using real Redis via TestContainers. +""" + +import time +import uuid + +import pytest + +from extensions.ext_redis import redis_client +from libs.db_migration_lock import DbMigrationAutoRenewLock + + +@pytest.mark.usefixtures("flask_app_with_containers") +def test_db_migration_lock_renews_ttl_and_releases(): + lock_name = f"test:db_migration_auto_renew_lock:{uuid.uuid4().hex}" + + # Keep base TTL very small, and renew frequently so the test is stable even on slower CI. + lock = DbMigrationAutoRenewLock( + redis_client=redis_client, + name=lock_name, + ttl_seconds=1.0, + renew_interval_seconds=0.2, + log_context="test_db_migration_lock", + ) + + acquired = lock.acquire(blocking=True, blocking_timeout=5) + assert acquired is True + + # Wait beyond the base TTL; key should still exist due to renewal. + time.sleep(1.5) + ttl = redis_client.ttl(lock_name) + assert ttl > 0 + + lock.release_safely(status="successful") + + # After release, the key should not exist. + assert redis_client.exists(lock_name) == 0 diff --git a/api/tests/unit_tests/commands/test_upgrade_db.py b/api/tests/unit_tests/commands/test_upgrade_db.py new file mode 100644 index 0000000000..80173f5d46 --- /dev/null +++ b/api/tests/unit_tests/commands/test_upgrade_db.py @@ -0,0 +1,146 @@ +import sys +import threading +import types +from unittest.mock import MagicMock + +import commands +from libs.db_migration_lock import LockNotOwnedError, RedisError + +HEARTBEAT_WAIT_TIMEOUT_SECONDS = 5.0 + + +def _install_fake_flask_migrate(monkeypatch, upgrade_impl) -> None: + module = types.ModuleType("flask_migrate") + module.upgrade = upgrade_impl + monkeypatch.setitem(sys.modules, "flask_migrate", module) + + +def _invoke_upgrade_db() -> int: + try: + commands.upgrade_db.callback() + except SystemExit as e: + return int(e.code or 0) + return 0 + + +def test_upgrade_db_skips_when_lock_not_acquired(monkeypatch, capsys): + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 1234) + + lock = MagicMock() + lock.acquire.return_value = False + commands.redis_client.lock.return_value = lock + + exit_code = _invoke_upgrade_db() + captured = capsys.readouterr() + + assert exit_code == 0 + assert "Database migration skipped" in captured.out + + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=1234, thread_local=False) + lock.acquire.assert_called_once_with(blocking=False) + lock.release.assert_not_called() + + +def test_upgrade_db_failure_not_masked_by_lock_release(monkeypatch, capsys): + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 321) + + lock = MagicMock() + lock.acquire.return_value = True + lock.release.side_effect = LockNotOwnedError("simulated") + commands.redis_client.lock.return_value = lock + + def _upgrade(): + raise RuntimeError("boom") + + _install_fake_flask_migrate(monkeypatch, _upgrade) + + exit_code = _invoke_upgrade_db() + captured = capsys.readouterr() + + assert exit_code == 1 + assert "Database migration failed: boom" in captured.out + + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=321, thread_local=False) + lock.acquire.assert_called_once_with(blocking=False) + lock.release.assert_called_once() + + +def test_upgrade_db_success_ignores_lock_not_owned_on_release(monkeypatch, capsys): + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 999) + + lock = MagicMock() + lock.acquire.return_value = True + lock.release.side_effect = LockNotOwnedError("simulated") + commands.redis_client.lock.return_value = lock + + _install_fake_flask_migrate(monkeypatch, lambda: None) + + exit_code = _invoke_upgrade_db() + captured = capsys.readouterr() + + assert exit_code == 0 + assert "Database migration successful!" in captured.out + + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=999, thread_local=False) + lock.acquire.assert_called_once_with(blocking=False) + lock.release.assert_called_once() + + +def test_upgrade_db_renews_lock_during_migration(monkeypatch, capsys): + """ + Ensure the lock is renewed while migrations are running, so the base TTL can stay short. + """ + + # Use a small TTL so the heartbeat interval triggers quickly. + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 0.3) + + lock = MagicMock() + lock.acquire.return_value = True + commands.redis_client.lock.return_value = lock + + renewed = threading.Event() + + def _reacquire(): + renewed.set() + return True + + lock.reacquire.side_effect = _reacquire + + def _upgrade(): + assert renewed.wait(HEARTBEAT_WAIT_TIMEOUT_SECONDS) + + _install_fake_flask_migrate(monkeypatch, _upgrade) + + exit_code = _invoke_upgrade_db() + _ = capsys.readouterr() + + assert exit_code == 0 + assert lock.reacquire.call_count >= 1 + + +def test_upgrade_db_ignores_reacquire_errors(monkeypatch, capsys): + # Use a small TTL so heartbeat runs during the upgrade call. + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 0.3) + + lock = MagicMock() + lock.acquire.return_value = True + commands.redis_client.lock.return_value = lock + + attempted = threading.Event() + + def _reacquire(): + attempted.set() + raise RedisError("simulated") + + lock.reacquire.side_effect = _reacquire + + def _upgrade(): + assert attempted.wait(HEARTBEAT_WAIT_TIMEOUT_SECONDS) + + _install_fake_flask_migrate(monkeypatch, _upgrade) + + exit_code = _invoke_upgrade_db() + _ = capsys.readouterr() + + assert exit_code == 0 + assert lock.reacquire.call_count >= 1