From 3a4756449a7850a145ec45b94a9a0c79061cb0ed Mon Sep 17 00:00:00 2001 From: aliworksx08 <57456290+aliworksx08@users.noreply.github.com> Date: Wed, 8 Apr 2026 18:12:57 -0500 Subject: [PATCH] refactor: migrate session.query to select API in schedule cleanup task (#34775) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../clean_workflow_runlogs_precise.py | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/api/schedule/clean_workflow_runlogs_precise.py b/api/schedule/clean_workflow_runlogs_precise.py index ebb8d52924..c5762fcdad 100644 --- a/api/schedule/clean_workflow_runlogs_precise.py +++ b/api/schedule/clean_workflow_runlogs_precise.py @@ -4,6 +4,7 @@ import time from collections.abc import Sequence import click +from sqlalchemy import delete, select from sqlalchemy.orm import Session, sessionmaker import app @@ -113,11 +114,9 @@ def _delete_batch( try: with session.begin_nested(): workflow_run_ids = [run.id for run in workflow_runs] - message_data = ( - session.query(Message.id, Message.conversation_id) - .where(Message.workflow_run_id.in_(workflow_run_ids)) - .all() - ) + message_data = session.execute( + select(Message.id, Message.conversation_id).where(Message.workflow_run_id.in_(workflow_run_ids)) + ).all() message_id_list = [msg.id for msg in message_data] conversation_id_list = list({msg.conversation_id for msg in message_data if msg.conversation_id}) if message_id_list: @@ -132,23 +131,19 @@ def _delete_batch( SavedMessage, ] for model in message_related_models: - session.query(model).where(model.message_id.in_(message_id_list)).delete(synchronize_session=False) # type: ignore + session.execute(delete(model).where(model.message_id.in_(message_id_list))) # type: ignore # error: "DeclarativeAttributeIntercept" has no attribute "message_id". But this type is only in lib # and these 6 types all have the message_id field. - session.query(Message).where(Message.workflow_run_id.in_(workflow_run_ids)).delete( - synchronize_session=False - ) + session.execute(delete(Message).where(Message.workflow_run_id.in_(workflow_run_ids))) if conversation_id_list: - session.query(ConversationVariable).where( - ConversationVariable.conversation_id.in_(conversation_id_list) - ).delete(synchronize_session=False) - - session.query(Conversation).where(Conversation.id.in_(conversation_id_list)).delete( - synchronize_session=False + session.execute( + delete(ConversationVariable).where(ConversationVariable.conversation_id.in_(conversation_id_list)) ) + session.execute(delete(Conversation).where(Conversation.id.in_(conversation_id_list))) + def _delete_node_executions(active_session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]: run_ids = [run.id for run in runs] repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(