diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index 52afb78ee5..5a2c57291d 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -14,10 +14,11 @@ from core.variables.variables import RAGPipelineVariable, RAGPipelineVariableInp from core.workflow.callbacks import WorkflowCallback, WorkflowLoggingCallback from core.workflow.entities.variable_pool import VariablePool from core.workflow.enums import SystemVariableKey +from core.workflow.graph_engine.entities.event import GraphEngineEvent, GraphRunFailedEvent from core.workflow.graph_engine.entities.graph import Graph from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db -from models.dataset import Pipeline +from models.dataset import Document, Pipeline from models.enums import UserFrom from models.model import EndUser from models.workflow import Workflow, WorkflowType @@ -162,6 +163,9 @@ class PipelineRunner(WorkflowBasedAppRunner): generator = workflow_entry.run(callbacks=workflow_callbacks) for event in generator: + self._update_document_status( + event, self.application_generate_entity.document_id, self.application_generate_entity.dataset_id + ) self._handle_event(workflow_entry, event) def get_workflow(self, pipeline: Pipeline, workflow_id: str) -> Optional[Workflow]: @@ -219,3 +223,20 @@ class PipelineRunner(WorkflowBasedAppRunner): raise ValueError("graph not found in workflow") return graph + + def _update_document_status(self, event: GraphEngineEvent, document_id: str | None, dataset_id: str | None) -> None: + """ + Update document status + """ + if isinstance(event, GraphRunFailedEvent): + if document_id and dataset_id: + document = ( + db.session.query(Document) + .filter(Document.id == document_id, Document.dataset_id == dataset_id) + .first() + ) + if document: + document.indexing_status = "error" + document.error = event.error or "Unknown error" + db.session.add(document) + db.session.commit()