From c08a60021a99f330521a136e3c9954ac8234e197 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Sun, 14 Sep 2025 22:06:32 +0800 Subject: [PATCH] add dataset service api enable --- .../priority_rag_pipeline_run_task.py | 123 +++++++++--------- .../rag_pipeline/rag_pipeline_run_task.py | 112 ++++++++-------- 2 files changed, 116 insertions(+), 119 deletions(-) diff --git a/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py b/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py index 3d7f713258..5ccc51a66a 100644 --- a/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py +++ b/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py @@ -1,7 +1,6 @@ import contextvars import json import logging -import threading import time import uuid from collections.abc import Mapping @@ -85,67 +84,69 @@ def priority_rag_pipeline_run_task( db.session.close() def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any], flask_app): + """Run a single RAG pipeline task within Flask app context.""" # Create Flask application context for this thread with flask_app.app_context(): - rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity) - user_id = rag_pipeline_invoke_entity_model.user_id - tenant_id = rag_pipeline_invoke_entity_model.tenant_id - pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id - workflow_id = rag_pipeline_invoke_entity_model.workflow_id - streaming = rag_pipeline_invoke_entity_model.streaming - workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id - workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id - application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity - with Session(db.engine) as session: - account = session.query(Account).filter(Account.id == user_id).first() - if not account: - raise ValueError(f"Account {user_id} not found") - tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first() - if not tenant: - raise ValueError(f"Tenant {tenant_id} not found") - account.current_tenant = tenant - - pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first() - if not pipeline: - raise ValueError(f"Pipeline {pipeline_id} not found") - - workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first() - - if not workflow: - raise ValueError(f"Workflow {pipeline.workflow_id} not found") - - if workflow_execution_id is None: - workflow_execution_id = str(uuid.uuid4()) - - # Create application generate entity from dict - entity = RagPipelineGenerateEntity(**application_generate_entity) - - # Create workflow node execution repository - session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) - workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( - session_factory=session_factory, - user=account, - app_id=entity.app_config.app_id, - triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, - ) - - workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, - user=account, - app_id=entity.app_config.app_id, - triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, - ) + try: + rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity) + user_id = rag_pipeline_invoke_entity_model.user_id + tenant_id = rag_pipeline_invoke_entity_model.tenant_id + pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id + workflow_id = rag_pipeline_invoke_entity_model.workflow_id + streaming = rag_pipeline_invoke_entity_model.streaming + workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id + workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id + application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity - # Set the user directly in g for preserve_flask_contexts - g._login_user = account + with Session(db.engine) as session: + # Load required entities + account = session.query(Account).filter(Account.id == user_id).first() + if not account: + raise ValueError(f"Account {user_id} not found") + + tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first() + if not tenant: + raise ValueError(f"Tenant {tenant_id} not found") + account.current_tenant = tenant - # Copy context for thread (after setting user) - context = contextvars.copy_context() + pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first() + if not pipeline: + raise ValueError(f"Pipeline {pipeline_id} not found") - # Create a wrapper function that passes user context - def _run_with_user_context(): - # Don't create a new app context here - let _generate handle it - # Just ensure the user is available in contextvars + workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first() + if not workflow: + raise ValueError(f"Workflow {pipeline.workflow_id} not found") + + if workflow_execution_id is None: + workflow_execution_id = str(uuid.uuid4()) + + # Create application generate entity from dict + entity = RagPipelineGenerateEntity(**application_generate_entity) + + # Create workflow repositories + session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=account, + app_id=entity.app_config.app_id, + triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, + ) + + workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( + session_factory=session_factory, + user=account, + app_id=entity.app_config.app_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, + ) + + # Set the user directly in g for preserve_flask_contexts + g._login_user = account + + # Copy context for passing to pipeline generator + context = contextvars.copy_context() + + # Direct execution without creating another thread + # Since we're already in a thread pool, no need for nested threading from core.app.apps.pipeline.pipeline_generator import PipelineGenerator pipeline_generator = PipelineGenerator() @@ -162,8 +163,6 @@ def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any], streaming=streaming, workflow_thread_pool_id=workflow_thread_pool_id, ) - - # Create and start worker thread - worker_thread = threading.Thread(target=_run_with_user_context) - worker_thread.start() - worker_thread.join() # Wait for worker thread to complete + except Exception: + logging.exception("Error in priority pipeline task") + raise diff --git a/api/tasks/rag_pipeline/rag_pipeline_run_task.py b/api/tasks/rag_pipeline/rag_pipeline_run_task.py index 1af1c9a675..5d64177e7e 100644 --- a/api/tasks/rag_pipeline/rag_pipeline_run_task.py +++ b/api/tasks/rag_pipeline/rag_pipeline_run_task.py @@ -1,7 +1,6 @@ import contextvars import json import logging -import threading import time import uuid from collections.abc import Mapping @@ -54,7 +53,6 @@ def rag_pipeline_run_task( try: start_at = time.perf_counter() - print("asdadsdadaddadadadadadasdsa") rag_pipeline_invoke_entities_content = FileService(db.engine).get_file_content( rag_pipeline_invoke_entities_file_id) rag_pipeline_invoke_entities = json.loads(rag_pipeline_invoke_entities_content) @@ -109,67 +107,69 @@ def rag_pipeline_run_task( def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any], flask_app): + """Run a single RAG pipeline task within Flask app context.""" # Create Flask application context for this thread with flask_app.app_context(): - rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity) - user_id = rag_pipeline_invoke_entity_model.user_id - tenant_id = rag_pipeline_invoke_entity_model.tenant_id - pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id - workflow_id = rag_pipeline_invoke_entity_model.workflow_id - streaming = rag_pipeline_invoke_entity_model.streaming - workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id - workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id - application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity - with Session(db.engine) as session: - account = session.query(Account).filter(Account.id == user_id).first() - if not account: - raise ValueError(f"Account {user_id} not found") - tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first() - if not tenant: - raise ValueError(f"Tenant {tenant_id} not found") - account.current_tenant = tenant + try: + rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity) + user_id = rag_pipeline_invoke_entity_model.user_id + tenant_id = rag_pipeline_invoke_entity_model.tenant_id + pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id + workflow_id = rag_pipeline_invoke_entity_model.workflow_id + streaming = rag_pipeline_invoke_entity_model.streaming + workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id + workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id + application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity + + with Session(db.engine) as session: + # Load required entities + account = session.query(Account).filter(Account.id == user_id).first() + if not account: + raise ValueError(f"Account {user_id} not found") + + tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first() + if not tenant: + raise ValueError(f"Tenant {tenant_id} not found") + account.current_tenant = tenant - pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first() - if not pipeline: - raise ValueError(f"Pipeline {pipeline_id} not found") + pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first() + if not pipeline: + raise ValueError(f"Pipeline {pipeline_id} not found") - workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first() + workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first() + if not workflow: + raise ValueError(f"Workflow {pipeline.workflow_id} not found") - if not workflow: - raise ValueError(f"Workflow {pipeline.workflow_id} not found") + if workflow_execution_id is None: + workflow_execution_id = str(uuid.uuid4()) - if workflow_execution_id is None: - workflow_execution_id = str(uuid.uuid4()) + # Create application generate entity from dict + entity = RagPipelineGenerateEntity(**application_generate_entity) - # Create application generate entity from dict - entity = RagPipelineGenerateEntity(**application_generate_entity) + # Create workflow repositories + session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=account, + app_id=entity.app_config.app_id, + triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, + ) - # Create workflow node execution repository - session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) - workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( - session_factory=session_factory, - user=account, - app_id=entity.app_config.app_id, - triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, - ) + workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( + session_factory=session_factory, + user=account, + app_id=entity.app_config.app_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, + ) - workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, - user=account, - app_id=entity.app_config.app_id, - triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, - ) + # Set the user directly in g for preserve_flask_contexts + g._login_user = account - # Set the user directly in g for preserve_flask_contexts - g._login_user = account + # Copy context for passing to pipeline generator + context = contextvars.copy_context() - # Copy context for thread (after setting user) - context = contextvars.copy_context() - - # Create a wrapper function that passes user context - def _run_with_user_context(): - # Don't create a new app context here - let _generate handle it - # Just ensure the user is available in contextvars + # Direct execution without creating another thread + # Since we're already in a thread pool, no need for nested threading from core.app.apps.pipeline.pipeline_generator import PipelineGenerator pipeline_generator = PipelineGenerator() @@ -186,8 +186,6 @@ def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any], streaming=streaming, workflow_thread_pool_id=workflow_thread_pool_id, ) - - # Create and start worker thread - worker_thread = threading.Thread(target=_run_with_user_context) - worker_thread.start() - worker_thread.join() # Wait for worker thread to complete + except Exception: + logging.exception("Error in pipeline task") + raise