From 34793e0d92ee918b68e279954aed46bf714b58a9 Mon Sep 17 00:00:00 2001 From: echooffx Date: Thu, 30 Apr 2026 16:04:02 +0800 Subject: [PATCH 1/3] feat(api): add scheduled cleanup task for dataset_queries The dataset_queries table grows without bound because every RAG retrieval and hit-test inserts a row. This adds a configurable Celery Beat task (clean_dataset_queries_task) that deletes rows older than a retention period (default 60 days) in batches, gated by ENABLE_CLEAN_DATASET_QUERIES_TASK. Retention is clamped to max(config, PLAN_SANDBOX_CLEAN_DAY_SETTING) to avoid breaking clean_unused_datasets_task which reads DatasetQuery.created_at. Also adds a created_at index on dataset_queries via alembic migration to keep the delete scan performant as the table grows. Co-Authored-By: Claude Opus 4.7 --- api/configs/feature/__init__.py | 19 +++ api/extensions/ext_celery.py | 6 + ...7d0a_add_dataset_queries_created_at_idx.py | 25 ++++ api/models/dataset.py | 1 + api/schedule/clean_dataset_queries_task.py | 108 ++++++++++++++++++ api/tests/unit_tests/schedule/__init__.py | 0 .../test_clean_dataset_queries_task.py | 102 +++++++++++++++++ 7 files changed, 261 insertions(+) create mode 100644 api/migrations/versions/2026_04_30_1600-67b5709d7d0a_add_dataset_queries_created_at_idx.py create mode 100644 api/schedule/clean_dataset_queries_task.py create mode 100644 api/tests/unit_tests/schedule/__init__.py create mode 100644 api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 52e33c1789..892a389f11 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1238,6 +1238,25 @@ class CeleryScheduleTasksConfig(BaseSettings): default=60 * 60, ) + # Clean dataset_queries scheduled task + ENABLE_CLEAN_DATASET_QUERIES_TASK: bool = Field( + description="Enable scheduled cleanup for the dataset_queries table", + default=False, + ) + CLEAN_DATASET_QUERIES_RETENTION_DAYS: PositiveInt = Field( + description="Retention days for dataset_queries rows; " + "must be >= PLAN_SANDBOX_CLEAN_DAY_SETTING to avoid breaking clean_unused_datasets_task", + default=60, + ) + CLEAN_DATASET_QUERIES_BATCH_SIZE: PositiveInt = Field( + description="Batch size for dataset_queries cleanup", + default=500, + ) + CLEAN_DATASET_QUERIES_LOCK_TTL: PositiveInt = Field( + description="Redis lock TTL in seconds for the dataset_queries cleanup task", + default=3600, + ) + class PositionConfig(BaseSettings): POSITION_PROVIDER_PINS: str = Field( diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 340f514fcc..1fa1b78f5e 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -165,6 +165,12 @@ def init_app(app: DifyApp) -> Celery: "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task", "schedule": crontab(minute="0", hour="3", day_of_month=f"*/{day}"), } + if dify_config.ENABLE_CLEAN_DATASET_QUERIES_TASK: + imports.append("schedule.clean_dataset_queries_task") + beat_schedule["clean_dataset_queries_task"] = { + "task": "schedule.clean_dataset_queries_task.clean_dataset_queries_task", + "schedule": crontab(minute="0", hour="5", day_of_month=f"*/{day}"), + } if dify_config.ENABLE_CREATE_TIDB_SERVERLESS_TASK: imports.append("schedule.create_tidb_serverless_task") beat_schedule["create_tidb_serverless_task"] = { diff --git a/api/migrations/versions/2026_04_30_1600-67b5709d7d0a_add_dataset_queries_created_at_idx.py b/api/migrations/versions/2026_04_30_1600-67b5709d7d0a_add_dataset_queries_created_at_idx.py new file mode 100644 index 0000000000..61978e40dd --- /dev/null +++ b/api/migrations/versions/2026_04_30_1600-67b5709d7d0a_add_dataset_queries_created_at_idx.py @@ -0,0 +1,25 @@ +"""add dataset_queries created_at index + +Revision ID: 67b5709d7d0a +Revises: 227822d22895 +Create Date: 2026-04-30 16:00:00.000000 + +""" +from alembic import op +import models as models + +# revision identifiers, used by Alembic. +revision = '67b5709d7d0a' +down_revision = '227822d22895' +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table('dataset_queries', schema=None) as batch_op: + batch_op.create_index('dataset_query_created_at_idx', ['created_at'], unique=False) + + +def downgrade(): + with op.batch_alter_table('dataset_queries', schema=None) as batch_op: + batch_op.drop_index('dataset_query_created_at_idx') diff --git a/api/models/dataset.py b/api/models/dataset.py index a00e9f7640..0db04a95a3 100644 --- a/api/models/dataset.py +++ b/api/models/dataset.py @@ -1127,6 +1127,7 @@ class DatasetQuery(TypeBase): __table_args__ = ( sa.PrimaryKeyConstraint("id", name="dataset_query_pkey"), sa.Index("dataset_query_dataset_id_idx", "dataset_id"), + sa.Index("dataset_query_created_at_idx", "created_at"), ) id: Mapped[str] = mapped_column( diff --git a/api/schedule/clean_dataset_queries_task.py b/api/schedule/clean_dataset_queries_task.py new file mode 100644 index 0000000000..86511b5958 --- /dev/null +++ b/api/schedule/clean_dataset_queries_task.py @@ -0,0 +1,108 @@ +"""Periodic cleanup for the ``dataset_queries`` table. + +Every RAG retrieval and hit-testing operation inserts a row, and this table +grows without bound unless we actively prune it. + +Important invariant: ``clean_unused_datasets_task`` reads +``DatasetQuery.created_at`` to decide whether a dataset has been queried +recently (window = ``PLAN_SANDBOX_CLEAN_DAY_SETTING``). Deleting rows younger +than that window would cause datasets to be incorrectly marked unused and have +their documents disabled. We therefore clamp the effective retention to +``max(CLEAN_DATASET_QUERIES_RETENTION_DAYS, PLAN_SANDBOX_CLEAN_DAY_SETTING)``. +""" + +import datetime +import logging +import time + +import click +from redis.exceptions import LockError +from sqlalchemy import delete, select + +import app +from configs import dify_config +from extensions.ext_database import db +from extensions.ext_redis import redis_client +from models.dataset import DatasetQuery + +logger = logging.getLogger(__name__) + + +def _effective_retention_days() -> int: + """Return the retention days after clamping to the minimum safe threshold.""" + requested = dify_config.CLEAN_DATASET_QUERIES_RETENTION_DAYS + minimum = dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING + if requested < minimum: + logger.warning( + "CLEAN_DATASET_QUERIES_RETENTION_DAYS (%d) < PLAN_SANDBOX_CLEAN_DAY_SETTING (%d); " + "clamping to %d to avoid breaking clean_unused_datasets_task", + requested, + minimum, + minimum, + ) + return minimum + return requested + + +@app.celery.task(queue="dataset") +def clean_dataset_queries_task() -> None: + """Delete ``dataset_queries`` rows older than the effective retention window.""" + click.echo(click.style("Start clean dataset_queries.", fg="green")) + start_at = time.perf_counter() + + retention_days = _effective_retention_days() + cutoff_date = datetime.datetime.now() - datetime.timedelta(days=retention_days) + batch_size = dify_config.CLEAN_DATASET_QUERIES_BATCH_SIZE + + try: + with redis_client.lock( + "retention:clean_dataset_queries_task", + timeout=dify_config.CLEAN_DATASET_QUERIES_LOCK_TTL, + blocking=False, + ): + total_deleted = 0 + batch_count = 0 + + while True: + batch_count += 1 + ids = db.session.scalars( + select(DatasetQuery.id).where(DatasetQuery.created_at < cutoff_date).limit(batch_size) + ).all() + + if not ids: + break + + db.session.execute(delete(DatasetQuery).where(DatasetQuery.id.in_(ids))) + db.session.commit() + total_deleted += len(ids) + + end_at = time.perf_counter() + click.echo( + click.style( + f"Cleaned {total_deleted} dataset_queries rows " + f"older than {retention_days} days " + f"in {batch_count} batches, latency: {end_at - start_at:.2f}s", + fg="green", + ) + ) + + except LockError: + end_at = time.perf_counter() + logger.exception("clean_dataset_queries_task: acquire task lock failed, skip current execution") + click.echo( + click.style( + f"clean_dataset_queries_task: skipped (lock already held), latency: {end_at - start_at:.2f}s", + fg="yellow", + ) + ) + raise + except Exception: + end_at = time.perf_counter() + logger.exception("clean_dataset_queries_task failed") + click.echo( + click.style( + f"clean_dataset_queries_task: failed after {end_at - start_at:.2f}s", + fg="red", + ) + ) + raise diff --git a/api/tests/unit_tests/schedule/__init__.py b/api/tests/unit_tests/schedule/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py b/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py new file mode 100644 index 0000000000..241e0199d5 --- /dev/null +++ b/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py @@ -0,0 +1,102 @@ +from unittest.mock import MagicMock, patch + +import pytest +from redis.exceptions import LockError + +from schedule.clean_dataset_queries_task import _effective_retention_days, clean_dataset_queries_task + + +class TestEffectiveRetentionDays: + def test_returns_requested_when_above_minimum(self): + """Retention is returned as-is when it exceeds the safe minimum.""" + with ( + patch("schedule.clean_dataset_queries_task.dify_config") as mock_cfg, + ): + mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 60 + mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30 + assert _effective_retention_days() == 60 + + def test_clamps_when_below_minimum(self): + """Retention is clamped to PLAN_SANDBOX_CLEAN_DAY_SETTING when too low.""" + with ( + patch("schedule.clean_dataset_queries_task.dify_config") as mock_cfg, + ): + mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 10 + mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30 + assert _effective_retention_days() == 30 + + +class TestCleanDatasetQueriesTask: + @patch("schedule.clean_dataset_queries_task.redis_client") + @patch("schedule.clean_dataset_queries_task.db") + @patch("schedule.clean_dataset_queries_task.dify_config") + def test_deletes_rows_older_than_retention(self, mock_cfg, mock_db, mock_redis): + """Rows older than the cutoff are deleted in batches until none remain.""" + mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 60 + mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30 + mock_cfg.CLEAN_DATASET_QUERIES_BATCH_SIZE = 2 + mock_cfg.CLEAN_DATASET_QUERIES_LOCK_TTL = 3600 + + mock_lock = MagicMock() + mock_lock.__enter__ = MagicMock(return_value=mock_lock) + mock_lock.__exit__ = MagicMock(return_value=False) + mock_redis.lock.return_value = mock_lock + + session = MagicMock() + mock_db.session = session + + batch_1 = ["id1", "id2"] + batch_2 = ["id3"] + session.scalars.return_value.all.side_effect = [batch_1, batch_2, []] + + clean_dataset_queries_task() + + assert session.execute.call_count == 2 + assert session.commit.call_count == 2 + + @patch("schedule.clean_dataset_queries_task.redis_client") + @patch("schedule.clean_dataset_queries_task.db") + @patch("schedule.clean_dataset_queries_task.dify_config") + def test_lock_held_skips(self, mock_cfg, mock_db, mock_redis): + """When the Redis lock is already held, the task raises LockError and + makes no database calls.""" + mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 60 + mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30 + + mock_lock = MagicMock() + mock_lock.__enter__ = MagicMock(side_effect=LockError) + mock_lock.__exit__ = MagicMock(return_value=False) + mock_redis.lock.return_value = mock_lock + + session = MagicMock() + mock_db.session = session + + with pytest.raises(LockError): + clean_dataset_queries_task() + + session.scalars.assert_not_called() + session.execute.assert_not_called() + + @patch("schedule.clean_dataset_queries_task.redis_client") + @patch("schedule.clean_dataset_queries_task.db") + @patch("schedule.clean_dataset_queries_task.dify_config") + def test_retention_clamped_below_minimum(self, mock_cfg, mock_db, mock_redis): + """When configured retention < PLAN_SANDBOX_CLEAN_DAY_SETTING, the + effective cutoff uses the larger value.""" + mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 10 + mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30 + mock_cfg.CLEAN_DATASET_QUERIES_BATCH_SIZE = 500 + mock_cfg.CLEAN_DATASET_QUERIES_LOCK_TTL = 3600 + + mock_lock = MagicMock() + mock_lock.__enter__ = MagicMock(return_value=mock_lock) + mock_lock.__exit__ = MagicMock(return_value=False) + mock_redis.lock.return_value = mock_lock + + session = MagicMock() + mock_db.session = session + session.scalars.return_value.all.return_value = [] + + clean_dataset_queries_task() + + session.scalars.assert_called_once() From ca8a742853ecf261fae80e0d246a9f11f74b4e91 Mon Sep 17 00:00:00 2001 From: echooffx Date: Thu, 7 May 2026 14:52:37 +0800 Subject: [PATCH 2/3] fix(api): return cleanly on lock contention in clean_dataset_queries_task Re-raise LockError after printing a skip message caused false task failures for normal lock contention. Return instead to exit cleanly. Co-Authored-By: Claude Opus 4.7 --- api/schedule/clean_dataset_queries_task.py | 9 ++++++--- .../schedule/test_clean_dataset_queries_task.py | 7 +++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/api/schedule/clean_dataset_queries_task.py b/api/schedule/clean_dataset_queries_task.py index 86511b5958..aa0c72aa60 100644 --- a/api/schedule/clean_dataset_queries_task.py +++ b/api/schedule/clean_dataset_queries_task.py @@ -88,14 +88,17 @@ def clean_dataset_queries_task() -> None: except LockError: end_at = time.perf_counter() - logger.exception("clean_dataset_queries_task: acquire task lock failed, skip current execution") + logger.warning( + "clean_dataset_queries_task: lock already held, skip current execution" + ) click.echo( click.style( - f"clean_dataset_queries_task: skipped (lock already held), latency: {end_at - start_at:.2f}s", + f"clean_dataset_queries_task: skipped (lock already held), " + f"latency: {end_at - start_at:.2f}s", fg="yellow", ) ) - raise + return except Exception: end_at = time.perf_counter() logger.exception("clean_dataset_queries_task failed") diff --git a/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py b/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py index 241e0199d5..792bc070e0 100644 --- a/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py +++ b/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py @@ -58,8 +58,8 @@ class TestCleanDatasetQueriesTask: @patch("schedule.clean_dataset_queries_task.db") @patch("schedule.clean_dataset_queries_task.dify_config") def test_lock_held_skips(self, mock_cfg, mock_db, mock_redis): - """When the Redis lock is already held, the task raises LockError and - makes no database calls.""" + """When the Redis lock is already held, the task exits cleanly without + database calls or raising an error.""" mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 60 mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30 @@ -71,8 +71,7 @@ class TestCleanDatasetQueriesTask: session = MagicMock() mock_db.session = session - with pytest.raises(LockError): - clean_dataset_queries_task() + clean_dataset_queries_task() session.scalars.assert_not_called() session.execute.assert_not_called() From 05526ce5e1d07fae1463425e4362173e57c78f18 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 7 May 2026 06:54:56 +0000 Subject: [PATCH 3/3] [autofix.ci] apply automated fixes --- api/schedule/clean_dataset_queries_task.py | 7 ++----- .../unit_tests/schedule/test_clean_dataset_queries_task.py | 1 - 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/api/schedule/clean_dataset_queries_task.py b/api/schedule/clean_dataset_queries_task.py index aa0c72aa60..0760e89bc0 100644 --- a/api/schedule/clean_dataset_queries_task.py +++ b/api/schedule/clean_dataset_queries_task.py @@ -88,13 +88,10 @@ def clean_dataset_queries_task() -> None: except LockError: end_at = time.perf_counter() - logger.warning( - "clean_dataset_queries_task: lock already held, skip current execution" - ) + logger.warning("clean_dataset_queries_task: lock already held, skip current execution") click.echo( click.style( - f"clean_dataset_queries_task: skipped (lock already held), " - f"latency: {end_at - start_at:.2f}s", + f"clean_dataset_queries_task: skipped (lock already held), latency: {end_at - start_at:.2f}s", fg="yellow", ) ) diff --git a/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py b/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py index 792bc070e0..458110e041 100644 --- a/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py +++ b/api/tests/unit_tests/schedule/test_clean_dataset_queries_task.py @@ -1,6 +1,5 @@ from unittest.mock import MagicMock, patch -import pytest from redis.exceptions import LockError from schedule.clean_dataset_queries_task import _effective_retention_days, clean_dataset_queries_task