diff --git a/api/commands.py b/api/commands.py index 4d15171034..c7033d7b46 100644 --- a/api/commands.py +++ b/api/commands.py @@ -285,6 +285,7 @@ def migrate_knowledge_vector_database(): click.echo(click.style("Starting vector database migration.", fg="green")) create_count = 0 skipped_count = 0 + error_count = 0 total_count = 0 vector_type = dify_config.VECTOR_STORE upper_collection_vector_types = { @@ -298,6 +299,7 @@ def migrate_knowledge_vector_database(): VectorType.OPENGAUSS, VectorType.TABLESTORE, VectorType.MATRIXONE, + VectorType.TIDB_ON_QDRANT } lower_collection_vector_types = { VectorType.ANALYTICDB, @@ -361,13 +363,6 @@ def migrate_knowledge_vector_database(): else: raise ValueError(f"Vector store {vector_type} is not supported.") - index_struct_dict = { - "type": vector_type, - "vector_store": {"class_prefix": collection_name}, - "original_type": dataset.index_struct_dict["type"], - } - dataset.index_struct = json.dumps(index_struct_dict) - vector = Vector(dataset) click.echo(f"Migrating dataset {dataset.id}.") # try: @@ -394,13 +389,21 @@ def migrate_knowledge_vector_database(): documents = [] segments_count = 0 + original_index_vector = Vector(dataset) for dataset_document in dataset_documents: - single_documents = vector.search_by_metadata_field("document_id", dataset_document.id) + single_documents = original_index_vector.search_by_metadata_field("document_id", dataset_document.id) if single_documents: documents.extend(single_documents) segments_count += len(single_documents) + # update dataset index_struct_dict + index_struct_dict = { + "type": vector_type, + "vector_store": {"class_prefix": collection_name}, + "original_type": dataset.index_struct_dict["type"], + } + dataset.index_struct = json.dumps(index_struct_dict) if documents: try: click.echo( @@ -410,7 +413,8 @@ def migrate_knowledge_vector_database(): fg="green", ) ) - vector.create_with_vectors(documents) + new_vector = Vector(dataset) + new_vector.create_with_vectors(documents) click.echo(click.style(f"Created vector index for dataset {dataset.id}.", fg="green")) except Exception as e: click.echo(click.style(f"Failed to created vector index for dataset {dataset.id}.", fg="red")) @@ -422,11 +426,12 @@ def migrate_knowledge_vector_database(): except Exception as e: db.session.rollback() click.echo(click.style(f"Error creating dataset index: {e.__class__.__name__} {str(e)}", fg="red")) + error_count += 1 continue click.echo( click.style( - f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets.", fg="green" + f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets. Errors {error_count} datasets.", fg="green" ) ) diff --git a/api/core/rag/datasource/vdb/vector_factory.py b/api/core/rag/datasource/vdb/vector_factory.py index 53b8af604e..bc14298bbd 100644 --- a/api/core/rag/datasource/vdb/vector_factory.py +++ b/api/core/rag/datasource/vdb/vector_factory.py @@ -7,7 +7,8 @@ from typing import Any from sqlalchemy import select from configs import dify_config -from core.model_manager import ModelManager +from core.entities.provider_configuration import ProviderModelBundle +from core.model_manager import ModelInstance, ModelManager from core.model_runtime.entities.model_entities import ModelType from core.rag.datasource.vdb.vector_base import BaseVector from core.rag.datasource.vdb.vector_type import VectorType @@ -222,7 +223,19 @@ class Vector: embeddings.append(text.vector) embedding_texts.append(text) if embeddings and embedding_texts: - self._vector_processor.create(texts=embedding_texts, embeddings=embeddings, **kwargs) + # batch create documents with vectors + start = time.time() + batch_size = 500 + total_batches = len(embedding_texts) + batch_size - 1 + for i in range(0, len(embedding_texts), batch_size): + batch = embedding_texts[i : i + batch_size] + batch_embeddings = embeddings[i : i + batch_size] + batch_start = time.time() + self._vector_processor.create(texts=batch, embeddings=batch_embeddings, **kwargs) + logger.info( + "Embedding batch %s/%s took %s s", i // batch_size + 1, total_batches, time.time() - batch_start + ) + logger.info("Embedding %s documents with vectors took %s s", len(embedding_texts), time.time() - start) def create_multimodal(self, file_documents: list | None = None, **kwargs): if file_documents: @@ -314,16 +327,33 @@ class Vector: collection_exist_cache_key = f"vector_indexing_{self._vector_processor.collection_name}" redis_client.delete(collection_exist_cache_key) - def _get_embeddings(self) -> Embeddings: + def _get_embeddings(self) -> Embeddings | None: model_manager = ModelManager() - - embedding_model = model_manager.get_model_instance( - tenant_id=self._dataset.tenant_id, - provider=self._dataset.embedding_model_provider, - model_type=ModelType.TEXT_EMBEDDING, - model=self._dataset.embedding_model, - ) - return CacheEmbedding(embedding_model) + try: + embedding_model = model_manager.get_model_instance( + tenant_id=self._dataset.tenant_id, + provider=self._dataset.embedding_model_provider, + model_type=ModelType.TEXT_EMBEDDING, + model=self._dataset.embedding_model, + ) + return CacheEmbedding(embedding_model) + except Exception as e: + logger.exception("Error getting embeddings: %s", e) + # return a fake embeddings + return CacheEmbedding(model_instance=ModelInstance( + provider_model_bundle=ProviderModelBundle( + configuration=ProviderConfiguration( + provider=ProviderEntity( + provider="openai", + label=I18nObject(en_US="OpenAI", zh_Hans="OpenAI"), + description=I18nObject(en_US="OpenAI provider", zh_Hans="OpenAI 提供商"), + icon_small=I18nObject(en_US="icon.png", zh_Hans="icon.png"), + icon_large=I18nObject(en_US="icon.png", zh_Hans="icon.png"), + background="background.png", + help=None, + supported_model_types=[ModelType.TEXT_EMBEDDING], + configurate_methods=[ConfigurateMethod.PREDEFINED_MODEL], + provider_credential_schema=None, model_credential_schema=None)), model="text-embedding-ada-002"))) def _filter_duplicate_texts(self, texts: list[Document]) -> list[Document]: for text in texts.copy():