From e9a7e8f77fbf7f6bbc38c64543823bcd9e783461 Mon Sep 17 00:00:00 2001 From: Xiyuan Chen <52963600+GareArc@users.noreply.github.com> Date: Fri, 13 Feb 2026 23:40:37 -0800 Subject: [PATCH 01/13] fix: include sso_verified in access_mode validation (#32325) --- api/services/enterprise/enterprise_service.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/api/services/enterprise/enterprise_service.py b/api/services/enterprise/enterprise_service.py index a5133dfcb4..9930c6bf7c 100644 --- a/api/services/enterprise/enterprise_service.py +++ b/api/services/enterprise/enterprise_service.py @@ -4,6 +4,8 @@ from pydantic import BaseModel, Field from services.enterprise.base import EnterpriseRequest +ALLOWED_ACCESS_MODES = ["public", "private", "private_all", "sso_verified"] + class WebAppSettings(BaseModel): access_mode: str = Field( @@ -123,8 +125,8 @@ class EnterpriseService: def update_app_access_mode(cls, app_id: str, access_mode: str): if not app_id: raise ValueError("app_id must be provided.") - if access_mode not in ["public", "private", "private_all"]: - raise ValueError("access_mode must be either 'public', 'private', or 'private_all'") + if access_mode not in ALLOWED_ACCESS_MODES: + raise ValueError(f"access_mode must be one of: {', '.join(ALLOWED_ACCESS_MODES)}") data = {"appId": app_id, "accessMode": access_mode} From 1977e68b2d26948c9c049df3de6a02af09240cdf Mon Sep 17 00:00:00 2001 From: longbingljw Date: Fri, 6 Feb 2026 13:01:31 +0900 Subject: [PATCH 02/13] fix: make `flask upgrade-db` fail on error (#32024) (cherry picked from commit d9530f7bb75c324ec17fae870d752209e1e86194) --- api/commands.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/api/commands.py b/api/commands.py index c4f2c9edbb..93855bc3b8 100644 --- a/api/commands.py +++ b/api/commands.py @@ -739,8 +739,10 @@ def upgrade_db(): click.echo(click.style("Database migration successful!", fg="green")) - except Exception: + except Exception as e: logger.exception("Failed to execute database migration") + click.echo(click.style(f"Database migration failed: {e}", fg="red")) + raise SystemExit(1) finally: lock.release() else: From 9acdfbde2f15b19e90d48a9fed115361aebb5b24 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Fri, 13 Feb 2026 12:15:55 +0800 Subject: [PATCH 03/13] feat(api): enhance database migration locking mechanism and configuration - Introduced a configurable Redis lock TTL for database migrations in DeploymentConfig. - Updated the upgrade_db command to handle lock release errors gracefully. - Added documentation for the new MIGRATION_LOCK_TTL environment variable in the .env.example file and docker-compose.yaml. (cherry picked from commit 4a05fb120622908bc109a3715686706aab3d3b59) --- api/commands.py | 11 ++- api/configs/deploy/__init__.py | 7 +- .../unit_tests/commands/test_upgrade_db.py | 84 +++++++++++++++++++ docker/.env.example | 4 + docker/docker-compose.yaml | 1 + 5 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 api/tests/unit_tests/commands/test_upgrade_db.py diff --git a/api/commands.py b/api/commands.py index 93855bc3b8..fbf16de8be 100644 --- a/api/commands.py +++ b/api/commands.py @@ -10,6 +10,7 @@ import click import sqlalchemy as sa from flask import current_app from pydantic import TypeAdapter +from redis.exceptions import LockNotOwnedError, RedisError from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import sessionmaker @@ -727,7 +728,7 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No @click.command("upgrade-db", help="Upgrade the database") def upgrade_db(): click.echo("Preparing database migration...") - lock = redis_client.lock(name="db_upgrade_lock", timeout=60) + lock = redis_client.lock(name="db_upgrade_lock", timeout=dify_config.MIGRATION_LOCK_TTL) if lock.acquire(blocking=False): try: click.echo(click.style("Starting database migration.", fg="green")) @@ -744,7 +745,13 @@ def upgrade_db(): click.echo(click.style(f"Database migration failed: {e}", fg="red")) raise SystemExit(1) finally: - lock.release() + # Lock release errors should never mask the real migration failure. + try: + lock.release() + except LockNotOwnedError: + logger.warning("DB migration lock not owned on release (likely expired); ignoring.") + except RedisError: + logger.warning("Failed to release DB migration lock due to Redis error; ignoring.", exc_info=True) else: click.echo("Database migration skipped") diff --git a/api/configs/deploy/__init__.py b/api/configs/deploy/__init__.py index 63f4dfba63..4ac57f0370 100644 --- a/api/configs/deploy/__init__.py +++ b/api/configs/deploy/__init__.py @@ -1,4 +1,4 @@ -from pydantic import Field +from pydantic import Field, PositiveInt from pydantic_settings import BaseSettings @@ -32,3 +32,8 @@ class DeploymentConfig(BaseSettings): description="Deployment environment (e.g., 'PRODUCTION', 'DEVELOPMENT'), default to PRODUCTION", default="PRODUCTION", ) + + MIGRATION_LOCK_TTL: PositiveInt = Field( + description="Redis lock TTL for startup DB migration (seconds). Increase for large/slow databases.", + default=3600, + ) diff --git a/api/tests/unit_tests/commands/test_upgrade_db.py b/api/tests/unit_tests/commands/test_upgrade_db.py new file mode 100644 index 0000000000..c262ef71cc --- /dev/null +++ b/api/tests/unit_tests/commands/test_upgrade_db.py @@ -0,0 +1,84 @@ +import sys +import types +from unittest.mock import MagicMock + +import commands +from configs import dify_config + + +def _install_fake_flask_migrate(monkeypatch, upgrade_impl) -> None: + module = types.ModuleType("flask_migrate") + module.upgrade = upgrade_impl + monkeypatch.setitem(sys.modules, "flask_migrate", module) + + +def _invoke_upgrade_db() -> int: + try: + commands.upgrade_db.callback() + except SystemExit as e: + return int(e.code or 0) + return 0 + + +def test_upgrade_db_skips_when_lock_not_acquired(monkeypatch, capsys): + monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 1234) + + lock = MagicMock() + lock.acquire.return_value = False + commands.redis_client.lock.return_value = lock + + exit_code = _invoke_upgrade_db() + captured = capsys.readouterr() + + assert exit_code == 0 + assert "Database migration skipped" in captured.out + + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=1234) + lock.acquire.assert_called_once_with(blocking=False) + lock.release.assert_not_called() + + +def test_upgrade_db_failure_not_masked_by_lock_release(monkeypatch, capsys): + monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 321) + + lock = MagicMock() + lock.acquire.return_value = True + lock.release.side_effect = commands.LockNotOwnedError("simulated") + commands.redis_client.lock.return_value = lock + + def _upgrade(): + raise RuntimeError("boom") + + _install_fake_flask_migrate(monkeypatch, _upgrade) + + exit_code = _invoke_upgrade_db() + captured = capsys.readouterr() + + assert exit_code == 1 + assert "Database migration failed: boom" in captured.out + + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=321) + lock.acquire.assert_called_once_with(blocking=False) + lock.release.assert_called_once() + + +def test_upgrade_db_success_ignores_lock_not_owned_on_release(monkeypatch, capsys): + monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 999) + + lock = MagicMock() + lock.acquire.return_value = True + lock.release.side_effect = commands.LockNotOwnedError("simulated") + commands.redis_client.lock.return_value = lock + + _install_fake_flask_migrate(monkeypatch, lambda: None) + + exit_code = _invoke_upgrade_db() + captured = capsys.readouterr() + + assert exit_code == 0 + assert "Database migration successful!" in captured.out + + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=999) + lock.acquire.assert_called_once_with(blocking=False) + lock.release.assert_called_once() + diff --git a/docker/.env.example b/docker/.env.example index 41a0205bf5..9339404b58 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -125,6 +125,10 @@ OPENAI_API_BASE=https://api.openai.com/v1 # and the application will start after the migrations have completed. MIGRATION_ENABLED=true +# Redis lock TTL (in seconds) for startup database migrations. +# Increase this value for long-running migrations to avoid concurrent upgrades in multi-replica deployments. +MIGRATION_LOCK_TTL=3600 + # File Access Time specifies a time interval in seconds for the file to be accessed. # The default value is 300 seconds. FILES_ACCESS_TIMEOUT=300 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 1886f848e0..57a0c089c8 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -32,6 +32,7 @@ x-shared-env: &shared-api-worker-env CHECK_UPDATE_URL: ${CHECK_UPDATE_URL:-https://updates.dify.ai} OPENAI_API_BASE: ${OPENAI_API_BASE:-https://api.openai.com/v1} MIGRATION_ENABLED: ${MIGRATION_ENABLED:-true} + MIGRATION_LOCK_TTL: ${MIGRATION_LOCK_TTL:-3600} FILES_ACCESS_TIMEOUT: ${FILES_ACCESS_TIMEOUT:-300} ACCESS_TOKEN_EXPIRE_MINUTES: ${ACCESS_TOKEN_EXPIRE_MINUTES:-60} REFRESH_TOKEN_EXPIRE_DAYS: ${REFRESH_TOKEN_EXPIRE_DAYS:-30} From afdd5b6c860b2d0cbc401f4b299fc46df9c5f8c5 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Fri, 13 Feb 2026 12:48:10 +0800 Subject: [PATCH 04/13] feat(api): implement heartbeat mechanism for database migration lock - Added a heartbeat function to renew the Redis lock during database migrations, preventing long blockages from crashed processes. - Updated the upgrade_db command to utilize the new locking mechanism with a configurable TTL. - Removed the deprecated MIGRATION_LOCK_TTL from DeploymentConfig and related files. - Enhanced unit tests to cover the new lock renewal behavior and error handling during migrations. (cherry picked from commit a3331c622435f9f215b95f6b0261f43ae56a9d9c) --- api/commands.py | 49 +++++++++++- api/configs/deploy/__init__.py | 6 +- .../unit_tests/commands/test_upgrade_db.py | 74 +++++++++++++++++-- docker/.env.example | 4 - docker/docker-compose.yaml | 1 - 5 files changed, 115 insertions(+), 19 deletions(-) diff --git a/api/commands.py b/api/commands.py index fbf16de8be..4cc2f476f2 100644 --- a/api/commands.py +++ b/api/commands.py @@ -3,8 +3,9 @@ import datetime import json import logging import secrets +import threading import time -from typing import Any +from typing import TYPE_CHECKING, Any import click import sqlalchemy as sa @@ -55,6 +56,35 @@ from tasks.remove_app_and_related_data_task import delete_draft_variables_batch logger = logging.getLogger(__name__) +if TYPE_CHECKING: + from redis.lock import Lock + +DB_UPGRADE_LOCK_TTL_SECONDS = 60 + + +def _heartbeat_db_upgrade_lock(lock: "Lock", stop_event: threading.Event, ttl_seconds: float) -> None: + """ + Keep the DB upgrade lock alive while migrations are running. + + We intentionally keep the base TTL small (e.g. 60s) so that if the process is killed and can't + release the lock, the lock will naturally expire soon. While the process is alive, this + heartbeat periodically resets the TTL via `lock.reacquire()`. + """ + + interval_seconds = max(0.1, ttl_seconds / 3) + while not stop_event.wait(interval_seconds): + try: + lock.reacquire() + except LockNotOwnedError: + # Another process took over / TTL expired; continuing to retry won't help. + logger.warning("DB migration lock is no longer owned during heartbeat; stop renewing.") + return + except RedisError: + # Best-effort: keep trying while the process is alive. + logger.warning("Failed to renew DB migration lock due to Redis error; will retry.", exc_info=True) + except Exception: + logger.warning("Unexpected error while renewing DB migration lock; will retry.", exc_info=True) + @click.command("reset-password", help="Reset the account password.") @click.option("--email", prompt=True, help="Account email to reset password for") @@ -728,8 +758,21 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No @click.command("upgrade-db", help="Upgrade the database") def upgrade_db(): click.echo("Preparing database migration...") - lock = redis_client.lock(name="db_upgrade_lock", timeout=dify_config.MIGRATION_LOCK_TTL) + # Use a short base TTL + heartbeat renewal, so a crashed process doesn't block migrations for long. + # thread_local=False is required because heartbeat runs in a separate thread. + lock = redis_client.lock( + name="db_upgrade_lock", + timeout=DB_UPGRADE_LOCK_TTL_SECONDS, + thread_local=False, + ) if lock.acquire(blocking=False): + stop_event = threading.Event() + heartbeat_thread = threading.Thread( + target=_heartbeat_db_upgrade_lock, + args=(lock, stop_event, float(DB_UPGRADE_LOCK_TTL_SECONDS)), + daemon=True, + ) + heartbeat_thread.start() try: click.echo(click.style("Starting database migration.", fg="green")) @@ -745,6 +788,8 @@ def upgrade_db(): click.echo(click.style(f"Database migration failed: {e}", fg="red")) raise SystemExit(1) finally: + stop_event.set() + heartbeat_thread.join(timeout=5) # Lock release errors should never mask the real migration failure. try: lock.release() diff --git a/api/configs/deploy/__init__.py b/api/configs/deploy/__init__.py index 4ac57f0370..7db212a3d8 100644 --- a/api/configs/deploy/__init__.py +++ b/api/configs/deploy/__init__.py @@ -1,4 +1,4 @@ -from pydantic import Field, PositiveInt +from pydantic import Field from pydantic_settings import BaseSettings @@ -33,7 +33,3 @@ class DeploymentConfig(BaseSettings): default="PRODUCTION", ) - MIGRATION_LOCK_TTL: PositiveInt = Field( - description="Redis lock TTL for startup DB migration (seconds). Increase for large/slow databases.", - default=3600, - ) diff --git a/api/tests/unit_tests/commands/test_upgrade_db.py b/api/tests/unit_tests/commands/test_upgrade_db.py index c262ef71cc..59d47de895 100644 --- a/api/tests/unit_tests/commands/test_upgrade_db.py +++ b/api/tests/unit_tests/commands/test_upgrade_db.py @@ -1,9 +1,9 @@ import sys +import threading import types from unittest.mock import MagicMock import commands -from configs import dify_config def _install_fake_flask_migrate(monkeypatch, upgrade_impl) -> None: @@ -21,7 +21,7 @@ def _invoke_upgrade_db() -> int: def test_upgrade_db_skips_when_lock_not_acquired(monkeypatch, capsys): - monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 1234) + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 1234) lock = MagicMock() lock.acquire.return_value = False @@ -33,13 +33,13 @@ def test_upgrade_db_skips_when_lock_not_acquired(monkeypatch, capsys): assert exit_code == 0 assert "Database migration skipped" in captured.out - commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=1234) + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=1234, thread_local=False) lock.acquire.assert_called_once_with(blocking=False) lock.release.assert_not_called() def test_upgrade_db_failure_not_masked_by_lock_release(monkeypatch, capsys): - monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 321) + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 321) lock = MagicMock() lock.acquire.return_value = True @@ -57,13 +57,13 @@ def test_upgrade_db_failure_not_masked_by_lock_release(monkeypatch, capsys): assert exit_code == 1 assert "Database migration failed: boom" in captured.out - commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=321) + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=321, thread_local=False) lock.acquire.assert_called_once_with(blocking=False) lock.release.assert_called_once() def test_upgrade_db_success_ignores_lock_not_owned_on_release(monkeypatch, capsys): - monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 999) + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 999) lock = MagicMock() lock.acquire.return_value = True @@ -78,7 +78,67 @@ def test_upgrade_db_success_ignores_lock_not_owned_on_release(monkeypatch, capsy assert exit_code == 0 assert "Database migration successful!" in captured.out - commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=999) + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=999, thread_local=False) lock.acquire.assert_called_once_with(blocking=False) lock.release.assert_called_once() + +def test_upgrade_db_renews_lock_during_migration(monkeypatch, capsys): + """ + Ensure the lock is renewed while migrations are running, so the base TTL can stay short. + """ + + # Use a small TTL so the heartbeat interval triggers quickly. + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 0.3) + + lock = MagicMock() + lock.acquire.return_value = True + commands.redis_client.lock.return_value = lock + + renewed = threading.Event() + + def _reacquire(): + renewed.set() + return True + + lock.reacquire.side_effect = _reacquire + + def _upgrade(): + assert renewed.wait(1.0) + + _install_fake_flask_migrate(monkeypatch, _upgrade) + + exit_code = _invoke_upgrade_db() + _ = capsys.readouterr() + + assert exit_code == 0 + assert lock.reacquire.call_count >= 1 + + +def test_upgrade_db_ignores_reacquire_errors(monkeypatch, capsys): + # Use a small TTL so heartbeat runs during the upgrade call. + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 0.3) + + lock = MagicMock() + lock.acquire.return_value = True + commands.redis_client.lock.return_value = lock + + attempted = threading.Event() + + def _reacquire(): + attempted.set() + raise commands.RedisError("simulated") + + lock.reacquire.side_effect = _reacquire + + def _upgrade(): + assert attempted.wait(1.0) + + _install_fake_flask_migrate(monkeypatch, _upgrade) + + exit_code = _invoke_upgrade_db() + _ = capsys.readouterr() + + assert exit_code == 0 + assert lock.reacquire.call_count >= 1 + diff --git a/docker/.env.example b/docker/.env.example index 9339404b58..41a0205bf5 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -125,10 +125,6 @@ OPENAI_API_BASE=https://api.openai.com/v1 # and the application will start after the migrations have completed. MIGRATION_ENABLED=true -# Redis lock TTL (in seconds) for startup database migrations. -# Increase this value for long-running migrations to avoid concurrent upgrades in multi-replica deployments. -MIGRATION_LOCK_TTL=3600 - # File Access Time specifies a time interval in seconds for the file to be accessed. # The default value is 300 seconds. FILES_ACCESS_TIMEOUT=300 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 57a0c089c8..1886f848e0 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -32,7 +32,6 @@ x-shared-env: &shared-api-worker-env CHECK_UPDATE_URL: ${CHECK_UPDATE_URL:-https://updates.dify.ai} OPENAI_API_BASE: ${OPENAI_API_BASE:-https://api.openai.com/v1} MIGRATION_ENABLED: ${MIGRATION_ENABLED:-true} - MIGRATION_LOCK_TTL: ${MIGRATION_LOCK_TTL:-3600} FILES_ACCESS_TIMEOUT: ${FILES_ACCESS_TIMEOUT:-300} ACCESS_TOKEN_EXPIRE_MINUTES: ${ACCESS_TOKEN_EXPIRE_MINUTES:-60} REFRESH_TOKEN_EXPIRE_DAYS: ${REFRESH_TOKEN_EXPIRE_DAYS:-30} From 6032c598b0bef1ab8051834eaacc1ae2f8ff2312 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Fri, 13 Feb 2026 12:57:14 +0800 Subject: [PATCH 05/13] fix(api): improve logging for database migration lock release - Added a migration_succeeded flag to track the success of database migrations. - Enhanced logging messages to indicate the status of the migration when releasing the lock, providing clearer context for potential issues. (cherry picked from commit e74be0392995d16d288eed2175c51148c9e5b9c0) --- api/commands.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/api/commands.py b/api/commands.py index 4cc2f476f2..ada2099058 100644 --- a/api/commands.py +++ b/api/commands.py @@ -773,6 +773,7 @@ def upgrade_db(): daemon=True, ) heartbeat_thread.start() + migration_succeeded = False try: click.echo(click.style("Starting database migration.", fg="green")) @@ -781,6 +782,7 @@ def upgrade_db(): flask_migrate.upgrade() + migration_succeeded = True click.echo(click.style("Database migration successful!", fg="green")) except Exception as e: @@ -794,9 +796,15 @@ def upgrade_db(): try: lock.release() except LockNotOwnedError: - logger.warning("DB migration lock not owned on release (likely expired); ignoring.") + status = "successful" if migration_succeeded else "failed" + logger.warning("DB migration lock not owned on release after %s migration (likely expired); ignoring.", status) except RedisError: - logger.warning("Failed to release DB migration lock due to Redis error; ignoring.", exc_info=True) + status = "successful" if migration_succeeded else "failed" + logger.warning( + "Failed to release DB migration lock due to Redis error after %s migration; ignoring.", + status, + exc_info=True, + ) else: click.echo("Database migration skipped") From ee0c4a8852e51f0d462c764c4b35cabaeffea506 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 13 Feb 2026 06:46:17 +0000 Subject: [PATCH 06/13] [autofix.ci] apply automated fixes (cherry picked from commit 326cffa553ffac1bcd39a051c899c35b0ebe997d) --- api/commands.py | 4 +++- api/configs/deploy/__init__.py | 1 - api/tests/unit_tests/commands/test_upgrade_db.py | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/api/commands.py b/api/commands.py index ada2099058..d40cf58e2a 100644 --- a/api/commands.py +++ b/api/commands.py @@ -797,7 +797,9 @@ def upgrade_db(): lock.release() except LockNotOwnedError: status = "successful" if migration_succeeded else "failed" - logger.warning("DB migration lock not owned on release after %s migration (likely expired); ignoring.", status) + logger.warning( + "DB migration lock not owned on release after %s migration (likely expired); ignoring.", status + ) except RedisError: status = "successful" if migration_succeeded else "failed" logger.warning( diff --git a/api/configs/deploy/__init__.py b/api/configs/deploy/__init__.py index 7db212a3d8..63f4dfba63 100644 --- a/api/configs/deploy/__init__.py +++ b/api/configs/deploy/__init__.py @@ -32,4 +32,3 @@ class DeploymentConfig(BaseSettings): description="Deployment environment (e.g., 'PRODUCTION', 'DEVELOPMENT'), default to PRODUCTION", default="PRODUCTION", ) - diff --git a/api/tests/unit_tests/commands/test_upgrade_db.py b/api/tests/unit_tests/commands/test_upgrade_db.py index 59d47de895..7f5e0e00c7 100644 --- a/api/tests/unit_tests/commands/test_upgrade_db.py +++ b/api/tests/unit_tests/commands/test_upgrade_db.py @@ -141,4 +141,3 @@ def test_upgrade_db_ignores_reacquire_errors(monkeypatch, capsys): assert exit_code == 0 assert lock.reacquire.call_count >= 1 - From 8d4bd5636b3e33cc3b873abc15e5b4176dd06cb2 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Fri, 13 Feb 2026 15:01:32 +0800 Subject: [PATCH 07/13] refactor(tests): replace hardcoded wait time with constant for clarity - Introduced HEARTBEAT_WAIT_TIMEOUT_SECONDS constant to improve readability and maintainability of test code. - Updated test assertions to use the new constant instead of a hardcoded value. (cherry picked from commit 0d53743d83b03ae0e68fad143711ffa5f6354093) --- api/tests/unit_tests/commands/test_upgrade_db.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/api/tests/unit_tests/commands/test_upgrade_db.py b/api/tests/unit_tests/commands/test_upgrade_db.py index 7f5e0e00c7..d884477143 100644 --- a/api/tests/unit_tests/commands/test_upgrade_db.py +++ b/api/tests/unit_tests/commands/test_upgrade_db.py @@ -5,6 +5,8 @@ from unittest.mock import MagicMock import commands +HEARTBEAT_WAIT_TIMEOUT_SECONDS = 1.0 + def _install_fake_flask_migrate(monkeypatch, upgrade_impl) -> None: module = types.ModuleType("flask_migrate") @@ -104,7 +106,7 @@ def test_upgrade_db_renews_lock_during_migration(monkeypatch, capsys): lock.reacquire.side_effect = _reacquire def _upgrade(): - assert renewed.wait(1.0) + assert renewed.wait(HEARTBEAT_WAIT_TIMEOUT_SECONDS) _install_fake_flask_migrate(monkeypatch, _upgrade) @@ -132,7 +134,7 @@ def test_upgrade_db_ignores_reacquire_errors(monkeypatch, capsys): lock.reacquire.side_effect = _reacquire def _upgrade(): - assert attempted.wait(1.0) + assert attempted.wait(HEARTBEAT_WAIT_TIMEOUT_SECONDS) _install_fake_flask_migrate(monkeypatch, _upgrade) From 94603b5408ea4daf9a6b9abc6ce9ea22b139efcb Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Sat, 14 Feb 2026 12:03:58 +0800 Subject: [PATCH 08/13] refactor(api): replace heartbeat mechanism with AutoRenewRedisLock for database migration - Removed the manual heartbeat function for renewing the Redis lock during database migrations. - Integrated AutoRenewRedisLock to handle lock renewal automatically, simplifying the upgrade_db command. - Updated unit tests to reflect changes in lock handling and error management during migrations. (cherry picked from commit 8814256eb5fa20b29e554264f3b659b027bc4c9a) --- api/commands.py | 68 +----- api/libs/auto_renew_redis_lock.py | 198 ++++++++++++++++++ .../test_auto_renew_redis_lock_integration.py | 39 ++++ .../unit_tests/commands/test_upgrade_db.py | 9 +- .../enterprise/test_enterprise_service.py | 125 +++++++++++ 5 files changed, 376 insertions(+), 63 deletions(-) create mode 100644 api/libs/auto_renew_redis_lock.py create mode 100644 api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py create mode 100644 api/tests/unit_tests/services/enterprise/test_enterprise_service.py diff --git a/api/commands.py b/api/commands.py index d40cf58e2a..f7af5a5df2 100644 --- a/api/commands.py +++ b/api/commands.py @@ -3,15 +3,13 @@ import datetime import json import logging import secrets -import threading import time -from typing import TYPE_CHECKING, Any +from typing import Any import click import sqlalchemy as sa from flask import current_app from pydantic import TypeAdapter -from redis.exceptions import LockNotOwnedError, RedisError from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import sessionmaker @@ -32,6 +30,7 @@ from extensions.ext_redis import redis_client from extensions.ext_storage import storage from extensions.storage.opendal_storage import OpenDALStorage from extensions.storage.storage_type import StorageType +from libs.auto_renew_redis_lock import AutoRenewRedisLock from libs.helper import email as email_validate from libs.password import hash_password, password_pattern, valid_password from libs.rsa import generate_key_pair @@ -56,36 +55,9 @@ from tasks.remove_app_and_related_data_task import delete_draft_variables_batch logger = logging.getLogger(__name__) -if TYPE_CHECKING: - from redis.lock import Lock - DB_UPGRADE_LOCK_TTL_SECONDS = 60 -def _heartbeat_db_upgrade_lock(lock: "Lock", stop_event: threading.Event, ttl_seconds: float) -> None: - """ - Keep the DB upgrade lock alive while migrations are running. - - We intentionally keep the base TTL small (e.g. 60s) so that if the process is killed and can't - release the lock, the lock will naturally expire soon. While the process is alive, this - heartbeat periodically resets the TTL via `lock.reacquire()`. - """ - - interval_seconds = max(0.1, ttl_seconds / 3) - while not stop_event.wait(interval_seconds): - try: - lock.reacquire() - except LockNotOwnedError: - # Another process took over / TTL expired; continuing to retry won't help. - logger.warning("DB migration lock is no longer owned during heartbeat; stop renewing.") - return - except RedisError: - # Best-effort: keep trying while the process is alive. - logger.warning("Failed to renew DB migration lock due to Redis error; will retry.", exc_info=True) - except Exception: - logger.warning("Unexpected error while renewing DB migration lock; will retry.", exc_info=True) - - @click.command("reset-password", help="Reset the account password.") @click.option("--email", prompt=True, help="Account email to reset password for") @click.option("--new-password", prompt=True, help="New password") @@ -758,21 +730,14 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No @click.command("upgrade-db", help="Upgrade the database") def upgrade_db(): click.echo("Preparing database migration...") - # Use a short base TTL + heartbeat renewal, so a crashed process doesn't block migrations for long. - # thread_local=False is required because heartbeat runs in a separate thread. - lock = redis_client.lock( + lock = AutoRenewRedisLock( + redis_client=redis_client, name="db_upgrade_lock", - timeout=DB_UPGRADE_LOCK_TTL_SECONDS, - thread_local=False, + ttl_seconds=DB_UPGRADE_LOCK_TTL_SECONDS, + logger=logger, + log_context="db_migration", ) if lock.acquire(blocking=False): - stop_event = threading.Event() - heartbeat_thread = threading.Thread( - target=_heartbeat_db_upgrade_lock, - args=(lock, stop_event, float(DB_UPGRADE_LOCK_TTL_SECONDS)), - daemon=True, - ) - heartbeat_thread.start() migration_succeeded = False try: click.echo(click.style("Starting database migration.", fg="green")) @@ -790,23 +755,8 @@ def upgrade_db(): click.echo(click.style(f"Database migration failed: {e}", fg="red")) raise SystemExit(1) finally: - stop_event.set() - heartbeat_thread.join(timeout=5) - # Lock release errors should never mask the real migration failure. - try: - lock.release() - except LockNotOwnedError: - status = "successful" if migration_succeeded else "failed" - logger.warning( - "DB migration lock not owned on release after %s migration (likely expired); ignoring.", status - ) - except RedisError: - status = "successful" if migration_succeeded else "failed" - logger.warning( - "Failed to release DB migration lock due to Redis error after %s migration; ignoring.", - status, - exc_info=True, - ) + status = "successful" if migration_succeeded else "failed" + lock.release_safely(status=status) else: click.echo("Database migration skipped") diff --git a/api/libs/auto_renew_redis_lock.py b/api/libs/auto_renew_redis_lock.py new file mode 100644 index 0000000000..2d45c6bf26 --- /dev/null +++ b/api/libs/auto_renew_redis_lock.py @@ -0,0 +1,198 @@ +""" +Auto-renewing Redis distributed lock (redis-py Lock). + +Why this exists: +- A fixed, long lock TTL can leave a stale lock for a long time if the process is killed + before releasing it. +- A fixed, short lock TTL can expire during long critical sections (e.g. DB migrations), + allowing another instance to acquire the same lock concurrently. + +This wrapper keeps a short base TTL and renews it in a daemon thread using `Lock.reacquire()` +while the process is alive. If the process is terminated, the renewal stops and the lock +expires soon. +""" + +from __future__ import annotations + +import logging +import threading +from typing import Any + +from redis.exceptions import LockNotOwnedError, RedisError + +logger = logging.getLogger(__name__) + + +class AutoRenewRedisLock: + """ + Redis lock wrapper that automatically renews TTL while held. + + Notes: + - We force `thread_local=False` when creating the underlying redis-py lock, because the + lock token must be accessible from the heartbeat thread for `reacquire()` to work. + - `release_safely()` is best-effort: it never raises, so it won't mask the caller's + primary error/exit code. + """ + + _redis_client: Any + _name: str + _ttl_seconds: float + _renew_interval_seconds: float + _log_context: str | None + _logger: logging.Logger + + _lock: Any + _stop_event: threading.Event | None + _thread: threading.Thread | None + _acquired: bool + + def __init__( + self, + redis_client: Any, + name: str, + ttl_seconds: float = 60, + renew_interval_seconds: float | None = None, + *, + logger: logging.Logger | None = None, + log_context: str | None = None, + ) -> None: + self._redis_client = redis_client + self._name = name + self._ttl_seconds = float(ttl_seconds) + self._renew_interval_seconds = ( + float(renew_interval_seconds) if renew_interval_seconds is not None else max(0.1, self._ttl_seconds / 3) + ) + self._logger = logger or logging.getLogger(__name__) + self._log_context = log_context + + self._lock = None + self._stop_event = None + self._thread = None + self._acquired = False + + @property + def name(self) -> str: + return self._name + + def acquire(self, *args: Any, **kwargs: Any) -> bool: + """ + Acquire the lock and start auto-renew heartbeat on success. + + Accepts the same args/kwargs as redis-py `Lock.acquire()`. + """ + self._lock = self._redis_client.lock( + name=self._name, + timeout=self._ttl_seconds, + thread_local=False, + ) + acquired = bool(self._lock.acquire(*args, **kwargs)) + self._acquired = acquired + if acquired: + self._start_heartbeat() + return acquired + + def owned(self) -> bool: + if self._lock is None: + return False + try: + return bool(self._lock.owned()) + except Exception: + # Ownership checks are best-effort and must not break callers. + return False + + def _start_heartbeat(self) -> None: + if self._lock is None: + return + if self._stop_event is not None: + return + + self._stop_event = threading.Event() + self._thread = threading.Thread( + target=self._heartbeat_loop, + args=(self._lock, self._stop_event), + daemon=True, + name=f"AutoRenewRedisLock({self._name})", + ) + self._thread.start() + + def _heartbeat_loop(self, lock: Any, stop_event: threading.Event) -> None: + while not stop_event.wait(self._renew_interval_seconds): + try: + lock.reacquire() + except LockNotOwnedError: + self._logger.warning( + "Auto-renew lock is no longer owned during heartbeat%s; stop renewing.", + f" ({self._log_context})" if self._log_context else "", + exc_info=True, + ) + return + except RedisError: + self._logger.warning( + "Failed to renew auto-renew lock due to Redis error%s; will retry.", + f" ({self._log_context})" if self._log_context else "", + exc_info=True, + ) + except Exception: + self._logger.warning( + "Unexpected error while renewing auto-renew lock%s; will retry.", + f" ({self._log_context})" if self._log_context else "", + exc_info=True, + ) + + def release_safely(self, *, status: str | None = None) -> None: + """ + Stop heartbeat and release lock. Never raises. + + Args: + status: Optional caller-provided status (e.g. 'successful'/'failed') to add context to logs. + """ + lock = self._lock + if lock is None: + return + + self._stop_heartbeat() + + # Lock release errors should never mask the real error/exit code. + try: + lock.release() + except LockNotOwnedError: + self._logger.warning( + "Auto-renew lock not owned on release%s%s; ignoring.", + f" after {status} operation" if status else "", + f" ({self._log_context})" if self._log_context else "", + exc_info=True, + ) + except RedisError: + self._logger.warning( + "Failed to release auto-renew lock due to Redis error%s%s; ignoring.", + f" after {status} operation" if status else "", + f" ({self._log_context})" if self._log_context else "", + exc_info=True, + ) + except Exception: + self._logger.warning( + "Unexpected error while releasing auto-renew lock%s%s; ignoring.", + f" after {status} operation" if status else "", + f" ({self._log_context})" if self._log_context else "", + exc_info=True, + ) + finally: + self._acquired = False + + def _stop_heartbeat(self) -> None: + if self._stop_event is None: + return + self._stop_event.set() + if self._thread is not None: + # Best-effort join: if Redis calls are blocked, the daemon thread may remain alive. + join_timeout_seconds = max(0.5, min(5.0, self._renew_interval_seconds * 2)) + self._thread.join(timeout=join_timeout_seconds) + if self._thread.is_alive(): + self._logger.warning( + "Auto-renew lock heartbeat thread did not stop within %.2fs%s; ignoring.", + join_timeout_seconds, + f" ({self._log_context})" if self._log_context else "", + ) + self._stop_event = None + self._thread = None + diff --git a/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py b/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py new file mode 100644 index 0000000000..072ba27d73 --- /dev/null +++ b/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py @@ -0,0 +1,39 @@ +""" +Integration tests for AutoRenewRedisLock using real Redis via TestContainers. +""" + +import time +import uuid + +import pytest + +from extensions.ext_redis import redis_client +from libs.auto_renew_redis_lock import AutoRenewRedisLock + + +@pytest.mark.usefixtures("flask_app_with_containers") +def test_auto_renew_redis_lock_renews_ttl_and_releases(): + lock_name = f"test:auto_renew_lock:{uuid.uuid4().hex}" + + # Keep base TTL very small, and renew frequently so the test is stable even on slower CI. + lock = AutoRenewRedisLock( + redis_client=redis_client, + name=lock_name, + ttl_seconds=1.0, + renew_interval_seconds=0.2, + log_context="test_auto_renew_redis_lock", + ) + + acquired = lock.acquire(blocking=True, blocking_timeout=5) + assert acquired is True + + # Wait beyond the base TTL; key should still exist due to renewal. + time.sleep(1.5) + ttl = redis_client.ttl(lock_name) + assert ttl > 0 + + lock.release_safely(status="successful") + + # After release, the key should not exist. + assert redis_client.exists(lock_name) == 0 + diff --git a/api/tests/unit_tests/commands/test_upgrade_db.py b/api/tests/unit_tests/commands/test_upgrade_db.py index d884477143..c4c333f457 100644 --- a/api/tests/unit_tests/commands/test_upgrade_db.py +++ b/api/tests/unit_tests/commands/test_upgrade_db.py @@ -4,8 +4,9 @@ import types from unittest.mock import MagicMock import commands +from libs.auto_renew_redis_lock import LockNotOwnedError, RedisError -HEARTBEAT_WAIT_TIMEOUT_SECONDS = 1.0 +HEARTBEAT_WAIT_TIMEOUT_SECONDS = 5.0 def _install_fake_flask_migrate(monkeypatch, upgrade_impl) -> None: @@ -45,7 +46,7 @@ def test_upgrade_db_failure_not_masked_by_lock_release(monkeypatch, capsys): lock = MagicMock() lock.acquire.return_value = True - lock.release.side_effect = commands.LockNotOwnedError("simulated") + lock.release.side_effect = LockNotOwnedError("simulated") commands.redis_client.lock.return_value = lock def _upgrade(): @@ -69,7 +70,7 @@ def test_upgrade_db_success_ignores_lock_not_owned_on_release(monkeypatch, capsy lock = MagicMock() lock.acquire.return_value = True - lock.release.side_effect = commands.LockNotOwnedError("simulated") + lock.release.side_effect = LockNotOwnedError("simulated") commands.redis_client.lock.return_value = lock _install_fake_flask_migrate(monkeypatch, lambda: None) @@ -129,7 +130,7 @@ def test_upgrade_db_ignores_reacquire_errors(monkeypatch, capsys): def _reacquire(): attempted.set() - raise commands.RedisError("simulated") + raise RedisError("simulated") lock.reacquire.side_effect = _reacquire diff --git a/api/tests/unit_tests/services/enterprise/test_enterprise_service.py b/api/tests/unit_tests/services/enterprise/test_enterprise_service.py new file mode 100644 index 0000000000..b4201aa061 --- /dev/null +++ b/api/tests/unit_tests/services/enterprise/test_enterprise_service.py @@ -0,0 +1,125 @@ +"""Unit tests for enterprise service integrations. + +This module covers the enterprise-only default workspace auto-join behavior: +- Enterprise mode disabled: no external calls +- Successful join / skipped join: no errors +- Failures (network/invalid response/invalid UUID): soft-fail wrapper must not raise +""" + +from unittest.mock import patch + +import pytest + +from services.enterprise.enterprise_service import ( + DefaultWorkspaceJoinResult, + EnterpriseService, + try_join_default_workspace, +) + + +class TestJoinDefaultWorkspace: + def test_join_default_workspace_success(self): + account_id = "11111111-1111-1111-1111-111111111111" + response = {"workspace_id": "22222222-2222-2222-2222-222222222222", "joined": True, "message": "ok"} + + with patch("services.enterprise.enterprise_service.EnterpriseRequest.send_request") as mock_send_request: + mock_send_request.return_value = response + + result = EnterpriseService.join_default_workspace(account_id=account_id) + + assert isinstance(result, DefaultWorkspaceJoinResult) + assert result.workspace_id == response["workspace_id"] + assert result.joined is True + assert result.message == "ok" + + mock_send_request.assert_called_once_with( + "POST", + "/default-workspace/members", + json={"account_id": account_id}, + ) + + def test_join_default_workspace_invalid_response_format_raises(self): + account_id = "11111111-1111-1111-1111-111111111111" + + with patch("services.enterprise.enterprise_service.EnterpriseRequest.send_request") as mock_send_request: + mock_send_request.return_value = "not-a-dict" + + with pytest.raises(ValueError, match="Invalid response format"): + EnterpriseService.join_default_workspace(account_id=account_id) + + def test_join_default_workspace_invalid_account_id_raises(self): + with pytest.raises(ValueError): + EnterpriseService.join_default_workspace(account_id="not-a-uuid") + + +class TestTryJoinDefaultWorkspace: + def test_try_join_default_workspace_enterprise_disabled_noop(self): + with ( + patch("services.enterprise.enterprise_service.dify_config") as mock_config, + patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join, + ): + mock_config.ENTERPRISE_ENABLED = False + + try_join_default_workspace("11111111-1111-1111-1111-111111111111") + + mock_join.assert_not_called() + + def test_try_join_default_workspace_successful_join_does_not_raise(self): + account_id = "11111111-1111-1111-1111-111111111111" + + with ( + patch("services.enterprise.enterprise_service.dify_config") as mock_config, + patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join, + ): + mock_config.ENTERPRISE_ENABLED = True + mock_join.return_value = DefaultWorkspaceJoinResult( + workspace_id="22222222-2222-2222-2222-222222222222", + joined=True, + message="ok", + ) + + # Should not raise + try_join_default_workspace(account_id) + + mock_join.assert_called_once_with(account_id=account_id) + + def test_try_join_default_workspace_skipped_join_does_not_raise(self): + account_id = "11111111-1111-1111-1111-111111111111" + + with ( + patch("services.enterprise.enterprise_service.dify_config") as mock_config, + patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join, + ): + mock_config.ENTERPRISE_ENABLED = True + mock_join.return_value = DefaultWorkspaceJoinResult( + workspace_id="", + joined=False, + message="no default workspace configured", + ) + + # Should not raise + try_join_default_workspace(account_id) + + mock_join.assert_called_once_with(account_id=account_id) + + def test_try_join_default_workspace_api_failure_soft_fails(self): + account_id = "11111111-1111-1111-1111-111111111111" + + with ( + patch("services.enterprise.enterprise_service.dify_config") as mock_config, + patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join, + ): + mock_config.ENTERPRISE_ENABLED = True + mock_join.side_effect = Exception("network failure") + + # Should not raise + try_join_default_workspace(account_id) + + mock_join.assert_called_once_with(account_id=account_id) + + def test_try_join_default_workspace_invalid_account_id_soft_fails(self): + with patch("services.enterprise.enterprise_service.dify_config") as mock_config: + mock_config.ENTERPRISE_ENABLED = True + + # Should not raise even though UUID parsing fails inside join_default_workspace + try_join_default_workspace("not-a-uuid") From 5ccbc00eb95f3f9e3adfcadab1fe734ef1431117 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Sat, 14 Feb 2026 12:11:52 +0800 Subject: [PATCH 09/13] refactor(api): replace AutoRenewRedisLock with DbMigrationAutoRenewLock - Updated the database migration locking mechanism to use DbMigrationAutoRenewLock for improved clarity and functionality. - Removed the AutoRenewRedisLock implementation and its associated tests. - Adjusted integration and unit tests to reflect the new locking class and its usage in the upgrade_db command. (cherry picked from commit c812ad9ff26bed3eb59862bd7a5179b7ee83f11f) --- api/commands.py | 4 +- ...new_redis_lock.py => db_migration_lock.py} | 38 +++++++++---------- .../test_auto_renew_redis_lock_integration.py | 12 +++--- .../unit_tests/commands/test_upgrade_db.py | 2 +- 4 files changed, 27 insertions(+), 29 deletions(-) rename api/libs/{auto_renew_redis_lock.py => db_migration_lock.py} (80%) diff --git a/api/commands.py b/api/commands.py index f7af5a5df2..75b17df78e 100644 --- a/api/commands.py +++ b/api/commands.py @@ -30,7 +30,7 @@ from extensions.ext_redis import redis_client from extensions.ext_storage import storage from extensions.storage.opendal_storage import OpenDALStorage from extensions.storage.storage_type import StorageType -from libs.auto_renew_redis_lock import AutoRenewRedisLock +from libs.db_migration_lock import DbMigrationAutoRenewLock from libs.helper import email as email_validate from libs.password import hash_password, password_pattern, valid_password from libs.rsa import generate_key_pair @@ -730,7 +730,7 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No @click.command("upgrade-db", help="Upgrade the database") def upgrade_db(): click.echo("Preparing database migration...") - lock = AutoRenewRedisLock( + lock = DbMigrationAutoRenewLock( redis_client=redis_client, name="db_upgrade_lock", ttl_seconds=DB_UPGRADE_LOCK_TTL_SECONDS, diff --git a/api/libs/auto_renew_redis_lock.py b/api/libs/db_migration_lock.py similarity index 80% rename from api/libs/auto_renew_redis_lock.py rename to api/libs/db_migration_lock.py index 2d45c6bf26..799ac24b5f 100644 --- a/api/libs/auto_renew_redis_lock.py +++ b/api/libs/db_migration_lock.py @@ -1,15 +1,13 @@ """ -Auto-renewing Redis distributed lock (redis-py Lock). +DB migration Redis lock with heartbeat renewal. -Why this exists: -- A fixed, long lock TTL can leave a stale lock for a long time if the process is killed - before releasing it. -- A fixed, short lock TTL can expire during long critical sections (e.g. DB migrations), - allowing another instance to acquire the same lock concurrently. +This is intentionally migration-specific. Background renewal is a trade-off that makes sense +for unbounded, blocking operations like DB migrations (DDL/DML) where the main thread cannot +periodically refresh the lock TTL. -This wrapper keeps a short base TTL and renews it in a daemon thread using `Lock.reacquire()` -while the process is alive. If the process is terminated, the renewal stops and the lock -expires soon. +Do NOT use this as a general-purpose lock primitive for normal application code. Prefer explicit +lock lifecycle management (e.g. redis-py Lock context manager + `extend()` / `reacquire()` from +the same thread) when execution flow is under control. """ from __future__ import annotations @@ -23,9 +21,9 @@ from redis.exceptions import LockNotOwnedError, RedisError logger = logging.getLogger(__name__) -class AutoRenewRedisLock: +class DbMigrationAutoRenewLock: """ - Redis lock wrapper that automatically renews TTL while held. + Redis lock wrapper that automatically renews TTL while held (migration-only). Notes: - We force `thread_local=False` when creating the underlying redis-py lock, because the @@ -76,7 +74,7 @@ class AutoRenewRedisLock: def acquire(self, *args: Any, **kwargs: Any) -> bool: """ - Acquire the lock and start auto-renew heartbeat on success. + Acquire the lock and start heartbeat renewal on success. Accepts the same args/kwargs as redis-py `Lock.acquire()`. """ @@ -111,7 +109,7 @@ class AutoRenewRedisLock: target=self._heartbeat_loop, args=(self._lock, self._stop_event), daemon=True, - name=f"AutoRenewRedisLock({self._name})", + name=f"DbMigrationAutoRenewLock({self._name})", ) self._thread.start() @@ -121,20 +119,20 @@ class AutoRenewRedisLock: lock.reacquire() except LockNotOwnedError: self._logger.warning( - "Auto-renew lock is no longer owned during heartbeat%s; stop renewing.", + "DB migration lock is no longer owned during heartbeat%s; stop renewing.", f" ({self._log_context})" if self._log_context else "", exc_info=True, ) return except RedisError: self._logger.warning( - "Failed to renew auto-renew lock due to Redis error%s; will retry.", + "Failed to renew DB migration lock due to Redis error%s; will retry.", f" ({self._log_context})" if self._log_context else "", exc_info=True, ) except Exception: self._logger.warning( - "Unexpected error while renewing auto-renew lock%s; will retry.", + "Unexpected error while renewing DB migration lock%s; will retry.", f" ({self._log_context})" if self._log_context else "", exc_info=True, ) @@ -157,21 +155,21 @@ class AutoRenewRedisLock: lock.release() except LockNotOwnedError: self._logger.warning( - "Auto-renew lock not owned on release%s%s; ignoring.", + "DB migration lock not owned on release%s%s; ignoring.", f" after {status} operation" if status else "", f" ({self._log_context})" if self._log_context else "", exc_info=True, ) except RedisError: self._logger.warning( - "Failed to release auto-renew lock due to Redis error%s%s; ignoring.", + "Failed to release DB migration lock due to Redis error%s%s; ignoring.", f" after {status} operation" if status else "", f" ({self._log_context})" if self._log_context else "", exc_info=True, ) except Exception: self._logger.warning( - "Unexpected error while releasing auto-renew lock%s%s; ignoring.", + "Unexpected error while releasing DB migration lock%s%s; ignoring.", f" after {status} operation" if status else "", f" ({self._log_context})" if self._log_context else "", exc_info=True, @@ -189,7 +187,7 @@ class AutoRenewRedisLock: self._thread.join(timeout=join_timeout_seconds) if self._thread.is_alive(): self._logger.warning( - "Auto-renew lock heartbeat thread did not stop within %.2fs%s; ignoring.", + "DB migration lock heartbeat thread did not stop within %.2fs%s; ignoring.", join_timeout_seconds, f" ({self._log_context})" if self._log_context else "", ) diff --git a/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py b/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py index 072ba27d73..550aea14b4 100644 --- a/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py +++ b/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py @@ -1,5 +1,5 @@ """ -Integration tests for AutoRenewRedisLock using real Redis via TestContainers. +Integration tests for DbMigrationAutoRenewLock using real Redis via TestContainers. """ import time @@ -8,20 +8,20 @@ import uuid import pytest from extensions.ext_redis import redis_client -from libs.auto_renew_redis_lock import AutoRenewRedisLock +from libs.db_migration_lock import DbMigrationAutoRenewLock @pytest.mark.usefixtures("flask_app_with_containers") -def test_auto_renew_redis_lock_renews_ttl_and_releases(): - lock_name = f"test:auto_renew_lock:{uuid.uuid4().hex}" +def test_db_migration_lock_renews_ttl_and_releases(): + lock_name = f"test:db_migration_auto_renew_lock:{uuid.uuid4().hex}" # Keep base TTL very small, and renew frequently so the test is stable even on slower CI. - lock = AutoRenewRedisLock( + lock = DbMigrationAutoRenewLock( redis_client=redis_client, name=lock_name, ttl_seconds=1.0, renew_interval_seconds=0.2, - log_context="test_auto_renew_redis_lock", + log_context="test_db_migration_lock", ) acquired = lock.acquire(blocking=True, blocking_timeout=5) diff --git a/api/tests/unit_tests/commands/test_upgrade_db.py b/api/tests/unit_tests/commands/test_upgrade_db.py index c4c333f457..80173f5d46 100644 --- a/api/tests/unit_tests/commands/test_upgrade_db.py +++ b/api/tests/unit_tests/commands/test_upgrade_db.py @@ -4,7 +4,7 @@ import types from unittest.mock import MagicMock import commands -from libs.auto_renew_redis_lock import LockNotOwnedError, RedisError +from libs.db_migration_lock import LockNotOwnedError, RedisError HEARTBEAT_WAIT_TIMEOUT_SECONDS = 5.0 From d0bd74fccbd23f13ba45511aa137700a168e4955 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Sat, 14 Feb 2026 04:16:51 +0000 Subject: [PATCH 10/13] [autofix.ci] apply automated fixes (cherry picked from commit 907e63cdc57f8006017837a74c2da2fbe274dcfb) --- api/libs/db_migration_lock.py | 1 - .../libs/test_auto_renew_redis_lock_integration.py | 1 - 2 files changed, 2 deletions(-) diff --git a/api/libs/db_migration_lock.py b/api/libs/db_migration_lock.py index 799ac24b5f..69f20466e3 100644 --- a/api/libs/db_migration_lock.py +++ b/api/libs/db_migration_lock.py @@ -193,4 +193,3 @@ class DbMigrationAutoRenewLock: ) self._stop_event = None self._thread = None - diff --git a/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py b/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py index 550aea14b4..eb055ca332 100644 --- a/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py +++ b/api/tests/test_containers_integration_tests/libs/test_auto_renew_redis_lock_integration.py @@ -36,4 +36,3 @@ def test_db_migration_lock_renews_ttl_and_releases(): # After release, the key should not exist. assert redis_client.exists(lock_name) == 0 - From 18ba367b11a9bc12a3a1a32ca2b2cf8411f58942 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Sat, 14 Feb 2026 12:36:05 +0800 Subject: [PATCH 11/13] refactor(api): improve DbMigrationAutoRenewLock configuration and logging - Introduced constants for minimum and maximum join timeout values, enhancing clarity and maintainability. - Updated the renewal interval calculation to use defined constants for better readability. - Improved logging messages to include context information, making it easier to trace issues during lock operations. (cherry picked from commit 1471b77bf5156a95417bde148753702d44221929) --- api/libs/db_migration_lock.py | 49 +++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/api/libs/db_migration_lock.py b/api/libs/db_migration_lock.py index 69f20466e3..851119c9c0 100644 --- a/api/libs/db_migration_lock.py +++ b/api/libs/db_migration_lock.py @@ -20,6 +20,12 @@ from redis.exceptions import LockNotOwnedError, RedisError logger = logging.getLogger(__name__) +MIN_RENEW_INTERVAL_SECONDS = 0.1 +DEFAULT_RENEW_INTERVAL_DIVISOR = 3 +MIN_JOIN_TIMEOUT_SECONDS = 0.5 +MAX_JOIN_TIMEOUT_SECONDS = 5.0 +JOIN_TIMEOUT_MULTIPLIER = 2.0 + class DbMigrationAutoRenewLock: """ @@ -58,7 +64,9 @@ class DbMigrationAutoRenewLock: self._name = name self._ttl_seconds = float(ttl_seconds) self._renew_interval_seconds = ( - float(renew_interval_seconds) if renew_interval_seconds is not None else max(0.1, self._ttl_seconds / 3) + float(renew_interval_seconds) + if renew_interval_seconds is not None + else max(MIN_RENEW_INTERVAL_SECONDS, self._ttl_seconds / DEFAULT_RENEW_INTERVAL_DIVISOR) ) self._logger = logger or logging.getLogger(__name__) self._log_context = log_context @@ -119,21 +127,21 @@ class DbMigrationAutoRenewLock: lock.reacquire() except LockNotOwnedError: self._logger.warning( - "DB migration lock is no longer owned during heartbeat%s; stop renewing.", - f" ({self._log_context})" if self._log_context else "", + "DB migration lock is no longer owned during heartbeat; stop renewing. log_context=%s", + self._log_context, exc_info=True, ) return except RedisError: self._logger.warning( - "Failed to renew DB migration lock due to Redis error%s; will retry.", - f" ({self._log_context})" if self._log_context else "", + "Failed to renew DB migration lock due to Redis error; will retry. log_context=%s", + self._log_context, exc_info=True, ) except Exception: self._logger.warning( - "Unexpected error while renewing DB migration lock%s; will retry.", - f" ({self._log_context})" if self._log_context else "", + "Unexpected error while renewing DB migration lock; will retry. log_context=%s", + self._log_context, exc_info=True, ) @@ -155,23 +163,23 @@ class DbMigrationAutoRenewLock: lock.release() except LockNotOwnedError: self._logger.warning( - "DB migration lock not owned on release%s%s; ignoring.", - f" after {status} operation" if status else "", - f" ({self._log_context})" if self._log_context else "", + "DB migration lock not owned on release; ignoring. status=%s log_context=%s", + status, + self._log_context, exc_info=True, ) except RedisError: self._logger.warning( - "Failed to release DB migration lock due to Redis error%s%s; ignoring.", - f" after {status} operation" if status else "", - f" ({self._log_context})" if self._log_context else "", + "Failed to release DB migration lock due to Redis error; ignoring. status=%s log_context=%s", + status, + self._log_context, exc_info=True, ) except Exception: self._logger.warning( - "Unexpected error while releasing DB migration lock%s%s; ignoring.", - f" after {status} operation" if status else "", - f" ({self._log_context})" if self._log_context else "", + "Unexpected error while releasing DB migration lock; ignoring. status=%s log_context=%s", + status, + self._log_context, exc_info=True, ) finally: @@ -183,13 +191,16 @@ class DbMigrationAutoRenewLock: self._stop_event.set() if self._thread is not None: # Best-effort join: if Redis calls are blocked, the daemon thread may remain alive. - join_timeout_seconds = max(0.5, min(5.0, self._renew_interval_seconds * 2)) + join_timeout_seconds = max( + MIN_JOIN_TIMEOUT_SECONDS, + min(MAX_JOIN_TIMEOUT_SECONDS, self._renew_interval_seconds * JOIN_TIMEOUT_MULTIPLIER), + ) self._thread.join(timeout=join_timeout_seconds) if self._thread.is_alive(): self._logger.warning( - "DB migration lock heartbeat thread did not stop within %.2fs%s; ignoring.", + "DB migration lock heartbeat thread did not stop within %.2fs; ignoring. log_context=%s", join_timeout_seconds, - f" ({self._log_context})" if self._log_context else "", + self._log_context, ) self._stop_event = None self._thread = None From 3cdc9c119ee9cbf22fcdadd5d9dc8a29838cc810 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Sat, 14 Feb 2026 13:03:07 +0800 Subject: [PATCH 12/13] refactor(api): enhance DbMigrationAutoRenewLock acquisition logic - Added a check to prevent double acquisition of the DB migration lock, raising an error if an attempt is made to acquire it while already held. - Implemented logic to reuse the lock object if it has already been created, improving efficiency and clarity in lock management. - Reset the lock object to None upon release to ensure proper state management. (cherry picked from commit d4b102d3c8a473c4fd6409dba7c198289bb5f921) --- api/libs/db_migration_lock.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/api/libs/db_migration_lock.py b/api/libs/db_migration_lock.py index 851119c9c0..1d3a81e0a2 100644 --- a/api/libs/db_migration_lock.py +++ b/api/libs/db_migration_lock.py @@ -86,11 +86,17 @@ class DbMigrationAutoRenewLock: Accepts the same args/kwargs as redis-py `Lock.acquire()`. """ - self._lock = self._redis_client.lock( - name=self._name, - timeout=self._ttl_seconds, - thread_local=False, - ) + # Prevent accidental double-acquire which could leave the previous heartbeat thread running. + if self._acquired: + raise RuntimeError("DB migration lock is already acquired; call release_safely() before acquiring again.") + + # Reuse the lock object if we already created one. + if self._lock is None: + self._lock = self._redis_client.lock( + name=self._name, + timeout=self._ttl_seconds, + thread_local=False, + ) acquired = bool(self._lock.acquire(*args, **kwargs)) self._acquired = acquired if acquired: @@ -184,6 +190,7 @@ class DbMigrationAutoRenewLock: ) finally: self._acquired = False + self._lock = None def _stop_heartbeat(self) -> None: if self._stop_event is None: From 5025e292206cc27099f76eefedb73b7a5ad32f9a Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Sat, 14 Feb 2026 16:34:49 +0800 Subject: [PATCH 13/13] test: remove unrelated enterprise service test Co-authored-by: Cursor --- .../enterprise/test_enterprise_service.py | 125 ------------------ 1 file changed, 125 deletions(-) delete mode 100644 api/tests/unit_tests/services/enterprise/test_enterprise_service.py diff --git a/api/tests/unit_tests/services/enterprise/test_enterprise_service.py b/api/tests/unit_tests/services/enterprise/test_enterprise_service.py deleted file mode 100644 index b4201aa061..0000000000 --- a/api/tests/unit_tests/services/enterprise/test_enterprise_service.py +++ /dev/null @@ -1,125 +0,0 @@ -"""Unit tests for enterprise service integrations. - -This module covers the enterprise-only default workspace auto-join behavior: -- Enterprise mode disabled: no external calls -- Successful join / skipped join: no errors -- Failures (network/invalid response/invalid UUID): soft-fail wrapper must not raise -""" - -from unittest.mock import patch - -import pytest - -from services.enterprise.enterprise_service import ( - DefaultWorkspaceJoinResult, - EnterpriseService, - try_join_default_workspace, -) - - -class TestJoinDefaultWorkspace: - def test_join_default_workspace_success(self): - account_id = "11111111-1111-1111-1111-111111111111" - response = {"workspace_id": "22222222-2222-2222-2222-222222222222", "joined": True, "message": "ok"} - - with patch("services.enterprise.enterprise_service.EnterpriseRequest.send_request") as mock_send_request: - mock_send_request.return_value = response - - result = EnterpriseService.join_default_workspace(account_id=account_id) - - assert isinstance(result, DefaultWorkspaceJoinResult) - assert result.workspace_id == response["workspace_id"] - assert result.joined is True - assert result.message == "ok" - - mock_send_request.assert_called_once_with( - "POST", - "/default-workspace/members", - json={"account_id": account_id}, - ) - - def test_join_default_workspace_invalid_response_format_raises(self): - account_id = "11111111-1111-1111-1111-111111111111" - - with patch("services.enterprise.enterprise_service.EnterpriseRequest.send_request") as mock_send_request: - mock_send_request.return_value = "not-a-dict" - - with pytest.raises(ValueError, match="Invalid response format"): - EnterpriseService.join_default_workspace(account_id=account_id) - - def test_join_default_workspace_invalid_account_id_raises(self): - with pytest.raises(ValueError): - EnterpriseService.join_default_workspace(account_id="not-a-uuid") - - -class TestTryJoinDefaultWorkspace: - def test_try_join_default_workspace_enterprise_disabled_noop(self): - with ( - patch("services.enterprise.enterprise_service.dify_config") as mock_config, - patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join, - ): - mock_config.ENTERPRISE_ENABLED = False - - try_join_default_workspace("11111111-1111-1111-1111-111111111111") - - mock_join.assert_not_called() - - def test_try_join_default_workspace_successful_join_does_not_raise(self): - account_id = "11111111-1111-1111-1111-111111111111" - - with ( - patch("services.enterprise.enterprise_service.dify_config") as mock_config, - patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join, - ): - mock_config.ENTERPRISE_ENABLED = True - mock_join.return_value = DefaultWorkspaceJoinResult( - workspace_id="22222222-2222-2222-2222-222222222222", - joined=True, - message="ok", - ) - - # Should not raise - try_join_default_workspace(account_id) - - mock_join.assert_called_once_with(account_id=account_id) - - def test_try_join_default_workspace_skipped_join_does_not_raise(self): - account_id = "11111111-1111-1111-1111-111111111111" - - with ( - patch("services.enterprise.enterprise_service.dify_config") as mock_config, - patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join, - ): - mock_config.ENTERPRISE_ENABLED = True - mock_join.return_value = DefaultWorkspaceJoinResult( - workspace_id="", - joined=False, - message="no default workspace configured", - ) - - # Should not raise - try_join_default_workspace(account_id) - - mock_join.assert_called_once_with(account_id=account_id) - - def test_try_join_default_workspace_api_failure_soft_fails(self): - account_id = "11111111-1111-1111-1111-111111111111" - - with ( - patch("services.enterprise.enterprise_service.dify_config") as mock_config, - patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join, - ): - mock_config.ENTERPRISE_ENABLED = True - mock_join.side_effect = Exception("network failure") - - # Should not raise - try_join_default_workspace(account_id) - - mock_join.assert_called_once_with(account_id=account_id) - - def test_try_join_default_workspace_invalid_account_id_soft_fails(self): - with patch("services.enterprise.enterprise_service.dify_config") as mock_config: - mock_config.ENTERPRISE_ENABLED = True - - # Should not raise even though UUID parsing fails inside join_default_workspace - try_join_default_workspace("not-a-uuid")