From 7065b67d07d6c8b380ab30dacca0c73eeb49772e Mon Sep 17 00:00:00 2001 From: zyssyz123 <916125788@qq.com> Date: Thu, 16 Oct 2025 10:19:49 +0800 Subject: [PATCH] add app mode for message (#26876) --- .../app/apps/message_based_app_generator.py | 1 + ...-d98acf217d43_add_app_mode_for_messsage.py | 84 +++++++++++++++++++ api/models/model.py | 2 + 3 files changed, 87 insertions(+) create mode 100644 api/migrations/versions/2025_10_14_1618-d98acf217d43_add_app_mode_for_messsage.py diff --git a/api/core/app/apps/message_based_app_generator.py b/api/core/app/apps/message_based_app_generator.py index 170c6a274b..7a51b8f3a5 100644 --- a/api/core/app/apps/message_based_app_generator.py +++ b/api/core/app/apps/message_based_app_generator.py @@ -207,6 +207,7 @@ class MessageBasedAppGenerator(BaseAppGenerator): from_source=from_source, from_end_user_id=end_user_id, from_account_id=account_id, + app_mode=app_config.app_mode, ) db.session.add(message) diff --git a/api/migrations/versions/2025_10_14_1618-d98acf217d43_add_app_mode_for_messsage.py b/api/migrations/versions/2025_10_14_1618-d98acf217d43_add_app_mode_for_messsage.py new file mode 100644 index 0000000000..7d6797fca0 --- /dev/null +++ b/api/migrations/versions/2025_10_14_1618-d98acf217d43_add_app_mode_for_messsage.py @@ -0,0 +1,84 @@ +"""add app_mode for messsage + +Revision ID: d98acf217d43 +Revises: 68519ad5cd18 +Create Date: 2025-10-14 16:18:08.568011 + +""" +from alembic import op +import models as models +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = 'd98acf217d43' +down_revision = '68519ad5cd18' +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table('messages', schema=None) as batch_op: + batch_op.add_column(sa.Column('app_mode', sa.String(length=255), nullable=True)) + batch_op.create_index('message_app_mode_idx', ['app_mode'], unique=False) + + conn = op.get_bind() + + # Strategy: Update in batches to minimize lock time + # For large tables (millions of rows), this prevents long-running transactions + batch_size = 10000 + + print("Starting backfill of app_mode from conversations...") + + # Use a more efficient UPDATE with JOIN + # This query updates messages.app_mode from conversations.mode + # Using string formatting for LIMIT since it's a constant + update_query = f""" + UPDATE messages m + SET app_mode = c.mode + FROM conversations c + WHERE m.conversation_id = c.id + AND m.app_mode IS NULL + AND m.id IN ( + SELECT id FROM messages + WHERE app_mode IS NULL + LIMIT {batch_size} + ) + """ + + # Execute batched updates + total_updated = 0 + iteration = 0 + while True: + iteration += 1 + result = conn.execute(sa.text(update_query)) + + # Check if result is None or has no rowcount + if result is None: + print("Warning: Query returned None, stopping backfill") + break + + rows_updated = result.rowcount if hasattr(result, 'rowcount') else 0 + total_updated += rows_updated + + if rows_updated == 0: + break + + print(f"Iteration {iteration}: Updated {rows_updated} messages (total: {total_updated})") + + # For very large tables, add a small delay to reduce load + # Uncomment if needed: import time; time.sleep(0.1) + + print(f"Backfill completed. Total messages updated: {total_updated}") + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + + with op.batch_alter_table('messages', schema=None) as batch_op: + batch_op.drop_index('message_app_mode_idx') + batch_op.drop_column('app_mode') + + # ### end Alembic commands ### diff --git a/api/models/model.py b/api/models/model.py index 18958c8253..2373421e7d 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -910,6 +910,7 @@ class Message(Base): Index("message_account_idx", "app_id", "from_source", "from_account_id"), Index("message_workflow_run_id_idx", "conversation_id", "workflow_run_id"), Index("message_created_at_idx", "created_at"), + Index("message_app_mode_idx", "app_mode"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) @@ -943,6 +944,7 @@ class Message(Base): updated_at = mapped_column(sa.DateTime, nullable=False, server_default=func.current_timestamp()) agent_based: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("false")) workflow_run_id: Mapped[str | None] = mapped_column(StringUUID) + app_mode: Mapped[str | None] = mapped_column(String(255), nullable=True) @property def inputs(self) -> dict[str, Any]: