This commit is contained in:
jyong 2025-07-09 18:50:13 +08:00
parent 966e6e03fc
commit 8b97551f1a
1 changed files with 22 additions and 1 deletions

View File

@ -14,10 +14,11 @@ from core.variables.variables import RAGPipelineVariable, RAGPipelineVariableInp
from core.workflow.callbacks import WorkflowCallback, WorkflowLoggingCallback
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.event import GraphEngineEvent, GraphRunFailedEvent
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from models.dataset import Pipeline
from models.dataset import Document, Pipeline
from models.enums import UserFrom
from models.model import EndUser
from models.workflow import Workflow, WorkflowType
@ -162,6 +163,9 @@ class PipelineRunner(WorkflowBasedAppRunner):
generator = workflow_entry.run(callbacks=workflow_callbacks)
for event in generator:
self._update_document_status(
event, self.application_generate_entity.document_id, self.application_generate_entity.dataset_id
)
self._handle_event(workflow_entry, event)
def get_workflow(self, pipeline: Pipeline, workflow_id: str) -> Optional[Workflow]:
@ -219,3 +223,20 @@ class PipelineRunner(WorkflowBasedAppRunner):
raise ValueError("graph not found in workflow")
return graph
def _update_document_status(self, event: GraphEngineEvent, document_id: str | None, dataset_id: str | None) -> None:
"""
Update document status
"""
if isinstance(event, GraphRunFailedEvent):
if document_id and dataset_id:
document = (
db.session.query(Document)
.filter(Document.id == document_id, Document.dataset_id == dataset_id)
.first()
)
if document:
document.indexing_status = "error"
document.error = event.error or "Unknown error"
db.session.add(document)
db.session.commit()