From cb5cfb2daefb0dddced71eefe17d40f1108704dd Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Fri, 30 May 2025 00:03:43 +0800 Subject: [PATCH] r2 --- .../app/apps/pipeline/pipeline_generator.py | 96 +++++++++++++------ api/core/entities/knowledge_entities.py | 6 +- 2 files changed, 70 insertions(+), 32 deletions(-) diff --git a/api/core/app/apps/pipeline/pipeline_generator.py b/api/core/app/apps/pipeline/pipeline_generator.py index b7e20cfd10..19ded1696a 100644 --- a/api/core/app/apps/pipeline/pipeline_generator.py +++ b/api/core/app/apps/pipeline/pipeline_generator.py @@ -9,7 +9,7 @@ import uuid from collections.abc import Generator, Mapping from typing import Any, Literal, Optional, Union, overload -from flask import Flask, current_app +from flask import Flask, copy_current_request_context, current_app, has_request_context from pydantic import ValidationError from sqlalchemy.orm import sessionmaker @@ -185,8 +185,9 @@ class PipelineGenerator(BaseAppGenerator): if invoke_from == InvokeFrom.DEBUGGER: return self._generate( flask_app=current_app._get_current_object(),# type: ignore + context=contextvars.copy_context(), pipeline=pipeline, - workflow=workflow, + workflow_id=workflow.id, user=user, application_generate_entity=application_generate_entity, invoke_from=invoke_from, @@ -197,22 +198,28 @@ class PipelineGenerator(BaseAppGenerator): ) else: # run in child thread - thread = threading.Thread( - target=self._generate, - kwargs={ - "flask_app": current_app._get_current_object(), # type: ignore - "pipeline": pipeline, - "workflow": workflow, - "user": user, - "application_generate_entity": application_generate_entity, - "invoke_from": invoke_from, - "workflow_execution_repository": workflow_execution_repository, - "workflow_node_execution_repository": workflow_node_execution_repository, - "streaming": streaming, - "workflow_thread_pool_id": workflow_thread_pool_id, - }, - ) - thread.start() + context = contextvars.copy_context() + @copy_current_request_context + def worker_with_context(): + # Run the worker within the copied context + return context.run( + self._generate, + flask_app=current_app._get_current_object(), # type: ignore + context=context, + pipeline=pipeline, + workflow_id=workflow.id, + user=user, + application_generate_entity=application_generate_entity, + invoke_from=invoke_from, + workflow_execution_repository=workflow_execution_repository, + workflow_node_execution_repository=workflow_node_execution_repository, + streaming=streaming, + workflow_thread_pool_id=workflow_thread_pool_id, + ) + + worker_thread = threading.Thread(target=worker_with_context) + + worker_thread.start() # return batch, dataset, documents return { "batch": batch, @@ -225,7 +232,7 @@ class PipelineGenerator(BaseAppGenerator): "documents": [PipelineDocument( id=document.id, position=document.position, - data_source_info=document.data_source_info, + data_source_info=json.loads(document.data_source_info) if document.data_source_info else None, name=document.name, indexing_status=document.indexing_status, error=document.error, @@ -237,8 +244,9 @@ class PipelineGenerator(BaseAppGenerator): self, *, flask_app: Flask, + context: contextvars.Context, pipeline: Pipeline, - workflow: Workflow, + workflow_id: str, user: Union[Account, EndUser], application_generate_entity: RagPipelineGenerateEntity, invoke_from: InvokeFrom, @@ -260,26 +268,47 @@ class PipelineGenerator(BaseAppGenerator): :param streaming: is stream :param workflow_thread_pool_id: workflow thread pool id """ - print(user.id) + for var, val in context.items(): + var.set(val) + + # FIXME(-LAN-): Save current user before entering new app context + from flask import g + + saved_user = None + if has_request_context() and hasattr(g, "_login_user"): + saved_user = g._login_user with flask_app.app_context(): + # Restore user in new app context + if saved_user is not None: + from flask import g + + g._login_user = saved_user # init queue manager + workflow = db.session.query(Workflow).filter(Workflow.id == workflow_id).first() + if not workflow: + raise ValueError(f"Workflow not found: {workflow_id}") queue_manager = PipelineQueueManager( task_id=application_generate_entity.task_id, user_id=application_generate_entity.user_id, invoke_from=application_generate_entity.invoke_from, app_mode=AppMode.RAG_PIPELINE, ) + context = contextvars.copy_context() + @copy_current_request_context + def worker_with_context(): + # Run the worker within the copied context + return context.run( + self._generate_worker, + flask_app=current_app._get_current_object(), # type: ignore + context=context, + queue_manager=queue_manager, + application_generate_entity=application_generate_entity, + workflow_thread_pool_id=workflow_thread_pool_id, + ) # new thread worker_thread = threading.Thread( - target=self._generate_worker, - kwargs={ - "flask_app": current_app._get_current_object(), # type: ignore - "application_generate_entity": application_generate_entity, - "queue_manager": queue_manager, - "context": contextvars.copy_context(), - "workflow_thread_pool_id": workflow_thread_pool_id, - }, + target=worker_with_context ) worker_thread.start() @@ -479,8 +508,17 @@ class PipelineGenerator(BaseAppGenerator): """ for var, val in context.items(): var.set(val) + from flask import g + + saved_user = None + if has_request_context() and hasattr(g, "_login_user"): + saved_user = g._login_user with flask_app.app_context(): try: + if saved_user is not None: + from flask import g + + g._login_user = saved_user # workflow app runner = PipelineRunner( application_generate_entity=application_generate_entity, diff --git a/api/core/entities/knowledge_entities.py b/api/core/entities/knowledge_entities.py index f876c06b06..3beea56e15 100644 --- a/api/core/entities/knowledge_entities.py +++ b/api/core/entities/knowledge_entities.py @@ -25,17 +25,17 @@ class PipelineDataset(BaseModel): description: str chunk_structure: str + class PipelineDocument(BaseModel): id: str position: int - data_source_info: dict + data_source_info: Optional[dict] = None name: str indexing_status: str - error: str + error: Optional[str] = None enabled: bool - class PipelineGenerateResponse(BaseModel): batch: str dataset: PipelineDataset