chore(api): remove orphan files in draft var cleanup script

This commit is contained in:
QuantumGhost 2025-08-29 14:45:16 +08:00
parent 13eb9f7d7d
commit a7aa17e361
2 changed files with 119 additions and 16 deletions

View File

@ -1233,15 +1233,17 @@ def _find_orphaned_draft_variables(batch_size: int = 1000) -> list[str]:
def _count_orphaned_draft_variables() -> dict[str, Any]:
"""
Count orphaned draft variables by app.
Count orphaned draft variables by app, including associated file counts.
Returns:
Dictionary with statistics about orphaned variables
Dictionary with statistics about orphaned variables and files
"""
query = """
# Count orphaned variables by app
variables_query = """
SELECT
wdv.app_id,
COUNT(*) as variable_count
COUNT(*) as variable_count,
COUNT(wdv.file_id) as file_count
FROM workflow_draft_variables AS wdv
WHERE NOT EXISTS(
SELECT 1 FROM apps WHERE apps.id = wdv.app_id
@ -1251,14 +1253,24 @@ def _count_orphaned_draft_variables() -> dict[str, Any]:
"""
with db.engine.connect() as conn:
result = conn.execute(sa.text(query))
orphaned_by_app = {row[0]: row[1] for row in result}
result = conn.execute(sa.text(variables_query))
orphaned_by_app = {}
total_files = 0
for row in result:
app_id, variable_count, file_count = row
orphaned_by_app[app_id] = {
"variables": variable_count,
"files": file_count
}
total_files += file_count
total_orphaned = sum(orphaned_by_app.values())
total_orphaned = sum(app_data["variables"] for app_data in orphaned_by_app.values())
app_count = len(orphaned_by_app)
return {
"total_orphaned_variables": total_orphaned,
"total_orphaned_files": total_files,
"orphaned_app_count": app_count,
"orphaned_by_app": orphaned_by_app,
}
@ -1287,6 +1299,7 @@ def cleanup_orphaned_draft_variables(
stats = _count_orphaned_draft_variables()
logger.info("Found %s orphaned draft variables", stats["total_orphaned_variables"])
logger.info("Found %s associated offload files", stats["total_orphaned_files"])
logger.info("Across %s non-existent apps", stats["orphaned_app_count"])
if stats["total_orphaned_variables"] == 0:
@ -1295,10 +1308,10 @@ def cleanup_orphaned_draft_variables(
if dry_run:
logger.info("DRY RUN: Would delete the following:")
for app_id, count in sorted(stats["orphaned_by_app"].items(), key=lambda x: x[1], reverse=True)[
for app_id, data in sorted(stats["orphaned_by_app"].items(), key=lambda x: x[1]["variables"], reverse=True)[
:10
]: # Show top 10
logger.info(" App %s: %s variables", app_id, count)
logger.info(" App %s: %s variables, %s files", app_id, data["variables"], data["files"])
if len(stats["orphaned_by_app"]) > 10:
logger.info(" ... and %s more apps", len(stats["orphaned_by_app"]) - 10)
return
@ -1307,7 +1320,8 @@ def cleanup_orphaned_draft_variables(
if not force:
click.confirm(
f"Are you sure you want to delete {stats['total_orphaned_variables']} "
f"orphaned draft variables from {stats['orphaned_app_count']} apps?",
f"orphaned draft variables and {stats['total_orphaned_files']} associated files "
f"from {stats['orphaned_app_count']} apps?",
abort=True,
)

View File

@ -354,6 +354,11 @@ def delete_draft_variables_batch(app_id: str, batch_size: int = 1000) -> int:
"""
Delete draft variables for an app in batches.
This function now handles cleanup of associated Offload data including:
- WorkflowDraftVariableFile records
- UploadFile records
- Object storage files
Args:
app_id: The ID of the app whose draft variables should be deleted
batch_size: Number of records to delete per batch
@ -365,22 +370,31 @@ def delete_draft_variables_batch(app_id: str, batch_size: int = 1000) -> int:
raise ValueError("batch_size must be positive")
total_deleted = 0
total_files_deleted = 0
while True:
with db.engine.begin() as conn:
# Get a batch of draft variable IDs
# Get a batch of draft variable IDs along with their file_ids
query_sql = """
SELECT id FROM workflow_draft_variables
SELECT id, file_id FROM workflow_draft_variables
WHERE app_id = :app_id
LIMIT :batch_size
"""
result = conn.execute(sa.text(query_sql), {"app_id": app_id, "batch_size": batch_size})
draft_var_ids = [row[0] for row in result]
if not draft_var_ids:
rows = list(result)
if not rows:
break
# Delete the batch
draft_var_ids = [row[0] for row in rows]
file_ids = [row[1] for row in rows if row[1] is not None]
# Clean up associated Offload data first
if file_ids:
files_deleted = _delete_draft_variable_offload_data(conn, file_ids)
total_files_deleted += files_deleted
# Delete the draft variables
delete_sql = """
DELETE FROM workflow_draft_variables
WHERE id IN :ids
@ -391,10 +405,85 @@ def delete_draft_variables_batch(app_id: str, batch_size: int = 1000) -> int:
logger.info(click.style(f"Deleted {batch_deleted} draft variables (batch) for app {app_id}", fg="green"))
logger.info(click.style(f"Deleted {total_deleted} total draft variables for app {app_id}", fg="green"))
logger.info(
click.style(
f"Deleted {total_deleted} total draft variables for app {app_id}. "
f"Cleaned up {total_files_deleted} total associated files.",
fg="green",
)
)
return total_deleted
def _delete_draft_variable_offload_data(conn, file_ids: list[str]) -> int:
"""
Delete Offload data associated with WorkflowDraftVariable file_ids.
This function:
1. Finds WorkflowDraftVariableFile records by file_ids
2. Deletes associated files from object storage
3. Deletes UploadFile records
4. Deletes WorkflowDraftVariableFile records
Args:
conn: Database connection
file_ids: List of WorkflowDraftVariableFile IDs
Returns:
Number of files cleaned up
"""
from extensions.ext_storage import storage
if not file_ids:
return 0
files_deleted = 0
try:
# Get WorkflowDraftVariableFile records and their associated UploadFile keys
query_sql = """
SELECT wdvf.id, uf.key, uf.id as upload_file_id
FROM workflow_draft_variable_files wdvf
JOIN upload_files uf ON wdvf.upload_file_id = uf.id
WHERE wdvf.id IN :file_ids
"""
result = conn.execute(sa.text(query_sql), {"file_ids": tuple(file_ids)})
file_records = list(result)
# Delete from object storage and collect upload file IDs
upload_file_ids = []
for variable_file_id, storage_key, upload_file_id in file_records:
try:
storage.delete(storage_key)
upload_file_ids.append(upload_file_id)
files_deleted += 1
except Exception as e:
logging.warning(f"Failed to delete storage object {storage_key}: {e}")
# Continue with database cleanup even if storage deletion fails
upload_file_ids.append(upload_file_id)
# Delete UploadFile records
if upload_file_ids:
delete_upload_files_sql = """
DELETE FROM upload_files
WHERE id IN :upload_file_ids
"""
conn.execute(sa.text(delete_upload_files_sql), {"upload_file_ids": tuple(upload_file_ids)})
# Delete WorkflowDraftVariableFile records
delete_variable_files_sql = """
DELETE FROM workflow_draft_variable_files
WHERE id IN :file_ids
"""
conn.execute(sa.text(delete_variable_files_sql), {"file_ids": tuple(file_ids)})
except Exception as e:
logging.exception(f"Error deleting draft variable offload data: {e}")
# Don't raise, as we want to continue with the main deletion process
return files_deleted
def _delete_records(query_sql: str, params: dict, delete_func: Callable, name: str) -> None:
while True:
with db.engine.begin() as conn: