diff --git a/api/tasks/remove_app_and_related_data_task.py b/api/tasks/remove_app_and_related_data_task.py index c66287c1d7..3227f6da96 100644 --- a/api/tasks/remove_app_and_related_data_task.py +++ b/api/tasks/remove_app_and_related_data_task.py @@ -17,6 +17,7 @@ from models import ( AppDatasetJoin, AppMCPServer, AppModelConfig, + AppTrigger, Conversation, EndUser, InstalledApp, @@ -30,8 +31,10 @@ from models import ( Site, TagBinding, TraceAppConfig, + WorkflowSchedulePlan, ) from models.tools import WorkflowToolProvider +from models.trigger import WorkflowPluginTrigger, WorkflowTriggerLog, WorkflowWebhookTrigger from models.web import PinnedConversation, SavedMessage from models.workflow import ( ConversationVariable, @@ -489,6 +492,72 @@ def _delete_draft_variable_offload_data(conn, file_ids: list[str]) -> int: return files_deleted +def _delete_app_triggers(tenant_id: str, app_id: str): + def del_app_trigger(trigger_id: str): + db.session.query(AppTrigger).where(AppTrigger.id == trigger_id).delete(synchronize_session=False) + + _delete_records( + """select id from app_triggers where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_app_trigger, + "app trigger", + ) + + +def _delete_workflow_plugin_triggers(tenant_id: str, app_id: str): + def del_plugin_trigger(trigger_id: str): + db.session.query(WorkflowPluginTrigger).where(WorkflowPluginTrigger.id == trigger_id).delete( + synchronize_session=False + ) + + _delete_records( + """select id from workflow_plugin_triggers where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_plugin_trigger, + "workflow plugin trigger", + ) + + +def _delete_workflow_webhook_triggers(tenant_id: str, app_id: str): + def del_webhook_trigger(trigger_id: str): + db.session.query(WorkflowWebhookTrigger).where(WorkflowWebhookTrigger.id == trigger_id).delete( + synchronize_session=False + ) + + _delete_records( + """select id from workflow_webhook_triggers where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_webhook_trigger, + "workflow webhook trigger", + ) + + +def _delete_workflow_schedule_plans(tenant_id: str, app_id: str): + def del_schedule_plan(plan_id: str): + db.session.query(WorkflowSchedulePlan).where(WorkflowSchedulePlan.id == plan_id).delete( + synchronize_session=False + ) + + _delete_records( + """select id from workflow_schedule_plans where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_schedule_plan, + "workflow schedule plan", + ) + + +def _delete_workflow_trigger_logs(tenant_id: str, app_id: str): + def del_trigger_log(log_id: str): + db.session.query(WorkflowTriggerLog).where(WorkflowTriggerLog.id == log_id).delete(synchronize_session=False) + + _delete_records( + """select id from workflow_trigger_logs where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_trigger_log, + "workflow trigger log", + ) + + def _delete_records(query_sql: str, params: dict, delete_func: Callable, name: str) -> None: while True: with db.engine.begin() as conn: @@ -506,88 +575,3 @@ def _delete_records(query_sql: str, params: dict, delete_func: Callable, name: s logger.exception("Error occurred while deleting %s %s", name, record_id) continue rs.close() - - -def _delete_app_triggers(tenant_id: str, app_id: str): - with db.engine.begin() as conn: - result = conn.execute( - sa.text( - """ - DELETE FROM app_triggers - WHERE tenant_id = :tenant_id - AND app_id = :app_id - """ - ), - {"tenant_id": tenant_id, "app_id": app_id}, - ) - deleted_count = result.rowcount or 0 - if deleted_count > 0: - logger.info(click.style(f"Deleted {deleted_count} app triggers for app {app_id}", fg="green")) - - -def _delete_workflow_plugin_triggers(tenant_id: str, app_id: str): - with db.engine.begin() as conn: - result = conn.execute( - sa.text( - """ - DELETE FROM workflow_plugin_triggers - WHERE tenant_id = :tenant_id - AND app_id = :app_id - """ - ), - {"tenant_id": tenant_id, "app_id": app_id}, - ) - deleted_count = result.rowcount or 0 - if deleted_count > 0: - logger.info(click.style(f"Deleted {deleted_count} workflow plugin triggers for app {app_id}", fg="green")) - - -def _delete_workflow_webhook_triggers(tenant_id: str, app_id: str): - with db.engine.begin() as conn: - result = conn.execute( - sa.text( - """ - DELETE FROM workflow_webhook_triggers - WHERE tenant_id = :tenant_id - AND app_id = :app_id - """ - ), - {"tenant_id": tenant_id, "app_id": app_id}, - ) - deleted_count = result.rowcount or 0 - if deleted_count > 0: - logger.info(click.style(f"Deleted {deleted_count} workflow webhook triggers for app {app_id}", fg="green")) - - -def _delete_workflow_schedule_plans(tenant_id: str, app_id: str): - with db.engine.begin() as conn: - result = conn.execute( - sa.text( - """ - DELETE FROM workflow_schedule_plans - WHERE tenant_id = :tenant_id - AND app_id = :app_id - """ - ), - {"tenant_id": tenant_id, "app_id": app_id}, - ) - deleted_count = result.rowcount or 0 - if deleted_count > 0: - logger.info(click.style(f"Deleted {deleted_count} workflow schedule plans for app {app_id}", fg="green")) - - -def _delete_workflow_trigger_logs(tenant_id: str, app_id: str): - with db.engine.begin() as conn: - result = conn.execute( - sa.text( - """ - DELETE FROM workflow_trigger_logs - WHERE tenant_id = :tenant_id - AND app_id = :app_id - """ - ), - {"tenant_id": tenant_id, "app_id": app_id}, - ) - deleted_count = result.rowcount or 0 - if deleted_count > 0: - logger.info(click.style(f"Deleted {deleted_count} workflow trigger logs for app {app_id}", fg="green"))