diff --git a/api/commands.py b/api/commands.py index 89fef39d25..2fdc7f6aa7 100644 --- a/api/commands.py +++ b/api/commands.py @@ -1233,15 +1233,17 @@ def _find_orphaned_draft_variables(batch_size: int = 1000) -> list[str]: def _count_orphaned_draft_variables() -> dict[str, Any]: """ - Count orphaned draft variables by app. + Count orphaned draft variables by app, including associated file counts. Returns: - Dictionary with statistics about orphaned variables + Dictionary with statistics about orphaned variables and files """ - query = """ + # Count orphaned variables by app + variables_query = """ SELECT wdv.app_id, - COUNT(*) as variable_count + COUNT(*) as variable_count, + COUNT(wdv.file_id) as file_count FROM workflow_draft_variables AS wdv WHERE NOT EXISTS( SELECT 1 FROM apps WHERE apps.id = wdv.app_id @@ -1251,14 +1253,24 @@ def _count_orphaned_draft_variables() -> dict[str, Any]: """ with db.engine.connect() as conn: - result = conn.execute(sa.text(query)) - orphaned_by_app = {row[0]: row[1] for row in result} + result = conn.execute(sa.text(variables_query)) + orphaned_by_app = {} + total_files = 0 + + for row in result: + app_id, variable_count, file_count = row + orphaned_by_app[app_id] = { + "variables": variable_count, + "files": file_count + } + total_files += file_count - total_orphaned = sum(orphaned_by_app.values()) + total_orphaned = sum(app_data["variables"] for app_data in orphaned_by_app.values()) app_count = len(orphaned_by_app) return { "total_orphaned_variables": total_orphaned, + "total_orphaned_files": total_files, "orphaned_app_count": app_count, "orphaned_by_app": orphaned_by_app, } @@ -1287,6 +1299,7 @@ def cleanup_orphaned_draft_variables( stats = _count_orphaned_draft_variables() logger.info("Found %s orphaned draft variables", stats["total_orphaned_variables"]) + logger.info("Found %s associated offload files", stats["total_orphaned_files"]) logger.info("Across %s non-existent apps", stats["orphaned_app_count"]) if stats["total_orphaned_variables"] == 0: @@ -1295,10 +1308,10 @@ def cleanup_orphaned_draft_variables( if dry_run: logger.info("DRY RUN: Would delete the following:") - for app_id, count in sorted(stats["orphaned_by_app"].items(), key=lambda x: x[1], reverse=True)[ + for app_id, data in sorted(stats["orphaned_by_app"].items(), key=lambda x: x[1]["variables"], reverse=True)[ :10 ]: # Show top 10 - logger.info(" App %s: %s variables", app_id, count) + logger.info(" App %s: %s variables, %s files", app_id, data["variables"], data["files"]) if len(stats["orphaned_by_app"]) > 10: logger.info(" ... and %s more apps", len(stats["orphaned_by_app"]) - 10) return @@ -1307,7 +1320,8 @@ def cleanup_orphaned_draft_variables( if not force: click.confirm( f"Are you sure you want to delete {stats['total_orphaned_variables']} " - f"orphaned draft variables from {stats['orphaned_app_count']} apps?", + f"orphaned draft variables and {stats['total_orphaned_files']} associated files " + f"from {stats['orphaned_app_count']} apps?", abort=True, ) diff --git a/api/tasks/remove_app_and_related_data_task.py b/api/tasks/remove_app_and_related_data_task.py index 7bfda3d740..c82643ade8 100644 --- a/api/tasks/remove_app_and_related_data_task.py +++ b/api/tasks/remove_app_and_related_data_task.py @@ -354,6 +354,11 @@ def delete_draft_variables_batch(app_id: str, batch_size: int = 1000) -> int: """ Delete draft variables for an app in batches. + This function now handles cleanup of associated Offload data including: + - WorkflowDraftVariableFile records + - UploadFile records + - Object storage files + Args: app_id: The ID of the app whose draft variables should be deleted batch_size: Number of records to delete per batch @@ -365,22 +370,31 @@ def delete_draft_variables_batch(app_id: str, batch_size: int = 1000) -> int: raise ValueError("batch_size must be positive") total_deleted = 0 + total_files_deleted = 0 while True: with db.engine.begin() as conn: - # Get a batch of draft variable IDs + # Get a batch of draft variable IDs along with their file_ids query_sql = """ - SELECT id FROM workflow_draft_variables + SELECT id, file_id FROM workflow_draft_variables WHERE app_id = :app_id LIMIT :batch_size """ result = conn.execute(sa.text(query_sql), {"app_id": app_id, "batch_size": batch_size}) - draft_var_ids = [row[0] for row in result] - if not draft_var_ids: + rows = list(result) + if not rows: break - # Delete the batch + draft_var_ids = [row[0] for row in rows] + file_ids = [row[1] for row in rows if row[1] is not None] + + # Clean up associated Offload data first + if file_ids: + files_deleted = _delete_draft_variable_offload_data(conn, file_ids) + total_files_deleted += files_deleted + + # Delete the draft variables delete_sql = """ DELETE FROM workflow_draft_variables WHERE id IN :ids @@ -391,10 +405,85 @@ def delete_draft_variables_batch(app_id: str, batch_size: int = 1000) -> int: logger.info(click.style(f"Deleted {batch_deleted} draft variables (batch) for app {app_id}", fg="green")) - logger.info(click.style(f"Deleted {total_deleted} total draft variables for app {app_id}", fg="green")) + logger.info( + click.style( + f"Deleted {total_deleted} total draft variables for app {app_id}. " + f"Cleaned up {total_files_deleted} total associated files.", + fg="green", + ) + ) return total_deleted +def _delete_draft_variable_offload_data(conn, file_ids: list[str]) -> int: + """ + Delete Offload data associated with WorkflowDraftVariable file_ids. + + This function: + 1. Finds WorkflowDraftVariableFile records by file_ids + 2. Deletes associated files from object storage + 3. Deletes UploadFile records + 4. Deletes WorkflowDraftVariableFile records + + Args: + conn: Database connection + file_ids: List of WorkflowDraftVariableFile IDs + + Returns: + Number of files cleaned up + """ + from extensions.ext_storage import storage + + if not file_ids: + return 0 + + files_deleted = 0 + + try: + # Get WorkflowDraftVariableFile records and their associated UploadFile keys + query_sql = """ + SELECT wdvf.id, uf.key, uf.id as upload_file_id + FROM workflow_draft_variable_files wdvf + JOIN upload_files uf ON wdvf.upload_file_id = uf.id + WHERE wdvf.id IN :file_ids + """ + result = conn.execute(sa.text(query_sql), {"file_ids": tuple(file_ids)}) + file_records = list(result) + + # Delete from object storage and collect upload file IDs + upload_file_ids = [] + for variable_file_id, storage_key, upload_file_id in file_records: + try: + storage.delete(storage_key) + upload_file_ids.append(upload_file_id) + files_deleted += 1 + except Exception as e: + logging.warning(f"Failed to delete storage object {storage_key}: {e}") + # Continue with database cleanup even if storage deletion fails + upload_file_ids.append(upload_file_id) + + # Delete UploadFile records + if upload_file_ids: + delete_upload_files_sql = """ + DELETE FROM upload_files + WHERE id IN :upload_file_ids + """ + conn.execute(sa.text(delete_upload_files_sql), {"upload_file_ids": tuple(upload_file_ids)}) + + # Delete WorkflowDraftVariableFile records + delete_variable_files_sql = """ + DELETE FROM workflow_draft_variable_files + WHERE id IN :file_ids + """ + conn.execute(sa.text(delete_variable_files_sql), {"file_ids": tuple(file_ids)}) + + except Exception as e: + logging.exception(f"Error deleting draft variable offload data: {e}") + # Don't raise, as we want to continue with the main deletion process + + return files_deleted + + def _delete_records(query_sql: str, params: dict, delete_func: Callable, name: str) -> None: while True: with db.engine.begin() as conn: