add app mode for message

This commit is contained in:
Yansong Zhang 2025-10-14 16:20:04 +08:00
parent 7a54607219
commit 7877706b5f
3 changed files with 78 additions and 0 deletions

View File

@ -207,6 +207,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
from_source=from_source,
from_end_user_id=end_user_id,
from_account_id=account_id,
app_mode=app_config.app_mode,
)
db.session.add(message)

View File

@ -0,0 +1,75 @@
"""add app_mode for messsage
Revision ID: d98acf217d43
Revises: 68519ad5cd18
Create Date: 2025-10-14 16:18:08.568011
"""
from alembic import op
import models as models
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'd98acf217d43'
down_revision = '68519ad5cd18'
branch_labels = None
depends_on = None
def upgrade():
with op.batch_alter_table('messages', schema=None) as batch_op:
batch_op.add_column(sa.Column('app_mode', sa.String(length=255), nullable=True))
batch_op.create_index('message_app_mode_idx', ['app_mode'], unique=False)
conn = op.get_bind()
# Strategy: Update in batches to minimize lock time
# For large tables (millions of rows), this prevents long-running transactions
batch_size = 10000
print("Starting backfill of app_mode from conversations...")
# Use a more efficient UPDATE with JOIN
# This query updates messages.app_mode from conversations.mode
update_query = """
UPDATE messages m
SET app_mode = c.mode
FROM conversations c
WHERE m.conversation_id = c.id
AND m.app_mode IS NULL
AND m.id IN (
SELECT id FROM messages
WHERE app_mode IS NULL
LIMIT :batch_size
)
"""
# Execute batched updates
total_updated = 0
while True:
result = conn.execute(sa.text(update_query), {"batch_size": batch_size})
rows_updated = result.rowcount
total_updated += rows_updated
if rows_updated == 0:
break
print(f"Updated {rows_updated} messages (total: {total_updated})")
# Commit each batch to release locks
# Note: Alembic auto-commits in upgrade() by default
print(f"Backfill completed. Total messages updated: {total_updated}")
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('messages', schema=None) as batch_op:
batch_op.drop_index('message_app_mode_idx')
batch_op.drop_column('app_mode')
# ### end Alembic commands ###

View File

@ -968,6 +968,7 @@ class Message(Base):
Index("message_account_idx", "app_id", "from_source", "from_account_id"),
Index("message_workflow_run_id_idx", "conversation_id", "workflow_run_id"),
Index("message_created_at_idx", "created_at"),
Index("message_app_mode_idx", "app_mode"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
@ -1001,6 +1002,7 @@ class Message(Base):
updated_at = mapped_column(sa.DateTime, nullable=False, server_default=func.current_timestamp())
agent_based: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("false"))
workflow_run_id: Mapped[str | None] = mapped_column(StringUUID)
app_mode: Mapped[str | None] = mapped_column(String(255), nullable=True)
@property
def inputs(self) -> dict[str, Any]: