From 815e5568c3187fd19e14a3f52473a104aacfe631 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Sun, 14 Sep 2025 21:53:32 +0800 Subject: [PATCH] add dataset service api enable --- .../priority_rag_pipeline_run_task.py | 100 +++++++-------- .../rag_pipeline/rag_pipeline_run_task.py | 115 ++++++++++-------- 2 files changed, 112 insertions(+), 103 deletions(-) diff --git a/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py b/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py index a22d77ec17..3d7f713258 100644 --- a/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py +++ b/api/tasks/rag_pipeline/priority_rag_pipeline_run_task.py @@ -55,11 +55,15 @@ def priority_rag_pipeline_run_task( start_at = time.perf_counter() rag_pipeline_invoke_entities_content = FileService(db.engine).get_file_content(rag_pipeline_invoke_entities_file_id) rag_pipeline_invoke_entities = json.loads(rag_pipeline_invoke_entities_content) + + # Get Flask app object for thread context + flask_app = current_app._get_current_object() # type: ignore + with ThreadPoolExecutor(max_workers=10) as executor: futures = [] for rag_pipeline_invoke_entity in rag_pipeline_invoke_entities: - # Submit task to thread pool - future = executor.submit(run_single_rag_pipeline_task, rag_pipeline_invoke_entity) + # Submit task to thread pool with Flask app + future = executor.submit(run_single_rag_pipeline_task, rag_pipeline_invoke_entity, flask_app) futures.append(future) # Wait for all tasks to complete @@ -80,66 +84,64 @@ def priority_rag_pipeline_run_task( file_service.delete_file(rag_pipeline_invoke_entities_file_id) db.session.close() -def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any]): - rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity) - user_id = rag_pipeline_invoke_entity_model.user_id - tenant_id = rag_pipeline_invoke_entity_model.tenant_id - pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id - workflow_id = rag_pipeline_invoke_entity_model.workflow_id - streaming = rag_pipeline_invoke_entity_model.streaming - workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id - workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id - application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity - with Session(db.engine) as session: - account = session.query(Account).filter(Account.id == user_id).first() - if not account: - raise ValueError(f"Account {user_id} not found") - tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first() - if not tenant: - raise ValueError(f"Tenant {tenant_id} not found") - account.current_tenant = tenant +def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any], flask_app): + # Create Flask application context for this thread + with flask_app.app_context(): + rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity) + user_id = rag_pipeline_invoke_entity_model.user_id + tenant_id = rag_pipeline_invoke_entity_model.tenant_id + pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id + workflow_id = rag_pipeline_invoke_entity_model.workflow_id + streaming = rag_pipeline_invoke_entity_model.streaming + workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id + workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id + application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity + with Session(db.engine) as session: + account = session.query(Account).filter(Account.id == user_id).first() + if not account: + raise ValueError(f"Account {user_id} not found") + tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first() + if not tenant: + raise ValueError(f"Tenant {tenant_id} not found") + account.current_tenant = tenant - pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first() - if not pipeline: - raise ValueError(f"Pipeline {pipeline_id} not found") + pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first() + if not pipeline: + raise ValueError(f"Pipeline {pipeline_id} not found") - workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first() + workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first() - if not workflow: - raise ValueError(f"Workflow {pipeline.workflow_id} not found") + if not workflow: + raise ValueError(f"Workflow {pipeline.workflow_id} not found") - if workflow_execution_id is None: - workflow_execution_id = str(uuid.uuid4()) + if workflow_execution_id is None: + workflow_execution_id = str(uuid.uuid4()) - # Create application generate entity from dict - entity = RagPipelineGenerateEntity(**application_generate_entity) + # Create application generate entity from dict + entity = RagPipelineGenerateEntity(**application_generate_entity) - # Create workflow node execution repository - session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) - workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( - session_factory=session_factory, - user=account, - app_id=entity.app_config.app_id, - triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, - ) + # Create workflow node execution repository + session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=account, + app_id=entity.app_config.app_id, + triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, + ) - workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, - user=account, - app_id=entity.app_config.app_id, - triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, - ) - # Use app context to ensure Flask globals work properly - with current_app.app_context(): + workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( + session_factory=session_factory, + user=account, + app_id=entity.app_config.app_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, + ) + # Set the user directly in g for preserve_flask_contexts g._login_user = account # Copy context for thread (after setting user) context = contextvars.copy_context() - # Get Flask app object in the main thread where app context exists - flask_app = current_app._get_current_object() # type: ignore - # Create a wrapper function that passes user context def _run_with_user_context(): # Don't create a new app context here - let _generate handle it diff --git a/api/tasks/rag_pipeline/rag_pipeline_run_task.py b/api/tasks/rag_pipeline/rag_pipeline_run_task.py index d9b6bf5d5a..1af1c9a675 100644 --- a/api/tasks/rag_pipeline/rag_pipeline_run_task.py +++ b/api/tasks/rag_pipeline/rag_pipeline_run_task.py @@ -54,15 +54,21 @@ def rag_pipeline_run_task( try: start_at = time.perf_counter() - rag_pipeline_invoke_entities_content = FileService(db.engine).get_file_content(rag_pipeline_invoke_entities_file_id) + print("asdadsdadaddadadadadadasdsa") + rag_pipeline_invoke_entities_content = FileService(db.engine).get_file_content( + rag_pipeline_invoke_entities_file_id) rag_pipeline_invoke_entities = json.loads(rag_pipeline_invoke_entities_content) + + # Get Flask app object for thread context + flask_app = current_app._get_current_object() # type: ignore + with ThreadPoolExecutor(max_workers=10) as executor: futures = [] for rag_pipeline_invoke_entity in rag_pipeline_invoke_entities: - # Submit task to thread pool - future = executor.submit(run_single_rag_pipeline_task, rag_pipeline_invoke_entity) + # Submit task to thread pool with Flask app + future = executor.submit(run_single_rag_pipeline_task, rag_pipeline_invoke_entity, flask_app) futures.append(future) - + # Wait for all tasks to complete for future in futures: try: @@ -71,7 +77,8 @@ def rag_pipeline_run_task( logging.exception("Error in pipeline task") end_at = time.perf_counter() logging.info( - click.style(f"tenant_id: {tenant_id} , Rag pipeline run completed. Latency: {end_at - start_at}s", fg="green") + click.style(f"tenant_id: {tenant_id} , Rag pipeline run completed. Latency: {end_at - start_at}s", + fg="green") ) except Exception: logging.exception(click.style(f"Error running rag pipeline, tenant_id: {tenant_id}", fg="red")) @@ -83,13 +90,14 @@ def rag_pipeline_run_task( # Check if there are waiting tasks in the queue # Use rpop to get the next task from the queue (FIFO order) next_file_id = redis_client.rpop(tenant_self_pipeline_task_queue) - + if next_file_id: # Process the next waiting task # Keep the flag set to indicate a task is running redis_client.setex(tenant_pipeline_task_key, 60 * 60, 1) rag_pipeline_run_task.delay( # type: ignore - rag_pipeline_invoke_entities_file_id=next_file_id.decode('utf-8') if isinstance(next_file_id, bytes) else next_file_id, + rag_pipeline_invoke_entities_file_id=next_file_id.decode('utf-8') if isinstance(next_file_id, + bytes) else next_file_id, tenant_id=tenant_id, ) else: @@ -99,66 +107,65 @@ def rag_pipeline_run_task( file_service.delete_file(rag_pipeline_invoke_entities_file_id) db.session.close() -def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any]): - rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity) - user_id = rag_pipeline_invoke_entity_model.user_id - tenant_id = rag_pipeline_invoke_entity_model.tenant_id - pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id - workflow_id = rag_pipeline_invoke_entity_model.workflow_id - streaming = rag_pipeline_invoke_entity_model.streaming - workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id - workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id - application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity - with Session(db.engine) as session: - account = session.query(Account).filter(Account.id == user_id).first() - if not account: - raise ValueError(f"Account {user_id} not found") - tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first() - if not tenant: - raise ValueError(f"Tenant {tenant_id} not found") - account.current_tenant = tenant - pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first() - if not pipeline: - raise ValueError(f"Pipeline {pipeline_id} not found") +def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any], flask_app): + # Create Flask application context for this thread + with flask_app.app_context(): + rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity) + user_id = rag_pipeline_invoke_entity_model.user_id + tenant_id = rag_pipeline_invoke_entity_model.tenant_id + pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id + workflow_id = rag_pipeline_invoke_entity_model.workflow_id + streaming = rag_pipeline_invoke_entity_model.streaming + workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id + workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id + application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity + with Session(db.engine) as session: + account = session.query(Account).filter(Account.id == user_id).first() + if not account: + raise ValueError(f"Account {user_id} not found") + tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first() + if not tenant: + raise ValueError(f"Tenant {tenant_id} not found") + account.current_tenant = tenant - workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first() + pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first() + if not pipeline: + raise ValueError(f"Pipeline {pipeline_id} not found") - if not workflow: - raise ValueError(f"Workflow {pipeline.workflow_id} not found") + workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first() - if workflow_execution_id is None: - workflow_execution_id = str(uuid.uuid4()) + if not workflow: + raise ValueError(f"Workflow {pipeline.workflow_id} not found") - # Create application generate entity from dict - entity = RagPipelineGenerateEntity(**application_generate_entity) + if workflow_execution_id is None: + workflow_execution_id = str(uuid.uuid4()) - # Create workflow node execution repository - session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) - workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( - session_factory=session_factory, - user=account, - app_id=entity.app_config.app_id, - triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, - ) + # Create application generate entity from dict + entity = RagPipelineGenerateEntity(**application_generate_entity) + + # Create workflow node execution repository + session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=account, + app_id=entity.app_config.app_id, + triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, + ) + + workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( + session_factory=session_factory, + user=account, + app_id=entity.app_config.app_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, + ) - workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=session_factory, - user=account, - app_id=entity.app_config.app_id, - triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, - ) - # Use app context to ensure Flask globals work properly - with current_app.app_context(): # Set the user directly in g for preserve_flask_contexts g._login_user = account # Copy context for thread (after setting user) context = contextvars.copy_context() - # Get Flask app object in the main thread where app context exists - flask_app = current_app._get_current_object() # type: ignore - # Create a wrapper function that passes user context def _run_with_user_context(): # Don't create a new app context here - let _generate handle it