fix re-chunk document

This commit is contained in:
jyong 2025-09-16 16:05:01 +08:00
parent 610f0414db
commit 05aec66424
7 changed files with 50 additions and 52 deletions

View File

@ -62,7 +62,7 @@ class DraftRagPipelineApi(Resource):
Get draft rag pipeline's workflow
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
# fetch draft workflow by app_model
@ -84,7 +84,7 @@ class DraftRagPipelineApi(Resource):
Sync draft workflow
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
content_type = request.headers.get("Content-Type", "")
@ -119,9 +119,6 @@ class DraftRagPipelineApi(Resource):
else:
abort(415)
if not isinstance(current_user, Account):
raise Forbidden()
try:
environment_variables_list = args.get("environment_variables") or []
environment_variables = [
@ -161,10 +158,7 @@ class RagPipelineDraftRunIterationNodeApi(Resource):
Run draft workflow iteration node
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
raise Forbidden()
if not isinstance(current_user, Account):
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
@ -198,10 +192,7 @@ class RagPipelineDraftRunLoopNodeApi(Resource):
Run draft workflow loop node
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
raise Forbidden()
if not isinstance(current_user, Account):
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
@ -235,10 +226,7 @@ class DraftRagPipelineRunApi(Resource):
Run draft workflow
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
raise Forbidden()
if not isinstance(current_user, Account):
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
@ -272,10 +260,7 @@ class PublishedRagPipelineRunApi(Resource):
Run published workflow
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
raise Forbidden()
if not isinstance(current_user, Account):
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
@ -285,6 +270,7 @@ class PublishedRagPipelineRunApi(Resource):
parser.add_argument("start_node_id", type=str, required=True, location="json")
parser.add_argument("is_preview", type=bool, required=True, location="json", default=False)
parser.add_argument("response_mode", type=str, required=True, location="json", default="streaming")
parser.add_argument("original_document_id", type=str, required=False, location="json")
args = parser.parse_args()
streaming = args["response_mode"] == "streaming"
@ -394,10 +380,7 @@ class RagPipelinePublishedDatasourceNodeRunApi(Resource):
Run rag pipeline datasource
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
raise Forbidden()
if not isinstance(current_user, Account):
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
@ -439,7 +422,7 @@ class RagPipelineDraftDatasourceNodeRunApi(Resource):
Run rag pipeline datasource
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
@ -482,7 +465,7 @@ class RagPipelineDraftNodeRunApi(Resource):
Run draft workflow node
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
@ -514,7 +497,7 @@ class RagPipelineTaskStopApi(Resource):
Stop workflow task
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id)
@ -533,7 +516,7 @@ class PublishedRagPipelineApi(Resource):
Get published pipeline
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
if not pipeline.is_published:
return None
@ -553,7 +536,7 @@ class PublishedRagPipelineApi(Resource):
Publish workflow
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
rag_pipeline_service = RagPipelineService()
@ -587,7 +570,7 @@ class DefaultRagPipelineBlockConfigsApi(Resource):
Get default block config
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
# Get default block configs
@ -605,10 +588,7 @@ class DefaultRagPipelineBlockConfigApi(Resource):
Get default block config
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
raise Forbidden()
if not isinstance(current_user, Account):
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
@ -651,7 +631,7 @@ class PublishedAllRagPipelineApi(Resource):
"""
Get published workflows
"""
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
@ -700,7 +680,7 @@ class RagPipelineByIdApi(Resource):
Update workflow attributes
"""
# Check permission
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
@ -756,7 +736,7 @@ class PublishedRagPipelineSecondStepApi(Resource):
Get second step parameters of rag pipeline
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("node_id", type=str, required=True, location="args")
@ -781,7 +761,7 @@ class PublishedRagPipelineFirstStepApi(Resource):
Get first step parameters of rag pipeline
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("node_id", type=str, required=True, location="args")
@ -806,7 +786,7 @@ class DraftRagPipelineFirstStepApi(Resource):
Get first step parameters of rag pipeline
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("node_id", type=str, required=True, location="args")
@ -831,7 +811,7 @@ class DraftRagPipelineSecondStepApi(Resource):
Get second step parameters of rag pipeline
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("node_id", type=str, required=True, location="args")
@ -953,7 +933,7 @@ class RagPipelineTransformApi(Resource):
if not isinstance(current_user, Account):
raise Forbidden()
if not (current_user.is_editor or current_user.is_dataset_operator):
if not (current_user.has_edit_permission or current_user.is_dataset_operator):
raise Forbidden()
dataset_id = str(dataset_id)
@ -972,7 +952,7 @@ class RagPipelineDatasourceVariableApi(Resource):
"""
Set datasource variables
"""
if not isinstance(current_user, Account) or not current_user.is_editor:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()

View File

@ -72,7 +72,6 @@ class PipelineGenerator(BaseAppGenerator):
call_depth: int,
workflow_thread_pool_id: Optional[str],
is_retry: bool = False,
document_id: Optional[str] = None,
) -> Mapping[str, Any] | Generator[Mapping | str, None, None] | None: ...
@overload
@ -88,7 +87,6 @@ class PipelineGenerator(BaseAppGenerator):
call_depth: int,
workflow_thread_pool_id: Optional[str],
is_retry: bool = False,
document_id: Optional[str] = None,
) -> Mapping[str, Any]: ...
@overload
@ -104,7 +102,6 @@ class PipelineGenerator(BaseAppGenerator):
call_depth: int,
workflow_thread_pool_id: Optional[str],
is_retry: bool = False,
document_id: Optional[str] = None,
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: ...
def generate(
@ -119,7 +116,6 @@ class PipelineGenerator(BaseAppGenerator):
call_depth: int = 0,
workflow_thread_pool_id: Optional[str] = None,
is_retry: bool = False,
documents: list[Document] = [],
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None], None]:
# Add null check for dataset
@ -138,7 +134,8 @@ class PipelineGenerator(BaseAppGenerator):
pipeline_config = PipelineConfigManager.get_pipeline_config(
pipeline=pipeline, workflow=workflow, start_node_id=start_node_id
)
if invoke_from == InvokeFrom.PUBLISHED and not is_retry:
documents: list[Document] = []
if invoke_from == InvokeFrom.PUBLISHED and not is_retry and not args.get("original_document_id"):
from services.dataset_service import DocumentService
for datasource_info in datasource_info_list:
position = DocumentService.get_documents_position(dataset.id)
@ -162,10 +159,9 @@ class PipelineGenerator(BaseAppGenerator):
rag_pipeline_invoke_entities = []
for i, datasource_info in enumerate(datasource_info_list):
workflow_run_id = str(uuid.uuid4())
document_id = None
if documents:
document_id = documents[i].id
document_id = args.get("original_document_id") or None
if invoke_from == InvokeFrom.PUBLISHED and not is_retry:
document_id = document_id or documents[i].id
document_pipeline_execution_log = DocumentPipelineExecutionLog(
document_id=document_id,
datasource_type=datasource_type,
@ -184,6 +180,7 @@ class PipelineGenerator(BaseAppGenerator):
datasource_type=datasource_type,
datasource_info=datasource_info,
dataset_id=dataset.id,
original_document_id=args.get("original_document_id"),
start_node_id=start_node_id,
batch=batch,
document_id=document_id,

View File

@ -122,6 +122,7 @@ class PipelineRunner(WorkflowBasedAppRunner):
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
document_id=self.application_generate_entity.document_id,
original_document_id=self.application_generate_entity.original_document_id,
batch=self.application_generate_entity.batch,
dataset_id=self.application_generate_entity.dataset_id,
datasource_type=self.application_generate_entity.datasource_type,

View File

@ -257,6 +257,7 @@ class RagPipelineGenerateEntity(WorkflowAppGenerateEntity):
dataset_id: str
batch: str
document_id: Optional[str] = None
original_document_id: Optional[str] = None
start_node_id: Optional[str] = None
class SingleIterationRunEntity(BaseModel):

View File

@ -24,6 +24,7 @@ class SystemVariableKey(StrEnum):
WORKFLOW_EXECUTION_ID = "workflow_run_id"
# RAG Pipeline
DOCUMENT_ID = "document_id"
ORIGINAL_DOCUMENT_ID = "original_document_id"
BATCH = "batch"
DATASET_ID = "dataset_id"
DATASOURCE_TYPE = "datasource_type"

View File

@ -4,7 +4,7 @@ import time
from collections.abc import Mapping
from typing import Any, Optional, cast
from sqlalchemy import func
from sqlalchemy import func, select
from core.app.entities.app_invoke_entities import InvokeFrom
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
@ -128,6 +128,8 @@ class KnowledgeIndexNode(Node):
document_id = variable_pool.get(["sys", SystemVariableKey.DOCUMENT_ID])
if not document_id:
raise KnowledgeIndexNodeError("Document ID is required.")
original_document_id = variable_pool.get(["sys", SystemVariableKey.ORIGINAL_DOCUMENT_ID])
batch = variable_pool.get(["sys", SystemVariableKey.BATCH])
if not batch:
raise KnowledgeIndexNodeError("Batch is required.")
@ -137,6 +139,19 @@ class KnowledgeIndexNode(Node):
# chunk nodes by chunk size
indexing_start_at = time.perf_counter()
index_processor = IndexProcessorFactory(dataset.chunk_structure).init_index_processor()
if original_document_id:
segments = db.session.scalars(
select(DocumentSegment).where(DocumentSegment.document_id == document_id)
).all()
if segments:
index_node_ids = [segment.index_node_id for segment in segments]
# delete from vector index
index_processor.clean(dataset, index_node_ids, with_keywords=True, delete_child_chunks=True)
for segment in segments:
db.session.delete(segment)
db.session.commit()
index_processor.index(dataset, document, chunks)
indexing_end_at = time.perf_counter()
document.indexing_latency = indexing_end_at - indexing_start_at

View File

@ -44,6 +44,7 @@ class SystemVariable(BaseModel):
conversation_id: str | None = None
dialogue_count: int | None = None
document_id: str | None = None
original_document_id: str | None = None
dataset_id: str | None = None
batch: str | None = None
datasource_type: str | None = None
@ -94,6 +95,8 @@ class SystemVariable(BaseModel):
d[SystemVariableKey.DIALOGUE_COUNT] = self.dialogue_count
if self.document_id is not None:
d[SystemVariableKey.DOCUMENT_ID] = self.document_id
if self.original_document_id is not None:
d[SystemVariableKey.ORIGINAL_DOCUMENT_ID] = self.original_document_id
if self.dataset_id is not None:
d[SystemVariableKey.DATASET_ID] = self.dataset_id
if self.batch is not None: