external knowledge api

This commit is contained in:
jyong 2024-09-18 14:36:51 +08:00
parent 89e81873c4
commit 9f894bb3b3
12 changed files with 330 additions and 274 deletions

View File

@ -37,7 +37,7 @@ from .auth import activate, data_source_bearer_auth, data_source_oauth, forgot_p
from .billing import billing
# Import datasets controllers
from .datasets import data_source, datasets, datasets_document, datasets_segments, file, hit_testing, website
from .datasets import data_source, datasets, datasets_document, datasets_segments, external, file, hit_testing, website
# Import explore controllers
from .explore import (

View File

@ -110,6 +110,26 @@ class DatasetListApi(Resource):
nullable=True,
help="Invalid indexing technique.",
)
parser.add_argument(
"external_api_template_id",
type=str,
nullable=True,
required=False,
)
parser.add_argument(
"provider",
type=str,
nullable=True,
choices=Dataset.PROVIDER_LIST,
required=False,
default="vendor",
)
parser.add_argument(
"external_knowledge_id",
type=str,
nullable=True,
required=False,
)
args = parser.parse_args()
# The role of the current user in the ta table must be admin, owner, or editor, or dataset_operator
@ -123,6 +143,9 @@ class DatasetListApi(Resource):
indexing_technique=args["indexing_technique"],
account=current_user,
permission=DatasetPermissionEnum.ONLY_ME,
provider=args["provider"],
external_api_template_id=args["external_api_template_id"],
external_knowledge_id=args["external_knowledge_id"],
)
except services.errors.dataset.DatasetNameDuplicateError:
raise DatasetNameDuplicateError()

View File

