add qdrant to tidb

This commit is contained in:
jyong 2025-12-17 19:02:25 +08:00
parent 201de91665
commit 03e3208c71
1 changed files with 76 additions and 52 deletions

View File

@ -32,7 +32,7 @@ from libs.helper import email as email_validate
from libs.password import hash_password, password_pattern, valid_password
from libs.rsa import generate_key_pair
from models import Tenant
from models.dataset import Dataset, DatasetCollectionBinding, DatasetMetadata, DatasetMetadataBinding, DocumentSegment
from models.dataset import Dataset, DatasetCollectionBinding, DatasetMetadata, DatasetMetadataBinding
from models.dataset import Document as DatasetDocument
from models.model import Account, App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation, UploadFile
from models.oauth import DatasourceOauthParamConfig, DatasourceProvider
@ -175,54 +175,67 @@ def migrate_annotation_vector_database():
create_count = 0
skipped_count = 0
total_count = 0
page = 1
error_count = 0
per_page = 50
# Keyset pagination on AppAnnotationSetting (much smaller dataset than App table)
last_created_at = None
last_id = None
while True:
try:
# get apps info
per_page = 50
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
apps = (
session.query(App)
# Query AppAnnotationSetting directly instead of scanning all Apps
query = (
session.query(AppAnnotationSetting, App, DatasetCollectionBinding)
.join(App, App.id == AppAnnotationSetting.app_id)
.join(
DatasetCollectionBinding,
DatasetCollectionBinding.id == AppAnnotationSetting.collection_binding_id,
)
.where(App.status == "normal")
.order_by(App.created_at.desc())
)
# Apply keyset pagination condition
if last_created_at is not None and last_id is not None:
query = query.where(
sa.or_(
AppAnnotationSetting.created_at < last_created_at,
sa.and_(
AppAnnotationSetting.created_at == last_created_at,
AppAnnotationSetting.id < last_id,
),
)
)
results = (
query.order_by(AppAnnotationSetting.created_at.desc(), AppAnnotationSetting.id.desc())
.limit(per_page)
.offset((page - 1) * per_page)
.all()
)
if not apps:
if not results:
break
# Update cursor to the last record of current batch
last_created_at = results[-1][0].created_at
last_id = results[-1][0].id
except SQLAlchemyError:
raise
page += 1
for app in apps:
total_count = total_count + 1
for app_annotation_setting, app, dataset_collection_binding in results:
total_count += 1
click.echo(
f"Processing the {total_count} app {app.id}. " + f"{create_count} created, {skipped_count} skipped."
f"Processing the {total_count} app {app.id}. {create_count} created, {skipped_count} skipped."
)
try:
click.echo(f"Creating app annotation index: {app.id}")
if not app_annotation_setting:
skipped_count += 1
continue
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
app_annotation_setting = (
session.query(AppAnnotationSetting).where(AppAnnotationSetting.app_id == app.id).first()
)
if not app_annotation_setting:
skipped_count = skipped_count + 1
click.echo(f"App annotation setting disabled: {app.id}")
continue
# get dataset_collection_binding info
dataset_collection_binding = (
session.query(DatasetCollectionBinding)
.where(DatasetCollectionBinding.id == app_annotation_setting.collection_binding_id)
.first()
)
if not dataset_collection_binding:
click.echo(f"App annotation collection binding not found: {app.id}")
continue
annotations = session.scalars(
select(MessageAnnotation).where(MessageAnnotation.app_id == app.id)
).all()
dataset = Dataset(
id=app.id,
tenant_id=app.tenant_id,
@ -230,6 +243,12 @@ def migrate_annotation_vector_database():
embedding_model_provider=dataset_collection_binding.provider_name,
embedding_model=dataset_collection_binding.model_name,
collection_binding_id=dataset_collection_binding.id,
index_struct=json.dumps({
"type": VectorType.QDRANT,
"vector_store": {
"class_prefix": dataset_collection_binding.collection_name,
}
}),
)
documents = []
if annotations:
@ -240,31 +259,34 @@ def migrate_annotation_vector_database():
)
documents.append(document)
vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"])
original_vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"])
click.echo(f"Migrating annotations for app: {app.id}.")
try:
vector.delete()
click.echo(click.style(f"Deleted vector index for app {app.id}.", fg="green"))
except Exception as e:
click.echo(click.style(f"Failed to delete vector index for app {app.id}.", fg="red"))
raise e
if documents:
try:
click.echo(
click.style(
f"Creating vector index with {len(documents)} annotations for app {app.id}.",
fg="green",
)
original_annotarions = original_vector.search_by_metadata_field("app_id", app.id)
if original_annotarions:
new_dataset = Dataset(
id=app.id,
tenant_id=app.tenant_id,
indexing_technique="high_quality",
embedding_model_provider=dataset_collection_binding.provider_name,
embedding_model=dataset_collection_binding.model_name,
collection_binding_id=dataset_collection_binding.id,
)
vector.create(documents)
new_vector = Vector(new_dataset)
new_vector.create_with_vectors(original_annotarions)
click.echo(click.style(f"Created vector index for app {app.id}.", fg="green"))
except Exception as e:
click.echo(click.style(f"Failed to created vector index for app {app.id}.", fg="red"))
raise e
else:
click.echo(click.style(f"No original annotations found for app {app.id}.", fg="green"))
skipped_count += 1
continue
except Exception as e:
click.echo(click.style(f"Failed to created vector index for app {app.id}.", fg="red"))
raise e
click.echo(f"Successfully migrated app annotation {app.id}.")
create_count += 1
except Exception as e:
error_count += 1
click.echo(
click.style(f"Error creating app annotation index: {e.__class__.__name__} {str(e)}", fg="red")
)
@ -272,7 +294,7 @@ def migrate_annotation_vector_database():
click.echo(
click.style(
f"Migration complete. Created {create_count} app annotation indexes. Skipped {skipped_count} apps.",
f"Migration complete. Created {create_count} app annotation indexes. Skipped {skipped_count} apps. Errors {error_count} apps.",
fg="green",
)
)
@ -299,7 +321,7 @@ def migrate_knowledge_vector_database():
VectorType.OPENGAUSS,
VectorType.TABLESTORE,
VectorType.MATRIXONE,
VectorType.TIDB_ON_QDRANT
VectorType.TIDB_ON_QDRANT,
}
lower_collection_vector_types = {
VectorType.ANALYTICDB,
@ -391,8 +413,9 @@ def migrate_knowledge_vector_database():
segments_count = 0
original_index_vector = Vector(dataset)
for dataset_document in dataset_documents:
single_documents = original_index_vector.search_by_metadata_field("document_id", dataset_document.id)
single_documents = original_index_vector.search_by_metadata_field(
"document_id", dataset_document.id
)
if single_documents:
documents.extend(single_documents)
@ -431,7 +454,8 @@ def migrate_knowledge_vector_database():
click.echo(
click.style(
f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets. Errors {error_count} datasets.", fg="green"
f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets. Errors {error_count} datasets.",
fg="green",
)
)