fix feedback

This commit is contained in:
EndlessLucky 2026-04-07 08:55:29 -04:00
parent 8d525f9cd8
commit 117ff3d89f
4 changed files with 46 additions and 1 deletions

View File

@ -533,6 +533,8 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
# Maximum number of worker threads used for high-quality dataset indexing (1-10).
# Lower this value to reduce memory usage and avoid OOM freezes during re-indexing.
INDEXING_MAX_WORKERS=2
# Enable indexing memory snapshots in logs for OOM diagnosis (True/False).
INDEXING_MEMORY_SNAPSHOT_ENABLED=False
# Workflow runtime configuration
WORKFLOW_MAX_EXECUTION_STEPS=500

View File

@ -1113,6 +1113,11 @@ class IndexingConfig(BaseSettings):
default=2,
)
INDEXING_MEMORY_SNAPSHOT_ENABLED: bool = Field(
description="Enable memory snapshot logs during high-quality indexing (for OOM diagnosis)",
default=False,
)
class MultiModalTransferConfig(BaseSettings):
MULTIMODAL_SEND_FORMAT: Literal["base64", "url"] = Field(

View File

@ -9,6 +9,7 @@ import uuid
from collections.abc import Mapping
from typing import Any
import psutil
from flask import Flask, current_app
from graphon.model_runtime.entities.model_entities import ModelType
from sqlalchemy import delete, func, select, update
@ -67,6 +68,23 @@ class IndexingRunner:
document.stopped_at = naive_utc_now()
db.session.commit()
@staticmethod
def _log_memory_snapshot(stage: str, *, dataset_document_id: str, extra: Mapping[str, Any] | None = None) -> None:
if not dify_config.INDEXING_MEMORY_SNAPSHOT_ENABLED:
return
try:
rss_bytes = psutil.Process().memory_info().rss
payload: dict[str, Any] = {
"stage": stage,
"dataset_document_id": dataset_document_id,
"rss_mb": round(rss_bytes / 1024 / 1024, 2),
}
if extra:
payload.update(extra)
logger.info("indexing-memory-snapshot %s", payload)
except Exception:
logger.exception("Failed to capture indexing memory snapshot")
def run(self, dataset_documents: list[DatasetDocument]):
"""Run the indexing process."""
for dataset_document in dataset_documents:
@ -607,6 +625,14 @@ class IndexingRunner:
max_workers = max(1, int(dify_config.INDEXING_MAX_WORKERS))
max_workers = min(max_workers, len(documents)) if documents else 1
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
self._log_memory_snapshot(
"high_quality_load_start",
dataset_document_id=dataset_document.id,
extra={
"max_workers": max_workers,
"documents": len(documents),
},
)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
@ -633,8 +659,13 @@ class IndexingRunner:
)
)
for future in futures:
for completed_idx, future in enumerate(futures, start=1):
tokens += future.result()
self._log_memory_snapshot(
"high_quality_chunk_completed",
dataset_document_id=dataset_document.id,
extra={"completed_chunks": completed_idx, "total_chunks": len(futures)},
)
if (
dataset_document.doc_form != IndexStructureType.PARENT_CHILD_INDEX
and dataset.indexing_technique == IndexTechniqueType.ECONOMY
@ -731,6 +762,11 @@ class IndexingRunner:
# Help reclaim memory between chunk tasks.
# This is especially important for self-hosted setups that may run repeated re-indexing.
gc.collect()
self._log_memory_snapshot(
"chunk_gc_collected",
dataset_document_id=dataset_document.id,
extra={"chunk_documents": len(chunk_documents)},
)
return tokens
@staticmethod

View File

@ -1012,6 +1012,8 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
# Maximum number of worker threads used for high-quality dataset indexing (1-10).
# Lower this value to reduce memory usage and avoid OOM/freezes during re-indexing.
INDEXING_MAX_WORKERS=2
# Enable indexing memory snapshots in logs for OOM diagnosis (True/False).
INDEXING_MEMORY_SNAPSHOT_ENABLED=False
# Member invitation link valid time (hours),
# Default: 72.