@ -1,69 +1,49 @@
import flask_restful
from flask import request
from flask_login import current_user
from flask_restful import Resource, marshal, marshal_with, reqparse
from flask_restful import Resource, marshal, reqparse
from werkzeug.exceptions import Forbidden, NotFound
import services
from configs import dify_config
from controllers.console import api
from controllers.console.apikey import api_key_fields, api_key_list
from controllers.console.app.error import ProviderNotInitializeError
from controllers.console.datasets.error import DatasetInUseError, DatasetNameDuplicateError, IndexingEstimateError
from controllers.console.datasets.error import DatasetNameDuplicateError
from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required
from core.errors.error import LLMBadRequestError, ProviderTokenNotInitError
from core.indexing_runner import IndexingRunner
from core.model_runtime.entities.model_entities import ModelType
from core.provider_manager import ProviderManager
from core.rag.datasource.vdb.vector_type import VectorType
from core.rag.extractor.entity.extract_setting import ExtractSetting
from core.rag.retrieval.retrival_methods import RetrievalMethod
from extensions.ext_database import db
from fields.app_fields import related_app_list
from fields.dataset_fields import dataset_detail_fields, dataset_query_detail_fields
from fields.document_fields import document_status_fields
from fields.dataset_fields import dataset_detail_fields
from libs.login import login_required
from models.dataset import Dataset, Document, DocumentSegment
from models.model import ApiToken, UploadFile
from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService
from services.external_knowledge_service import ExternalDatasetService
def _validate_name(name):
if not name or len(name) < 1 or len(name) > 100:
raise ValueError('Name must be between 1 to 100 characters.')
raise ValueError("Name must be between 1 to 100 characters.")
return name
def _validate_description_length(description):
if len(description) > 400:
raise ValueError('Description cannot exceed 400 characters.')
raise ValueError("Description cannot exceed 400 characters.")
return description
class ExternalApiTemplateListApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self):
page = request.args.get('page', default=1, type=int)
limit = request.args.get('limit', default=20, type=int)
search = request.args.get('keyword', default=None, type=str)
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=20, type=int)
search = request.args.get("keyword", default=None, type=str)
api_templates, total = ExternalDatasetService.get_external_api_templates(
page,
limit,
current_user.current_tenant_id,
search
page, limit, current_user.current_tenant_id, search
)
response = {
'data': [item.to_dict() for item in api_templates],
'has_more': len(api_templates) == limit,
'limit': limit,
'total': total,
'page': page
"data": [item.to_dict() for item in api_templates],
"has_more": len(api_templates) == limit,
"limit": limit,
"total": total,
"page": page,
}
return response, 200
@ -72,18 +52,30 @@ class ExternalApiTemplateListApi(Resource):
@account_initialization_required
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('name', nullable=False, required=True,
help='Name is required. Name must be between 1 to 100 characters.',
type=_validate_name)
parser.add_argument('description', nullable=False, required=True,
help='Description is required. Description must be between 1 to 400 characters.',
type=_validate_description_length)
parser.add_argument('settings', type=list, location='json',
nullable=False,
required=True, )
parser.add_argument(
"name",
nullable=False,
required=True,
help="Name is required. Name must be between 1 to 100 characters.",
type=_validate_name,
)
parser.add_argument(
"description",
nullable=False,
required=True,
help="Description is required. Description must be between 1 to 400 characters.",
type=_validate_description_length,
)
parser.add_argument(
"settings",
type=list,
location="json",
nullable=False,
required=True,
)
args = parser.parse_args()
ExternalDatasetService.validate_api_list(args['settings'])
ExternalDatasetService.validate_api_list(args["settings"])
# The role of the current user in the ta table must be admin, owner, or editor, or dataset_operator
if not current_user.is_dataset_editor:
@ -91,9 +83,7 @@ class ExternalApiTemplateListApi(Resource):
try:
api_template = ExternalDatasetService.create_api_template(
tenant_id=current_user.current_tenant_id,
user_id=current_user.id,
args=args
tenant_id=current_user.current_tenant_id, user_id=current_user.id, args=args
)
except services.errors.dataset.DatasetNameDuplicateError:
raise DatasetNameDuplicateError()
@ -120,23 +110,35 @@ class ExternalApiTemplateApi(Resource):
api_template_id = str(api_template_id)
parser = reqparse.RequestParser()
parser.add_argument('name', nullable=False, required=True,
help='type is required. Name must be between 1 to 100 characters.',
type=_validate_name)
parser.add_argument('description', nullable=False, required=True,
help='description is required. Description must be between 1 to 400 characters.',
type=_validate_description_length)
parser.add_argument('settings', type=list, location='json',
nullable=False,
required=True, )
parser.add_argument(
"name",
nullable=False,
required=True,
help="type is required. Name must be between 1 to 100 characters.",
type=_validate_name,
)
parser.add_argument(
"description",
nullable=False,
required=True,
help="description is required. Description must be between 1 to 400 characters.",
type=_validate_description_length,
)
parser.add_argument(
"settings",
type=list,
location="json",
nullable=False,
required=True,
)
args = parser.parse_args()
ExternalDatasetService.validate_api_list(args['settings'])
ExternalDatasetService.validate_api_list(args["settings"])
api_template = ExternalDatasetService.update_api_template(
tenant_id=current_user.current_tenant_id,
user_id=current_user.id,
api_template_id=api_template_id,
args=args
args=args,
)
return api_template.to_dict(), 200
@ -152,7 +154,7 @@ class ExternalApiTemplateApi(Resource):
raise Forbidden()
ExternalDatasetService.delete_api_template(current_user.current_tenant_id, api_template_id)
return {'result': 'success'}, 204
return {"result": "success"}, 204
class ExternalApiUseCheckApi(Resource):
@ -163,11 +165,10 @@ class ExternalApiUseCheckApi(Resource):
api_template_id = str(api_template_id)
external_api_template_is_using = ExternalDatasetService.external_api_template_use_check(api_template_id)
return {'is_using': external_api_template_is_using}, 200
return {"is_using": external_api_template_is_using}, 200
class ExternalDatasetInitApi(Resource):
@setup_required
@login_required
@account_initialization_required
@ -177,13 +178,13 @@ class ExternalDatasetInitApi(Resource):
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument('api_template_id', type=str, required=True, nullable=True, location='json')
parser.add_argument("api_template_id", type=str, required=True, nullable=True, location="json")
# parser.add_argument('name', nullable=False, required=True,
# help='name is required. Name must be between 1 to 100 characters.',
# type=_validate_name)
# parser.add_argument('description', type=str, required=True, nullable=True, location='json')
parser.add_argument('data_source', type=dict, required=True, nullable=True, location='json')
parser.add_argument('process_parameter', type=dict, required=True, nullable=True, location='json')
parser.add_argument("data_source", type=dict, required=True, nullable=True, location="json")
parser.add_argument("process_parameter", type=dict, required=True, nullable=True, location="json")
args = parser.parse_args()
@ -193,9 +194,7 @@ class ExternalDatasetInitApi(Resource):
# validate args
ExternalDatasetService.document_create_args_validate(
current_user.current_tenant_id,
args['api_template_id'],
args['process_parameter']
current_user.current_tenant_id, args["api_template_id"], args["process_parameter"]
)
try:
@ -206,17 +205,12 @@ class ExternalDatasetInitApi(Resource):
)
except Exception as ex:
raise ProviderNotInitializeError(ex.description)
response = {
'dataset': dataset,
'documents': documents,
'batch': batch
}
response = {"dataset": dataset, "documents": documents, "batch": batch}
return response
class ExternalDatasetCreateApi(Resource):
@setup_required
@login_required
@account_initialization_required
@ -226,12 +220,16 @@ class ExternalDatasetCreateApi(Resource):
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument('api_template_id', type=str, required=True, nullable=False, location='json')
parser.add_argument('external_knowledge_id', type=str, required=True, nullable=False, location='json')
parser.add_argument('name', nullable=False, required=True,
help='name is required. Name must be between 1 to 100 characters.',
type=_validate_name)
parser.add_argument('description', type=str, required=True, nullable=True, location='json')
parser.add_argument("external_api_template_id", type=str, required=True, nullable=False, location="json")
parser.add_argument("external_knowledge_id", type=str, required=True, nullable=False, location="json")
parser.add_argument(
"name",
nullable=False,
required=True,
help="name is required. Name must be between 1 to 100 characters.",
type=_validate_name,
)
parser.add_argument("description", type=str, required=True, nullable=True, location="json")
args = parser.parse_args()
@ -251,6 +249,6 @@ class ExternalDatasetCreateApi(Resource):
return marshal(dataset, dataset_detail_fields), 201
api.add_resource(ExternalApiTemplateListApi, '/datasets/external-api-template')
api.add_resource(ExternalApiTemplateApi, '/datasets/external-api-template/<uuid:api_template_id>')
api.add_resource(ExternalApiUseCheckApi, '/datasets/external-api-template/<uuid:api_template_id>/use-check')
api.add_resource(ExternalApiTemplateListApi, "/datasets/external-api-template")
api.add_resource(ExternalApiTemplateApi, "/datasets/external-api-template/<uuid:api_template_id>")
api.add_resource(ExternalApiUseCheckApi, "/datasets/external-api-template/<uuid:api_template_id>/use-check")

View File

@ -82,6 +82,26 @@ class DatasetListApi(DatasetApiResource):
required=False,
nullable=False,
)
parser.add_argument(
"external_api_template_id",
type=str,
nullable=True,
required=False,
default="_validate_name",
)
parser.add_argument(
"provider",
type=str,
nullable=True,
required=False,
default="vendor",
)
parser.add_argument(
"external_knowledge_id",
type=str,
nullable=True,
required=False,
)
args = parser.parse_args()
try:
@ -91,6 +111,9 @@ class DatasetListApi(DatasetApiResource):
indexing_technique=args["indexing_technique"],
account=current_user,
permission=args["permission"],
provider=args["provider"],
external_api_template_id=args["external_api_template_id"],
external_knowledge_id=args["external_knowledge_id"],
)
except services.errors.dataset.DatasetNameDuplicateError:
raise DatasetNameDuplicateError()

View File

@ -38,7 +38,7 @@ class RetrievalService:
if not dataset:
return []
if provider == 'external':
external_knowledge_binding = ExternalDatasetService.fetch_external_knowledge_retrival(
all_documents = ExternalDatasetService.fetch_external_knowledge_retrival(
dataset.tenant_id,
dataset_id,
query,

View File

@ -37,6 +37,7 @@ class Dataset(db.Model):
)
INDEXING_TECHNIQUE_LIST = ['high_quality', 'economy', None]
PROVIDER_LIST = ['vendor', 'external', None]
id = db.Column(StringUUID, server_default=db.text('uuid_generate_v4()'))
tenant_id = db.Column(StringUUID, nullable=False)

View File

@ -12,9 +12,9 @@ from extensions.ext_database import db
from models.dataset import Dataset, DatasetQuery, Document
@app.celery.task(queue='dataset')
@app.celery.task(queue="dataset")
def clean_unused_message_task():
click.echo(click.style('Start clean unused messages .', fg='green'))
click.echo(click.style("Start clean unused messages .", fg="green"))
clean_days = int(dify_config.CLEAN_DAY_SETTING)
start_at = time.perf_counter()
thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
@ -22,40 +22,44 @@ def clean_unused_message_task():
while True:
try:
# Subquery for counting new documents
document_subquery_new = db.session.query(
Document.dataset_id,
func.count(Document.id).label('document_count')
).filter(
Document.indexing_status == 'completed',
Document.enabled == True,
Document.archived == False,
Document.updated_at > thirty_days_ago
).group_by(Document.dataset_id).subquery()
document_subquery_new = (
db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
.filter(
Document.indexing_status == "completed",
Document.enabled == True,
Document.archived == False,
Document.updated_at > thirty_days_ago,
)
.group_by(Document.dataset_id)
.subquery()
)
# Subquery for counting old documents
document_subquery_old = db.session.query(
Document.dataset_id,
func.count(Document.id).label('document_count')
).filter(
Document.indexing_status == 'completed',
Document.enabled == True,
Document.archived == False,
Document.updated_at < thirty_days_ago
).group_by(Document.dataset_id).subquery()
document_subquery_old = (
db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
.filter(
Document.indexing_status == "completed",
Document.enabled == True,
Document.archived == False,
Document.updated_at < thirty_days_ago,
)
.group_by(Document.dataset_id)
.subquery()
)
# Main query with join and filter
datasets = (db.session.query(Dataset)
.outerjoin(
document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id
).outerjoin(
document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id
).filter(
Dataset.created_at < thirty_days_ago,
func.coalesce(document_subquery_new.c.document_count, 0) == 0,
func.coalesce(document_subquery_old.c.document_count, 0) > 0
).order_by(
Dataset.created_at.desc()
).paginate(page=page, per_page=50))
datasets = (
db.session.query(Dataset)
.outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id)
.outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id)
.filter(
Dataset.created_at < thirty_days_ago,
func.coalesce(document_subquery_new.c.document_count, 0) == 0,
func.coalesce(document_subquery_old.c.document_count, 0) > 0,
)
.order_by(Dataset.created_at.desc())
.paginate(page=page, per_page=50)
)
except NotFound:
break
@ -63,10 +67,11 @@ def clean_unused_message_task():
break
page += 1
for dataset in datasets:
dataset_query = db.session.query(DatasetQuery).filter(
DatasetQuery.created_at > thirty_days_ago,
DatasetQuery.dataset_id == dataset.id
).all()
dataset_query = (
db.session.query(DatasetQuery)
.filter(DatasetQuery.created_at > thirty_days_ago, DatasetQuery.dataset_id == dataset.id)
.all()
)
if not dataset_query or len(dataset_query) == 0:
try:
# remove index
@ -74,17 +79,14 @@ def clean_unused_message_task():
index_processor.clean(dataset, None)
# update document
update_params = {
Document.enabled: False
}
update_params = {Document.enabled: False}
Document.query.filter_by(dataset_id=dataset.id).update(update_params)
db.session.commit()
click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id),
fg='green'))
click.echo(click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green"))
except Exception as e:
click.echo(
click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)),
fg='red'))
click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red")
)
end_at = time.perf_counter()
click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green'))
click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))

View File

@ -32,6 +32,7 @@ from models.dataset import (
DatasetQuery,
Document,
DocumentSegment,
ExternalKnowledgeBindings,
)
from models.model import UploadFile
from models.source import DataSourceOauthBinding
@ -39,6 +40,7 @@ from services.errors.account import NoPermissionError
from services.errors.dataset import DatasetNameDuplicateError
from services.errors.document import DocumentIndexingError
from services.errors.file import FileNotExistsError
from services.external_knowledge_service import ExternalDatasetService
from services.feature_service import FeatureModel, FeatureService
from services.tag_service import TagService
from services.vector_service import VectorService
@ -137,7 +139,14 @@ class DatasetService:
@staticmethod
def create_empty_dataset(
tenant_id: str, name: str, indexing_technique: Optional[str], account: Account, permission: Optional[str] = None
tenant_id: str,
name: str,
indexing_technique: Optional[str],
account: Account,
permission: Optional[str] = None,
provider: str = "vendor",
external_api_template_id: Optional[str] = None,
external_knowledge_id: Optional[str] = None,
):
# check if dataset name already exists
if Dataset.query.filter_by(name=name, tenant_id=tenant_id).first():
@ -156,7 +165,23 @@ class DatasetService:
dataset.embedding_model_provider = embedding_model.provider if embedding_model else None
dataset.embedding_model = embedding_model.model if embedding_model else None
dataset.permission = permission if permission else DatasetPermissionEnum.ONLY_ME
dataset.provider = provider
db.session.add(dataset)
db.session.flush()
if provider == "external" and external_api_template_id:
external_api_template = ExternalDatasetService.get_api_template(external_api_template_id)
if not external_api_template:
raise ValueError("External API template not found.")
external_knowledge_binding = ExternalKnowledgeBindings(
tenant_id=tenant_id,
dataset_id=dataset.id,
external_api_template_id=external_api_template_id,
external_knowledge_id=external_knowledge_id,
created_by=account.id,
)
db.session.add(external_knowledge_binding)
db.session.commit()
return dataset

