From 8ddc4f22927655d337fc80d2e2e17d1ba611b195 Mon Sep 17 00:00:00 2001 From: Asuka Minato Date: Wed, 15 Oct 2025 00:42:55 +0900 Subject: [PATCH] example to auto rollback (#26200) Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../clean_workflow_runlogs_precise.py | 125 +++++++++--------- 1 file changed, 59 insertions(+), 66 deletions(-) diff --git a/api/schedule/clean_workflow_runlogs_precise.py b/api/schedule/clean_workflow_runlogs_precise.py index 485a79782c..db4198720d 100644 --- a/api/schedule/clean_workflow_runlogs_precise.py +++ b/api/schedule/clean_workflow_runlogs_precise.py @@ -1,8 +1,11 @@ import datetime import logging import time +from collections.abc import Sequence import click +from sqlalchemy import select +from sqlalchemy.orm import Session, sessionmaker import app from configs import dify_config @@ -35,50 +38,53 @@ def clean_workflow_runlogs_precise(): retention_days = dify_config.WORKFLOW_LOG_RETENTION_DAYS cutoff_date = datetime.datetime.now() - datetime.timedelta(days=retention_days) + session_factory = sessionmaker(db.engine, expire_on_commit=False) try: - total_workflow_runs = db.session.query(WorkflowRun).where(WorkflowRun.created_at < cutoff_date).count() - if total_workflow_runs == 0: - logger.info("No expired workflow run logs found") - return - logger.info("Found %s expired workflow run logs to clean", total_workflow_runs) + with session_factory.begin() as session: + total_workflow_runs = session.query(WorkflowRun).where(WorkflowRun.created_at < cutoff_date).count() + if total_workflow_runs == 0: + logger.info("No expired workflow run logs found") + return + logger.info("Found %s expired workflow run logs to clean", total_workflow_runs) total_deleted = 0 failed_batches = 0 batch_count = 0 - while True: - workflow_runs = ( - db.session.query(WorkflowRun.id).where(WorkflowRun.created_at < cutoff_date).limit(BATCH_SIZE).all() - ) + with session_factory.begin() as session: + workflow_run_ids = session.scalars( + select(WorkflowRun.id) + .where(WorkflowRun.created_at < cutoff_date) + .order_by(WorkflowRun.created_at, WorkflowRun.id) + .limit(BATCH_SIZE) + ).all() - if not workflow_runs: - break - - workflow_run_ids = [run.id for run in workflow_runs] - batch_count += 1 - - success = _delete_batch_with_retry(workflow_run_ids, failed_batches) - - if success: - total_deleted += len(workflow_run_ids) - failed_batches = 0 - else: - failed_batches += 1 - if failed_batches >= MAX_RETRIES: - logger.error("Failed to delete batch after %s retries, aborting cleanup for today", MAX_RETRIES) + if not workflow_run_ids: break + + batch_count += 1 + + success = _delete_batch(session, workflow_run_ids, failed_batches) + + if success: + total_deleted += len(workflow_run_ids) + failed_batches = 0 else: - # Calculate incremental delay times: 5, 10, 15 minutes - retry_delay_minutes = failed_batches * 5 - logger.warning("Batch deletion failed, retrying in %s minutes...", retry_delay_minutes) - time.sleep(retry_delay_minutes * 60) - continue + failed_batches += 1 + if failed_batches >= MAX_RETRIES: + logger.error("Failed to delete batch after %s retries, aborting cleanup for today", MAX_RETRIES) + break + else: + # Calculate incremental delay times: 5, 10, 15 minutes + retry_delay_minutes = failed_batches * 5 + logger.warning("Batch deletion failed, retrying in %s minutes...", retry_delay_minutes) + time.sleep(retry_delay_minutes * 60) + continue logger.info("Cleanup completed: %s expired workflow run logs deleted", total_deleted) except Exception: - db.session.rollback() logger.exception("Unexpected error in workflow log cleanup") raise @@ -87,69 +93,56 @@ def clean_workflow_runlogs_precise(): click.echo(click.style(f"Cleaned workflow run logs from db success latency: {execution_time:.2f}s", fg="green")) -def _delete_batch_with_retry(workflow_run_ids: list[str], attempt_count: int) -> bool: - """Delete a single batch with a retry mechanism and complete cascading deletion""" +def _delete_batch(session: Session, workflow_run_ids: Sequence[str], attempt_count: int) -> bool: + """Delete a single batch of workflow runs and all related data within a nested transaction.""" try: - with db.session.begin_nested(): + with session.begin_nested(): message_data = ( - db.session.query(Message.id, Message.conversation_id) + session.query(Message.id, Message.conversation_id) .where(Message.workflow_run_id.in_(workflow_run_ids)) .all() ) message_id_list = [msg.id for msg in message_data] conversation_id_list = list({msg.conversation_id for msg in message_data if msg.conversation_id}) if message_id_list: - db.session.query(AppAnnotationHitHistory).where( - AppAnnotationHitHistory.message_id.in_(message_id_list) - ).delete(synchronize_session=False) + message_related_models = [ + AppAnnotationHitHistory, + MessageAgentThought, + MessageChain, + MessageFile, + MessageAnnotation, + MessageFeedback, + ] + for model in message_related_models: + session.query(model).where(model.message_id.in_(message_id_list)).delete(synchronize_session=False) # type: ignore + # error: "DeclarativeAttributeIntercept" has no attribute "message_id". But this type is only in lib + # and these 6 types all have the message_id field. - db.session.query(MessageAgentThought).where(MessageAgentThought.message_id.in_(message_id_list)).delete( + session.query(Message).where(Message.workflow_run_id.in_(workflow_run_ids)).delete( synchronize_session=False ) - db.session.query(MessageChain).where(MessageChain.message_id.in_(message_id_list)).delete( - synchronize_session=False - ) - - db.session.query(MessageFile).where(MessageFile.message_id.in_(message_id_list)).delete( - synchronize_session=False - ) - - db.session.query(MessageAnnotation).where(MessageAnnotation.message_id.in_(message_id_list)).delete( - synchronize_session=False - ) - - db.session.query(MessageFeedback).where(MessageFeedback.message_id.in_(message_id_list)).delete( - synchronize_session=False - ) - - db.session.query(Message).where(Message.workflow_run_id.in_(workflow_run_ids)).delete( - synchronize_session=False - ) - - db.session.query(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(workflow_run_ids)).delete( + session.query(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(workflow_run_ids)).delete( synchronize_session=False ) - db.session.query(WorkflowNodeExecutionModel).where( + session.query(WorkflowNodeExecutionModel).where( WorkflowNodeExecutionModel.workflow_run_id.in_(workflow_run_ids) ).delete(synchronize_session=False) if conversation_id_list: - db.session.query(ConversationVariable).where( + session.query(ConversationVariable).where( ConversationVariable.conversation_id.in_(conversation_id_list) ).delete(synchronize_session=False) - db.session.query(Conversation).where(Conversation.id.in_(conversation_id_list)).delete( + session.query(Conversation).where(Conversation.id.in_(conversation_id_list)).delete( synchronize_session=False ) - db.session.query(WorkflowRun).where(WorkflowRun.id.in_(workflow_run_ids)).delete(synchronize_session=False) + session.query(WorkflowRun).where(WorkflowRun.id.in_(workflow_run_ids)).delete(synchronize_session=False) - db.session.commit() - return True + return True except Exception: - db.session.rollback() logger.exception("Batch deletion failed (attempt %s)", attempt_count + 1) return False