mirror of
https://github.com/langgenius/dify.git
synced 2026-06-26 23:01:11 +08:00
fix: Cap indexing workers to prevent self-hosted OOM freezes
This commit is contained in:
parent
43c48ba4d7
commit
ecec445e62
@ -530,6 +530,9 @@ LOG_FORMAT=%(asctime)s,%(msecs)d %(levelname)-2s [%(filename)s:%(lineno)d] %(req
|
||||
|
||||
# Indexing configuration
|
||||
INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
|
||||
# Maximum number of worker threads used for high-quality dataset indexing.
|
||||
# Lower this value to reduce memory usage and avoid OOM freezes during re-indexing.
|
||||
INDEXING_MAX_WORKERS=2
|
||||
|
||||
# Workflow runtime configuration
|
||||
WORKFLOW_MAX_EXECUTION_STEPS=500
|
||||
|
||||
@ -1108,6 +1108,11 @@ class IndexingConfig(BaseSettings):
|
||||
default=50,
|
||||
)
|
||||
|
||||
INDEXING_MAX_WORKERS: PositiveInt = Field(
|
||||
description="Maximum number of worker threads used for high-quality dataset indexing",
|
||||
default=2,
|
||||
)
|
||||
|
||||
|
||||
class MultiModalTransferConfig(BaseSettings):
|
||||
MULTIMODAL_SEND_FORMAT: Literal["base64", "url"] = Field(
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import concurrent.futures
|
||||
import gc
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
@ -601,7 +602,10 @@ class IndexingRunner:
|
||||
)
|
||||
create_keyword_thread.start()
|
||||
|
||||
max_workers = 10
|
||||
# High-quality indexing is memory intensive (embedding generation + vector writes).
|
||||
# Running too many chunks in parallel can trigger OOM and freeze the service until reboot.
|
||||
max_workers = max(1, min(10, int(dify_config.INDEXING_MAX_WORKERS)))
|
||||
max_workers = min(max_workers, len(documents)) if documents else 1
|
||||
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = []
|
||||
@ -724,6 +728,9 @@ class IndexingRunner:
|
||||
|
||||
db.session.commit()
|
||||
|
||||
# Help reclaim memory between chunk tasks.
|
||||
# This is especially important for self-hosted setups that may run repeated re-indexing.
|
||||
gc.collect()
|
||||
return tokens
|
||||
|
||||
@staticmethod
|
||||
|
||||
@ -672,6 +672,36 @@ class TestIndexingRunnerLoad:
|
||||
# Verify executor was used for parallel processing
|
||||
assert mock_executor_instance.submit.called
|
||||
|
||||
def test_load_with_high_quality_respects_indexing_max_workers(
|
||||
self, mock_dependencies, sample_dataset, sample_dataset_document, sample_documents
|
||||
):
|
||||
"""Ensure the high-quality indexing fan-out is capped by INDEXING_MAX_WORKERS."""
|
||||
runner = IndexingRunner()
|
||||
mock_embedding_instance = MagicMock()
|
||||
mock_embedding_instance.get_text_embedding_num_tokens.return_value = 100
|
||||
model_manager = mock_dependencies["model_manager"].return_value
|
||||
model_manager.get_model_instance.return_value = mock_embedding_instance
|
||||
|
||||
mock_processor = MagicMock()
|
||||
|
||||
# Mock ThreadPoolExecutor
|
||||
mock_future = MagicMock()
|
||||
mock_future.result.return_value = 300
|
||||
mock_executor_instance = MagicMock()
|
||||
mock_executor_instance.__enter__.return_value = mock_executor_instance
|
||||
mock_executor_instance.__exit__.return_value = None
|
||||
mock_executor_instance.submit.return_value = mock_future
|
||||
mock_dependencies["executor"].return_value = mock_executor_instance
|
||||
|
||||
with (
|
||||
patch.object(runner, "_update_document_index_status"),
|
||||
patch("core.indexing_runner.dify_config") as mock_config,
|
||||
):
|
||||
mock_config.INDEXING_MAX_WORKERS = 2
|
||||
runner._load(mock_processor, sample_dataset, sample_dataset_document, sample_documents)
|
||||
|
||||
mock_dependencies["executor"].assert_called_once_with(max_workers=2)
|
||||
|
||||
def test_load_with_economy_indexing(
|
||||
self, mock_dependencies, sample_dataset, sample_dataset_document, sample_documents
|
||||
):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user