View File

@ -1,16 +1,16 @@
from typing import Literal, Union, Optional
from typing import Literal, Optional, Union
from pydantic import BaseModel
class AuthorizationConfig(BaseModel):
type: Literal[None, 'basic', 'bearer', 'custom']
type: Literal[None, "basic", "bearer", "custom"]
api_key: Union[None, str] = None
header: Union[None, str] = None
class Authorization(BaseModel):
type: Literal['no-auth', 'api-key']
type: Literal["no-auth", "api-key"]
config: Optional[AuthorizationConfig] = None

View File

@ -3,59 +3,45 @@ import random
import time
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any, Union, Optional
from typing import Any, Optional, Union
import httpx
from core.helper import ssrf_proxy
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs import helper
from models.account import Account, TenantAccountRole
from models.dataset import (
AppDatasetJoin,
Dataset,
DatasetCollectionBinding,
DatasetPermission,
DatasetProcessRule,
DatasetQuery,
Document,
DocumentSegment, ExternalApiTemplates, ExternalKnowledgeBindings,
ExternalApiTemplates,
ExternalKnowledgeBindings,
)
from models.model import UploadFile
from services.dataset_service import DocumentService
from services.entities.external_knowledge_entities.external_knowledge_entities import Authorization, ApiTemplateSetting
from services.entities.external_knowledge_entities.external_knowledge_entities import ApiTemplateSetting, Authorization
from services.errors.dataset import DatasetNameDuplicateError
from tasks.external_document_indexing_task import external_document_indexing_task
# from tasks.external_document_indexing_task import external_document_indexing_task
class ExternalDatasetService:
@staticmethod
def get_external_api_templates(page, per_page, tenant_id, search=None) -> tuple[list[ExternalApiTemplates], int]:
query = ExternalApiTemplates.query.filter(ExternalApiTemplates.tenant_id == tenant_id).order_by(
ExternalApiTemplates.created_at.desc()
)
if search:
query = query.filter(ExternalApiTemplates.name.ilike(f'%{search}%'))
query = query.filter(ExternalApiTemplates.name.ilike(f"%{search}%"))
api_templates = query.paginate(
page=page,
per_page=per_page,
max_per_page=100,
error_out=False
)
api_templates = query.paginate(page=page, per_page=per_page, max_per_page=100, error_out=False)
return api_templates.items, api_templates.total
@classmethod
def validate_api_list(cls, api_settings: dict):
if not api_settings:
raise ValueError('api list is empty')
if 'endpoint' not in api_settings and not api_settings['endpoint']:
raise ValueError('endpoint is required')
if 'api_key' not in api_settings and not api_settings['api_key']:
raise ValueError('api_key is required')
raise ValueError("api list is empty")
if "endpoint" not in api_settings and not api_settings["endpoint"]:
raise ValueError("endpoint is required")
if "api_key" not in api_settings and not api_settings["api_key"]:
raise ValueError("api_key is required")
@staticmethod
def create_api_template(tenant_id: str, user_id: str, args: dict) -> ExternalApiTemplates:
@ -63,9 +49,9 @@ class ExternalDatasetService:
tenant_id=tenant_id,
created_by=user_id,
updated_by=user_id,
name=args.get('name'),
description=args.get('description', ''),
settings=json.dumps(args.get('settings'), ensure_ascii=False),
name=args.get("name"),
description=args.get("description", ""),
settings=json.dumps(args.get("settings"), ensure_ascii=False),
)
db.session.add(api_template)
@ -74,22 +60,17 @@ class ExternalDatasetService:
@staticmethod
def get_api_template(api_template_id: str) -> ExternalApiTemplates:
return ExternalApiTemplates.query.filter_by(
id=api_template_id
).first()
return ExternalApiTemplates.query.filter_by(id=api_template_id).first()
@staticmethod
def update_api_template(tenant_id, user_id, api_template_id, args) -> ExternalApiTemplates:
api_template = ExternalApiTemplates.query.filter_by(
id=api_template_id,
tenant_id=tenant_id
).first()
api_template = ExternalApiTemplates.query.filter_by(id=api_template_id, tenant_id=tenant_id).first()
if api_template is None:
raise ValueError('api template not found')
raise ValueError("api template not found")
api_template.name = args.get('name')
api_template.description = args.get('description', '')
api_template.settings = json.dumps(args.get('settings'), ensure_ascii=False)
api_template.name = args.get("name")
api_template.description = args.get("description", "")
api_template.settings = json.dumps(args.get("settings"), ensure_ascii=False)
api_template.updated_by = user_id
api_template.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
db.session.commit()
@ -98,21 +79,16 @@ class ExternalDatasetService:
@staticmethod
def delete_api_template(tenant_id: str, api_template_id: str):
api_template = ExternalApiTemplates.query.filter_by(
id=api_template_id,
tenant_id=tenant_id
).first()
api_template = ExternalApiTemplates.query.filter_by(id=api_template_id, tenant_id=tenant_id).first()
if api_template is None:
raise ValueError('api template not found')
raise ValueError("api template not found")
db.session.delete(api_template)
db.session.commit()
@staticmethod
def external_api_template_use_check(api_template_id: str) -> bool:
count = ExternalKnowledgeBindings.query.filter_by(
external_api_template_id=api_template_id
).count()
count = ExternalKnowledgeBindings.query.filter_by(external_api_template_id=api_template_id).count()
if count > 0:
return True
return False
@ -120,66 +96,63 @@ class ExternalDatasetService:
@staticmethod
def get_external_knowledge_binding_with_dataset_id(tenant_id: str, dataset_id: str) -> ExternalKnowledgeBindings:
external_knowledge_binding = ExternalKnowledgeBindings.query.filter_by(
dataset_id=dataset_id,
tenant_id=tenant_id
dataset_id=dataset_id, tenant_id=tenant_id
).first()
if not external_knowledge_binding:
raise ValueError('external knowledge binding not found')
raise ValueError("external knowledge binding not found")
return external_knowledge_binding
@staticmethod
def document_create_args_validate(tenant_id: str, api_template_id: str, process_parameter: dict):
api_template = ExternalApiTemplates.query.filter_by(
id=api_template_id,
tenant_id=tenant_id
).first()
api_template = ExternalApiTemplates.query.filter_by(id=api_template_id, tenant_id=tenant_id).first()
if api_template is None:
raise ValueError('api template not found')
raise ValueError("api template not found")
settings = json.loads(api_template.settings)
for settings in settings:
custom_parameters = settings.get('document_process_setting')
custom_parameters = settings.get("document_process_setting")
if custom_parameters:
for parameter in custom_parameters:
if parameter.get('required', False) and not process_parameter.get(parameter.get('name')):
if parameter.get("required", False) and not process_parameter.get(parameter.get("name")):
raise ValueError(f'{parameter.get("name")} is required')
@staticmethod
def init_external_dataset(tenant_id: str, user_id: str, args: dict, created_from: str = 'web'):
api_template_id = args.get('api_template_id')
def init_external_dataset(tenant_id: str, user_id: str, args: dict, created_from: str = "web"):
api_template_id = args.get("api_template_id")
data_source = args.get('data_source')
data_source = args.get("data_source")
if data_source is None:
raise ValueError('data source is required')
raise ValueError("data source is required")
process_parameter = args.get('process_parameter')
api_template = ExternalApiTemplates.query.filter_by(
id=api_template_id,
tenant_id=tenant_id
).first()
process_parameter = args.get("process_parameter")
api_template = ExternalApiTemplates.query.filter_by(id=api_template_id, tenant_id=tenant_id).first()
if api_template is None:
raise ValueError('api template not found')
raise ValueError("api template not found")
dataset = Dataset(
tenant_id=tenant_id,
name=args.get('name'),
description=args.get('description', ''),
provider='external',
name=args.get("name"),
description=args.get("description", ""),
provider="external",
created_by=user_id,
)
db.session.add(dataset)
db.session.flush()
position = DocumentService.get_documents_position(dataset.id)
batch = time.strftime('%Y%m%d%H%M%S') + str(random.randint(100000, 999999))
document = Document.query.filter_by(dataset_id=dataset.id).order_by(Document.position.desc()).first()
position = document.position + 1 if document else 1
batch = time.strftime("%Y%m%d%H%M%S") + str(random.randint(100000, 999999))
document_ids = []
if data_source["type"] == "upload_file":
upload_file_list = data_source["info_list"]['file_info_list']['file_ids']
upload_file_list = data_source["info_list"]["file_info_list"]["file_ids"]
for file_id in upload_file_list:
file = db.session.query(UploadFile).filter(
UploadFile.tenant_id == dataset.tenant_id,
UploadFile.id == file_id
).first()
file = (
db.session.query(UploadFile)
.filter(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id)
.first()
)
if file:
data_source_info = {
"upload_file_id": file_id,
@ -200,21 +173,20 @@ class ExternalDatasetService:
db.session.flush()
document_ids.append(document.id)
db.session.commit()
external_document_indexing_task.delay(dataset.id, api_template_id, data_source, process_parameter)
#external_document_indexing_task.delay(dataset.id, api_template_id, data_source, process_parameter)
return dataset
@staticmethod
def process_external_api(settings: ApiTemplateSetting,
files: Union[None, dict[str, Any]]) -> httpx.Response:
def process_external_api(settings: ApiTemplateSetting, files: Union[None, dict[str, Any]]) -> httpx.Response:
"""
do http request depending on api bundle
"""
kwargs = {
'url': settings.url,
'headers': settings.headers,
'follow_redirects': True,
"url": settings.url,
"headers": settings.headers,
"follow_redirects": True,
}
response = getattr(ssrf_proxy, settings.request_method)(data=settings.params, files=files, **kwargs)
@ -228,21 +200,21 @@ class ExternalDatasetService:
headers = deepcopy(headers)
else:
headers = {}
if authorization.type == 'api-key':
if authorization.type == "api-key":
if authorization.config is None:
raise ValueError('authorization config is required')
raise ValueError("authorization config is required")
if authorization.config.api_key is None:
raise ValueError('api_key is required')
raise ValueError("api_key is required")
if not authorization.config.header:
authorization.config.header = 'Authorization'
authorization.config.header = "Authorization"
if authorization.config.type == 'bearer':
headers[authorization.config.header] = f'Bearer {authorization.config.api_key}'
elif authorization.config.type == 'basic':
headers[authorization.config.header] = f'Basic {authorization.config.api_key}'
elif authorization.config.type == 'custom':
if authorization.config.type == "bearer":
headers[authorization.config.header] = f"Bearer {authorization.config.api_key}"
elif authorization.config.type == "basic":
headers[authorization.config.header] = f"Basic {authorization.config.api_key}"
elif authorization.config.type == "custom":
headers[authorization.config.header] = authorization.config.api_key
return headers
@ -254,21 +226,20 @@ class ExternalDatasetService:
@staticmethod
def create_external_dataset(tenant_id: str, user_id: str, args: dict) -> Dataset:
# check if dataset name already exists
if Dataset.query.filter_by(name=args.get('name'), tenant_id=tenant_id).first():
if Dataset.query.filter_by(name=args.get("name"), tenant_id=tenant_id).first():
raise DatasetNameDuplicateError(f"Dataset with name {args.get('name')} already exists.")
api_template = ExternalApiTemplates.query.filter_by(
id=args.get('api_template_id'),
tenant_id=tenant_id
id=args.get("external_api_template_id"), tenant_id=tenant_id
).first()
if api_template is None:
raise ValueError('api template not found')
raise ValueError("api template not found")
dataset = Dataset(
tenant_id=tenant_id,
name=args.get('name'),
description=args.get('description', ''),
provider='external',
name=args.get("name"),
description=args.get("description", ""),
provider="external",
created_by=user_id,
)
@ -278,8 +249,8 @@ class ExternalDatasetService:
external_knowledge_binding = ExternalKnowledgeBindings(
tenant_id=tenant_id,
dataset_id=dataset.id,
external_api_template_id=args.get('api_template_id'),
external_knowledge_id=args.get('external_knowledge_id'),
external_api_template_id=args.get("external_api_template_id"),
external_knowledge_id=args.get("external_knowledge_id"),
created_by=user_id,
)
db.session.add(external_knowledge_binding)
@ -289,36 +260,32 @@ class ExternalDatasetService:
return dataset
@staticmethod
def fetch_external_knowledge_retrival(tenant_id: str,
dataset_id: str,
query: str,
external_retrival_parameters: dict):
def fetch_external_knowledge_retrival(
tenant_id: str, dataset_id: str, query: str, external_retrival_parameters: dict
):
external_knowledge_binding = ExternalKnowledgeBindings.query.filter_by(
dataset_id=dataset_id,
tenant_id=tenant_id
dataset_id=dataset_id, tenant_id=tenant_id
).first()
if not external_knowledge_binding:
raise ValueError('external knowledge binding not found')
raise ValueError("external knowledge binding not found")
external_api_template = ExternalApiTemplates.query.filter_by(
id=external_knowledge_binding.external_api_template_id
).first()
if not external_api_template:
raise ValueError('external api template not found')
raise ValueError("external api template not found")
settings = json.loads(external_api_template.settings)
headers = {}
if settings.get('api_token'):
headers['Authorization'] = f"Bearer {settings.get('api_token')}"
if settings.get("api_token"):
headers["Authorization"] = f"Bearer {settings.get('api_token')}"
external_retrival_parameters['query'] = query
external_retrival_parameters["query"] = query
api_template_setting = {
'url': f"{settings.get('endpoint')}/dify/external-knowledge/retrival-documents",
'request_method': 'post',
'headers': settings.get('headers'),
'params': external_retrival_parameters
"url": f"{settings.get('endpoint')}/dify/external-knowledge/retrival-documents",
"request_method": "post",
"headers": settings.get("headers"),
"params": external_retrival_parameters,
}
response = ExternalDatasetService.process_external_api(
ApiTemplateSetting(**api_template_setting), None
)
response = ExternalDatasetService.process_external_api(ApiTemplateSetting(**api_template_setting), None)

View File

@ -19,8 +19,15 @@ default_retrieval_model = {
class HitTestingService:
@classmethod
def retrieve(cls, dataset: Dataset, query: str, account: Account,
retrieval_model: dict, external_retrieval_model: dict, limit: int = 10) -> dict:
def retrieve(
cls,
dataset: Dataset,
query: str,
account: Account,
retrieval_model: dict,
external_retrieval_model: dict,
limit: int = 10,
) -> dict:
if dataset.available_document_count == 0 or dataset.available_segment_count == 0:
return {
"query": {

View File

@ -1,4 +1,3 @@
import datetime
import json
import logging
import time
@ -6,17 +5,15 @@ import time
import click
from celery import shared_task
from configs import dify_config
from core.indexing_runner import DocumentIsPausedException, IndexingRunner
from core.indexing_runner import DocumentIsPausedException
from extensions.ext_database import db
from extensions.ext_storage import storage
from models.dataset import Dataset, Document, ExternalApiTemplates
from models.dataset import Dataset, ExternalApiTemplates
from models.model import UploadFile
from services.external_knowledge_service import ExternalDatasetService
from services.feature_service import FeatureService
@shared_task(queue='dataset')
@shared_task(queue="dataset")
def external_document_indexing_task(dataset_id: str, api_template_id: str, data_source: dict, process_parameter: dict):
"""
Async process document
@ -30,26 +27,35 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if not dataset:
logging.info(click.style('Processed external dataset: {} failed, dataset not exit.'.format(dataset_id), fg='red'))
logging.info(
click.style("Processed external dataset: {} failed, dataset not exit.".format(dataset_id), fg="red")
)
return
# get external api template
api_template = db.session.query(ExternalApiTemplates).filter(
ExternalApiTemplates.id == api_template_id,
ExternalApiTemplates.tenant_id == dataset.tenant_id
).first()
api_template = (
db.session.query(ExternalApiTemplates)
.filter(ExternalApiTemplates.id == api_template_id, ExternalApiTemplates.tenant_id == dataset.tenant_id)
.first()
)
if not api_template:
logging.info(click.style('Processed external dataset: {} failed, api template: {} not exit.'.format(dataset_id, api_template_id), fg='red'))
logging.info(
click.style(
"Processed external dataset: {} failed, api template: {} not exit.".format(dataset_id, api_template_id),
fg="red",
)
)
return
files = {}
if data_source["type"] == "upload_file":
upload_file_list = data_source["info_list"]['file_info_list']['file_ids']
upload_file_list = data_source["info_list"]["file_info_list"]["file_ids"]
for file_id in upload_file_list:
file = db.session.query(UploadFile).filter(
UploadFile.tenant_id == dataset.tenant_id,
UploadFile.id == file_id
).first()
file = (
db.session.query(UploadFile)
.filter(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id)
.first()
)
if file:
files[file.id] = (file.name, storage.load_once(file.key), file.mime_type)
try:
@ -59,7 +65,7 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_
# do http request
response = ExternalDatasetService.process_external_api(settings, headers, process_parameter, files)
job_id = response.json().get('job_id')
job_id = response.json().get("job_id")
if job_id:
# save job_id to dataset
dataset.job_id = job_id
@ -67,9 +73,13 @@ def external_document_indexing_task(dataset_id: str, api_template_id: str, data_
end_at = time.perf_counter()
logging.info(
click.style('Processed external dataset: {} successful, latency: {}'.format(dataset.id, end_at - start_at), fg='green'))
click.style(
"Processed external dataset: {} successful, latency: {}".format(dataset.id, end_at - start_at),
fg="green",
)
)
except DocumentIsPausedException as ex:
logging.info(click.style(str(ex), fg='yellow'))
logging.info(click.style(str(ex), fg="yellow"))
except Exception:
pass