mirror of
https://github.com/langgenius/dify.git
synced 2026-04-29 12:37:20 +08:00
refactor: migrate session.query to select API in document task files (#34646)
This commit is contained in:
parent
459c36f21b
commit
3e995e6a6d
@ -26,7 +26,7 @@ def clean_notion_document_task(document_ids: list[str], dataset_id: str):
|
|||||||
total_index_node_ids = []
|
total_index_node_ids = []
|
||||||
|
|
||||||
with session_factory.create_session() as session:
|
with session_factory.create_session() as session:
|
||||||
dataset = session.query(Dataset).where(Dataset.id == dataset_id).first()
|
dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1))
|
||||||
|
|
||||||
if not dataset:
|
if not dataset:
|
||||||
raise Exception("Document has no dataset")
|
raise Exception("Document has no dataset")
|
||||||
@ -41,7 +41,7 @@ def clean_notion_document_task(document_ids: list[str], dataset_id: str):
|
|||||||
total_index_node_ids.extend([segment.index_node_id for segment in segments])
|
total_index_node_ids.extend([segment.index_node_id for segment in segments])
|
||||||
|
|
||||||
with session_factory.create_session() as session:
|
with session_factory.create_session() as session:
|
||||||
dataset = session.query(Dataset).where(Dataset.id == dataset_id).first()
|
dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1))
|
||||||
if dataset:
|
if dataset:
|
||||||
index_processor.clean(
|
index_processor.clean(
|
||||||
dataset, total_index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True
|
dataset, total_index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True
|
||||||
|
|||||||
@ -28,7 +28,9 @@ def document_indexing_update_task(dataset_id: str, document_id: str):
|
|||||||
start_at = time.perf_counter()
|
start_at = time.perf_counter()
|
||||||
|
|
||||||
with session_factory.create_session() as session, session.begin():
|
with session_factory.create_session() as session, session.begin():
|
||||||
document = session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first()
|
document = session.scalar(
|
||||||
|
select(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).limit(1)
|
||||||
|
)
|
||||||
|
|
||||||
if not document:
|
if not document:
|
||||||
logger.info(click.style(f"Document not found: {document_id}", fg="red"))
|
logger.info(click.style(f"Document not found: {document_id}", fg="red"))
|
||||||
@ -37,7 +39,7 @@ def document_indexing_update_task(dataset_id: str, document_id: str):
|
|||||||
document.indexing_status = IndexingStatus.PARSING
|
document.indexing_status = IndexingStatus.PARSING
|
||||||
document.processing_started_at = naive_utc_now()
|
document.processing_started_at = naive_utc_now()
|
||||||
|
|
||||||
dataset = session.query(Dataset).where(Dataset.id == dataset_id).first()
|
dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1))
|
||||||
if not dataset:
|
if not dataset:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|||||||
@ -32,15 +32,15 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str], user_
|
|||||||
start_at = time.perf_counter()
|
start_at = time.perf_counter()
|
||||||
with session_factory.create_session() as session:
|
with session_factory.create_session() as session:
|
||||||
try:
|
try:
|
||||||
dataset = session.query(Dataset).where(Dataset.id == dataset_id).first()
|
dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1))
|
||||||
if not dataset:
|
if not dataset:
|
||||||
logger.info(click.style(f"Dataset not found: {dataset_id}", fg="red"))
|
logger.info(click.style(f"Dataset not found: {dataset_id}", fg="red"))
|
||||||
return
|
return
|
||||||
user = session.query(Account).where(Account.id == user_id).first()
|
user = session.scalar(select(Account).where(Account.id == user_id).limit(1))
|
||||||
if not user:
|
if not user:
|
||||||
logger.info(click.style(f"User not found: {user_id}", fg="red"))
|
logger.info(click.style(f"User not found: {user_id}", fg="red"))
|
||||||
return
|
return
|
||||||
tenant = session.query(Tenant).where(Tenant.id == dataset.tenant_id).first()
|
tenant = session.scalar(select(Tenant).where(Tenant.id == dataset.tenant_id).limit(1))
|
||||||
if not tenant:
|
if not tenant:
|
||||||
raise ValueError("Tenant not found")
|
raise ValueError("Tenant not found")
|
||||||
user.current_tenant = tenant
|
user.current_tenant = tenant
|
||||||
@ -58,10 +58,8 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str], user_
|
|||||||
"your subscription."
|
"your subscription."
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
document = (
|
document = session.scalar(
|
||||||
session.query(Document)
|
select(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).limit(1)
|
||||||
.where(Document.id == document_id, Document.dataset_id == dataset_id)
|
|
||||||
.first()
|
|
||||||
)
|
)
|
||||||
if document:
|
if document:
|
||||||
document.indexing_status = IndexingStatus.ERROR
|
document.indexing_status = IndexingStatus.ERROR
|
||||||
@ -73,8 +71,8 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str], user_
|
|||||||
return
|
return
|
||||||
|
|
||||||
logger.info(click.style(f"Start retry document: {document_id}", fg="green"))
|
logger.info(click.style(f"Start retry document: {document_id}", fg="green"))
|
||||||
document = (
|
document = session.scalar(
|
||||||
session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first()
|
select(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).limit(1)
|
||||||
)
|
)
|
||||||
if not document:
|
if not document:
|
||||||
logger.info(click.style(f"Document not found: {document_id}", fg="yellow"))
|
logger.info(click.style(f"Document not found: {document_id}", fg="yellow"))
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user