diff --git a/api/core/app/apps/pipeline/pipeline_generator.py b/api/core/app/apps/pipeline/pipeline_generator.py index ec565fe2e5..e726ad4841 100644 --- a/api/core/app/apps/pipeline/pipeline_generator.py +++ b/api/core/app/apps/pipeline/pipeline_generator.py @@ -32,6 +32,7 @@ from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchem from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from extensions.ext_database import db +from libs.flask_utils import preserve_flask_contexts from models import Account, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom from models.dataset import Document, DocumentPipelineExecutionLog, Pipeline from models.enums import WorkflowRunTriggeredFrom @@ -209,25 +210,22 @@ class PipelineGenerator(BaseAppGenerator): # run in child thread context = contextvars.copy_context() - @copy_current_request_context - def worker_with_context(): - # Run the worker within the copied context - return context.run( - self._generate, - flask_app=current_app._get_current_object(), # type: ignore - context=context, - pipeline=pipeline, - workflow_id=workflow.id, - user=user, - application_generate_entity=application_generate_entity, - invoke_from=invoke_from, - workflow_execution_repository=workflow_execution_repository, - workflow_node_execution_repository=workflow_node_execution_repository, - streaming=streaming, - workflow_thread_pool_id=workflow_thread_pool_id, - ) - - worker_thread = threading.Thread(target=worker_with_context) + worker_thread = threading.Thread( + target=self._generate, + kwargs={ + "flask_app": current_app._get_current_object(), # type: ignore + "context": context, + "pipeline": pipeline, + "workflow_id": workflow.id, + "user": user, + "application_generate_entity": application_generate_entity, + "invoke_from": invoke_from, + "workflow_execution_repository": workflow_execution_repository, + "workflow_node_execution_repository": workflow_node_execution_repository, + "streaming": streaming, + "workflow_thread_pool_id": workflow_thread_pool_id, + }, + ) worker_thread.start() # return batch, dataset, documents @@ -282,23 +280,7 @@ class PipelineGenerator(BaseAppGenerator): :param streaming: is stream :param workflow_thread_pool_id: workflow thread pool id """ - print("jin ru la 1") - for var, val in context.items(): - var.set(val) - - # FIXME(-LAN-): Save current user before entering new app context - from flask import g - - saved_user = None - if has_request_context() and hasattr(g, "_login_user"): - saved_user = g._login_user - with flask_app.app_context(): - # Restore user in new app context - print("jin ru la 2") - if saved_user is not None: - from flask import g - - g._login_user = saved_user + with preserve_flask_contexts(flask_app, context_vars=context): # init queue manager workflow = db.session.query(Workflow).filter(Workflow.id == workflow_id).first() if not workflow: @@ -311,20 +293,17 @@ class PipelineGenerator(BaseAppGenerator): ) context = contextvars.copy_context() - @copy_current_request_context - def worker_with_context(): - # Run the worker within the copied context - return context.run( - self._generate_worker, - flask_app=current_app._get_current_object(), # type: ignore - context=context, - queue_manager=queue_manager, - application_generate_entity=application_generate_entity, - workflow_thread_pool_id=workflow_thread_pool_id, - ) - # new thread - worker_thread = threading.Thread(target=worker_with_context) + worker_thread = threading.Thread( + target=self._generate_worker, + kwargs={ + "flask_app": current_app._get_current_object(), # type: ignore + "context": context, + "queue_manager": queue_manager, + "application_generate_entity": application_generate_entity, + "workflow_thread_pool_id": workflow_thread_pool_id, + }, + ) worker_thread.start() @@ -521,20 +500,9 @@ class PipelineGenerator(BaseAppGenerator): :param workflow_thread_pool_id: workflow thread pool id :return: """ - print("jin ru la 3") - for var, val in context.items(): - var.set(val) - from flask import g - saved_user = None - if has_request_context() and hasattr(g, "_login_user"): - saved_user = g._login_user - with flask_app.app_context(): + with preserve_flask_contexts(flask_app, context_vars=context): try: - if saved_user is not None: - from flask import g - - g._login_user = saved_user # workflow app runner = PipelineRunner( application_generate_entity=application_generate_entity,