diff --git a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py index c70343ec95..d00be3a573 100644 --- a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py +++ b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py @@ -62,7 +62,7 @@ class DraftRagPipelineApi(Resource): Get draft rag pipeline's workflow """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() # fetch draft workflow by app_model @@ -84,7 +84,7 @@ class DraftRagPipelineApi(Resource): Sync draft workflow """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() content_type = request.headers.get("Content-Type", "") @@ -119,9 +119,6 @@ class DraftRagPipelineApi(Resource): else: abort(415) - if not isinstance(current_user, Account): - raise Forbidden() - try: environment_variables_list = args.get("environment_variables") or [] environment_variables = [ @@ -161,10 +158,7 @@ class RagPipelineDraftRunIterationNodeApi(Resource): Run draft workflow iteration node """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: - raise Forbidden() - - if not isinstance(current_user, Account): + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() @@ -198,10 +192,7 @@ class RagPipelineDraftRunLoopNodeApi(Resource): Run draft workflow loop node """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: - raise Forbidden() - - if not isinstance(current_user, Account): + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() @@ -235,10 +226,7 @@ class DraftRagPipelineRunApi(Resource): Run draft workflow """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: - raise Forbidden() - - if not isinstance(current_user, Account): + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() @@ -272,10 +260,7 @@ class PublishedRagPipelineRunApi(Resource): Run published workflow """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: - raise Forbidden() - - if not isinstance(current_user, Account): + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() @@ -285,6 +270,7 @@ class PublishedRagPipelineRunApi(Resource): parser.add_argument("start_node_id", type=str, required=True, location="json") parser.add_argument("is_preview", type=bool, required=True, location="json", default=False) parser.add_argument("response_mode", type=str, required=True, location="json", default="streaming") + parser.add_argument("original_document_id", type=str, required=False, location="json") args = parser.parse_args() streaming = args["response_mode"] == "streaming" @@ -394,10 +380,7 @@ class RagPipelinePublishedDatasourceNodeRunApi(Resource): Run rag pipeline datasource """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: - raise Forbidden() - - if not isinstance(current_user, Account): + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() @@ -439,7 +422,7 @@ class RagPipelineDraftDatasourceNodeRunApi(Resource): Run rag pipeline datasource """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() @@ -482,7 +465,7 @@ class RagPipelineDraftNodeRunApi(Resource): Run draft workflow node """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() @@ -514,7 +497,7 @@ class RagPipelineTaskStopApi(Resource): Stop workflow task """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id) @@ -533,7 +516,7 @@ class PublishedRagPipelineApi(Resource): Get published pipeline """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() if not pipeline.is_published: return None @@ -553,7 +536,7 @@ class PublishedRagPipelineApi(Resource): Publish workflow """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() rag_pipeline_service = RagPipelineService() @@ -587,7 +570,7 @@ class DefaultRagPipelineBlockConfigsApi(Resource): Get default block config """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() # Get default block configs @@ -605,10 +588,7 @@ class DefaultRagPipelineBlockConfigApi(Resource): Get default block config """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: - raise Forbidden() - - if not isinstance(current_user, Account): + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() @@ -651,7 +631,7 @@ class PublishedAllRagPipelineApi(Resource): """ Get published workflows """ - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() @@ -700,7 +680,7 @@ class RagPipelineByIdApi(Resource): Update workflow attributes """ # Check permission - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() @@ -756,7 +736,7 @@ class PublishedRagPipelineSecondStepApi(Resource): Get second step parameters of rag pipeline """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() parser.add_argument("node_id", type=str, required=True, location="args") @@ -781,7 +761,7 @@ class PublishedRagPipelineFirstStepApi(Resource): Get first step parameters of rag pipeline """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() parser.add_argument("node_id", type=str, required=True, location="args") @@ -806,7 +786,7 @@ class DraftRagPipelineFirstStepApi(Resource): Get first step parameters of rag pipeline """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() parser.add_argument("node_id", type=str, required=True, location="args") @@ -831,7 +811,7 @@ class DraftRagPipelineSecondStepApi(Resource): Get second step parameters of rag pipeline """ # The role of the current user in the ta table must be admin, owner, or editor - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() parser.add_argument("node_id", type=str, required=True, location="args") @@ -953,7 +933,7 @@ class RagPipelineTransformApi(Resource): if not isinstance(current_user, Account): raise Forbidden() - if not (current_user.is_editor or current_user.is_dataset_operator): + if not (current_user.has_edit_permission or current_user.is_dataset_operator): raise Forbidden() dataset_id = str(dataset_id) @@ -972,7 +952,7 @@ class RagPipelineDatasourceVariableApi(Resource): """ Set datasource variables """ - if not isinstance(current_user, Account) or not current_user.is_editor: + if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() parser = reqparse.RequestParser() diff --git a/api/core/app/apps/pipeline/pipeline_generator.py b/api/core/app/apps/pipeline/pipeline_generator.py index 0e13599c30..c9daead0ba 100644 --- a/api/core/app/apps/pipeline/pipeline_generator.py +++ b/api/core/app/apps/pipeline/pipeline_generator.py @@ -72,7 +72,6 @@ class PipelineGenerator(BaseAppGenerator): call_depth: int, workflow_thread_pool_id: Optional[str], is_retry: bool = False, - document_id: Optional[str] = None, ) -> Mapping[str, Any] | Generator[Mapping | str, None, None] | None: ... @overload @@ -88,7 +87,6 @@ class PipelineGenerator(BaseAppGenerator): call_depth: int, workflow_thread_pool_id: Optional[str], is_retry: bool = False, - document_id: Optional[str] = None, ) -> Mapping[str, Any]: ... @overload @@ -104,7 +102,6 @@ class PipelineGenerator(BaseAppGenerator): call_depth: int, workflow_thread_pool_id: Optional[str], is_retry: bool = False, - document_id: Optional[str] = None, ) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: ... def generate( @@ -119,7 +116,6 @@ class PipelineGenerator(BaseAppGenerator): call_depth: int = 0, workflow_thread_pool_id: Optional[str] = None, is_retry: bool = False, - documents: list[Document] = [], ) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None], None]: # Add null check for dataset @@ -138,7 +134,8 @@ class PipelineGenerator(BaseAppGenerator): pipeline_config = PipelineConfigManager.get_pipeline_config( pipeline=pipeline, workflow=workflow, start_node_id=start_node_id ) - if invoke_from == InvokeFrom.PUBLISHED and not is_retry: + documents: list[Document] = [] + if invoke_from == InvokeFrom.PUBLISHED and not is_retry and not args.get("original_document_id"): from services.dataset_service import DocumentService for datasource_info in datasource_info_list: position = DocumentService.get_documents_position(dataset.id) @@ -162,10 +159,9 @@ class PipelineGenerator(BaseAppGenerator): rag_pipeline_invoke_entities = [] for i, datasource_info in enumerate(datasource_info_list): workflow_run_id = str(uuid.uuid4()) - document_id = None - if documents: - document_id = documents[i].id + document_id = args.get("original_document_id") or None if invoke_from == InvokeFrom.PUBLISHED and not is_retry: + document_id = document_id or documents[i].id document_pipeline_execution_log = DocumentPipelineExecutionLog( document_id=document_id, datasource_type=datasource_type, @@ -184,6 +180,7 @@ class PipelineGenerator(BaseAppGenerator): datasource_type=datasource_type, datasource_info=datasource_info, dataset_id=dataset.id, + original_document_id=args.get("original_document_id"), start_node_id=start_node_id, batch=batch, document_id=document_id, diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index 12506049aa..f2f01d1ee7 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -122,6 +122,7 @@ class PipelineRunner(WorkflowBasedAppRunner): workflow_id=app_config.workflow_id, workflow_execution_id=self.application_generate_entity.workflow_execution_id, document_id=self.application_generate_entity.document_id, + original_document_id=self.application_generate_entity.original_document_id, batch=self.application_generate_entity.batch, dataset_id=self.application_generate_entity.dataset_id, datasource_type=self.application_generate_entity.datasource_type, diff --git a/api/core/app/entities/app_invoke_entities.py b/api/core/app/entities/app_invoke_entities.py index 6ed596bfb8..1c055fe8b6 100644 --- a/api/core/app/entities/app_invoke_entities.py +++ b/api/core/app/entities/app_invoke_entities.py @@ -257,6 +257,7 @@ class RagPipelineGenerateEntity(WorkflowAppGenerateEntity): dataset_id: str batch: str document_id: Optional[str] = None + original_document_id: Optional[str] = None start_node_id: Optional[str] = None class SingleIterationRunEntity(BaseModel): diff --git a/api/core/workflow/enums.py b/api/core/workflow/enums.py index c5be9be02a..00a125660a 100644 --- a/api/core/workflow/enums.py +++ b/api/core/workflow/enums.py @@ -24,6 +24,7 @@ class SystemVariableKey(StrEnum): WORKFLOW_EXECUTION_ID = "workflow_run_id" # RAG Pipeline DOCUMENT_ID = "document_id" + ORIGINAL_DOCUMENT_ID = "original_document_id" BATCH = "batch" DATASET_ID = "dataset_id" DATASOURCE_TYPE = "datasource_type" diff --git a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py index 850ea4a9cf..d970d7480c 100644 --- a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py +++ b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py @@ -4,7 +4,7 @@ import time from collections.abc import Mapping from typing import Any, Optional, cast -from sqlalchemy import func +from sqlalchemy import func, select from core.app.entities.app_invoke_entities import InvokeFrom from core.rag.index_processor.index_processor_factory import IndexProcessorFactory @@ -128,6 +128,8 @@ class KnowledgeIndexNode(Node): document_id = variable_pool.get(["sys", SystemVariableKey.DOCUMENT_ID]) if not document_id: raise KnowledgeIndexNodeError("Document ID is required.") + original_document_id = variable_pool.get(["sys", SystemVariableKey.ORIGINAL_DOCUMENT_ID]) + batch = variable_pool.get(["sys", SystemVariableKey.BATCH]) if not batch: raise KnowledgeIndexNodeError("Batch is required.") @@ -137,6 +139,19 @@ class KnowledgeIndexNode(Node): # chunk nodes by chunk size indexing_start_at = time.perf_counter() index_processor = IndexProcessorFactory(dataset.chunk_structure).init_index_processor() + if original_document_id: + segments = db.session.scalars( + select(DocumentSegment).where(DocumentSegment.document_id == document_id) + ).all() + if segments: + index_node_ids = [segment.index_node_id for segment in segments] + + # delete from vector index + index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True) + + for segment in segments: + db.session.delete(segment) + db.session.commit() index_processor.index(dataset, document, chunks) indexing_end_at = time.perf_counter() document.indexing_latency = indexing_end_at - indexing_start_at diff --git a/api/core/workflow/system_variable.py b/api/core/workflow/system_variable.py index cd3388de7e..6716e745cd 100644 --- a/api/core/workflow/system_variable.py +++ b/api/core/workflow/system_variable.py @@ -44,6 +44,7 @@ class SystemVariable(BaseModel): conversation_id: str | None = None dialogue_count: int | None = None document_id: str | None = None + original_document_id: str | None = None dataset_id: str | None = None batch: str | None = None datasource_type: str | None = None @@ -94,6 +95,8 @@ class SystemVariable(BaseModel): d[SystemVariableKey.DIALOGUE_COUNT] = self.dialogue_count if self.document_id is not None: d[SystemVariableKey.DOCUMENT_ID] = self.document_id + if self.original_document_id is not None: + d[SystemVariableKey.ORIGINAL_DOCUMENT_ID] = self.original_document_id if self.dataset_id is not None: d[SystemVariableKey.DATASET_ID] = self.dataset_id if self.batch is not None: