From 9f894bb3b389a334afa9057e294b6671c5a8e92f Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Wed, 18 Sep 2024 14:36:51 +0800 Subject: [PATCH] external knowledge api --- api/controllers/console/__init__.py | 2 +- api/controllers/console/datasets/datasets.py | 23 ++ api/controllers/console/datasets/external.py | 162 +++++++------- .../service_api/dataset/dataset.py | 23 ++ api/core/rag/datasource/retrieval_service.py | 2 +- api/models/dataset.py | 1 + api/schedule/clean_unused_messages_task.py | 90 ++++---- api/services/dataset_service.py | 27 ++- .../external_knowledge_entities.py | 6 +- api/services/external_knowledge_service.py | 207 ++++++++---------- api/services/hit_testing_service.py | 11 +- api/tasks/external_document_indexing_task.py | 50 +++-- 12 files changed, 330 insertions(+), 274 deletions(-) diff --git a/api/controllers/console/__init__.py b/api/controllers/console/__init__.py index eb7c1464d3..77358acedb 100644 --- a/api/controllers/console/__init__.py +++ b/api/controllers/console/__init__.py @@ -37,7 +37,7 @@ from .auth import activate, data_source_bearer_auth, data_source_oauth, forgot_p from .billing import billing # Import datasets controllers -from .datasets import data_source, datasets, datasets_document, datasets_segments, file, hit_testing, website +from .datasets import data_source, datasets, datasets_document, datasets_segments, external, file, hit_testing, website # Import explore controllers from .explore import ( diff --git a/api/controllers/console/datasets/datasets.py b/api/controllers/console/datasets/datasets.py index 44c1390c14..a2d7dc89cd 100644 --- a/api/controllers/console/datasets/datasets.py +++ b/api/controllers/console/datasets/datasets.py @@ -110,6 +110,26 @@ class DatasetListApi(Resource): nullable=True, help="Invalid indexing technique.", ) + parser.add_argument( + "external_api_template_id", + type=str, + nullable=True, + required=False, + ) + parser.add_argument( + "provider", + type=str, + nullable=True, + choices=Dataset.PROVIDER_LIST, + required=False, + default="vendor", + ) + parser.add_argument( + "external_knowledge_id", + type=str, + nullable=True, + required=False, + ) args = parser.parse_args() # The role of the current user in the ta table must be admin, owner, or editor, or dataset_operator @@ -123,6 +143,9 @@ class DatasetListApi(Resource): indexing_technique=args["indexing_technique"], account=current_user, permission=DatasetPermissionEnum.ONLY_ME, + provider=args["provider"], + external_api_template_id=args["external_api_template_id"], + external_knowledge_id=args["external_knowledge_id"], ) except services.errors.dataset.DatasetNameDuplicateError: raise DatasetNameDuplicateError() diff --git a/api/controllers/console/datasets/external.py b/api/controllers/console/datasets/external.py index 024ed955d4..92ce1ffd2e 100644 --- a/api/controllers/console/datasets/external.py +++ b/api/controllers/console/datasets/external.py @@ -1,69 +1,49 @@ -import flask_restful from flask import request from flask_login import current_user -from flask_restful import Resource, marshal, marshal_with, reqparse +from flask_restful import Resource, marshal, reqparse from werkzeug.exceptions import Forbidden, NotFound import services -from configs import dify_config from controllers.console import api -from controllers.console.apikey import api_key_fields, api_key_list from controllers.console.app.error import ProviderNotInitializeError -from controllers.console.datasets.error import DatasetInUseError, DatasetNameDuplicateError, IndexingEstimateError +from controllers.console.datasets.error import DatasetNameDuplicateError from controllers.console.setup import setup_required from controllers.console.wraps import account_initialization_required -from core.errors.error import LLMBadRequestError, ProviderTokenNotInitError -from core.indexing_runner import IndexingRunner -from core.model_runtime.entities.model_entities import ModelType -from core.provider_manager import ProviderManager -from core.rag.datasource.vdb.vector_type import VectorType -from core.rag.extractor.entity.extract_setting import ExtractSetting -from core.rag.retrieval.retrival_methods import RetrievalMethod -from extensions.ext_database import db -from fields.app_fields import related_app_list -from fields.dataset_fields import dataset_detail_fields, dataset_query_detail_fields -from fields.document_fields import document_status_fields +from fields.dataset_fields import dataset_detail_fields from libs.login import login_required -from models.dataset import Dataset, Document, DocumentSegment -from models.model import ApiToken, UploadFile -from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService from services.external_knowledge_service import ExternalDatasetService def _validate_name(name): if not name or len(name) < 1 or len(name) > 100: - raise ValueError('Name must be between 1 to 100 characters.') + raise ValueError("Name must be between 1 to 100 characters.") return name def _validate_description_length(description): if len(description) > 400: - raise ValueError('Description cannot exceed 400 characters.') + raise ValueError("Description cannot exceed 400 characters.") return description class ExternalApiTemplateListApi(Resource): - @setup_required @login_required @account_initialization_required def get(self): - page = request.args.get('page', default=1, type=int) - limit = request.args.get('limit', default=20, type=int) - search = request.args.get('keyword', default=None, type=str) + page = request.args.get("page", default=1, type=int) + limit = request.args.get("limit", default=20, type=int) + search = request.args.get("keyword", default=None, type=str) api_templates, total = ExternalDatasetService.get_external_api_templates( - page, - limit, - current_user.current_tenant_id, - search + page, limit, current_user.current_tenant_id, search ) response = { - 'data': [item.to_dict() for item in api_templates], - 'has_more': len(api_templates) == limit, - 'limit': limit, - 'total': total, - 'page': page + "data": [item.to_dict() for item in api_templates], + "has_more": len(api_templates) == limit, + "limit": limit, + "total": total, + "page": page, } return response, 200 @@ -72,18 +52,30 @@ class ExternalApiTemplateListApi(Resource): @account_initialization_required def post(self): parser = reqparse.RequestParser() - parser.add_argument('name', nullable=False, required=True, - help='Name is required. Name must be between 1 to 100 characters.', - type=_validate_name) - parser.add_argument('description', nullable=False, required=True, - help='Description is required. Description must be between 1 to 400 characters.', - type=_validate_description_length) - parser.add_argument('settings', type=list, location='json', - nullable=False, - required=True, ) + parser.add_argument( + "name", + nullable=False, + required=True, + help="Name is required. Name must be between 1 to 100 characters.", + type=_validate_name, + ) + parser.add_argument( + "description", + nullable=False, + required=True, + help="Description is required. Description must be between 1 to 400 characters.", + type=_validate_description_length, + ) + parser.add_argument( + "settings", + type=list, + location="json", + nullable=False, + required=True, + ) args = parser.parse_args() - ExternalDatasetService.validate_api_list(args['settings']) + ExternalDatasetService.validate_api_list(args["settings"]) # The role of the current user in the ta table must be admin, owner, or editor, or dataset_operator if not current_user.is_dataset_editor: @@ -91,9 +83,7 @@ class ExternalApiTemplateListApi(Resource): try: api_template = ExternalDatasetService.create_api_template( - tenant_id=current_user.current_tenant_id, - user_id=current_user.id, - args=args + tenant_id=current_user.current_tenant_id, user_id=current_user.id, args=args ) except services.errors.dataset.DatasetNameDuplicateError: raise DatasetNameDuplicateError() @@ -120,23 +110,35 @@ class ExternalApiTemplateApi(Resource): api_template_id = str(api_template_id) parser = reqparse.RequestParser() - parser.add_argument('name', nullable=False, required=True, - help='type is required. Name must be between 1 to 100 characters.', - type=_validate_name) - parser.add_argument('description', nullable=False, required=True, - help='description is required. Description must be between 1 to 400 characters.', - type=_validate_description_length) - parser.add_argument('settings', type=list, location='json', - nullable=False, - required=True, ) + parser.add_argument( + "name", + nullable=False, + required=True, + help="type is required. Name must be between 1 to 100 characters.", + type=_validate_name, + ) + parser.add_argument( + "description", + nullable=False, + required=True, + help="description is required. Description must be between 1 to 400 characters.", + type=_validate_description_length, + ) + parser.add_argument( + "settings", + type=list, + location="json", + nullable=False, + required=True, + ) args = parser.parse_args() - ExternalDatasetService.validate_api_list(args['settings']) + ExternalDatasetService.validate_api_list(args["settings"]) api_template = ExternalDatasetService.update_api_template( tenant_id=current_user.current_tenant_id, user_id=current_user.id, api_template_id=api_template_id, - args=args + args=args, ) return api_template.to_dict(), 200 @@ -152,7 +154,7 @@ class ExternalApiTemplateApi(Resource): raise Forbidden() ExternalDatasetService.delete_api_template(current_user.current_tenant_id, api_template_id) - return {'result': 'success'}, 204 + return {"result": "success"}, 204 class ExternalApiUseCheckApi(Resource): @@ -163,11 +165,10 @@ class ExternalApiUseCheckApi(Resource): api_template_id = str(api_template_id) external_api_template_is_using = ExternalDatasetService.external_api_template_use_check(api_template_id) - return {'is_using': external_api_template_is_using}, 200 + return {"is_using": external_api_template_is_using}, 200 class ExternalDatasetInitApi(Resource): - @setup_required @login_required @account_initialization_required @@ -177,13 +178,13 @@ class ExternalDatasetInitApi(Resource): raise Forbidden() parser = reqparse.RequestParser() - parser.add_argument('api_template_id', type=str, required=True, nullable=True, location='json') + parser.add_argument("api_template_id", type=str, required=True, nullable=True, location="json") # parser.add_argument('name', nullable=False, required=True, # help='name is required. Name must be between 1 to 100 characters.', # type=_validate_name) # parser.add_argument('description', type=str, required=True, nullable=True, location='json') - parser.add_argument('data_source', type=dict, required=True, nullable=True, location='json') - parser.add_argument('process_parameter', type=dict, required=True, nullable=True, location='json') + parser.add_argument("data_source", type=dict, required=True, nullable=True, location="json") + parser.add_argument("process_parameter", type=dict, required=True, nullable=True, location="json") args = parser.parse_args() @@ -193,9 +194,7 @@ class ExternalDatasetInitApi(Resource): # validate args ExternalDatasetService.document_create_args_validate( - current_user.current_tenant_id, - args['api_template_id'], - args['process_parameter'] + current_user.current_tenant_id, args["api_template_id"], args["process_parameter"] ) try: @@ -206,17 +205,12 @@ class ExternalDatasetInitApi(Resource): ) except Exception as ex: raise ProviderNotInitializeError(ex.description) - response = { - 'dataset': dataset, - 'documents': documents, - 'batch': batch - } + response = {"dataset": dataset, "documents": documents, "batch": batch} return response class ExternalDatasetCreateApi(Resource): - @setup_required @login_required @account_initialization_required @@ -226,12 +220,16 @@ class ExternalDatasetCreateApi(Resource): raise Forbidden() parser = reqparse.RequestParser() - parser.add_argument('api_template_id', type=str, required=True, nullable=False, location='json') - parser.add_argument('external_knowledge_id', type=str, required=True, nullable=False, location='json') - parser.add_argument('name', nullable=False, required=True, - help='name is required. Name must be between 1 to 100 characters.', - type=_validate_name) - parser.add_argument('description', type=str, required=True, nullable=True, location='json') + parser.add_argument("external_api_template_id", type=str, required=True, nullable=False, location="json") + parser.add_argument("external_knowledge_id", type=str, required=True, nullable=False, location="json") + parser.add_argument( + "name", + nullable=False, + required=True, + help="name is required. Name must be between 1 to 100 characters.", + type=_validate_name, + ) + parser.add_argument("description", type=str, required=True, nullable=True, location="json") args = parser.parse_args() @@ -251,6 +249,6 @@ class ExternalDatasetCreateApi(Resource): return marshal(dataset, dataset_detail_fields), 201 -api.add_resource(ExternalApiTemplateListApi, '/datasets/external-api-template') -api.add_resource(ExternalApiTemplateApi, '/datasets/external-api-template/') -api.add_resource(ExternalApiUseCheckApi, '/datasets/external-api-template//use-check') +api.add_resource(ExternalApiTemplateListApi, "/datasets/external-api-template") +api.add_resource(ExternalApiTemplateApi, "/datasets/external-api-template/") +api.add_resource(ExternalApiUseCheckApi, "/datasets/external-api-template//use-check") diff --git a/api/controllers/service_api/dataset/dataset.py b/api/controllers/service_api/dataset/dataset.py index c2c0672a03..90d62d7c7f 100644 --- a/api/controllers/service_api/dataset/dataset.py +++ b/api/controllers/service_api/dataset/dataset.py @@ -82,6 +82,26 @@ class DatasetListApi(DatasetApiResource): required=False, nullable=False, ) + parser.add_argument( + "external_api_template_id", + type=str, + nullable=True, + required=False, + default="_validate_name", + ) + parser.add_argument( + "provider", + type=str, + nullable=True, + required=False, + default="vendor", + ) + parser.add_argument( + "external_knowledge_id", + type=str, + nullable=True, + required=False, + ) args = parser.parse_args() try: @@ -91,6 +111,9 @@ class DatasetListApi(DatasetApiResource): indexing_technique=args["indexing_technique"], account=current_user, permission=args["permission"], + provider=args["provider"], + external_api_template_id=args["external_api_template_id"], + external_knowledge_id=args["external_knowledge_id"], ) except services.errors.dataset.DatasetNameDuplicateError: raise DatasetNameDuplicateError() diff --git a/api/core/rag/datasource/retrieval_service.py b/api/core/rag/datasource/retrieval_service.py index 26a728bcb6..4e56df34ae 100644 --- a/api/core/rag/datasource/retrieval_service.py +++ b/api/core/rag/datasource/retrieval_service.py @@ -38,7 +38,7 @@ class RetrievalService: if not dataset: return [] if provider == 'external': - external_knowledge_binding = ExternalDatasetService.fetch_external_knowledge_retrival( + all_documents = ExternalDatasetService.fetch_external_knowledge_retrival( dataset.tenant_id, dataset_id, query, diff --git a/api/models/dataset.py b/api/models/dataset.py index 88c038a10b..0289e18ea4 100644 --- a/api/models/dataset.py +++ b/api/models/dataset.py @@ -37,6 +37,7 @@ class Dataset(db.Model): ) INDEXING_TECHNIQUE_LIST = ['high_quality', 'economy', None] + PROVIDER_LIST = ['vendor', 'external', None] id = db.Column(StringUUID, server_default=db.text('uuid_generate_v4()')) tenant_id = db.Column(StringUUID, nullable=False) diff --git a/api/schedule/clean_unused_messages_task.py b/api/schedule/clean_unused_messages_task.py index d931892d97..85e6a58a0e 100644 --- a/api/schedule/clean_unused_messages_task.py +++ b/api/schedule/clean_unused_messages_task.py @@ -12,9 +12,9 @@ from extensions.ext_database import db from models.dataset import Dataset, DatasetQuery, Document -@app.celery.task(queue='dataset') +@app.celery.task(queue="dataset") def clean_unused_message_task(): - click.echo(click.style('Start clean unused messages .', fg='green')) + click.echo(click.style("Start clean unused messages .", fg="green")) clean_days = int(dify_config.CLEAN_DAY_SETTING) start_at = time.perf_counter() thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days) @@ -22,40 +22,44 @@ def clean_unused_message_task(): while True: try: # Subquery for counting new documents - document_subquery_new = db.session.query( - Document.dataset_id, - func.count(Document.id).label('document_count') - ).filter( - Document.indexing_status == 'completed', - Document.enabled == True, - Document.archived == False, - Document.updated_at > thirty_days_ago - ).group_by(Document.dataset_id).subquery() + document_subquery_new = ( + db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) + .filter( + Document.indexing_status == "completed", + Document.enabled == True, + Document.archived == False, + Document.updated_at > thirty_days_ago, + ) + .group_by(Document.dataset_id) + .subquery() + ) # Subquery for counting old documents - document_subquery_old = db.session.query( - Document.dataset_id, - func.count(Document.id).label('document_count') - ).filter( - Document.indexing_status == 'completed', - Document.enabled == True, - Document.archived == False, - Document.updated_at < thirty_days_ago - ).group_by(Document.dataset_id).subquery() + document_subquery_old = ( + db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) + .filter( + Document.indexing_status == "completed", + Document.enabled == True, + Document.archived == False, + Document.updated_at < thirty_days_ago, + ) + .group_by(Document.dataset_id) + .subquery() + ) # Main query with join and filter - datasets = (db.session.query(Dataset) - .outerjoin( - document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id - ).outerjoin( - document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id - ).filter( - Dataset.created_at < thirty_days_ago, - func.coalesce(document_subquery_new.c.document_count, 0) == 0, - func.coalesce(document_subquery_old.c.document_count, 0) > 0 - ).order_by( - Dataset.created_at.desc() - ).paginate(page=page, per_page=50)) + datasets = ( + db.session.query(Dataset) + .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id) + .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id) + .filter( + Dataset.created_at < thirty_days_ago, + func.coalesce(document_subquery_new.c.document_count, 0) == 0, + func.coalesce(document_subquery_old.c.document_count, 0) > 0, + ) + .order_by(Dataset.created_at.desc()) + .paginate(page=page, per_page=50) + ) except NotFound: break @@ -63,10 +67,11 @@ def clean_unused_message_task(): break page += 1 for dataset in datasets: - dataset_query = db.session.query(DatasetQuery).filter( - DatasetQuery.created_at > thirty_days_ago, - DatasetQuery.dataset_id == dataset.id - ).all() + dataset_query = ( + db.session.query(DatasetQuery) + .filter(DatasetQuery.created_at > thirty_days_ago, DatasetQuery.dataset_id == dataset.id) + .all() + ) if not dataset_query or len(dataset_query) == 0: try: # remove index @@ -74,17 +79,14 @@ def clean_unused_message_task(): index_processor.clean(dataset, None) # update document - update_params = { - Document.enabled: False - } + update_params = {Document.enabled: False} Document.query.filter_by(dataset_id=dataset.id).update(update_params) db.session.commit() - click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id), - fg='green')) + click.echo(click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green")) except Exception as e: click.echo( - click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)), - fg='red')) + click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red") + ) end_at = time.perf_counter() - click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green')) + click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green")) diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index ad552e1bab..e02efe03a2 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -32,6 +32,7 @@ from models.dataset import ( DatasetQuery, Document, DocumentSegment, + ExternalKnowledgeBindings, ) from models.model import UploadFile from models.source import DataSourceOauthBinding @@ -39,6 +40,7 @@ from services.errors.account import NoPermissionError from services.errors.dataset import DatasetNameDuplicateError from services.errors.document import DocumentIndexingError from services.errors.file import FileNotExistsError +from services.external_knowledge_service import ExternalDatasetService from services.feature_service import FeatureModel, FeatureService from services.tag_service import TagService from services.vector_service import VectorService @@ -137,7 +139,14 @@ class DatasetService: @staticmethod def create_empty_dataset( - tenant_id: str, name: str, indexing_technique: Optional[str], account: Account, permission: Optional[str] = None + tenant_id: str, + name: str, + indexing_technique: Optional[str], + account: Account, + permission: Optional[str] = None, + provider: str = "vendor", + external_api_template_id: Optional[str] = None, + external_knowledge_id: Optional[str] = None, ): # check if dataset name already exists if Dataset.query.filter_by(name=name, tenant_id=tenant_id).first(): @@ -156,7 +165,23 @@ class DatasetService: dataset.embedding_model_provider = embedding_model.provider if embedding_model else None dataset.embedding_model = embedding_model.model if embedding_model else None dataset.permission = permission if permission else DatasetPermissionEnum.ONLY_ME + dataset.provider = provider db.session.add(dataset) + db.session.flush() + + if provider == "external" and external_api_template_id: + external_api_template = ExternalDatasetService.get_api_template(external_api_template_id) + if not external_api_template: + raise ValueError("External API template not found.") + external_knowledge_binding = ExternalKnowledgeBindings( + tenant_id=tenant_id, + dataset_id=dataset.id, + external_api_template_id=external_api_template_id, + external_knowledge_id=external_knowledge_id, + created_by=account.id, + ) + db.session.add(external_knowledge_binding) + db.session.commit() return dataset diff --git a/api/services/entities/external_knowledge_entities/external_knowledge_entities.py b/api/services/entities/external_knowledge_entities/external_knowledge_entities.py index bca86e3d7e..e84475c181 100644 --- a/api/services/entities/external_knowledge_entities/external_knowledge_entities.py +++ b/api/services/entities/external_knowledge_entities/external_knowledge_entities.py @@ -1,16 +1,16 @@ -from typing import Literal, Union, Optional +from typing import Literal, Optional, Union from pydantic import BaseModel class AuthorizationConfig(BaseModel): - type: Literal[None, 'basic', 'bearer', 'custom'] + type: Literal[None, "basic", "bearer", "custom"] api_key: Union[None, str] = None header: Union[None, str] = None class Authorization(BaseModel): - type: Literal['no-auth', 'api-key'] + type: Literal["no-auth", "api-key"] config: Optional[AuthorizationConfig] = None diff --git a/api/services/external_knowledge_service.py b/api/services/external_knowledge_service.py index 364335a058..850e77b8ad 100644 --- a/api/services/external_knowledge_service.py +++ b/api/services/external_knowledge_service.py @@ -3,59 +3,45 @@ import random import time from copy import deepcopy from datetime import datetime, timezone -from typing import Any, Union, Optional +from typing import Any, Optional, Union import httpx from core.helper import ssrf_proxy from extensions.ext_database import db -from extensions.ext_redis import redis_client -from libs import helper -from models.account import Account, TenantAccountRole from models.dataset import ( - AppDatasetJoin, Dataset, - DatasetCollectionBinding, - DatasetPermission, - DatasetProcessRule, - DatasetQuery, Document, - DocumentSegment, ExternalApiTemplates, ExternalKnowledgeBindings, + ExternalApiTemplates, + ExternalKnowledgeBindings, ) from models.model import UploadFile -from services.dataset_service import DocumentService -from services.entities.external_knowledge_entities.external_knowledge_entities import Authorization, ApiTemplateSetting +from services.entities.external_knowledge_entities.external_knowledge_entities import ApiTemplateSetting, Authorization from services.errors.dataset import DatasetNameDuplicateError -from tasks.external_document_indexing_task import external_document_indexing_task +# from tasks.external_document_indexing_task import external_document_indexing_task class ExternalDatasetService: - @staticmethod def get_external_api_templates(page, per_page, tenant_id, search=None) -> tuple[list[ExternalApiTemplates], int]: query = ExternalApiTemplates.query.filter(ExternalApiTemplates.tenant_id == tenant_id).order_by( ExternalApiTemplates.created_at.desc() ) if search: - query = query.filter(ExternalApiTemplates.name.ilike(f'%{search}%')) + query = query.filter(ExternalApiTemplates.name.ilike(f"%{search}%")) - api_templates = query.paginate( - page=page, - per_page=per_page, - max_per_page=100, - error_out=False - ) + api_templates = query.paginate(page=page, per_page=per_page, max_per_page=100, error_out=False) return api_templates.items, api_templates.total @classmethod def validate_api_list(cls, api_settings: dict): if not api_settings: - raise ValueError('api list is empty') - if 'endpoint' not in api_settings and not api_settings['endpoint']: - raise ValueError('endpoint is required') - if 'api_key' not in api_settings and not api_settings['api_key']: - raise ValueError('api_key is required') + raise ValueError("api list is empty") + if "endpoint" not in api_settings and not api_settings["endpoint"]: + raise ValueError("endpoint is required") + if "api_key" not in api_settings and not api_settings["api_key"]: + raise ValueError("api_key is required") @staticmethod def create_api_template(tenant_id: str, user_id: str, args: dict) -> ExternalApiTemplates: @@ -63,9 +49,9 @@ class ExternalDatasetService: tenant_id=tenant_id, created_by=user_id, updated_by=user_id, - name=args.get('name'), - description=args.get('description', ''), - settings=json.dumps(args.get('settings'), ensure_ascii=False), + name=args.get("name"), + description=args.get("description", ""), + settings=json.dumps(args.get("settings"), ensure_ascii=False), ) db.session.add(api_template) @@ -74,22 +60,17 @@ class ExternalDatasetService: @staticmethod def get_api_template(api_template_id: str) -> ExternalApiTemplates: - return ExternalApiTemplates.query.filter_by( - id=api_template_id - ).first() + return ExternalApiTemplates.query.filter_by(id=api_template_id).first() @staticmethod def update_api_template(tenant_id, user_id, api_template_id, args) -> ExternalApiTemplates: - api_template = ExternalApiTemplates.query.filter_by( - id=api_template_id, - tenant_id=tenant_id - ).first() + api_template = ExternalApiTemplates.query.filter_by(id=api_template_id, tenant_id=tenant_id).first() if api_template is None: - raise ValueError('api template not found') + raise ValueError("api template not found") - api_template.name = args.get('name') - api_template.description = args.get('description', '') - api_template.settings = json.dumps(args.get('settings'), ensure_ascii=False) + api_template.name = args.get("name") + api_template.description = args.get("description", "") + api_template.settings = json.dumps(args.get("settings"), ensure_ascii=False) api_template.updated_by = user_id api_template.updated_at = datetime.now(timezone.utc).replace(tzinfo=None) db.session.commit() @@ -98,21 +79,16 @@ class ExternalDatasetService: @staticmethod def delete_api_template(tenant_id: str, api_template_id: str): - api_template = ExternalApiTemplates.query.filter_by( - id=api_template_id, - tenant_id=tenant_id - ).first() + api_template = ExternalApiTemplates.query.filter_by(id=api_template_id, tenant_id=tenant_id).first() if api_template is None: - raise ValueError('api template not found') + raise ValueError("api template not found") db.session.delete(api_template) db.session.commit() @staticmethod def external_api_template_use_check(api_template_id: str) -> bool: - count = ExternalKnowledgeBindings.query.filter_by( - external_api_template_id=api_template_id - ).count() + count = ExternalKnowledgeBindings.query.filter_by(external_api_template_id=api_template_id).count() if count > 0: return True return False @@ -120,66 +96,63 @@ class ExternalDatasetService: @staticmethod def get_external_knowledge_binding_with_dataset_id(tenant_id: str, dataset_id: str) -> ExternalKnowledgeBindings: external_knowledge_binding = ExternalKnowledgeBindings.query.filter_by( - dataset_id=dataset_id, - tenant_id=tenant_id + dataset_id=dataset_id, tenant_id=tenant_id ).first() if not external_knowledge_binding: - raise ValueError('external knowledge binding not found') + raise ValueError("external knowledge binding not found") return external_knowledge_binding @staticmethod def document_create_args_validate(tenant_id: str, api_template_id: str, process_parameter: dict): - api_template = ExternalApiTemplates.query.filter_by( - id=api_template_id, - tenant_id=tenant_id - ).first() + api_template = ExternalApiTemplates.query.filter_by(id=api_template_id, tenant_id=tenant_id).first() if api_template is None: - raise ValueError('api template not found') + raise ValueError("api template not found") settings = json.loads(api_template.settings) for settings in settings: - custom_parameters = settings.get('document_process_setting') + custom_parameters = settings.get("document_process_setting") if custom_parameters: for parameter in custom_parameters: - if parameter.get('required', False) and not process_parameter.get(parameter.get('name')): + if parameter.get("required", False) and not process_parameter.get(parameter.get("name")): raise ValueError(f'{parameter.get("name")} is required') @staticmethod - def init_external_dataset(tenant_id: str, user_id: str, args: dict, created_from: str = 'web'): - api_template_id = args.get('api_template_id') + def init_external_dataset(tenant_id: str, user_id: str, args: dict, created_from: str = "web"): + api_template_id = args.get("api_template_id") - data_source = args.get('data_source') + data_source = args.get("data_source") if data_source is None: - raise ValueError('data source is required') + raise ValueError("data source is required") - process_parameter = args.get('process_parameter') - api_template = ExternalApiTemplates.query.filter_by( - id=api_template_id, - tenant_id=tenant_id - ).first() + process_parameter = args.get("process_parameter") + api_template = ExternalApiTemplates.query.filter_by(id=api_template_id, tenant_id=tenant_id).first() if api_template is None: - raise ValueError('api template not found') + raise ValueError("api template not found") dataset = Dataset( tenant_id=tenant_id, - name=args.get('name'), - description=args.get('description', ''), - provider='external', + name=args.get("name"), + description=args.get("description", ""), + provider="external", created_by=user_id, ) db.session.add(dataset) db.session.flush() - position = DocumentService.get_documents_position(dataset.id) - batch = time.strftime('%Y%m%d%H%M%S') + str(random.randint(100000, 999999)) + document = Document.query.filter_by(dataset_id=dataset.id).order_by(Document.position.desc()).first() + + position = document.position + 1 if document else 1 + + batch = time.strftime("%Y%m%d%H%M%S") + str(random.randint(100000, 999999)) document_ids = [] if data_source["type"] == "upload_file": - upload_file_list = data_source["info_list"]['file_info_list']['file_ids'] + upload_file_list = data_source["info_list"]["file_info_list"]["file_ids"] for file_id in upload_file_list: - file = db.session.query(UploadFile).filter( - UploadFile.tenant_id == dataset.tenant_id, - UploadFile.id == file_id - ).first() + file = ( + db.session.query(UploadFile) + .filter(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id) + .first() + ) if file: data_source_info = { "upload_file_id": file_id, @@ -200,21 +173,20 @@ class ExternalDatasetService: db.session.flush() document_ids.append(document.id) db.session.commit() - external_document_indexing_task.delay(dataset.id, api_template_id, data_source, process_parameter) + #external_document_indexing_task.delay(dataset.id, api_template_id, data_source, process_parameter) return dataset @staticmethod - def process_external_api(settings: ApiTemplateSetting, - files: Union[None, dict[str, Any]]) -> httpx.Response: + def process_external_api(settings: ApiTemplateSetting, files: Union[None, dict[str, Any]]) -> httpx.Response: """ do http request depending on api bundle """ kwargs = { - 'url': settings.url, - 'headers': settings.headers, - 'follow_redirects': True, + "url": settings.url, + "headers": settings.headers, + "follow_redirects": True, } response = getattr(ssrf_proxy, settings.request_method)(data=settings.params, files=files, **kwargs) @@ -228,21 +200,21 @@ class ExternalDatasetService: headers = deepcopy(headers) else: headers = {} - if authorization.type == 'api-key': + if authorization.type == "api-key": if authorization.config is None: - raise ValueError('authorization config is required') + raise ValueError("authorization config is required") if authorization.config.api_key is None: - raise ValueError('api_key is required') + raise ValueError("api_key is required") if not authorization.config.header: - authorization.config.header = 'Authorization' + authorization.config.header = "Authorization" - if authorization.config.type == 'bearer': - headers[authorization.config.header] = f'Bearer {authorization.config.api_key}' - elif authorization.config.type == 'basic': - headers[authorization.config.header] = f'Basic {authorization.config.api_key}' - elif authorization.config.type == 'custom': + if authorization.config.type == "bearer": + headers[authorization.config.header] = f"Bearer {authorization.config.api_key}" + elif authorization.config.type == "basic": + headers[authorization.config.header] = f"Basic {authorization.config.api_key}" + elif authorization.config.type == "custom": headers[authorization.config.header] = authorization.config.api_key return headers @@ -254,21 +226,20 @@ class ExternalDatasetService: @staticmethod def create_external_dataset(tenant_id: str, user_id: str, args: dict) -> Dataset: # check if dataset name already exists - if Dataset.query.filter_by(name=args.get('name'), tenant_id=tenant_id).first(): + if Dataset.query.filter_by(name=args.get("name"), tenant_id=tenant_id).first(): raise DatasetNameDuplicateError(f"Dataset with name {args.get('name')} already exists.") api_template = ExternalApiTemplates.query.filter_by( - id=args.get('api_template_id'), - tenant_id=tenant_id + id=args.get("external_api_template_id"), tenant_id=tenant_id ).first() if api_template is None: - raise ValueError('api template not found') + raise ValueError("api template not found") dataset = Dataset( tenant_id=tenant_id, - name=args.get('name'), - description=args.get('description', ''), - provider='external', + name=args.get("name"), + description=args.get("description", ""), + provider="external", created_by=user_id, ) @@ -278,8 +249,8 @@ class ExternalDatasetService: external_knowledge_binding = ExternalKnowledgeBindings( tenant_id=tenant_id, dataset_id=dataset.id, - external_api_template_id=args.get('api_template_id'), - external_knowledge_id=args.get('external_knowledge_id'), + external_api_template_id=args.get("external_api_template_id"), + external_knowledge_id=args.get("external_knowledge_id"), created_by=user_id, ) db.session.add(external_knowledge_binding) @@ -289,36 +260,32 @@ class ExternalDatasetService: return dataset @staticmethod - def fetch_external_knowledge_retrival(tenant_id: str, - dataset_id: str, - query: str, - external_retrival_parameters: dict): + def fetch_external_knowledge_retrival( + tenant_id: str, dataset_id: str, query: str, external_retrival_parameters: dict + ): external_knowledge_binding = ExternalKnowledgeBindings.query.filter_by( - dataset_id=dataset_id, - tenant_id=tenant_id + dataset_id=dataset_id, tenant_id=tenant_id ).first() if not external_knowledge_binding: - raise ValueError('external knowledge binding not found') + raise ValueError("external knowledge binding not found") external_api_template = ExternalApiTemplates.query.filter_by( id=external_knowledge_binding.external_api_template_id ).first() if not external_api_template: - raise ValueError('external api template not found') + raise ValueError("external api template not found") settings = json.loads(external_api_template.settings) headers = {} - if settings.get('api_token'): - headers['Authorization'] = f"Bearer {settings.get('api_token')}" + if settings.get("api_token"): + headers["Authorization"] = f"Bearer {settings.get('api_token')}" - external_retrival_parameters['query'] = query + external_retrival_parameters["query"] = query api_template_setting = { - 'url': f"{settings.get('endpoint')}/dify/external-knowledge/retrival-documents", - 'request_method': 'post', - 'headers': settings.get('headers'), - 'params': external_retrival_parameters + "url": f"{settings.get('endpoint')}/dify/external-knowledge/retrival-documents", + "request_method": "post", + "headers": settings.get("headers"), + "params": external_retrival_parameters, } - response = ExternalDatasetService.process_external_api( - ApiTemplateSetting(**api_template_setting), None - ) + response = ExternalDatasetService.process_external_api(ApiTemplateSetting(**api_template_setting), None) diff --git a/api/services/hit_testing_service.py b/api/services/hit_testing_service.py index 5e00f740df..350f693d64 100644 --- a/api/services/hit_testing_service.py +++ b/api/services/hit_testing_service.py @@ -19,8 +19,15 @@ default_retrieval_model = { class HitTestingService: @classmethod - def retrieve(cls, dataset: Dataset, query: str, account: Account, - retrieval_model: dict, external_retrieval_model: dict, limit: int = 10) -> dict: + def retrieve( + cls, + dataset: Dataset, + query: str, + account: Account, + retrieval_model: dict, + external_retrieval_model: dict, + limit: int = 10, + ) -> dict: if dataset.available_document_count == 0 or dataset.available_segment_count == 0: return { "query": { diff --git a/api/tasks/external_document_indexing_task.py b/api/tasks/external_document_indexing_task.py index 938cf6962d..987b72e25e 100644 --- a/api/tasks/external_document_indexing_task.py +++ b/api/tasks/external_document_indexing_task.py @@ -1,4 +1,3 @@ -import datetime import json import logging import time @@ -6,17 +5,15 @@ import time import click from celery import shared_task -from configs import dify_config -from core.indexing_runner import DocumentIsPausedException, IndexingRunner +from core.indexing_runner import DocumentIsPausedException from extensions.ext_database import db from extensions.ext_storage import storage -from models.dataset import Dataset, Document, ExternalApiTemplates +from models.dataset import Dataset, ExternalApiTemplates from models.model import UploadFile from services.external_knowledge_service import ExternalDatasetService -from services.feature_service import FeatureService -@shared_task(queue='dataset') +@shared_task(queue="dataset") def external_document_indexing_task(dataset_id: str, api_template_id: str, data_source: dict, process_parameter: dict): """ Async process document @@ -30,26 +27,35 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_ dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() if not dataset: - logging.info(click.style('Processed external dataset: {} failed, dataset not exit.'.format(dataset_id), fg='red')) + logging.info( + click.style("Processed external dataset: {} failed, dataset not exit.".format(dataset_id), fg="red") + ) return # get external api template - api_template = db.session.query(ExternalApiTemplates).filter( - ExternalApiTemplates.id == api_template_id, - ExternalApiTemplates.tenant_id == dataset.tenant_id - ).first() + api_template = ( + db.session.query(ExternalApiTemplates) + .filter(ExternalApiTemplates.id == api_template_id, ExternalApiTemplates.tenant_id == dataset.tenant_id) + .first() + ) if not api_template: - logging.info(click.style('Processed external dataset: {} failed, api template: {} not exit.'.format(dataset_id, api_template_id), fg='red')) + logging.info( + click.style( + "Processed external dataset: {} failed, api template: {} not exit.".format(dataset_id, api_template_id), + fg="red", + ) + ) return files = {} if data_source["type"] == "upload_file": - upload_file_list = data_source["info_list"]['file_info_list']['file_ids'] + upload_file_list = data_source["info_list"]["file_info_list"]["file_ids"] for file_id in upload_file_list: - file = db.session.query(UploadFile).filter( - UploadFile.tenant_id == dataset.tenant_id, - UploadFile.id == file_id - ).first() + file = ( + db.session.query(UploadFile) + .filter(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id) + .first() + ) if file: files[file.id] = (file.name, storage.load_once(file.key), file.mime_type) try: @@ -59,7 +65,7 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_ # do http request response = ExternalDatasetService.process_external_api(settings, headers, process_parameter, files) - job_id = response.json().get('job_id') + job_id = response.json().get("job_id") if job_id: # save job_id to dataset dataset.job_id = job_id @@ -67,9 +73,13 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_ end_at = time.perf_counter() logging.info( - click.style('Processed external dataset: {} successful, latency: {}'.format(dataset.id, end_at - start_at), fg='green')) + click.style( + "Processed external dataset: {} successful, latency: {}".format(dataset.id, end_at - start_at), + fg="green", + ) + ) except DocumentIsPausedException as ex: - logging.info(click.style(str(ex), fg='yellow')) + logging.info(click.style(str(ex), fg="yellow")) except Exception: pass