This commit is contained in:
ふるい 2026-05-09 09:25:13 +08:00 committed by GitHub
commit 1f2efd3026
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 259 additions and 0 deletions

View File

@ -1238,6 +1238,25 @@ class CeleryScheduleTasksConfig(BaseSettings):
default=60 * 60,
)
# Clean dataset_queries scheduled task
ENABLE_CLEAN_DATASET_QUERIES_TASK: bool = Field(
description="Enable scheduled cleanup for the dataset_queries table",
default=False,
)
CLEAN_DATASET_QUERIES_RETENTION_DAYS: PositiveInt = Field(
description="Retention days for dataset_queries rows; "
"must be >= PLAN_SANDBOX_CLEAN_DAY_SETTING to avoid breaking clean_unused_datasets_task",
default=60,
)
CLEAN_DATASET_QUERIES_BATCH_SIZE: PositiveInt = Field(
description="Batch size for dataset_queries cleanup",
default=500,
)
CLEAN_DATASET_QUERIES_LOCK_TTL: PositiveInt = Field(
description="Redis lock TTL in seconds for the dataset_queries cleanup task",
default=3600,
)
class PositionConfig(BaseSettings):
POSITION_PROVIDER_PINS: str = Field(

View File

@ -165,6 +165,12 @@ def init_app(app: DifyApp) -> Celery:
"task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task",
"schedule": crontab(minute="0", hour="3", day_of_month=f"*/{day}"),
}
if dify_config.ENABLE_CLEAN_DATASET_QUERIES_TASK:
imports.append("schedule.clean_dataset_queries_task")
beat_schedule["clean_dataset_queries_task"] = {
"task": "schedule.clean_dataset_queries_task.clean_dataset_queries_task",
"schedule": crontab(minute="0", hour="5", day_of_month=f"*/{day}"),
}
if dify_config.ENABLE_CREATE_TIDB_SERVERLESS_TASK:
imports.append("schedule.create_tidb_serverless_task")
beat_schedule["create_tidb_serverless_task"] = {

View File

@ -0,0 +1,25 @@
"""add dataset_queries created_at index
Revision ID: 67b5709d7d0a
Revises: 227822d22895
Create Date: 2026-04-30 16:00:00.000000
"""
from alembic import op
import models as models
# revision identifiers, used by Alembic.
revision = '67b5709d7d0a'
down_revision = '227822d22895'
branch_labels = None
depends_on = None
def upgrade():
with op.batch_alter_table('dataset_queries', schema=None) as batch_op:
batch_op.create_index('dataset_query_created_at_idx', ['created_at'], unique=False)
def downgrade():
with op.batch_alter_table('dataset_queries', schema=None) as batch_op:
batch_op.drop_index('dataset_query_created_at_idx')

View File

@ -1127,6 +1127,7 @@ class DatasetQuery(TypeBase):
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="dataset_query_pkey"),
sa.Index("dataset_query_dataset_id_idx", "dataset_id"),
sa.Index("dataset_query_created_at_idx", "created_at"),
)
id: Mapped[str] = mapped_column(

View File

@ -0,0 +1,108 @@
"""Periodic cleanup for the ``dataset_queries`` table.
Every RAG retrieval and hit-testing operation inserts a row, and this table
grows without bound unless we actively prune it.
Important invariant: ``clean_unused_datasets_task`` reads
``DatasetQuery.created_at`` to decide whether a dataset has been queried
recently (window = ``PLAN_SANDBOX_CLEAN_DAY_SETTING``). Deleting rows younger
than that window would cause datasets to be incorrectly marked unused and have
their documents disabled. We therefore clamp the effective retention to
``max(CLEAN_DATASET_QUERIES_RETENTION_DAYS, PLAN_SANDBOX_CLEAN_DAY_SETTING)``.
"""
import datetime
import logging
import time
import click
from redis.exceptions import LockError
from sqlalchemy import delete, select
import app
from configs import dify_config
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.dataset import DatasetQuery
logger = logging.getLogger(__name__)
def _effective_retention_days() -> int:
"""Return the retention days after clamping to the minimum safe threshold."""
requested = dify_config.CLEAN_DATASET_QUERIES_RETENTION_DAYS
minimum = dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING
if requested < minimum:
logger.warning(
"CLEAN_DATASET_QUERIES_RETENTION_DAYS (%d) < PLAN_SANDBOX_CLEAN_DAY_SETTING (%d); "
"clamping to %d to avoid breaking clean_unused_datasets_task",
requested,
minimum,
minimum,
)
return minimum
return requested
@app.celery.task(queue="dataset")
def clean_dataset_queries_task() -> None:
"""Delete ``dataset_queries`` rows older than the effective retention window."""
click.echo(click.style("Start clean dataset_queries.", fg="green"))
start_at = time.perf_counter()
retention_days = _effective_retention_days()
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=retention_days)
batch_size = dify_config.CLEAN_DATASET_QUERIES_BATCH_SIZE
try:
with redis_client.lock(
"retention:clean_dataset_queries_task",
timeout=dify_config.CLEAN_DATASET_QUERIES_LOCK_TTL,
blocking=False,
):
total_deleted = 0
batch_count = 0
while True:
batch_count += 1
ids = db.session.scalars(
select(DatasetQuery.id).where(DatasetQuery.created_at < cutoff_date).limit(batch_size)
).all()
if not ids:
break
db.session.execute(delete(DatasetQuery).where(DatasetQuery.id.in_(ids)))
db.session.commit()
total_deleted += len(ids)
end_at = time.perf_counter()
click.echo(
click.style(
f"Cleaned {total_deleted} dataset_queries rows "
f"older than {retention_days} days "
f"in {batch_count} batches, latency: {end_at - start_at:.2f}s",
fg="green",
)
)
except LockError:
end_at = time.perf_counter()
logger.warning("clean_dataset_queries_task: lock already held, skip current execution")
click.echo(
click.style(
f"clean_dataset_queries_task: skipped (lock already held), latency: {end_at - start_at:.2f}s",
fg="yellow",
)
)
return
except Exception:
end_at = time.perf_counter()
logger.exception("clean_dataset_queries_task failed")
click.echo(
click.style(
f"clean_dataset_queries_task: failed after {end_at - start_at:.2f}s",
fg="red",
)
)
raise

