migration command

This commit is contained in:
jyong 2025-12-12 11:02:05 +08:00
parent 67eb632f1a
commit 1baca71e37
2 changed files with 56 additions and 21 deletions

View File

@ -285,6 +285,7 @@ def migrate_knowledge_vector_database():
click.echo(click.style("Starting vector database migration.", fg="green"))
create_count = 0
skipped_count = 0
error_count = 0
total_count = 0
vector_type = dify_config.VECTOR_STORE
upper_collection_vector_types = {
@ -298,6 +299,7 @@ def migrate_knowledge_vector_database():
VectorType.OPENGAUSS,
VectorType.TABLESTORE,
VectorType.MATRIXONE,
VectorType.TIDB_ON_QDRANT
}
lower_collection_vector_types = {
VectorType.ANALYTICDB,
@ -361,13 +363,6 @@ def migrate_knowledge_vector_database():
else:
raise ValueError(f"Vector store {vector_type} is not supported.")
index_struct_dict = {
"type": vector_type,
"vector_store": {"class_prefix": collection_name},
"original_type": dataset.index_struct_dict["type"],
}
dataset.index_struct = json.dumps(index_struct_dict)
vector = Vector(dataset)
click.echo(f"Migrating dataset {dataset.id}.")
# try:
@ -394,13 +389,21 @@ def migrate_knowledge_vector_database():
documents = []
segments_count = 0
original_index_vector = Vector(dataset)
for dataset_document in dataset_documents:
single_documents = vector.search_by_metadata_field("document_id", dataset_document.id)
single_documents = original_index_vector.search_by_metadata_field("document_id", dataset_document.id)
if single_documents:
documents.extend(single_documents)
segments_count += len(single_documents)
# update dataset index_struct_dict
index_struct_dict = {
"type": vector_type,
"vector_store": {"class_prefix": collection_name},
"original_type": dataset.index_struct_dict["type"],
}
dataset.index_struct = json.dumps(index_struct_dict)
if documents:
try:
click.echo(
@ -410,7 +413,8 @@ def migrate_knowledge_vector_database():
fg="green",
)
)
vector.create_with_vectors(documents)
new_vector = Vector(dataset)
new_vector.create_with_vectors(documents)
click.echo(click.style(f"Created vector index for dataset {dataset.id}.", fg="green"))
except Exception as e:
click.echo(click.style(f"Failed to created vector index for dataset {dataset.id}.", fg="red"))
@ -422,11 +426,12 @@ def migrate_knowledge_vector_database():
except Exception as e:
db.session.rollback()
click.echo(click.style(f"Error creating dataset index: {e.__class__.__name__} {str(e)}", fg="red"))
error_count += 1
continue
click.echo(
click.style(
f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets.", fg="green"
f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets. Errors {error_count} datasets.", fg="green"
)
)

View File

@ -7,7 +7,8 @@ from typing import Any
from sqlalchemy import select
from configs import dify_config
from core.model_manager import ModelManager
from core.entities.provider_configuration import ProviderModelBundle
from core.model_manager import ModelInstance, ModelManager
from core.model_runtime.entities.model_entities import ModelType
from core.rag.datasource.vdb.vector_base import BaseVector
from core.rag.datasource.vdb.vector_type import VectorType
@ -222,7 +223,19 @@ class Vector:
embeddings.append(text.vector)
embedding_texts.append(text)
if embeddings and embedding_texts:
self._vector_processor.create(texts=embedding_texts, embeddings=embeddings, **kwargs)
# batch create documents with vectors
start = time.time()
batch_size = 500
total_batches = len(embedding_texts) + batch_size - 1
for i in range(0, len(embedding_texts), batch_size):
batch = embedding_texts[i : i + batch_size]
batch_embeddings = embeddings[i : i + batch_size]
batch_start = time.time()
self._vector_processor.create(texts=batch, embeddings=batch_embeddings, **kwargs)
logger.info(
"Embedding batch %s/%s took %s s", i // batch_size + 1, total_batches, time.time() - batch_start
)
logger.info("Embedding %s documents with vectors took %s s", len(embedding_texts), time.time() - start)
def create_multimodal(self, file_documents: list | None = None, **kwargs):
if file_documents:
@ -314,16 +327,33 @@ class Vector:
collection_exist_cache_key = f"vector_indexing_{self._vector_processor.collection_name}"
redis_client.delete(collection_exist_cache_key)
def _get_embeddings(self) -> Embeddings:
def _get_embeddings(self) -> Embeddings | None:
model_manager = ModelManager()
embedding_model = model_manager.get_model_instance(
tenant_id=self._dataset.tenant_id,
provider=self._dataset.embedding_model_provider,
model_type=ModelType.TEXT_EMBEDDING,
model=self._dataset.embedding_model,
)
return CacheEmbedding(embedding_model)
try:
embedding_model = model_manager.get_model_instance(
tenant_id=self._dataset.tenant_id,
provider=self._dataset.embedding_model_provider,
model_type=ModelType.TEXT_EMBEDDING,
model=self._dataset.embedding_model,
)
return CacheEmbedding(embedding_model)
except Exception as e:
logger.exception("Error getting embeddings: %s", e)
# return a fake embeddings
return CacheEmbedding(model_instance=ModelInstance(
provider_model_bundle=ProviderModelBundle(
configuration=ProviderConfiguration(
provider=ProviderEntity(
provider="openai",
label=I18nObject(en_US="OpenAI", zh_Hans="OpenAI"),
description=I18nObject(en_US="OpenAI provider", zh_Hans="OpenAI 提供商"),
icon_small=I18nObject(en_US="icon.png", zh_Hans="icon.png"),
icon_large=I18nObject(en_US="icon.png", zh_Hans="icon.png"),
background="background.png",
help=None,
supported_model_types=[ModelType.TEXT_EMBEDDING],
configurate_methods=[ConfigurateMethod.PREDEFINED_MODEL],
provider_credential_schema=None, model_credential_schema=None)), model="text-embedding-ada-002")))
def _filter_duplicate_texts(self, texts: list[Document]) -> list[Document]:
for text in texts.copy():