import logging import threading import time from collections.abc import Sequence from typing import Optional from sqlalchemy import and_, delete, select from sqlalchemy.orm import Session from core.llm_generator.llm_generator import LLMGenerator from core.memory.entities import ( MemoryBlock, MemoryBlockSpec, MemoryBlockWithConversation, MemoryCreatedBy, MemoryScheduleMode, MemoryScope, MemoryTerm, MemoryValueData, ) from core.memory.errors import MemorySyncTimeoutError from core.model_runtime.entities.message_entities import PromptMessage from core.variables.segments import VersionedMemoryValue from core.workflow.constants import MEMORY_BLOCK_VARIABLE_NODE_ID from core.workflow.entities.variable_pool import VariablePool from extensions.ext_database import db from extensions.ext_redis import redis_client from models import App, CreatorUserRole from models.chatflow_memory import ChatflowMemoryVariable from models.workflow import Workflow, WorkflowDraftVariable from services.chatflow_history_service import ChatflowHistoryService from services.workflow_draft_variable_service import WorkflowDraftVariableService from services.workflow_service import WorkflowService logger = logging.getLogger(__name__) class ChatflowMemoryService: @staticmethod def get_persistent_memories( app: App, created_by: MemoryCreatedBy, version: int | None = None ) -> Sequence[MemoryBlock]: if created_by.account_id: created_by_role = CreatorUserRole.ACCOUNT created_by_id = created_by.account_id else: created_by_role = CreatorUserRole.END_USER created_by_id = created_by.id if version is None: # If version not specified, get the latest version stmt = select(ChatflowMemoryVariable).distinct(ChatflowMemoryVariable.memory_id).where( and_( ChatflowMemoryVariable.tenant_id == app.tenant_id, ChatflowMemoryVariable.app_id == app.id, ChatflowMemoryVariable.conversation_id == None, ChatflowMemoryVariable.created_by_role == created_by_role, ChatflowMemoryVariable.created_by == created_by_id, ) ).order_by(ChatflowMemoryVariable.version.desc()) else: stmt = select(ChatflowMemoryVariable).where( and_( ChatflowMemoryVariable.tenant_id == app.tenant_id, ChatflowMemoryVariable.app_id == app.id, ChatflowMemoryVariable.conversation_id == None, ChatflowMemoryVariable.created_by_role == created_by_role, ChatflowMemoryVariable.created_by == created_by_id, ChatflowMemoryVariable.version == version ) ) with Session(db.engine) as session: db_results = session.execute(stmt).all() return ChatflowMemoryService._convert_to_memory_blocks(app, created_by, [result[0] for result in db_results]) @staticmethod def get_session_memories( app: App, created_by: MemoryCreatedBy, conversation_id: str, version: int | None = None ) -> Sequence[MemoryBlock]: if version is None: # If version not specified, get the latest version stmt = select(ChatflowMemoryVariable).distinct(ChatflowMemoryVariable.memory_id).where( and_( ChatflowMemoryVariable.tenant_id == app.tenant_id, ChatflowMemoryVariable.app_id == app.id, ChatflowMemoryVariable.conversation_id == conversation_id ) ).order_by(ChatflowMemoryVariable.version.desc()) else: stmt = select(ChatflowMemoryVariable).where( and_( ChatflowMemoryVariable.tenant_id == app.tenant_id, ChatflowMemoryVariable.app_id == app.id, ChatflowMemoryVariable.conversation_id == conversation_id, ChatflowMemoryVariable.version == version ) ) with Session(db.engine) as session: db_results = session.execute(stmt).all() return ChatflowMemoryService._convert_to_memory_blocks(app, created_by, [result[0] for result in db_results]) @staticmethod def save_memory(memory: MemoryBlock, variable_pool: VariablePool, is_draft: bool) -> None: key = f"{memory.node_id}.{memory.spec.id}" if memory.node_id else memory.spec.id variable_pool.add([MEMORY_BLOCK_VARIABLE_NODE_ID, key], memory.value) if memory.created_by.account_id: created_by_role = CreatorUserRole.ACCOUNT created_by = memory.created_by.account_id else: created_by_role = CreatorUserRole.END_USER created_by = memory.created_by.id with Session(db.engine) as session: session.add( ChatflowMemoryVariable( memory_id=memory.spec.id, tenant_id=memory.tenant_id, app_id=memory.app_id, node_id=memory.node_id, conversation_id=memory.conversation_id, name=memory.spec.name, value=MemoryValueData( value=memory.value, edited_by_user=memory.edited_by_user ).model_dump_json(), term=memory.spec.term, scope=memory.spec.scope, version=memory.version, # Use version from MemoryBlock directly created_by_role=created_by_role, created_by=created_by, ) ) session.commit() if is_draft: with Session(bind=db.engine) as session: draft_var_service = WorkflowDraftVariableService(session) memory_selector = memory.spec.id if not memory.node_id else f"{memory.node_id}.{memory.spec.id}" existing_vars = draft_var_service.get_draft_variables_by_selectors( app_id=memory.app_id, selectors=[[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_selector]] ) if existing_vars: draft_var = existing_vars[0] draft_var.value = VersionedMemoryValue.model_validate_json(draft_var.value)\ .add_version(memory.value)\ .model_dump_json() else: draft_var = WorkflowDraftVariable.new_memory_block_variable( app_id=memory.app_id, memory_id=memory.spec.id, name=memory.spec.name, value=VersionedMemoryValue().add_version(memory.value), description=memory.spec.description ) session.add(draft_var) session.commit() @staticmethod def get_memories_by_specs( memory_block_specs: Sequence[MemoryBlockSpec], tenant_id: str, app_id: str, created_by: MemoryCreatedBy, conversation_id: Optional[str], node_id: Optional[str], is_draft: bool ) -> Sequence[MemoryBlock]: return [ChatflowMemoryService.get_memory_by_spec( spec, tenant_id, app_id, created_by, conversation_id, node_id, is_draft ) for spec in memory_block_specs] @staticmethod def get_memory_by_spec( spec: MemoryBlockSpec, tenant_id: str, app_id: str, created_by: MemoryCreatedBy, conversation_id: Optional[str], node_id: Optional[str], is_draft: bool ) -> MemoryBlock: with Session(db.engine) as session: if is_draft: draft_var_service = WorkflowDraftVariableService(session) selector = [MEMORY_BLOCK_VARIABLE_NODE_ID, f"{spec.id}.{node_id}"] \ if node_id else [MEMORY_BLOCK_VARIABLE_NODE_ID, spec.id] draft_vars = draft_var_service.get_draft_variables_by_selectors( app_id=app_id, selectors=[selector] ) if draft_vars: draft_var = draft_vars[0] return MemoryBlock( value=draft_var.get_value().text, tenant_id=tenant_id, app_id=app_id, conversation_id=conversation_id, node_id=node_id, spec=spec, created_by=created_by, version=1, ) stmt = select(ChatflowMemoryVariable).where( and_( ChatflowMemoryVariable.memory_id == spec.id, ChatflowMemoryVariable.tenant_id == tenant_id, ChatflowMemoryVariable.app_id == app_id, ChatflowMemoryVariable.node_id == (node_id if spec.scope == MemoryScope.NODE else None), ChatflowMemoryVariable.conversation_id == (conversation_id if spec.term == MemoryTerm.SESSION else None), ) ).order_by(ChatflowMemoryVariable.version.desc()).limit(1) result = session.execute(stmt).scalar() if result: memory_value_data = MemoryValueData.model_validate_json(result.value) return MemoryBlock( value=memory_value_data.value, tenant_id=tenant_id, app_id=app_id, conversation_id=conversation_id, node_id=node_id, spec=spec, edited_by_user=memory_value_data.edited_by_user, created_by=created_by, version=result.version, ) return MemoryBlock( tenant_id=tenant_id, value=spec.template, app_id=app_id, conversation_id=conversation_id, node_id=node_id, spec=spec, created_by=created_by, version=1, ) @staticmethod def update_app_memory_if_needed( workflow: Workflow, conversation_id: str, variable_pool: VariablePool, created_by: MemoryCreatedBy, is_draft: bool ): visible_messages = ChatflowHistoryService.get_visible_chat_history( conversation_id=conversation_id, app_id=workflow.app_id, tenant_id=workflow.tenant_id, node_id=None, ) sync_blocks: list[MemoryBlock] = [] async_blocks: list[MemoryBlock] = [] for memory_spec in workflow.memory_blocks: if memory_spec.scope == MemoryScope.APP: memory = ChatflowMemoryService.get_memory_by_spec( spec=memory_spec, tenant_id=workflow.tenant_id, app_id=workflow.app_id, conversation_id=conversation_id, node_id=None, is_draft=is_draft, created_by=created_by, ) if ChatflowMemoryService._should_update_memory(memory, visible_messages): if memory.spec.schedule_mode == MemoryScheduleMode.SYNC: sync_blocks.append(memory) else: async_blocks.append(memory) if not sync_blocks and not async_blocks: return # async mode: submit individual async tasks directly for memory_block in async_blocks: ChatflowMemoryService._app_submit_async_memory_update( block=memory_block, is_draft=is_draft, variable_pool=variable_pool, visible_messages=visible_messages, conversation_id=conversation_id, ) # sync mode: submit a batch update task if sync_blocks: ChatflowMemoryService._app_submit_sync_memory_batch_update( sync_blocks=sync_blocks, is_draft=is_draft, conversation_id=conversation_id, app_id=workflow.app_id, visible_messages=visible_messages, variable_pool=variable_pool ) @staticmethod def update_node_memory_if_needed( tenant_id: str, app_id: str, node_id: str, created_by: MemoryCreatedBy, conversation_id: str, memory_block_spec: MemoryBlockSpec, variable_pool: VariablePool, is_draft: bool ) -> bool: visible_messages = ChatflowHistoryService.get_visible_chat_history( conversation_id=conversation_id, app_id=app_id, tenant_id=tenant_id, node_id=node_id, ) memory_block = ChatflowMemoryService.get_memory_by_spec( spec=memory_block_spec, tenant_id=tenant_id, app_id=app_id, conversation_id=conversation_id, node_id=node_id, is_draft=is_draft, created_by=created_by, ) if not ChatflowMemoryService._should_update_memory( memory_block=memory_block, visible_history=visible_messages ): return False if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC: # Node-level sync: blocking execution ChatflowMemoryService._update_node_memory_sync( visible_messages=visible_messages, memory_block=memory_block, variable_pool=variable_pool, is_draft=is_draft, conversation_id=conversation_id ) else: # Node-level async: execute asynchronously ChatflowMemoryService._update_node_memory_async( memory_block=memory_block, visible_messages=visible_messages, variable_pool=variable_pool, is_draft=is_draft, conversation_id=conversation_id ) return True @staticmethod def wait_for_sync_memory_completion(workflow: Workflow, conversation_id: str): """Wait for sync memory update to complete, maximum 50 seconds""" memory_blocks = workflow.memory_blocks sync_memory_blocks = [ block for block in memory_blocks if block.scope == MemoryScope.APP and block.schedule_mode == MemoryScheduleMode.SYNC ] if not sync_memory_blocks: return lock_key = _get_memory_sync_lock_key(workflow.app_id, conversation_id) # Retry up to 10 times, wait 5 seconds each time, total 50 seconds max_retries = 10 retry_interval = 5 for i in range(max_retries): if not redis_client.exists(lock_key): # Lock doesn't exist, can continue return if i < max_retries - 1: # Still have retry attempts, wait time.sleep(retry_interval) else: # Maximum retry attempts reached, raise exception raise MemorySyncTimeoutError( app_id=workflow.app_id, conversation_id=conversation_id ) @staticmethod def _convert_to_memory_blocks( app: App, created_by: MemoryCreatedBy, raw_results: Sequence[ChatflowMemoryVariable] ) -> Sequence[MemoryBlock]: workflow = WorkflowService().get_published_workflow(app) if not workflow: return [] results = [] for chatflow_memory_variable in raw_results: spec = next( (spec for spec in workflow.memory_blocks if spec.id == chatflow_memory_variable.memory_id), None ) if spec and chatflow_memory_variable.app_id: memory_value_data = MemoryValueData.model_validate_json(chatflow_memory_variable.value) results.append( MemoryBlock( spec=spec, tenant_id=chatflow_memory_variable.tenant_id, value=memory_value_data.value, app_id=chatflow_memory_variable.app_id, conversation_id=chatflow_memory_variable.conversation_id, node_id=chatflow_memory_variable.node_id, edited_by_user=memory_value_data.edited_by_user, created_by=created_by, version=chatflow_memory_variable.version, ) ) return results @staticmethod def _should_update_memory( memory_block: MemoryBlock, visible_history: Sequence[PromptMessage] ) -> bool: return len(visible_history) >= memory_block.spec.update_turns @staticmethod def _app_submit_async_memory_update( block: MemoryBlock, visible_messages: Sequence[PromptMessage], variable_pool: VariablePool, conversation_id: str, is_draft: bool ): thread = threading.Thread( target=ChatflowMemoryService._perform_memory_update, kwargs={ 'memory_block': block, 'visible_messages': visible_messages, 'variable_pool': variable_pool, 'is_draft': is_draft, 'conversation_id': conversation_id }, ) thread.start() @staticmethod def _app_submit_sync_memory_batch_update( sync_blocks: Sequence[MemoryBlock], app_id: str, conversation_id: str, visible_messages: Sequence[PromptMessage], variable_pool: VariablePool, is_draft: bool ): """Submit sync memory batch update task""" thread = threading.Thread( target=ChatflowMemoryService._batch_update_sync_memory, kwargs={ 'sync_blocks': sync_blocks, 'app_id': app_id, 'conversation_id': conversation_id, 'visible_messages': visible_messages, 'variable_pool': variable_pool, 'is_draft': is_draft }, ) thread.start() @staticmethod def _batch_update_sync_memory( sync_blocks: Sequence[MemoryBlock], app_id: str, conversation_id: str, visible_messages: Sequence[PromptMessage], variable_pool: VariablePool, is_draft: bool ): try: lock_key = _get_memory_sync_lock_key(app_id, conversation_id) with redis_client.lock(lock_key, timeout=120): threads = [] for block in sync_blocks: thread = threading.Thread( target=ChatflowMemoryService._perform_memory_update, kwargs={ 'memory_block': block, 'visible_messages': visible_messages, 'variable_pool': variable_pool, 'is_draft': is_draft, 'conversation_id': conversation_id, }, ) threads.append(thread) for thread in threads: thread.start() for thread in threads: thread.join() except Exception as e: logger.exception("Error batch updating memory", exc_info=e) @staticmethod def _update_node_memory_sync( memory_block: MemoryBlock, visible_messages: Sequence[PromptMessage], variable_pool: VariablePool, conversation_id: str, is_draft: bool ): ChatflowMemoryService._perform_memory_update( memory_block=memory_block, visible_messages=visible_messages, variable_pool=variable_pool, is_draft=is_draft, conversation_id=conversation_id ) @staticmethod def _update_node_memory_async( memory_block: MemoryBlock, visible_messages: Sequence[PromptMessage], variable_pool: VariablePool, conversation_id: str, is_draft: bool = False ): thread = threading.Thread( target=ChatflowMemoryService._perform_memory_update, kwargs={ 'memory_block': memory_block, 'visible_messages': visible_messages, 'variable_pool': variable_pool, 'is_draft': is_draft, 'conversation_id': conversation_id, }, daemon=True ) thread.start() @staticmethod def _perform_memory_update( memory_block: MemoryBlock, variable_pool: VariablePool, conversation_id: str, visible_messages: Sequence[PromptMessage], is_draft: bool ): updated_value = LLMGenerator.update_memory_block( tenant_id=memory_block.tenant_id, visible_history=ChatflowMemoryService._format_chat_history(visible_messages), variable_pool=variable_pool, memory_block=memory_block, memory_spec=memory_block.spec, ) updated_memory = MemoryBlock( tenant_id=memory_block.tenant_id, value=updated_value, spec=memory_block.spec, app_id=memory_block.app_id, conversation_id=conversation_id, node_id=memory_block.node_id, edited_by_user=False, created_by=memory_block.created_by, version=memory_block.version + 1, # Increment version for business logic update ) ChatflowMemoryService.save_memory(updated_memory, variable_pool, is_draft) ChatflowHistoryService.update_visible_count( conversation_id=conversation_id, node_id=memory_block.node_id, new_visible_count=memory_block.spec.preserved_turns, app_id=memory_block.app_id, tenant_id=memory_block.tenant_id ) @staticmethod def delete_memory(app: App, memory_id: str, created_by: MemoryCreatedBy): workflow = WorkflowService().get_published_workflow(app) if not workflow: raise ValueError("Workflow not found") memory_spec = next((it for it in workflow.memory_blocks if it.id == memory_id), None) if not memory_spec or not memory_spec.end_user_editable: raise ValueError("Memory not found or not deletable") if created_by.account_id: created_by_role = CreatorUserRole.ACCOUNT created_by_id = created_by.account_id else: created_by_role = CreatorUserRole.END_USER created_by_id = created_by.id with Session(db.engine) as session: stmt = delete(ChatflowMemoryVariable).where( and_( ChatflowMemoryVariable.tenant_id == app.tenant_id, ChatflowMemoryVariable.app_id == app.id, ChatflowMemoryVariable.memory_id == memory_id, ChatflowMemoryVariable.created_by_role == created_by_role, ChatflowMemoryVariable.created_by == created_by_id ) ) session.execute(stmt) session.commit() @staticmethod def delete_all_user_memories(app: App, created_by: MemoryCreatedBy): if created_by.account_id: created_by_role = CreatorUserRole.ACCOUNT created_by_id = created_by.account_id else: created_by_role = CreatorUserRole.END_USER created_by_id = created_by.id with Session(db.engine) as session: stmt = delete(ChatflowMemoryVariable).where( and_( ChatflowMemoryVariable.tenant_id == app.tenant_id, ChatflowMemoryVariable.app_id == app.id, ChatflowMemoryVariable.created_by_role == created_by_role, ChatflowMemoryVariable.created_by == created_by_id ) ) session.execute(stmt) session.commit() @staticmethod def get_persistent_memories_with_conversation( app: App, created_by: MemoryCreatedBy, conversation_id: str, version: int | None = None ) -> Sequence[MemoryBlockWithConversation]: """Get persistent memories with conversation metadata (always None for persistent)""" memory_blocks = ChatflowMemoryService.get_persistent_memories(app, created_by, version) return [ MemoryBlockWithConversation.from_memory_block( block, ChatflowHistoryService.get_conversation_metadata( app.tenant_id, app.id, conversation_id, block.node_id ) ) for block in memory_blocks ] @staticmethod def get_session_memories_with_conversation( app: App, created_by: MemoryCreatedBy, conversation_id: str, version: int | None = None ) -> Sequence[MemoryBlockWithConversation]: """Get session memories with conversation metadata""" memory_blocks = ChatflowMemoryService.get_session_memories(app, created_by, conversation_id, version) return [ MemoryBlockWithConversation.from_memory_block( block, ChatflowHistoryService.get_conversation_metadata( app.tenant_id, app.id, conversation_id, block.node_id ) ) for block in memory_blocks ] @staticmethod def _format_chat_history(messages: Sequence[PromptMessage]) -> Sequence[tuple[str, str]]: result = [] for message in messages: result.append((str(message.role.value), message.get_text_content())) return result def _get_memory_sync_lock_key(app_id: str, conversation_id: str) -> str: """Generate Redis lock key for memory sync updates Args: app_id: Application ID conversation_id: Conversation ID Returns: Formatted lock key """ return f"memory_sync_update:{app_id}:{conversation_id}"