From 8b97551f1a089c974128086b2ee76f4892268774 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Wed, 9 Jul 2025 18:50:13 +0800 Subject: [PATCH] r2 --- api/core/app/apps/pipeline/pipeline_runner.py | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index 52afb78ee5..5a2c57291d 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -14,10 +14,11 @@ from core.variables.variables import RAGPipelineVariable, RAGPipelineVariableInp from core.workflow.callbacks import WorkflowCallback, WorkflowLoggingCallback from core.workflow.entities.variable_pool import VariablePool from core.workflow.enums import SystemVariableKey +from core.workflow.graph_engine.entities.event import GraphEngineEvent, GraphRunFailedEvent from core.workflow.graph_engine.entities.graph import Graph from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db -from models.dataset import Pipeline +from models.dataset import Document, Pipeline from models.enums import UserFrom from models.model import EndUser from models.workflow import Workflow, WorkflowType @@ -162,6 +163,9 @@ class PipelineRunner(WorkflowBasedAppRunner): generator = workflow_entry.run(callbacks=workflow_callbacks) for event in generator: + self._update_document_status( + event, self.application_generate_entity.document_id, self.application_generate_entity.dataset_id + ) self._handle_event(workflow_entry, event) def get_workflow(self, pipeline: Pipeline, workflow_id: str) -> Optional[Workflow]: @@ -219,3 +223,20 @@ class PipelineRunner(WorkflowBasedAppRunner): raise ValueError("graph not found in workflow") return graph + + def _update_document_status(self, event: GraphEngineEvent, document_id: str | None, dataset_id: str | None) -> None: + """ + Update document status + """ + if isinstance(event, GraphRunFailedEvent): + if document_id and dataset_id: + document = ( + db.session.query(Document) + .filter(Document.id == document_id, Document.dataset_id == dataset_id) + .first() + ) + if document: + document.indexing_status = "error" + document.error = event.error or "Unknown error" + db.session.add(document) + db.session.commit()