fix: add index initialization checks, improve batch vector operations and search, ensure robust exception handling.

This commit is contained in:
Frederick2313072 2025-09-23 16:41:46 +08:00
parent 1f36c0c1c5
commit 12de554313
1 changed files with 86 additions and 67 deletions

View File

@ -1,7 +1,6 @@
import json
import time
import uuid
from typing import Any, Optional, Union
from typing import Any, Optional
from pinecone import Pinecone, ServerlessSpec
from pydantic import BaseModel
@ -30,7 +29,7 @@ class PineconeConfig(BaseModel):
class PineconeVector(BaseVector):
"""Pinecone vector database concrete implementation class"""
def __init__(self, collection_name: str, group_id: str, config: PineconeConfig):
super().__init__(collection_name)
self._client_config = config
@ -45,8 +44,6 @@ class PineconeVector(BaseVector):
)
except Exception as e:
# Fallback to basic initialization if SSL config fails
import logging
logging.warning(f"Failed to initialize Pinecone with SSL config: {e}, using basic config")
self._pc = Pinecone(api_key=config.api_key)
# Normalize index name: lowercase, only a-z0-9- and <=45 chars
@ -69,6 +66,19 @@ class PineconeVector(BaseVector):
"""Return vector database type identifier"""
return "pinecone"
def _ensure_index_initialized(self) -> None:
"""Ensure that self._index is attached to an existing Pinecone index."""
if self._index is not None:
return
try:
existing_indexes = self._pc.list_indexes().names()
if self._index_name in existing_indexes:
self._index = self._pc.Index(self._index_name)
else:
raise ValueError("Index not initialized. Please ingest documents to create index.")
except Exception:
raise
def to_index_struct(self) -> dict:
"""Generate index structure dictionary"""
return {
@ -90,21 +100,18 @@ class PineconeVector(BaseVector):
def create_index(self, dimension: int):
"""Create Pinecone index"""
# Debug: Log the index name being used
import logging
logging.warning(f"Pinecone: Creating index with name: {self._index_name} (length: {len(self._index_name)})")
lock_name = f"vector_indexing_lock_{self._index_name}"
with redis_client.lock(lock_name, timeout=30):
# Check Redis cache
index_exist_cache_key = f"vector_indexing_{self._index_name}"
if redis_client.get(index_exist_cache_key):
self._index = self._pc.Index(self._index_name)
return
# Check if index already exists
existing_indexes = self._pc.list_indexes().names()
if self._index_name not in existing_indexes:
# Create new index using ServerlessSpec
self._pc.create_index(
@ -116,14 +123,14 @@ class PineconeVector(BaseVector):
region=self._client_config.environment
)
)
# Wait for index creation to complete
while not self._pc.describe_index(self._index_name).status['ready']:
time.sleep(1)
# Get index instance
self._index = self._pc.Index(self._index_name)
else:
# Get index instance
self._index = self._pc.Index(self._index_name)
# Set cache
redis_client.set(index_exist_cache_key, 1, ex=3600)
@ -131,17 +138,21 @@ class PineconeVector(BaseVector):
"""Batch add document vectors"""
if not self._index:
raise ValueError("Index not initialized. Call create() first.")
total_docs = len(documents)
uuids = self._get_uuids(documents)
batch_size = self._client_config.batch_size
added_ids = []
# Batch processing
for i in range(0, len(documents), batch_size):
total_batches = (total_docs + batch_size - 1) // batch_size # Ceiling division
for batch_idx, i in enumerate(range(0, len(documents), batch_size), 1):
batch_documents = documents[i:i + batch_size]
batch_embeddings = embeddings[i:i + batch_size]
batch_uuids = uuids[i:i + batch_size]
batch_size_actual = len(batch_documents)
# Build Pinecone vector data (metadata must be primitives or list[str])
vectors_to_upsert = []
for doc, embedding, doc_id in zip(batch_documents, batch_embeddings, batch_uuids):
@ -166,52 +177,62 @@ class PineconeVector(BaseVector):
"values": embedding,
"metadata": safe_meta
})
# Batch insert to Pinecone
self._index.upsert(vectors=vectors_to_upsert)
added_ids.extend(batch_uuids)
try:
self._index.upsert(vectors=vectors_to_upsert)
added_ids.extend(batch_uuids)
except Exception as e:
raise
return added_ids
def search_by_vector(self, query_vector: list[float], **kwargs) -> list[Document]:
"""Vector similarity search"""
if not self._index:
raise ValueError("Index not initialized.")
# Lazily attach to an existing index if needed
self._ensure_index_initialized()
top_k = kwargs.get("top_k", 4)
score_threshold = float(kwargs.get("score_threshold", 0.0))
# Build filter conditions
filter_dict = {Field.GROUP_KEY.value: {"$eq": self._group_id}}
filter_dict = {Field.GROUP_KEY.value: {"$eq": str(self._group_id)}}
# Document scope filtering
document_ids_filter = kwargs.get("document_ids_filter")
if document_ids_filter:
filter_dict[f"{Field.METADATA_KEY.value}.document_id"] = {"$in": document_ids_filter}
filter_dict["document_id"] = {"$in": document_ids_filter}
# Execute search
response = self._index.query(
vector=query_vector,
top_k=top_k,
include_metadata=True,
filter=filter_dict
)
try:
response = self._index.query(
vector=query_vector,
top_k=top_k,
include_metadata=True,
filter=filter_dict
)
except Exception as e:
raise
# Convert results
docs = []
filtered_count = 0
for match in response.matches:
if match.score >= score_threshold:
metadata = match.metadata.get(Field.METADATA_KEY.value, {})
page_content = match.metadata.get(Field.CONTENT_KEY.value, "")
metadata = dict(match.metadata or {})
metadata.pop(Field.CONTENT_KEY.value, None)
metadata.pop(Field.GROUP_KEY.value, None)
metadata["score"] = match.score
doc = Document(
page_content=match.metadata.get(Field.CONTENT_KEY.value, ""),
metadata=metadata,
)
doc = Document(page_content=page_content, metadata=metadata)
docs.append(doc)
else:
filtered_count += 1
# Sort by similarity score in descending order
docs.sort(key=lambda x: x.metadata.get("score", 0), reverse=True)
return docs
def search_by_full_text(self, query: str, **kwargs) -> list[Document]:
@ -220,57 +241,55 @@ class PineconeVector(BaseVector):
def delete_by_metadata_field(self, key: str, value: str):
"""Delete by metadata field"""
if not self._index:
return
self._ensure_index_initialized()
try:
# Build filter conditions
filter_dict = {
Field.GROUP_KEY.value: {"$eq": self._group_id},
f"{Field.METADATA_KEY.value}.{key}": {"$eq": value}
}
# Pinecone delete operation
self._index.delete(filter=filter_dict)
except Exception:
except Exception as e:
# Ignore delete errors
pass
def delete_by_ids(self, ids: list[str]) -> None:
"""Batch delete by ID list"""
if not self._index:
return
self._ensure_index_initialized()
try:
# Pinecone delete by ID
self._index.delete(ids=ids)
except Exception:
# Ignore delete errors
pass
except Exception as e:
raise
def delete(self) -> None:
"""Delete all vector data for the entire dataset"""
if not self._index:
return
self._ensure_index_initialized()
try:
# Delete all vectors by group_id
filter_dict = {Field.GROUP_KEY.value: {"$eq": self._group_id}}
self._index.delete(filter=filter_dict)
except Exception:
# Ignore delete errors
pass
except Exception as e:
raise
def text_exists(self, id: str) -> bool:
"""Check if document exists"""
if not self._index:
try:
self._ensure_index_initialized()
except Exception:
return False
try:
# Check if vector exists through query
response = self._index.fetch(ids=[id])
return id in response.vectors
except Exception:
exists = id in response.vectors
return exists
except Exception as e:
return False