From 517cdb2ca4480f3c7ddeba8f56c2a08e564baa85 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Tue, 20 Aug 2024 11:13:29 +0800 Subject: [PATCH] add external knowledge --- api/controllers/console/datasets/external.py | 213 +++++++++++++++++++ api/fields/external_dataset_fields.py | 11 + api/models/dataset.py | 62 ++++++ api/services/external_knowledge_service.py | 176 +++++++++++++++ api/tasks/external_document_indexing_task.py | 62 ++++++ 5 files changed, 524 insertions(+) create mode 100644 api/controllers/console/datasets/external.py create mode 100644 api/fields/external_dataset_fields.py create mode 100644 api/services/external_knowledge_service.py create mode 100644 api/tasks/external_document_indexing_task.py diff --git a/api/controllers/console/datasets/external.py b/api/controllers/console/datasets/external.py new file mode 100644 index 0000000000..58665e2f55 --- /dev/null +++ b/api/controllers/console/datasets/external.py @@ -0,0 +1,213 @@ +import flask_restful +from flask import request +from flask_login import current_user +from flask_restful import Resource, marshal, marshal_with, reqparse +from werkzeug.exceptions import Forbidden, NotFound + +import services +from configs import dify_config +from controllers.console import api +from controllers.console.apikey import api_key_fields, api_key_list +from controllers.console.app.error import ProviderNotInitializeError +from controllers.console.datasets.error import DatasetInUseError, DatasetNameDuplicateError, IndexingEstimateError +from controllers.console.setup import setup_required +from controllers.console.wraps import account_initialization_required +from core.errors.error import LLMBadRequestError, ProviderTokenNotInitError +from core.indexing_runner import IndexingRunner +from core.model_runtime.entities.model_entities import ModelType +from core.provider_manager import ProviderManager +from core.rag.datasource.vdb.vector_type import VectorType +from core.rag.extractor.entity.extract_setting import ExtractSetting +from core.rag.retrieval.retrival_methods import RetrievalMethod +from extensions.ext_database import db +from fields.app_fields import related_app_list +from fields.dataset_fields import dataset_detail_fields, dataset_query_detail_fields +from fields.document_fields import document_status_fields +from libs.login import login_required +from models.dataset import Dataset, Document, DocumentSegment +from models.model import ApiToken, UploadFile +from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService +from services.external_knowledge_service import ExternalDatasetService + + +def _validate_name(name): + if not name or len(name) < 1 or len(name) > 100: + raise ValueError('Name must be between 1 to 100 characters.') + return name + +def _validate_description_length(description): + if len(description) > 400: + raise ValueError('Description cannot exceed 400 characters.') + return description + +class ExternalApiTemplateListApi(Resource): + + @setup_required + @login_required + @account_initialization_required + def get(self): + page = request.args.get('page', default=1, type=int) + limit = request.args.get('limit', default=20, type=int) + search = request.args.get('keyword', default=None, type=str) + + api_templates, total = ExternalDatasetService.get_external_api_templates( + page, + limit, + current_user.current_tenant_id, + search + ) + response = { + 'data': [item.to_dict() for item in api_templates], + 'has_more': len(api_templates) == limit, + 'limit': limit, + 'total': total, + 'page': page + } + return response, 200 + + @setup_required + @login_required + @account_initialization_required + def post(self): + parser = reqparse.RequestParser() + parser.add_argument('name', nullable=False, required=True, + help='type is required. Name must be between 1 to 100 characters.', + type=_validate_name) + parser.add_argument('settings', type=list, location='json', + nullable=False, + required=True, ) + args = parser.parse_args() + + ExternalDatasetService.validate_api_list(args['settings']) + + # The role of the current user in the ta table must be admin, owner, or editor, or dataset_operator + if not current_user.is_dataset_editor: + raise Forbidden() + + try: + api_template = ExternalDatasetService.create_api_template( + tenant_id=current_user.current_tenant_id, + user_id=current_user.id, + args=args + ) + except services.errors.dataset.DatasetNameDuplicateError: + raise DatasetNameDuplicateError() + + return api_template.to_dict(), 201 + + +class ExternalApiTemplateApi(Resource): + @setup_required + @login_required + @account_initialization_required + def get(self, api_template_id): + api_template_id = str(api_template_id) + api_template = ExternalDatasetService.get_api_template(api_template_id) + if api_template is None: + raise NotFound("API template not found.") + + return api_template.to_dict(), 200 + + @setup_required + @login_required + @account_initialization_required + def patch(self, api_template_id): + api_template_id = str(api_template_id) + + parser = reqparse.RequestParser() + parser.add_argument('name', nullable=False, required=True, + help='type is required. Name must be between 1 to 100 characters.', + type=_validate_name) + parser.add_argument('settings', type=list, location='json', + nullable=False, + required=True, ) + args = parser.parse_args() + ExternalDatasetService.validate_api_list(args['settings']) + + api_template = ExternalDatasetService.update_api_template( + tenant_id=current_user.current_tenant_id, + user_id=current_user.id, + api_template_id=api_template_id, + args=args + ) + + return api_template.to_dict(), 200 + + @setup_required + @login_required + @account_initialization_required + def delete(self, api_template_id): + api_template_id = str(api_template_id) + + # The role of the current user in the ta table must be admin, owner, or editor + if not current_user.is_editor or current_user.is_dataset_operator: + raise Forbidden() + + ExternalDatasetService.delete_api_template(current_user.current_tenant_id, api_template_id) + return {'result': 'success'}, 204 + + +class ExternalApiUseCheckApi(Resource): + @setup_required + @login_required + @account_initialization_required + def get(self, api_template_id): + api_template_id = str(api_template_id) + + external_api_template_is_using = ExternalDatasetService.external_api_template_use_check(api_template_id) + return {'is_using': external_api_template_is_using}, 200 + + +class ExternalDatasetInitApi(Resource): + + @setup_required + @login_required + @account_initialization_required + def post(self): + # The role of the current user in the ta table must be admin, owner, or editor + if not current_user.is_editor: + raise Forbidden() + + parser = reqparse.RequestParser() + parser.add_argument('api_template_id', type=str, required=True, nullable=True, location='json') + parser.add_argument('name', nullable=False, required=True, + help='name is required. Name must be between 1 to 100 characters.', + type=_validate_name) + parser.add_argument('description', type=str, required=True, nullable=True, location='json') + parser.add_argument('data_source', type=dict, required=True, nullable=True, location='json') + parser.add_argument('process_parameter', type=dict, required=True, nullable=True, location='json') + + args = parser.parse_args() + + # The role of the current user in the ta table must be admin, owner, or editor, or dataset_operator + if not current_user.is_dataset_editor: + raise Forbidden() + + # validate args + ExternalDatasetService.document_create_args_validate( + current_user.current_tenant_id, + args['api_template_id'], + args['process_parameter'] + ) + + try: + dataset, documents, batch = ExternalDatasetService.init_external_dataset( + tenant_id=current_user.current_tenant_id, + user_id=current_user.id, + args=args, + ) + except Exception as ex: + raise ProviderNotInitializeError(ex.description) + response = { + 'dataset': dataset, + 'documents': documents, + 'batch': batch + } + + return response + + +api.add_resource(ExternalApiTemplateListApi, '/datasets/external-api-template') +api.add_resource(ExternalApiTemplateApi, '/datasets/external-api-template/') +api.add_resource(ExternalApiUseCheckApi, '/datasets/external-api-template//use-check') + diff --git a/api/fields/external_dataset_fields.py b/api/fields/external_dataset_fields.py new file mode 100644 index 0000000000..d287cbbe58 --- /dev/null +++ b/api/fields/external_dataset_fields.py @@ -0,0 +1,11 @@ +from flask_restful import fields + +from libs.helper import TimestampField + +api_template_query_detail_fields = { + "id": fields.String, + "name": fields.String, + "setting": fields.String, + "created_by": fields.String, + "created_at": TimestampField, +} diff --git a/api/models/dataset.py b/api/models/dataset.py index 0d48177eb6..bcc6824d22 100644 --- a/api/models/dataset.py +++ b/api/models/dataset.py @@ -682,3 +682,65 @@ class DatasetPermission(db.Model): tenant_id = db.Column(StringUUID, nullable=False) has_permission = db.Column(db.Boolean, nullable=False, server_default=db.text('true')) created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP(0)')) + + +class ExternalApiTemplates(db.Model): + __tablename__ = 'external_api_templates' + __table_args__ = ( + db.PrimaryKeyConstraint('id', name='external_api_template_pkey'), + db.Index('external_api_templates_tenant_idx', 'tenant_id'), + db.Index('external_api_templates_name_idx', 'name'), + ) + + id = db.Column(StringUUID, nullable=False, + server_default=db.text('uuid_generate_v4()')) + name = db.Column(db.String(255), nullable=False) + tenant_id = db.Column(StringUUID, nullable=False) + settings = db.Column(db.Text, nullable=True) + created_by = db.Column(StringUUID, nullable=False) + created_at = db.Column(db.DateTime, nullable=False, + server_default=db.text('CURRENT_TIMESTAMP(0)')) + updated_by = db.Column(StringUUID, nullable=True) + updated_at = db.Column(db.DateTime, nullable=False, + server_default=db.text('CURRENT_TIMESTAMP(0)')) + + def to_dict(self): + return { + 'id': self.id, + 'tenant_id': self.tenant_id, + 'name': self.name, + 'settings': self.settings_dict, + 'created_by': self.created_by, + 'created_at': self.created_at, + } + + @property + def settings_dict(self): + try: + return json.loads(self.settings) if self.settings else None + except JSONDecodeError: + return None + + +class ExternalKnowledgeBindings(db.Model): + __tablename__ = 'external_knowledge_bindings' + __table_args__ = ( + db.PrimaryKeyConstraint('id', name='external_knowledge_bindings_pkey'), + db.Index('external_knowledge_bindings_tenant_idx', 'tenant_id'), + db.Index('external_knowledge_bindings_dataset_idx', 'dataset_id'), + db.Index('external_knowledge_bindings_external_knowledge_idx', 'external_knowledge_id'), + db.Index('external_knowledge_bindings_external_api_template_idx', 'external_api_template_id'), + ) + + id = db.Column(StringUUID, nullable=False, + server_default=db.text('uuid_generate_v4()')) + tenant_id = db.Column(StringUUID, nullable=False) + external_api_template_id = db.Column(StringUUID, nullable=False) + dataset_id = db.Column(StringUUID, nullable=False) + external_knowledge_id = db.Column(db.Text, nullable=False) + created_by = db.Column(StringUUID, nullable=False) + created_at = db.Column(db.DateTime, nullable=False, + server_default=db.text('CURRENT_TIMESTAMP(0)')) + updated_by = db.Column(StringUUID, nullable=True) + updated_at = db.Column(db.DateTime, nullable=False, + server_default=db.text('CURRENT_TIMESTAMP(0)')) \ No newline at end of file diff --git a/api/services/external_knowledge_service.py b/api/services/external_knowledge_service.py new file mode 100644 index 0000000000..2ff272f87e --- /dev/null +++ b/api/services/external_knowledge_service.py @@ -0,0 +1,176 @@ +import json +from datetime import datetime, timezone + +from extensions.ext_database import db +from extensions.ext_redis import redis_client +from libs import helper +from models.account import Account, TenantAccountRole +from models.dataset import ( + AppDatasetJoin, + Dataset, + DatasetCollectionBinding, + DatasetPermission, + DatasetProcessRule, + DatasetQuery, + Document, + DocumentSegment, ExternalApiTemplates, ExternalKnowledgeBindings, +) + + + +class ExternalDatasetService: + + @staticmethod + def get_external_api_templates(page, per_page, tenant_id, search=None) -> tuple[list[ExternalApiTemplates], int]: + query = ExternalApiTemplates.query.filter(ExternalApiTemplates.tenant_id == tenant_id).order_by( + ExternalApiTemplates.created_at.desc() + ) + if search: + query = query.filter(ExternalApiTemplates.name.ilike(f'%{search}%')) + + api_templates = query.paginate( + page=page, + per_page=per_page, + max_per_page=100, + error_out=False + ) + + return api_templates.items, api_templates.total + + @classmethod + def validate_api_list(cls, api_settings: list[dict]): + if not api_settings: + raise ValueError('api list is empty') + for api_settings_dict in api_settings: + if not api_settings_dict.get('method'): + raise ValueError('api name is required') + + if not api_settings_dict.get('url'): + raise ValueError('api url is required') + + if api_settings_dict.get('authorization'): + if not api_settings_dict.get('authorization').get('type'): + raise ValueError('authorization type is required') + if api_settings_dict.get('authorization').get('type') == 'bearer': + if not api_settings_dict.get('authorization').get('api_key'): + raise ValueError('authorization token is required') + if api_settings_dict.get('authorization').get('type') == 'custom': + if not api_settings_dict.get('authorization').get('header'): + raise ValueError('authorization header is required') + + if api_settings_dict.get('method') in ['create', 'update']: + if not api_settings_dict.get('callback_setting'): + raise ValueError('callback_setting is required for create and update method') + + @staticmethod + def create_api_template(tenant_id: str, user_id: str, args: dict) -> ExternalApiTemplates: + api_template = ExternalApiTemplates( + tenant_id=tenant_id, + created_by=user_id, + updated_by=user_id, + name=args.get('name'), + settings=json.dumps(args.get('settings'), ensure_ascii=False), + ) + + db.session.add(api_template) + db.session.commit() + return api_template + + @staticmethod + def get_api_template(api_template_id: str) -> ExternalApiTemplates: + return ExternalApiTemplates.query.filter_by( + id=api_template_id + ).first() + + @staticmethod + def update_api_template(tenant_id, user_id, api_template_id, args) -> ExternalApiTemplates: + api_template = ExternalApiTemplates.query.filter_by( + id=api_template_id, + tenant_id=tenant_id + ).first() + if api_template is None: + raise ValueError('api template not found') + + api_template.name = args.get('name') + api_template.settings = json.dumps(args.get('settings'), ensure_ascii=False) + api_template.updated_by = user_id + api_template.updated_at = datetime.now(timezone.utc).replace(tzinfo=None) + db.session.commit() + + return api_template + + @staticmethod + def delete_api_template(tenant_id: str, api_template_id: str): + api_template = ExternalApiTemplates.query.filter_by( + id=api_template_id, + tenant_id=tenant_id + ).first() + if api_template is None: + raise ValueError('api template not found') + + db.session.delete(api_template) + db.session.commit() + + @staticmethod + def external_api_template_use_check(api_template_id: str) -> bool: + count = ExternalKnowledgeBindings.query.filter_by( + external_api_template_id=api_template_id + ).count() + if count > 0: + return True + return False + + @staticmethod + def document_create_args_validate(tenant_id: str, api_template_id: str, process_parameter: dict): + api_template = ExternalApiTemplates.query.filter_by( + id=api_template_id, + tenant_id=tenant_id + ).first() + if api_template is None: + raise ValueError('api template not found') + settings = json.loads(api_template.settings) + for settings in settings: + if settings.get('method') == 'create': + parameters = settings.get('parameters') + for parameter in parameters: + if parameter.get('required') and not process_parameter.get(parameter.get('name')): + raise ValueError(f'{parameter.get("name")} is required') + + @staticmethod + def init_external_dataset(tenant_id: str, user_id: str, args: dict): + api_template_id = args.get('api_template_id') + data_source = args.get('data_source') + process_parameter = args.get('process_parameter') + api_template = ExternalApiTemplates.query.filter_by( + id=api_template_id, + tenant_id=tenant_id + ).first() + if api_template is None: + raise ValueError('api template not found') + settings = json.loads(api_template.settings) + for settings in settings: + if settings.get('method') == 'create': + + ExternalDatasetService.process_external_api(api_template_id, data_source, process_parameter) + break + # save dataset + dataset = Dataset( + tenant_id=tenant_id, + name=args.get('name'), + description=args.get('description', ''), + provider='external', + created_by=user_id, + ) + + db.session.add(dataset) + db.session.commit() + + external_document_indexing_task.delay(dataset.id, api_template_id, data_source, process_parameter) + + return dataset + + + @staticmethod + def process_external_api(api_template_id: str, data_source: dict, process_parameter: dict): + pass + diff --git a/api/tasks/external_document_indexing_task.py b/api/tasks/external_document_indexing_task.py new file mode 100644 index 0000000000..14509e65c4 --- /dev/null +++ b/api/tasks/external_document_indexing_task.py @@ -0,0 +1,62 @@ +import datetime +import logging +import time + +import click +from celery import shared_task + +from configs import dify_config +from core.indexing_runner import DocumentIsPausedException, IndexingRunner +from extensions.ext_database import db +from models.dataset import Dataset, Document, ExternalApiTemplates +from models.model import UploadFile +from services.feature_service import FeatureService + + +@shared_task(queue='dataset') +def external_document_indexing_task(dataset_id: str, api_template_id: str, data_source: dict, process_parameter: dict): + """ + Async process document + :param dataset_id: + :param api_template_id: + :param data_source: + :param process_parameter: + Usage: external_document_indexing_task.delay(dataset_id, document_id) + """ + documents = [] + start_at = time.perf_counter() + + dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() + if not dataset: + logging.info(click.style('Processed external dataset: {} failed, dataset not exit.'.format(dataset_id), fg='red')) + return + + # get external api template + api_template = db.session.query(ExternalApiTemplates).filter( + ExternalApiTemplates.id == api_template_id, + ExternalApiTemplates.tenant_id == dataset.tenant_id + ).first() + + if not api_template: + logging.info(click.style('Processed external dataset: {} failed, api template: {} not exit.'.format(dataset_id, api_template_id), fg='red')) + return + file_resource = [] + if data_source["type"] == "upload_file": + upload_file_list = data_source["info_list"]['file_info_list']['file_ids'] + for file_id in upload_file_list: + file = db.session.query(UploadFile).filter( + UploadFile.tenant_id == dataset.tenant_id, + UploadFile.id == file_id + ).first() + if file: + file_resource.append(file) + try: + # assemble headers + headers = self._assembling_headers() + + # do http request + response = self._do_http_request(headers) + except DocumentIsPausedException as ex: + logging.info(click.style(str(ex), fg='yellow')) + except Exception: + pass