refactor: migrate session.query to select API in schedule cleanup task (#34775)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
aliworksx08 2026-04-08 18:12:57 -05:00 committed by GitHub
parent 55b7ea04a7
commit 3a4756449a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -4,6 +4,7 @@ import time
from collections.abc import Sequence
import click
from sqlalchemy import delete, select
from sqlalchemy.orm import Session, sessionmaker
import app
@ -113,11 +114,9 @@ def _delete_batch(
try:
with session.begin_nested():
workflow_run_ids = [run.id for run in workflow_runs]
message_data = (
session.query(Message.id, Message.conversation_id)
.where(Message.workflow_run_id.in_(workflow_run_ids))
.all()
)
message_data = session.execute(
select(Message.id, Message.conversation_id).where(Message.workflow_run_id.in_(workflow_run_ids))
).all()
message_id_list = [msg.id for msg in message_data]
conversation_id_list = list({msg.conversation_id for msg in message_data if msg.conversation_id})
if message_id_list:
@ -132,23 +131,19 @@ def _delete_batch(
SavedMessage,
]
for model in message_related_models:
session.query(model).where(model.message_id.in_(message_id_list)).delete(synchronize_session=False) # type: ignore
session.execute(delete(model).where(model.message_id.in_(message_id_list))) # type: ignore
# error: "DeclarativeAttributeIntercept" has no attribute "message_id". But this type is only in lib
# and these 6 types all have the message_id field.
session.query(Message).where(Message.workflow_run_id.in_(workflow_run_ids)).delete(
synchronize_session=False
)
session.execute(delete(Message).where(Message.workflow_run_id.in_(workflow_run_ids)))
if conversation_id_list:
session.query(ConversationVariable).where(
ConversationVariable.conversation_id.in_(conversation_id_list)
).delete(synchronize_session=False)
session.query(Conversation).where(Conversation.id.in_(conversation_id_list)).delete(
synchronize_session=False
session.execute(
delete(ConversationVariable).where(ConversationVariable.conversation_id.in_(conversation_id_list))
)
session.execute(delete(Conversation).where(Conversation.id.in_(conversation_id_list)))
def _delete_node_executions(active_session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]:
run_ids = [run.id for run in runs]
repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(