add dataset service api enable

This commit is contained in:
jyong 2025-09-14 21:53:32 +08:00
parent 80c32a130f
commit 815e5568c3
2 changed files with 112 additions and 103 deletions

View File

@ -55,11 +55,15 @@ def priority_rag_pipeline_run_task(
start_at = time.perf_counter()
rag_pipeline_invoke_entities_content = FileService(db.engine).get_file_content(rag_pipeline_invoke_entities_file_id)
rag_pipeline_invoke_entities = json.loads(rag_pipeline_invoke_entities_content)
# Get Flask app object for thread context
flask_app = current_app._get_current_object() # type: ignore
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for rag_pipeline_invoke_entity in rag_pipeline_invoke_entities:
# Submit task to thread pool
future = executor.submit(run_single_rag_pipeline_task, rag_pipeline_invoke_entity)
# Submit task to thread pool with Flask app
future = executor.submit(run_single_rag_pipeline_task, rag_pipeline_invoke_entity, flask_app)
futures.append(future)
# Wait for all tasks to complete
@ -80,66 +84,64 @@ def priority_rag_pipeline_run_task(
file_service.delete_file(rag_pipeline_invoke_entities_file_id)
db.session.close()
def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any]):
rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity)
user_id = rag_pipeline_invoke_entity_model.user_id
tenant_id = rag_pipeline_invoke_entity_model.tenant_id
pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id
workflow_id = rag_pipeline_invoke_entity_model.workflow_id
streaming = rag_pipeline_invoke_entity_model.streaming
workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id
workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id
application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity
with Session(db.engine) as session:
account = session.query(Account).filter(Account.id == user_id).first()
if not account:
raise ValueError(f"Account {user_id} not found")
tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first()
if not tenant:
raise ValueError(f"Tenant {tenant_id} not found")
account.current_tenant = tenant
def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any], flask_app):
# Create Flask application context for this thread
with flask_app.app_context():
rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity)
user_id = rag_pipeline_invoke_entity_model.user_id
tenant_id = rag_pipeline_invoke_entity_model.tenant_id
pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id
workflow_id = rag_pipeline_invoke_entity_model.workflow_id
streaming = rag_pipeline_invoke_entity_model.streaming
workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id
workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id
application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity
with Session(db.engine) as session:
account = session.query(Account).filter(Account.id == user_id).first()
if not account:
raise ValueError(f"Account {user_id} not found")
tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first()
if not tenant:
raise ValueError(f"Tenant {tenant_id} not found")
account.current_tenant = tenant
pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first()
if not pipeline:
raise ValueError(f"Pipeline {pipeline_id} not found")
pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first()
if not pipeline:
raise ValueError(f"Pipeline {pipeline_id} not found")
workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first()
workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first()
if not workflow:
raise ValueError(f"Workflow {pipeline.workflow_id} not found")
if not workflow:
raise ValueError(f"Workflow {pipeline.workflow_id} not found")
if workflow_execution_id is None:
workflow_execution_id = str(uuid.uuid4())
if workflow_execution_id is None:
workflow_execution_id = str(uuid.uuid4())
# Create application generate entity from dict
entity = RagPipelineGenerateEntity(**application_generate_entity)
# Create application generate entity from dict
entity = RagPipelineGenerateEntity(**application_generate_entity)
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository(
session_factory=session_factory,
user=account,
app_id=entity.app_config.app_id,
triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN,
)
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository(
session_factory=session_factory,
user=account,
app_id=entity.app_config.app_id,
triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN,
)
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory,
user=account,
app_id=entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN,
)
# Use app context to ensure Flask globals work properly
with current_app.app_context():
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory,
user=account,
app_id=entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN,
)
# Set the user directly in g for preserve_flask_contexts
g._login_user = account
# Copy context for thread (after setting user)
context = contextvars.copy_context()
# Get Flask app object in the main thread where app context exists
flask_app = current_app._get_current_object() # type: ignore
# Create a wrapper function that passes user context
def _run_with_user_context():
# Don't create a new app context here - let _generate handle it

View File

