diff --git a/api/.env.example b/api/.env.example index daee3058a0e..40055e3704f 100644 --- a/api/.env.example +++ b/api/.env.example @@ -533,6 +533,8 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000 # Maximum number of worker threads used for high-quality dataset indexing (1-10). # Lower this value to reduce memory usage and avoid OOM freezes during re-indexing. INDEXING_MAX_WORKERS=2 +# Enable indexing memory snapshots in logs for OOM diagnosis (True/False). +INDEXING_MEMORY_SNAPSHOT_ENABLED=False # Workflow runtime configuration WORKFLOW_MAX_EXECUTION_STEPS=500 diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 6c604705ecf..7c02606affd 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1113,6 +1113,11 @@ class IndexingConfig(BaseSettings): default=2, ) + INDEXING_MEMORY_SNAPSHOT_ENABLED: bool = Field( + description="Enable memory snapshot logs during high-quality indexing (for OOM diagnosis)", + default=False, + ) + class MultiModalTransferConfig(BaseSettings): MULTIMODAL_SEND_FORMAT: Literal["base64", "url"] = Field( diff --git a/api/core/indexing_runner.py b/api/core/indexing_runner.py index 9147630ddb4..8c911124f0c 100644 --- a/api/core/indexing_runner.py +++ b/api/core/indexing_runner.py @@ -9,6 +9,7 @@ import uuid from collections.abc import Mapping from typing import Any +import psutil from flask import Flask, current_app from graphon.model_runtime.entities.model_entities import ModelType from sqlalchemy import delete, func, select, update @@ -67,6 +68,23 @@ class IndexingRunner: document.stopped_at = naive_utc_now() db.session.commit() + @staticmethod + def _log_memory_snapshot(stage: str, *, dataset_document_id: str, extra: Mapping[str, Any] | None = None) -> None: + if not dify_config.INDEXING_MEMORY_SNAPSHOT_ENABLED: + return + try: + rss_bytes = psutil.Process().memory_info().rss + payload: dict[str, Any] = { + "stage": stage, + "dataset_document_id": dataset_document_id, + "rss_mb": round(rss_bytes / 1024 / 1024, 2), + } + if extra: + payload.update(extra) + logger.info("indexing-memory-snapshot %s", payload) + except Exception: + logger.exception("Failed to capture indexing memory snapshot") + def run(self, dataset_documents: list[DatasetDocument]): """Run the indexing process.""" for dataset_document in dataset_documents: @@ -607,6 +625,14 @@ class IndexingRunner: max_workers = max(1, int(dify_config.INDEXING_MAX_WORKERS)) max_workers = min(max_workers, len(documents)) if documents else 1 if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY: + self._log_memory_snapshot( + "high_quality_load_start", + dataset_document_id=dataset_document.id, + extra={ + "max_workers": max_workers, + "documents": len(documents), + }, + ) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] @@ -633,8 +659,13 @@ class IndexingRunner: ) ) - for future in futures: + for completed_idx, future in enumerate(futures, start=1): tokens += future.result() + self._log_memory_snapshot( + "high_quality_chunk_completed", + dataset_document_id=dataset_document.id, + extra={"completed_chunks": completed_idx, "total_chunks": len(futures)}, + ) if ( dataset_document.doc_form != IndexStructureType.PARENT_CHILD_INDEX and dataset.indexing_technique == IndexTechniqueType.ECONOMY @@ -731,6 +762,11 @@ class IndexingRunner: # Help reclaim memory between chunk tasks. # This is especially important for self-hosted setups that may run repeated re-indexing. gc.collect() + self._log_memory_snapshot( + "chunk_gc_collected", + dataset_document_id=dataset_document.id, + extra={"chunk_documents": len(chunk_documents)}, + ) return tokens @staticmethod diff --git a/docker/.env.example b/docker/.env.example index f9701506fb1..3c95ca5e79e 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -1012,6 +1012,8 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000 # Maximum number of worker threads used for high-quality dataset indexing (1-10). # Lower this value to reduce memory usage and avoid OOM/freezes during re-indexing. INDEXING_MAX_WORKERS=2 +# Enable indexing memory snapshots in logs for OOM diagnosis (True/False). +INDEXING_MEMORY_SNAPSHOT_ENABLED=False # Member invitation link valid time (hours), # Default: 72.