View File

@ -0,0 +1,100 @@
from unittest.mock import MagicMock, patch
from redis.exceptions import LockError
from schedule.clean_dataset_queries_task import _effective_retention_days, clean_dataset_queries_task
class TestEffectiveRetentionDays:
def test_returns_requested_when_above_minimum(self):
"""Retention is returned as-is when it exceeds the safe minimum."""
with (
patch("schedule.clean_dataset_queries_task.dify_config") as mock_cfg,
):
mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 60
mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30
assert _effective_retention_days() == 60
def test_clamps_when_below_minimum(self):
"""Retention is clamped to PLAN_SANDBOX_CLEAN_DAY_SETTING when too low."""
with (
patch("schedule.clean_dataset_queries_task.dify_config") as mock_cfg,
):
mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 10
mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30
assert _effective_retention_days() == 30
class TestCleanDatasetQueriesTask:
@patch("schedule.clean_dataset_queries_task.redis_client")
@patch("schedule.clean_dataset_queries_task.db")
@patch("schedule.clean_dataset_queries_task.dify_config")
def test_deletes_rows_older_than_retention(self, mock_cfg, mock_db, mock_redis):
"""Rows older than the cutoff are deleted in batches until none remain."""
mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 60
mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30
mock_cfg.CLEAN_DATASET_QUERIES_BATCH_SIZE = 2
mock_cfg.CLEAN_DATASET_QUERIES_LOCK_TTL = 3600
mock_lock = MagicMock()
mock_lock.__enter__ = MagicMock(return_value=mock_lock)
mock_lock.__exit__ = MagicMock(return_value=False)
mock_redis.lock.return_value = mock_lock
session = MagicMock()
mock_db.session = session
batch_1 = ["id1", "id2"]
batch_2 = ["id3"]
session.scalars.return_value.all.side_effect = [batch_1, batch_2, []]
clean_dataset_queries_task()
assert session.execute.call_count == 2
assert session.commit.call_count == 2
@patch("schedule.clean_dataset_queries_task.redis_client")
@patch("schedule.clean_dataset_queries_task.db")
@patch("schedule.clean_dataset_queries_task.dify_config")
def test_lock_held_skips(self, mock_cfg, mock_db, mock_redis):
"""When the Redis lock is already held, the task exits cleanly without
database calls or raising an error."""
mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 60
mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30
mock_lock = MagicMock()
mock_lock.__enter__ = MagicMock(side_effect=LockError)
mock_lock.__exit__ = MagicMock(return_value=False)
mock_redis.lock.return_value = mock_lock
session = MagicMock()
mock_db.session = session
clean_dataset_queries_task()
session.scalars.assert_not_called()
session.execute.assert_not_called()
@patch("schedule.clean_dataset_queries_task.redis_client")
@patch("schedule.clean_dataset_queries_task.db")
@patch("schedule.clean_dataset_queries_task.dify_config")
def test_retention_clamped_below_minimum(self, mock_cfg, mock_db, mock_redis):
"""When configured retention < PLAN_SANDBOX_CLEAN_DAY_SETTING, the
effective cutoff uses the larger value."""
mock_cfg.CLEAN_DATASET_QUERIES_RETENTION_DAYS = 10
mock_cfg.PLAN_SANDBOX_CLEAN_DAY_SETTING = 30
mock_cfg.CLEAN_DATASET_QUERIES_BATCH_SIZE = 500
mock_cfg.CLEAN_DATASET_QUERIES_LOCK_TTL = 3600
mock_lock = MagicMock()
mock_lock.__enter__ = MagicMock(return_value=mock_lock)
mock_lock.__exit__ = MagicMock(return_value=False)
mock_redis.lock.return_value = mock_lock
session = MagicMock()
mock_db.session = session
session.scalars.return_value.all.return_value = []
clean_dataset_queries_task()
session.scalars.assert_called_once()