@ -54,15 +54,21 @@ def rag_pipeline_run_task(
try:
start_at = time.perf_counter()
rag_pipeline_invoke_entities_content = FileService(db.engine).get_file_content(rag_pipeline_invoke_entities_file_id)
print("asdadsdadaddadadadadadasdsa")
rag_pipeline_invoke_entities_content = FileService(db.engine).get_file_content(
rag_pipeline_invoke_entities_file_id)
rag_pipeline_invoke_entities = json.loads(rag_pipeline_invoke_entities_content)
# Get Flask app object for thread context
flask_app = current_app._get_current_object() # type: ignore
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for rag_pipeline_invoke_entity in rag_pipeline_invoke_entities:
# Submit task to thread pool
future = executor.submit(run_single_rag_pipeline_task, rag_pipeline_invoke_entity)
# Submit task to thread pool with Flask app
future = executor.submit(run_single_rag_pipeline_task, rag_pipeline_invoke_entity, flask_app)
futures.append(future)
# Wait for all tasks to complete
for future in futures:
try:
@ -71,7 +77,8 @@ def rag_pipeline_run_task(
logging.exception("Error in pipeline task")
end_at = time.perf_counter()
logging.info(
click.style(f"tenant_id: {tenant_id} , Rag pipeline run completed. Latency: {end_at - start_at}s", fg="green")
click.style(f"tenant_id: {tenant_id} , Rag pipeline run completed. Latency: {end_at - start_at}s",
fg="green")
)
except Exception:
logging.exception(click.style(f"Error running rag pipeline, tenant_id: {tenant_id}", fg="red"))
@ -83,13 +90,14 @@ def rag_pipeline_run_task(
# Check if there are waiting tasks in the queue
# Use rpop to get the next task from the queue (FIFO order)
next_file_id = redis_client.rpop(tenant_self_pipeline_task_queue)
if next_file_id:
# Process the next waiting task
# Keep the flag set to indicate a task is running
redis_client.setex(tenant_pipeline_task_key, 60 * 60, 1)
rag_pipeline_run_task.delay( # type: ignore
rag_pipeline_invoke_entities_file_id=next_file_id.decode('utf-8') if isinstance(next_file_id, bytes) else next_file_id,
rag_pipeline_invoke_entities_file_id=next_file_id.decode('utf-8') if isinstance(next_file_id,
bytes) else next_file_id,
tenant_id=tenant_id,
)
else:
@ -99,66 +107,65 @@ def rag_pipeline_run_task(
file_service.delete_file(rag_pipeline_invoke_entities_file_id)
db.session.close()
def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any]):
rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity)
user_id = rag_pipeline_invoke_entity_model.user_id
tenant_id = rag_pipeline_invoke_entity_model.tenant_id
pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id
workflow_id = rag_pipeline_invoke_entity_model.workflow_id
streaming = rag_pipeline_invoke_entity_model.streaming
workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id
workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id
application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity
with Session(db.engine) as session:
account = session.query(Account).filter(Account.id == user_id).first()
if not account:
raise ValueError(f"Account {user_id} not found")
tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first()
if not tenant:
raise ValueError(f"Tenant {tenant_id} not found")
account.current_tenant = tenant
pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first()
if not pipeline:
raise ValueError(f"Pipeline {pipeline_id} not found")
def run_single_rag_pipeline_task(rag_pipeline_invoke_entity: Mapping[str, Any], flask_app):
# Create Flask application context for this thread
with flask_app.app_context():
rag_pipeline_invoke_entity_model = RagPipelineInvokeEntity(**rag_pipeline_invoke_entity)
user_id = rag_pipeline_invoke_entity_model.user_id
tenant_id = rag_pipeline_invoke_entity_model.tenant_id
pipeline_id = rag_pipeline_invoke_entity_model.pipeline_id
workflow_id = rag_pipeline_invoke_entity_model.workflow_id
streaming = rag_pipeline_invoke_entity_model.streaming
workflow_execution_id = rag_pipeline_invoke_entity_model.workflow_execution_id
workflow_thread_pool_id = rag_pipeline_invoke_entity_model.workflow_thread_pool_id
application_generate_entity = rag_pipeline_invoke_entity_model.application_generate_entity
with Session(db.engine) as session:
account = session.query(Account).filter(Account.id == user_id).first()
if not account:
raise ValueError(f"Account {user_id} not found")
tenant = session.query(Tenant).filter(Tenant.id == tenant_id).first()
if not tenant:
raise ValueError(f"Tenant {tenant_id} not found")
account.current_tenant = tenant
workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first()
pipeline = session.query(Pipeline).filter(Pipeline.id == pipeline_id).first()
if not pipeline:
raise ValueError(f"Pipeline {pipeline_id} not found")
if not workflow:
raise ValueError(f"Workflow {pipeline.workflow_id} not found")
workflow = session.query(Workflow).filter(Workflow.id == pipeline.workflow_id).first()
if workflow_execution_id is None:
workflow_execution_id = str(uuid.uuid4())
if not workflow:
raise ValueError(f"Workflow {pipeline.workflow_id} not found")
# Create application generate entity from dict
entity = RagPipelineGenerateEntity(**application_generate_entity)
if workflow_execution_id is None:
workflow_execution_id = str(uuid.uuid4())
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository(
session_factory=session_factory,
user=account,
app_id=entity.app_config.app_id,
triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN,
)
# Create application generate entity from dict
entity = RagPipelineGenerateEntity(**application_generate_entity)
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository(
session_factory=session_factory,
user=account,
app_id=entity.app_config.app_id,
triggered_from=WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN,
)
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory,
user=account,
app_id=entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN,
)
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory,
user=account,
app_id=entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN,
)
# Use app context to ensure Flask globals work properly
with current_app.app_context():
# Set the user directly in g for preserve_flask_contexts
g._login_user = account
# Copy context for thread (after setting user)
context = contextvars.copy_context()
# Get Flask app object in the main thread where app context exists
flask_app = current_app._get_current_object() # type: ignore
# Create a wrapper function that passes user context
def _run_with_user_context():
# Don't create a new app context here - let _generate handle it