feat: implement content-based deduplication for document segments

- Add database index on (dataset_id, index_node_hash) for efficient deduplication queries
- Add deduplication check in SegmentService.create_segment and multi_create_segment methods
- Add deduplication check in DatasetDocumentStore.add_documents method to prevent duplicate embedding processing
- Skip creating segments with identical content hashes across the entire dataset

This prevents duplicate content from being re-processed and re-embedded when uploading documents with repeated content, improving efficiency and reducing unnecessary compute costs.
This commit is contained in:
Frederick2313072 2025-09-20 06:28:14 +08:00
parent 07047487c3
commit 626e71cb3b
3 changed files with 38 additions and 0 deletions

View File

@ -93,6 +93,17 @@ class DatasetDocumentStore:
segment_document = self.get_document_segment(doc_id=doc.metadata["doc_id"])
# Check if a segment with the same content hash already exists in the dataset
existing_segment_by_hash = db.session.query(DocumentSegment).filter_by(
dataset_id=self._dataset.id,
index_node_hash=doc.metadata["doc_hash"],
enabled=True
).first()
if existing_segment_by_hash:
# Skip creating duplicate segment with same content hash
continue
# NOTE: doc could already exist in the store, but we overwrite it
if not allow_update and segment_document:
raise ValueError(

View File

@ -689,6 +689,7 @@ class DocumentSegment(Base):
sa.Index("document_segment_tenant_document_idx", "document_id", "tenant_id"),
sa.Index("document_segment_node_dataset_idx", "index_node_id", "dataset_id"),
sa.Index("document_segment_tenant_idx", "tenant_id"),
sa.Index("document_segment_dataset_hash_idx", "dataset_id", "index_node_hash"),
)
# initial fields

View File

@ -2623,6 +2623,17 @@ class SegmentService:
tokens = embedding_model.get_text_embedding_num_tokens(texts=[content])[0]
lock_name = f"add_segment_lock_document_id_{document.id}"
with redis_client.lock(lock_name, timeout=600):
# Check if a segment with the same content hash already exists
existing_segment = db.session.query(DocumentSegment).filter_by(
dataset_id=document.dataset_id,
index_node_hash=segment_hash,
enabled=True
).first()
if existing_segment:
logger.info(f"Segment with same content hash already exists: {segment_hash}")
return existing_segment
max_position = (
db.session.query(func.max(DocumentSegment.position))
.where(DocumentSegment.document_id == document.id)
@ -2689,6 +2700,15 @@ class SegmentService:
.where(DocumentSegment.document_id == document.id)
.scalar()
)
# Batch query existing hashes before the loop
segment_hashes = [helper.generate_text_hash(seg["content"]) for seg in segments]
existing_segments = db.session.query(DocumentSegment.index_node_hash).filter(
DocumentSegment.dataset_id == document.dataset_id,
DocumentSegment.index_node_hash.in_(segment_hashes),
DocumentSegment.enabled == True
).all()
existing_hashes = {seg.index_node_hash for seg in existing_segments}
pre_segment_data_list = []
segment_data_list = []
keywords_list = []
@ -2697,6 +2717,12 @@ class SegmentService:
content = segment_item["content"]
doc_id = str(uuid.uuid4())
segment_hash = helper.generate_text_hash(content)
# Skip existing segments
if segment_hash in existing_hashes:
logger.info(f"Skipping duplicate segment with hash: {segment_hash}")
continue
tokens = 0
if dataset.indexing_technique == "high_quality" and embedding_model:
# calc embedding use tokens