From ffa8e4ccd1b47321214d47e642fc76079dfb4633 Mon Sep 17 00:00:00 2001 From: Jyong <718720800@qq.com> Date: Tue, 16 May 2023 00:33:21 +0800 Subject: [PATCH] add re_segment document param check --- .../console/datasets/datasets_document.py | 8 +- api/services/dataset_service.py | 136 +++++++++--------- api/tasks/document_indexing_update_task.py | 3 +- 3 files changed, 73 insertions(+), 74 deletions(-) diff --git a/api/controllers/console/datasets/datasets_document.py b/api/controllers/console/datasets/datasets_document.py index 11bdf89add..a6042c0668 100644 --- a/api/controllers/console/datasets/datasets_document.py +++ b/api/controllers/console/datasets/datasets_document.py @@ -207,8 +207,8 @@ class DatasetDocumentListApi(Resource): parser = reqparse.RequestParser() parser.add_argument('indexing_technique', type=str, choices=Dataset.INDEXING_TECHNIQUE_LIST, nullable=False, location='json') - parser.add_argument('data_source', type=dict, required=True, nullable=True, location='json') - parser.add_argument('process_rule', type=dict, required=True, nullable=True, location='json') + parser.add_argument('data_source', type=dict, required=False, location='json') + parser.add_argument('process_rule', type=dict, required=False, location='json') parser.add_argument('duplicate', type=bool, nullable=False, location='json') parser.add_argument('original_document_id', type=str, required=False, location='json') args = parser.parse_args() @@ -245,8 +245,8 @@ class DatasetInitApi(Resource): parser = reqparse.RequestParser() parser.add_argument('indexing_technique', type=str, choices=Dataset.INDEXING_TECHNIQUE_LIST, required=True, nullable=False, location='json') - parser.add_argument('data_source', type=dict, required=False, location='json') - parser.add_argument('process_rule', type=dict, required=False, location='json') + parser.add_argument('data_source', type=dict, required=True, nullable=True, location='json') + parser.add_argument('process_rule', type=dict, required=True, nullable=True, location='json') args = parser.parse_args() # validate args diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 9b6cfff716..770b087b20 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -18,8 +18,8 @@ from services.errors.account import NoPermissionError from services.errors.dataset import DatasetNameDuplicateError from services.errors.document import DocumentIndexingError from services.errors.file import FileNotExistsError -from tasks import document_indexing_update_task from tasks.document_indexing_task import document_indexing_task +from tasks.document_indexing_update_task import document_indexing_update_task class DatasetService: @@ -358,68 +358,68 @@ class DocumentService: if dataset.indexing_technique == 'high_quality': IndexBuilder.get_default_service_context(dataset.tenant_id) - if document_data["original_document_id"]: - DocumentService.update_document_with_dataset_id(dataset, document_data, account) - # save process rule - if not dataset_process_rule: - process_rule = document_data["process_rule"] - if process_rule["mode"] == "custom": - dataset_process_rule = DatasetProcessRule( - dataset_id=dataset.id, - mode=process_rule["mode"], - rules=json.dumps(process_rule["rules"]), - created_by=account.id - ) - elif process_rule["mode"] == "automatic": - dataset_process_rule = DatasetProcessRule( - dataset_id=dataset.id, - mode=process_rule["mode"], - rules=json.dumps(DatasetProcessRule.AUTOMATIC_RULES), - created_by=account.id - ) - db.session.add(dataset_process_rule) + if 'original_document_id' in document_data and document_data["original_document_id"]: + document = DocumentService.update_document_with_dataset_id(dataset, document_data, account) + else: + # save process rule + if not dataset_process_rule: + process_rule = document_data["process_rule"] + if process_rule["mode"] == "custom": + dataset_process_rule = DatasetProcessRule( + dataset_id=dataset.id, + mode=process_rule["mode"], + rules=json.dumps(process_rule["rules"]), + created_by=account.id + ) + elif process_rule["mode"] == "automatic": + dataset_process_rule = DatasetProcessRule( + dataset_id=dataset.id, + mode=process_rule["mode"], + rules=json.dumps(DatasetProcessRule.AUTOMATIC_RULES), + created_by=account.id + ) + db.session.add(dataset_process_rule) + db.session.commit() + + file_name = '' + data_source_info = {} + if document_data["data_source"]["type"] == "upload_file": + file_id = document_data["data_source"]["info"] + file = db.session.query(UploadFile).filter( + UploadFile.tenant_id == dataset.tenant_id, + UploadFile.id == file_id + ).first() + + # raise error if file not found + if not file: + raise FileNotExistsError() + + file_name = file.name + data_source_info = { + "upload_file_id": file_id, + } + + # save document + position = DocumentService.get_documents_position(dataset.id) + document = Document( + tenant_id=dataset.tenant_id, + dataset_id=dataset.id, + position=position, + data_source_type=document_data["data_source"]["type"], + data_source_info=json.dumps(data_source_info), + dataset_process_rule_id=dataset_process_rule.id, + batch=time.strftime('%Y%m%d%H%M%S') + str(random.randint(100000, 999999)), + name=file_name, + created_from=created_from, + created_by=account.id, + # created_api_request_id = db.Column(UUID, nullable=True) + ) + + db.session.add(document) db.session.commit() - file_name = '' - data_source_info = {} - if document_data["data_source"]["type"] == "upload_file": - file_id = document_data["data_source"]["info"] - file = db.session.query(UploadFile).filter( - UploadFile.tenant_id == dataset.tenant_id, - UploadFile.id == file_id - ).first() - - # raise error if file not found - if not file: - raise FileNotExistsError() - - file_name = file.name - data_source_info = { - "upload_file_id": file_id, - } - - # save document - position = DocumentService.get_documents_position(dataset.id) - document = Document( - tenant_id=dataset.tenant_id, - dataset_id=dataset.id, - position=position, - data_source_type=document_data["data_source"]["type"], - data_source_info=json.dumps(data_source_info), - dataset_process_rule_id=dataset_process_rule.id, - batch=time.strftime('%Y%m%d%H%M%S') + str(random.randint(100000, 999999)), - name=file_name, - created_from=created_from, - created_by=account.id, - # created_api_request_id = db.Column(UUID, nullable=True) - ) - - db.session.add(document) - db.session.commit() - - # trigger async task - document_indexing_task.delay(document.dataset_id, document.id) - + # trigger async task + document_indexing_task.delay(document.dataset_id, document.id) return document @staticmethod @@ -430,7 +430,7 @@ class DocumentService: if document.display_status != 'available': raise ValueError("Document is not available") # save process rule - if 'process_rule' in document_data or document_data['process_rule']: + if 'process_rule' in document_data and document_data['process_rule']: process_rule = document_data["process_rule"] if process_rule["mode"] == "custom": dataset_process_rule = DatasetProcessRule( @@ -450,7 +450,7 @@ class DocumentService: db.session.commit() document.dataset_process_rule_id = dataset_process_rule.id # update document data source - if 'data_source' in document_data or document_data['data_source']: + if 'data_source' in document_data and document_data['data_source']: file_name = '' data_source_info = {} if document_data["data_source"]["type"] == "upload_file": @@ -513,17 +513,17 @@ class DocumentService: @classmethod def document_create_args_validate(cls, args: dict): - if 'original_document_id ' not in args or not args['original_document_id']: + if 'original_document_id' not in args or not args['original_document_id']: DocumentService.data_source_args_validate(args) DocumentService.process_rule_args_validate(args) else: - if ('data_source' not in args or not args['data_source']) and ( - 'process_rule' not in args or not args['process_rule']): + if ('data_source' not in args and not args['data_source'])\ + and ('process_rule' not in args and not args['process_rule']): raise ValueError("Data source or Process rule is required") else: - if 'data_source' in args or args['data_source']: + if 'data_source' in args and args['data_source']: DocumentService.data_source_args_validate(args) - elif 'process_rule' in args or args['process_rule']: + if 'process_rule' in args and args['process_rule']: DocumentService.process_rule_args_validate(args) @classmethod diff --git a/api/tasks/document_indexing_update_task.py b/api/tasks/document_indexing_update_task.py index 8ba8a8fc26..493c05505a 100644 --- a/api/tasks/document_indexing_update_task.py +++ b/api/tasks/document_indexing_update_task.py @@ -17,7 +17,7 @@ from models.dataset import Document, Dataset, DocumentSegment @shared_task def document_indexing_update_task(dataset_id: str, document_id: str): """ - Async process document + Async update document :param dataset_id: :param document_id: @@ -65,7 +65,6 @@ def document_indexing_update_task(dataset_id: str, document_id: str): click.style('Cleaned document when document update data source or process rule: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) except Exception: logging.exception("Cleaned document when document update data source or process rule failed") - # start document re_segment try: indexing_runner = IndexingRunner() indexing_runner.run(document)