From ecec445e6297a78c0811ebd8a3b2e2af2331eb6c Mon Sep 17 00:00:00 2001 From: EndlessLucky <105.olivesoft@gmail.com> Date: Thu, 2 Apr 2026 02:31:19 -0400 Subject: [PATCH] fix: Cap indexing workers to prevent self-hosted OOM freezes --- api/.env.example | 3 ++ api/configs/feature/__init__.py | 5 ++++ api/core/indexing_runner.py | 9 +++++- .../core/rag/indexing/test_indexing_runner.py | 30 +++++++++++++++++++ 4 files changed, 46 insertions(+), 1 deletion(-) diff --git a/api/.env.example b/api/.env.example index c6541731e64..4026d024fb9 100644 --- a/api/.env.example +++ b/api/.env.example @@ -530,6 +530,9 @@ LOG_FORMAT=%(asctime)s,%(msecs)d %(levelname)-2s [%(filename)s:%(lineno)d] %(req # Indexing configuration INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000 +# Maximum number of worker threads used for high-quality dataset indexing. +# Lower this value to reduce memory usage and avoid OOM freezes during re-indexing. +INDEXING_MAX_WORKERS=2 # Workflow runtime configuration WORKFLOW_MAX_EXECUTION_STEPS=500 diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index d37cff63e90..b1d9950730d 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1108,6 +1108,11 @@ class IndexingConfig(BaseSettings): default=50, ) + INDEXING_MAX_WORKERS: PositiveInt = Field( + description="Maximum number of worker threads used for high-quality dataset indexing", + default=2, + ) + class MultiModalTransferConfig(BaseSettings): MULTIMODAL_SEND_FORMAT: Literal["base64", "url"] = Field( diff --git a/api/core/indexing_runner.py b/api/core/indexing_runner.py index b8d5ca2f50f..e9ad2aaa7e2 100644 --- a/api/core/indexing_runner.py +++ b/api/core/indexing_runner.py @@ -1,4 +1,5 @@ import concurrent.futures +import gc import json import logging import re @@ -601,7 +602,10 @@ class IndexingRunner: ) create_keyword_thread.start() - max_workers = 10 + # High-quality indexing is memory intensive (embedding generation + vector writes). + # Running too many chunks in parallel can trigger OOM and freeze the service until reboot. + max_workers = max(1, min(10, int(dify_config.INDEXING_MAX_WORKERS))) + max_workers = min(max_workers, len(documents)) if documents else 1 if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY: with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] @@ -724,6 +728,9 @@ class IndexingRunner: db.session.commit() + # Help reclaim memory between chunk tasks. + # This is especially important for self-hosted setups that may run repeated re-indexing. + gc.collect() return tokens @staticmethod diff --git a/api/tests/unit_tests/core/rag/indexing/test_indexing_runner.py b/api/tests/unit_tests/core/rag/indexing/test_indexing_runner.py index 641c5d9ba0f..57f92660a03 100644 --- a/api/tests/unit_tests/core/rag/indexing/test_indexing_runner.py +++ b/api/tests/unit_tests/core/rag/indexing/test_indexing_runner.py @@ -672,6 +672,36 @@ class TestIndexingRunnerLoad: # Verify executor was used for parallel processing assert mock_executor_instance.submit.called + def test_load_with_high_quality_respects_indexing_max_workers( + self, mock_dependencies, sample_dataset, sample_dataset_document, sample_documents + ): + """Ensure the high-quality indexing fan-out is capped by INDEXING_MAX_WORKERS.""" + runner = IndexingRunner() + mock_embedding_instance = MagicMock() + mock_embedding_instance.get_text_embedding_num_tokens.return_value = 100 + model_manager = mock_dependencies["model_manager"].return_value + model_manager.get_model_instance.return_value = mock_embedding_instance + + mock_processor = MagicMock() + + # Mock ThreadPoolExecutor + mock_future = MagicMock() + mock_future.result.return_value = 300 + mock_executor_instance = MagicMock() + mock_executor_instance.__enter__.return_value = mock_executor_instance + mock_executor_instance.__exit__.return_value = None + mock_executor_instance.submit.return_value = mock_future + mock_dependencies["executor"].return_value = mock_executor_instance + + with ( + patch.object(runner, "_update_document_index_status"), + patch("core.indexing_runner.dify_config") as mock_config, + ): + mock_config.INDEXING_MAX_WORKERS = 2 + runner._load(mock_processor, sample_dataset, sample_dataset_document, sample_documents) + + mock_dependencies["executor"].assert_called_once_with(max_workers=2) + def test_load_with_economy_indexing( self, mock_dependencies, sample_dataset, sample_dataset_document, sample_documents ):