From 03e3208c71d8f9f629a858ba22ff0bb4099856ea Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Wed, 17 Dec 2025 19:02:25 +0800 Subject: [PATCH] add qdrant to tidb --- api/commands.py | 128 ++++++++++++++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 52 deletions(-) diff --git a/api/commands.py b/api/commands.py index c7033d7b46..253c076d1e 100644 --- a/api/commands.py +++ b/api/commands.py @@ -32,7 +32,7 @@ from libs.helper import email as email_validate from libs.password import hash_password, password_pattern, valid_password from libs.rsa import generate_key_pair from models import Tenant -from models.dataset import Dataset, DatasetCollectionBinding, DatasetMetadata, DatasetMetadataBinding, DocumentSegment +from models.dataset import Dataset, DatasetCollectionBinding, DatasetMetadata, DatasetMetadataBinding from models.dataset import Document as DatasetDocument from models.model import Account, App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation, UploadFile from models.oauth import DatasourceOauthParamConfig, DatasourceProvider @@ -175,54 +175,67 @@ def migrate_annotation_vector_database(): create_count = 0 skipped_count = 0 total_count = 0 - page = 1 + error_count = 0 + per_page = 50 + # Keyset pagination on AppAnnotationSetting (much smaller dataset than App table) + last_created_at = None + last_id = None while True: try: - # get apps info - per_page = 50 with sessionmaker(db.engine, expire_on_commit=False).begin() as session: - apps = ( - session.query(App) + # Query AppAnnotationSetting directly instead of scanning all Apps + query = ( + session.query(AppAnnotationSetting, App, DatasetCollectionBinding) + .join(App, App.id == AppAnnotationSetting.app_id) + .join( + DatasetCollectionBinding, + DatasetCollectionBinding.id == AppAnnotationSetting.collection_binding_id, + ) .where(App.status == "normal") - .order_by(App.created_at.desc()) + ) + + # Apply keyset pagination condition + if last_created_at is not None and last_id is not None: + query = query.where( + sa.or_( + AppAnnotationSetting.created_at < last_created_at, + sa.and_( + AppAnnotationSetting.created_at == last_created_at, + AppAnnotationSetting.id < last_id, + ), + ) + ) + + results = ( + query.order_by(AppAnnotationSetting.created_at.desc(), AppAnnotationSetting.id.desc()) .limit(per_page) - .offset((page - 1) * per_page) .all() ) - if not apps: + + if not results: break + + # Update cursor to the last record of current batch + last_created_at = results[-1][0].created_at + last_id = results[-1][0].id except SQLAlchemyError: raise - page += 1 - for app in apps: - total_count = total_count + 1 + for app_annotation_setting, app, dataset_collection_binding in results: + total_count += 1 click.echo( - f"Processing the {total_count} app {app.id}. " + f"{create_count} created, {skipped_count} skipped." + f"Processing the {total_count} app {app.id}. {create_count} created, {skipped_count} skipped." ) try: click.echo(f"Creating app annotation index: {app.id}") + if not app_annotation_setting: + skipped_count += 1 + continue with sessionmaker(db.engine, expire_on_commit=False).begin() as session: - app_annotation_setting = ( - session.query(AppAnnotationSetting).where(AppAnnotationSetting.app_id == app.id).first() - ) - - if not app_annotation_setting: - skipped_count = skipped_count + 1 - click.echo(f"App annotation setting disabled: {app.id}") - continue - # get dataset_collection_binding info - dataset_collection_binding = ( - session.query(DatasetCollectionBinding) - .where(DatasetCollectionBinding.id == app_annotation_setting.collection_binding_id) - .first() - ) - if not dataset_collection_binding: - click.echo(f"App annotation collection binding not found: {app.id}") - continue annotations = session.scalars( select(MessageAnnotation).where(MessageAnnotation.app_id == app.id) ).all() + dataset = Dataset( id=app.id, tenant_id=app.tenant_id, @@ -230,6 +243,12 @@ def migrate_annotation_vector_database(): embedding_model_provider=dataset_collection_binding.provider_name, embedding_model=dataset_collection_binding.model_name, collection_binding_id=dataset_collection_binding.id, + index_struct=json.dumps({ + "type": VectorType.QDRANT, + "vector_store": { + "class_prefix": dataset_collection_binding.collection_name, + } + }), ) documents = [] if annotations: @@ -240,31 +259,34 @@ def migrate_annotation_vector_database(): ) documents.append(document) - vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) + original_vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) click.echo(f"Migrating annotations for app: {app.id}.") try: - vector.delete() - click.echo(click.style(f"Deleted vector index for app {app.id}.", fg="green")) - except Exception as e: - click.echo(click.style(f"Failed to delete vector index for app {app.id}.", fg="red")) - raise e - if documents: - try: - click.echo( - click.style( - f"Creating vector index with {len(documents)} annotations for app {app.id}.", - fg="green", - ) + original_annotarions = original_vector.search_by_metadata_field("app_id", app.id) + if original_annotarions: + new_dataset = Dataset( + id=app.id, + tenant_id=app.tenant_id, + indexing_technique="high_quality", + embedding_model_provider=dataset_collection_binding.provider_name, + embedding_model=dataset_collection_binding.model_name, + collection_binding_id=dataset_collection_binding.id, ) - vector.create(documents) + new_vector = Vector(new_dataset) + new_vector.create_with_vectors(original_annotarions) click.echo(click.style(f"Created vector index for app {app.id}.", fg="green")) - except Exception as e: - click.echo(click.style(f"Failed to created vector index for app {app.id}.", fg="red")) - raise e + else: + click.echo(click.style(f"No original annotations found for app {app.id}.", fg="green")) + skipped_count += 1 + continue + except Exception as e: + click.echo(click.style(f"Failed to created vector index for app {app.id}.", fg="red")) + raise e click.echo(f"Successfully migrated app annotation {app.id}.") create_count += 1 except Exception as e: + error_count += 1 click.echo( click.style(f"Error creating app annotation index: {e.__class__.__name__} {str(e)}", fg="red") ) @@ -272,7 +294,7 @@ def migrate_annotation_vector_database(): click.echo( click.style( - f"Migration complete. Created {create_count} app annotation indexes. Skipped {skipped_count} apps.", + f"Migration complete. Created {create_count} app annotation indexes. Skipped {skipped_count} apps. Errors {error_count} apps.", fg="green", ) ) @@ -299,7 +321,7 @@ def migrate_knowledge_vector_database(): VectorType.OPENGAUSS, VectorType.TABLESTORE, VectorType.MATRIXONE, - VectorType.TIDB_ON_QDRANT + VectorType.TIDB_ON_QDRANT, } lower_collection_vector_types = { VectorType.ANALYTICDB, @@ -391,8 +413,9 @@ def migrate_knowledge_vector_database(): segments_count = 0 original_index_vector = Vector(dataset) for dataset_document in dataset_documents: - - single_documents = original_index_vector.search_by_metadata_field("document_id", dataset_document.id) + single_documents = original_index_vector.search_by_metadata_field( + "document_id", dataset_document.id + ) if single_documents: documents.extend(single_documents) @@ -431,7 +454,8 @@ def migrate_knowledge_vector_database(): click.echo( click.style( - f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets. Errors {error_count} datasets.", fg="green" + f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets. Errors {error_count} datasets.", + fg="green", ) )