diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 23ac82bf72..cbb7ad4fd6 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -425,25 +425,22 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): # Build memory_id -> value mapping for memory in memories: - if memory.scope == MemoryScope.APP: + if memory.spec.scope == MemoryScope.APP: # App level: use memory_id directly - memory_blocks_dict[memory.memory_id] = memory.value + memory_blocks_dict[memory.spec.id] = memory.value else: # NODE scope node_id = memory.node_id if not node_id: - logger.warning("Memory block %s has no node_id, skip.", memory.memory_id) + logger.warning("Memory block %s has no node_id, skip.", memory.spec.id) continue - key = f"{node_id}.{memory.memory_id}" + key = f"{node_id}.{memory.spec.id}" memory_blocks_dict[key] = memory.value return memory_blocks_dict def _sync_conversation_to_chatflow_tables(self, assistant_message: str): - # Get user input and AI response - user_message = self.application_generate_entity.query - ChatflowHistoryService.save_app_message( - prompt_message=UserPromptMessage(content=user_message), + prompt_message=UserPromptMessage(content=(self.application_generate_entity.query)), conversation_id=self.conversation.id, app_id=self._workflow.app_id, tenant_id=self._workflow.tenant_id @@ -456,14 +453,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): ) def _check_app_memory_updates(self): - from core.app.entities.app_invoke_entities import InvokeFrom - from services.chatflow_memory_service import ChatflowMemoryService - is_draft = (self.application_generate_entity.invoke_from == InvokeFrom.DEBUGGER) - ChatflowMemoryService.update_app_memory_after_run( + ChatflowMemoryService.update_app_memory_if_needed( workflow=self._workflow, conversation_id=self.conversation.id, - variable_pool=VariablePool(), # Make a fake pool to satisfy the signature is_draft=is_draft ) diff --git a/api/core/memory/entities.py b/api/core/memory/entities.py index 5bb4c512ea..654922d154 100644 --- a/api/core/memory/entities.py +++ b/api/core/memory/entities.py @@ -1,4 +1,3 @@ -from datetime import datetime from enum import Enum from typing import Optional from uuid import uuid4 @@ -63,37 +62,12 @@ class MemoryBlock(BaseModel): These rules implicitly determine scope and term without redundant storage. """ - id: str - memory_id: str - name: str + spec: MemoryBlockSpec + tenant_id: str value: str - scope: MemoryScope # Derived from node_id: None=APP, str=NODE - term: MemoryTerm # Derived from conversation_id: None=PERSISTENT, str=SESSION - app_id: str # None=global(future), str=app-specific - conversation_id: Optional[str] = None # None=persistent, str=session - node_id: Optional[str] = None # None=app-scope, str=node-scope - created_at: Optional[datetime] = None - updated_at: Optional[datetime] = None - - @property - def is_global(self) -> bool: - """Check if this is global memory (future feature)""" - return self.app_id is None - - @property - def is_persistent(self) -> bool: - """Check if this is persistent memory (cross-conversation)""" - return self.conversation_id is None - - @property - def is_app_scope(self) -> bool: - """Check if this is app-level scope""" - return self.node_id is None - - @property - def is_node_scope(self) -> bool: - """Check if this is node-level scope""" - return self.node_id is not None + app_id: str + conversation_id: Optional[str] = None + node_id: Optional[str] = None class MemoryBlockWithVisibility(BaseModel): id: str diff --git a/api/services/chatflow_history_service.py b/api/services/chatflow_history_service.py index baabb7c071..915d5ff2c8 100644 --- a/api/services/chatflow_history_service.py +++ b/api/services/chatflow_history_service.py @@ -107,7 +107,6 @@ class ChatflowHistoryService: app_id: str, tenant_id: str ) -> None: - """Save PromptMessage to node-specific chatflow conversation.""" ChatflowHistoryService.save_message( prompt_message=prompt_message, conversation_id=conversation_id, @@ -116,50 +115,6 @@ class ChatflowHistoryService: node_id=node_id ) - @staticmethod - def save_message_version( - prompt_message: PromptMessage, - message_index: int, - conversation_id: str, - app_id: str, - tenant_id: str, - node_id: Optional[str] = None - ) -> None: - """ - Save a new version of an existing message (for message editing scenarios). - """ - with Session(db.engine) as session: - chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation( - session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True - ) - - # Get the maximum version number for this index - max_version = session.execute( - select(func.max(ChatflowMessage.version)).where( - and_( - ChatflowMessage.conversation_id == chatflow_conv.id, - ChatflowMessage.index == message_index - ) - ) - ).scalar() or 0 - next_version = max_version + 1 - - # Save new version of the message - message_data = { - 'role': prompt_message.role.value, - 'content': prompt_message.get_text_content(), - 'timestamp': time.time() - } - - new_message_version = ChatflowMessage( - conversation_id=chatflow_conv.id, - index=message_index, - version=next_version, - data=json.dumps(message_data) - ) - session.add(new_message_version) - session.commit() - @staticmethod def update_visible_count( conversation_id: str, @@ -168,20 +123,6 @@ class ChatflowHistoryService: app_id: str, tenant_id: str ) -> None: - """ - Update visible_count metadata for specific scope. - - Args: - node_id: None for app-level updates, specific node_id for node-level updates - new_visible_count: The new visible_count value (typically preserved_turns) - - Usage Examples: - # Update app-level visible_count - ChatflowHistoryService.update_visible_count(conv_id, None, 10, app_id, tenant_id) - - # Update node-specific visible_count - ChatflowHistoryService.update_visible_count(conv_id, "node-123", 8, app_id, tenant_id) - """ with Session(db.engine) as session: chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation( session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True diff --git a/api/services/chatflow_memory_service.py b/api/services/chatflow_memory_service.py index d46146e065..9bb0db6e9d 100644 --- a/api/services/chatflow_memory_service.py +++ b/api/services/chatflow_memory_service.py @@ -2,7 +2,7 @@ import logging import threading import time from collections.abc import Sequence -from typing import Optional, cast +from typing import Optional from sqlalchemy import and_, select from sqlalchemy.orm import Session @@ -24,25 +24,13 @@ from extensions.ext_database import db from extensions.ext_redis import redis_client from models import App from models.chatflow_memory import ChatflowMemoryVariable -from models.workflow import WorkflowDraftVariable +from models.workflow import Workflow, WorkflowDraftVariable from services.chatflow_history_service import ChatflowHistoryService from services.workflow_draft_variable_service import WorkflowDraftVariableService from services.workflow_service import WorkflowService logger = logging.getLogger(__name__) -def _get_memory_sync_lock_key(app_id: str, conversation_id: str) -> str: - """Generate Redis lock key for memory sync updates - - Args: - app_id: Application ID - conversation_id: Conversation ID - - Returns: - Formatted lock key - """ - return f"memory_sync_update:{app_id}:{conversation_id}" - class ChatflowMemoryService: @staticmethod def get_persistent_memories(app: App) -> Sequence[MemoryBlockWithVisibility]: @@ -71,12 +59,34 @@ class ChatflowMemoryService: return ChatflowMemoryService._with_visibility(app, [result[0] for result in db_results]) @staticmethod - def save_memory(memory: MemoryBlock, tenant_id: str, variable_pool: VariablePool, is_draft: bool) -> None: - key = f"{memory.node_id}:{memory.memory_id}" if memory.node_id else memory.memory_id + def save_memory(memory: MemoryBlock, variable_pool: VariablePool, is_draft: bool) -> None: + key = f"{memory.node_id}:{memory.spec.id}" if memory.node_id else memory.spec.id variable_pool.add([MEMORY_BLOCK_VARIABLE_NODE_ID, key], memory.value) - with db.session() as session: - session.merge(ChatflowMemoryService._to_chatflow_memory_variable(memory)) + with Session(db.engine) as session: + existing = session.query(ChatflowMemoryVariable).filter_by( + memory_id=memory.spec.id, + tenant_id=memory.tenant_id, + app_id=memory.app_id, + node_id=memory.node_id, + conversation_id=memory.conversation_id + ).first() + if existing: + existing.value = memory.value + else: + session.add( + ChatflowMemoryVariable( + memory_id=memory.spec.id, + tenant_id=memory.tenant_id, + app_id=memory.app_id, + node_id=memory.node_id, + conversation_id=memory.conversation_id, + name=memory.spec.name, + value=memory.value, + term=memory.spec.term, + scope=memory.spec.scope, + ) + ) session.commit() if is_draft: @@ -84,7 +94,7 @@ class ChatflowMemoryService: draft_var_service = WorkflowDraftVariableService(session) existing_vars = draft_var_service.get_draft_variables_by_selectors( app_id=memory.app_id, - selectors=[['memory_block', memory.memory_id]] + selectors=[['memory_block', memory.spec.id]] ) if existing_vars: draft_var = existing_vars[0] @@ -92,8 +102,8 @@ class ChatflowMemoryService: else: draft_var = WorkflowDraftVariable.new_memory_block_variable( app_id=memory.app_id, - memory_id=memory.memory_id, - name=memory.name, + memory_id=memory.spec.id, + name=memory.spec.name, value=memory.value, description="" ) @@ -101,25 +111,30 @@ class ChatflowMemoryService: session.commit() @staticmethod - def get_memories_by_specs(memory_block_specs: Sequence[MemoryBlockSpec], - tenant_id: str, app_id: str, - conversation_id: Optional[str], - node_id: Optional[str], - is_draft: bool) -> Sequence[MemoryBlock]: - return [ChatflowMemoryService.get_memory_by_spec( + def get_memories_by_specs( + memory_block_specs: Sequence[MemoryBlockSpec], + tenant_id: str, app_id: str, + conversation_id: Optional[str], + node_id: Optional[str], + is_draft: bool + ) -> Sequence[MemoryBlock]: + return [ChatflowMemoryService.get_memory_by_spec( spec, tenant_id, app_id, conversation_id, node_id, is_draft ) for spec in memory_block_specs] @staticmethod - def get_memory_by_spec(spec: MemoryBlockSpec, - tenant_id: str, app_id: str, - conversation_id: Optional[str], - node_id: Optional[str], - is_draft: bool) -> MemoryBlock: - with (Session(bind=db.engine) as session): + def get_memory_by_spec( + spec: MemoryBlockSpec, + tenant_id: str, + app_id: str, + conversation_id: Optional[str], + node_id: Optional[str], + is_draft: bool + ) -> MemoryBlock: + with Session(db.engine) as session: if is_draft: draft_var_service = WorkflowDraftVariableService(session) - selector = [MEMORY_BLOCK_VARIABLE_NODE_ID, f"{spec.id}.{node_id}"]\ + selector = [MEMORY_BLOCK_VARIABLE_NODE_ID, f"{spec.id}.{node_id}"] \ if node_id else [MEMORY_BLOCK_VARIABLE_NODE_ID, spec.id] draft_vars = draft_var_service.get_draft_variables_by_selectors( app_id=app_id, @@ -128,38 +143,92 @@ class ChatflowMemoryService: if draft_vars: draft_var = draft_vars[0] return MemoryBlock( - id=draft_var.id, - memory_id=draft_var.name, - name=spec.name, value=draft_var.value, - scope=spec.scope, - term=spec.term, + tenant_id=tenant_id, app_id=app_id, conversation_id=conversation_id, - node_id=node_id + node_id=node_id, + spec=spec ) stmt = select(ChatflowMemoryVariable).where( and_( ChatflowMemoryVariable.memory_id == spec.id, ChatflowMemoryVariable.tenant_id == tenant_id, ChatflowMemoryVariable.app_id == app_id, - ChatflowMemoryVariable.node_id == node_id, - ChatflowMemoryVariable.conversation_id == conversation_id + ChatflowMemoryVariable.node_id == \ + (node_id if spec.term == MemoryScope.NODE else None), + ChatflowMemoryVariable.conversation_id == \ + (conversation_id if spec.term == MemoryTerm.SESSION else None), ) ) result = session.execute(stmt).scalar() if result: - return ChatflowMemoryService._to_memory_block(result) + return MemoryBlock( + value=result.value, + tenant_id=tenant_id, + app_id=app_id, + conversation_id=conversation_id, + node_id=node_id, + spec=spec + ) return MemoryBlock( - id="", # Will be assigned when saved - memory_id=spec.id, - name=spec.name, + tenant_id=tenant_id, value=spec.template, - scope=spec.scope, - term=spec.term, app_id=app_id, conversation_id=conversation_id, - node_id=node_id + node_id=node_id, + spec=spec + ) + + @staticmethod + def update_app_memory_if_needed( + workflow: Workflow, + conversation_id: str, + is_draft: bool + ): + visible_messages = ChatflowHistoryService.get_visible_chat_history( + conversation_id=conversation_id, + app_id=workflow.app_id, + tenant_id=workflow.tenant_id, + node_id=None, + ) + sync_blocks: list[MemoryBlock] = [] + async_blocks: list[MemoryBlock] = [] + for memory_spec in workflow.memory_blocks: + if memory_spec.scope == MemoryScope.APP: + memory = ChatflowMemoryService.get_memory_by_spec( + spec=memory_spec, + tenant_id=workflow.tenant_id, + app_id=workflow.app_id, + conversation_id=conversation_id, + node_id=None, + is_draft=is_draft + ) + if ChatflowMemoryService._should_update_memory(memory, visible_messages): + if memory.spec.schedule_mode == MemoryScheduleMode.SYNC: + sync_blocks.append(memory) + else: + async_blocks.append(memory) + + if not sync_blocks and not async_blocks: + return + + # async mode: submit individual async tasks directly + for memory_block in async_blocks: + ChatflowMemoryService._app_submit_async_memory_update( + block=memory_block, + is_draft=is_draft, + visible_messages=visible_messages + ) + + # sync mode: submit a batch update task + if sync_blocks: + ChatflowMemoryService._app_submit_sync_memory_batch_update( + sync_blocks=sync_blocks, + is_draft=is_draft, + conversation_id=conversation_id, + app_id=workflow.app_id, + visible_messages=visible_messages ) @staticmethod @@ -172,307 +241,47 @@ class ChatflowMemoryService: variable_pool: VariablePool, is_draft: bool ) -> bool: - if not ChatflowMemoryService._should_update_memory( - tenant_id=tenant_id, - app_id=app_id, - memory_block_spec=memory_block_spec, - conversation_id=conversation_id, - node_id=node_id - ): - return False - - if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC: - # Node-level sync: blocking execution - ChatflowMemoryService._update_node_memory_sync( - tenant_id=tenant_id, - app_id=app_id, - memory_block_spec=memory_block_spec, - node_id=node_id, - conversation_id=conversation_id, - variable_pool=variable_pool, - is_draft=is_draft - ) - else: - # Node-level async: execute asynchronously - ChatflowMemoryService._update_node_memory_async( - tenant_id=tenant_id, - app_id=app_id, - memory_block_spec=memory_block_spec, - node_id=node_id, - conversation_id=conversation_id, - variable_pool=variable_pool, - is_draft=is_draft - ) - return True - - @staticmethod - def _get_memory_from_chatflow_table(memory_id: str, tenant_id: str, - app_id: Optional[str] = None, - conversation_id: Optional[str] = None, - node_id: Optional[str] = None) -> Optional[MemoryBlock]: - stmt = select(ChatflowMemoryVariable).where( - and_( - ChatflowMemoryVariable.app_id == app_id, - ChatflowMemoryVariable.memory_id == memory_id, - ChatflowMemoryVariable.tenant_id == tenant_id, - ChatflowMemoryVariable.conversation_id == conversation_id, - ChatflowMemoryVariable.node_id == node_id - ) - ) - - with db.session() as session: - result = session.execute(stmt).first() - return ChatflowMemoryService._to_memory_block(result[0]) if result else None - - @staticmethod - def _to_memory_block(entity: ChatflowMemoryVariable) -> MemoryBlock: - scope = MemoryScope(entity.scope) if not isinstance(entity.scope, MemoryScope) else entity.scope - term = MemoryTerm(entity.term) if not isinstance(entity.term, MemoryTerm) else entity.term - return MemoryBlock( - id=entity.id, - memory_id=entity.memory_id, - name=entity.name, - value=entity.value, - scope=scope, - term=term, - app_id=cast(str, entity.app_id), # It's supposed to be not nullable for now - conversation_id=entity.conversation_id, - node_id=entity.node_id, - created_at=entity.created_at, - updated_at=entity.updated_at, - ) - - @staticmethod - def _to_chatflow_memory_variable(memory_block: MemoryBlock) -> ChatflowMemoryVariable: - return ChatflowMemoryVariable( - id=memory_block.id, - node_id=memory_block.node_id, - memory_id=memory_block.memory_id, - name=memory_block.name, - value=memory_block.value, - scope=memory_block.scope, - term=memory_block.term, - app_id=memory_block.app_id, - conversation_id=memory_block.conversation_id, - ) - - @staticmethod - def _with_visibility( - app: App, - raw_results: Sequence[ChatflowMemoryVariable] - ) -> Sequence[MemoryBlockWithVisibility]: - workflow = WorkflowService().get_published_workflow(app) - if not workflow: - return [] - results = [] - for db_result in raw_results: - spec = next((spec for spec in workflow.memory_blocks if spec.id == db_result.memory_id), None) - if spec: - results.append( - MemoryBlockWithVisibility( - id=db_result.memory_id, - name=db_result.name, - value=db_result.value, - end_user_editable=spec.end_user_editable, - end_user_visible=spec.end_user_visible, - ) - ) - return results - - @staticmethod - def _should_update_memory(tenant_id: str, app_id: str, - memory_block_spec: MemoryBlockSpec, - conversation_id: str, node_id: Optional[str] = None) -> bool: - """Check if memory should be updated based on strategy""" - # Currently, `memory_block_spec.strategy != MemoryStrategy.ON_TURNS` is not possible, but possible in the future - - # Check turn count - turn_key = f"memory_turn_count:{tenant_id}:{app_id}:{conversation_id}" - if node_id: - turn_key += f":{node_id}" - - current_turns = redis_client.get(turn_key) - current_turns = int(current_turns) if current_turns else 0 - current_turns += 1 - - # Update count - redis_client.set(turn_key, current_turns) - - return current_turns % memory_block_spec.update_turns == 0 - - # App-level async update method - @staticmethod - def _submit_async_memory_update(tenant_id: str, app_id: str, - block: MemoryBlockSpec, - conversation_id: str, - variable_pool: VariablePool, - is_draft: bool = False): - """Submit async memory update task""" - - # Execute update asynchronously using thread - thread = threading.Thread( - target=ChatflowMemoryService._update_app_single_memory, - kwargs={ - 'tenant_id': tenant_id, - 'app_id': app_id, - 'memory_block_spec': block, - 'conversation_id': conversation_id, - 'variable_pool': variable_pool, - 'is_draft': is_draft - }, - daemon=True - ) - thread.start() - - # Node-level sync update method - @staticmethod - def _update_node_memory_sync(tenant_id: str, app_id: str, - memory_block_spec: MemoryBlockSpec, - node_id: str, conversation_id: str, - variable_pool: VariablePool, - is_draft: bool = False): - """Synchronously update node memory (blocking execution)""" - ChatflowMemoryService._perform_memory_update( - tenant_id=tenant_id, - app_id=app_id, - memory_block_spec=memory_block_spec, - conversation_id=conversation_id, - variable_pool=variable_pool, - node_id=node_id, - is_draft=is_draft - ) - # Wait for update to complete before returning - - # Node-level async update method - @staticmethod - def _update_node_memory_async( - tenant_id: str, - app_id: str, - memory_block_spec: MemoryBlockSpec, - node_id: str, - conversation_id: str, - variable_pool: VariablePool, - is_draft: bool = False): - """Asynchronously update node memory (submit task)""" - - # Execute update asynchronously using thread - thread = threading.Thread( - target=ChatflowMemoryService._perform_node_memory_update, - kwargs={ - 'memory_block_spec': memory_block_spec, - 'tenant_id': tenant_id, - 'app_id': app_id, - 'node_id': node_id, - 'variable_pool': variable_pool, - 'is_draft': is_draft - }, - daemon=True - ) - thread.start() - # Return immediately without waiting - - @staticmethod - def _perform_node_memory_update( - *, - memory_block_spec: MemoryBlockSpec, - tenant_id: str, - app_id: str, - node_id: str, - variable_pool: VariablePool, - is_draft: bool = False - ): - ChatflowMemoryService._perform_memory_update( - tenant_id=tenant_id, - app_id=app_id, - memory_block_spec=memory_block_spec, - conversation_id=str(variable_pool.get(('sys', 'conversation_id'))), - variable_pool=variable_pool, - node_id=node_id, - is_draft=is_draft - ) - - @staticmethod - def _update_app_single_memory(*, tenant_id: str, app_id: str, - memory_block_spec: MemoryBlockSpec, - conversation_id: str, - variable_pool: VariablePool, - is_draft: bool = False): - """Update single memory""" - ChatflowMemoryService._perform_memory_update( - tenant_id=tenant_id, - app_id=app_id, - memory_block_spec=memory_block_spec, - conversation_id=conversation_id, - variable_pool=variable_pool, - node_id=None, # App-level memory doesn't have node_id - is_draft=is_draft - ) - - @staticmethod - def _perform_memory_update( - tenant_id: str, app_id: str, - memory_block_spec: MemoryBlockSpec, - conversation_id: str, - variable_pool: VariablePool, - node_id: Optional[str], - is_draft: bool): - history = ChatflowHistoryService.get_visible_chat_history( + visible_messages = ChatflowHistoryService.get_visible_chat_history( conversation_id=conversation_id, app_id=app_id, tenant_id=tenant_id, node_id=node_id, ) memory_block = ChatflowMemoryService.get_memory_by_spec( - tenant_id=tenant_id, spec=memory_block_spec, + tenant_id=tenant_id, app_id=app_id, conversation_id=conversation_id, node_id=node_id, is_draft=is_draft ) - updated_value = LLMGenerator.update_memory_block( - tenant_id=tenant_id, - visible_history=ChatflowMemoryService._format_chat_history(history), + if not ChatflowMemoryService._should_update_memory( memory_block=memory_block, - memory_spec=memory_block_spec, - ) - # Save updated memory - updated_memory = MemoryBlock( - id=memory_block.id, - memory_id=memory_block_spec.id, - name=memory_block_spec.name, - value=updated_value, - scope=memory_block_spec.scope, - term=memory_block_spec.term, - app_id=app_id, - conversation_id=conversation_id if memory_block_spec.term == MemoryTerm.SESSION else None, - node_id=node_id - ) - ChatflowMemoryService.save_memory(updated_memory, tenant_id, variable_pool, is_draft) + visible_history=visible_messages + ): + return False - # Not implemented yet: Send success event - # self._send_memory_update_event(memory_block_spec.id, "completed", updated_value) + if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC: + # Node-level sync: blocking execution + ChatflowMemoryService._update_node_memory_sync( + visible_messages=visible_messages, + memory_block=memory_block, + variable_pool=variable_pool, + is_draft=is_draft + ) + else: + # Node-level async: execute asynchronously + ChatflowMemoryService._update_node_memory_async( + memory_block=memory_block, + visible_messages=visible_messages, + variable_pool=variable_pool, + is_draft=is_draft + ) + return True - @staticmethod - def _format_chat_history(messages: Sequence[PromptMessage]) -> Sequence[tuple[str, str]]: - result = [] - for message in messages: - result.append((str(message.role.value), message.get_text_content())) - return result - - # App-level sync batch update related methods @staticmethod def wait_for_sync_memory_completion(workflow, conversation_id: str): - """Wait for sync memory update to complete, maximum 50 seconds - - Args: - workflow: Workflow object - conversation_id: Conversation ID - - Raises: - MemorySyncTimeoutError: Raised when timeout is reached - """ - from core.memory.entities import MemoryScope + """Wait for sync memory update to complete, maximum 50 seconds""" memory_blocks = workflow.memory_blocks sync_memory_blocks = [ @@ -505,54 +314,132 @@ class ChatflowMemoryService: ) @staticmethod - def update_app_memory_after_run(workflow, conversation_id: str, variable_pool: VariablePool, - is_draft: bool = False): - """Update app-level memory after run completion""" - sync_blocks = [] - async_blocks = [] - for block in workflow.memory_blocks: - if block.scope == MemoryScope.APP: - if block.update_mode == "sync": - sync_blocks.append(block) - else: - async_blocks.append(block) - - # async mode: submit individual async tasks directly - for block in async_blocks: - ChatflowMemoryService._submit_async_memory_update( - tenant_id=workflow.tenant_id, - app_id=workflow.app_id, - block=block, - conversation_id=conversation_id, - variable_pool=variable_pool, - is_draft=is_draft - ) - - # sync mode: submit a batch update task - if sync_blocks: - ChatflowMemoryService._submit_sync_memory_batch_update( - workflow=workflow, - sync_blocks=sync_blocks, - conversation_id=conversation_id, - variable_pool=variable_pool, - is_draft=is_draft + def _with_visibility( + app: App, + raw_results: Sequence[ChatflowMemoryVariable] + ) -> Sequence[MemoryBlockWithVisibility]: + workflow = WorkflowService().get_published_workflow(app) + if not workflow: + return [] + results = [] + for chatflow_memory_variable in raw_results: + spec = next( + (spec for spec in workflow.memory_blocks if spec.id == chatflow_memory_variable.memory_id), + None ) + if spec: + results.append( + MemoryBlockWithVisibility( + id=chatflow_memory_variable.memory_id, + name=chatflow_memory_variable.name, + value=chatflow_memory_variable.value, + end_user_editable=spec.end_user_editable, + end_user_visible=spec.end_user_visible, + ) + ) + return results @staticmethod - def _submit_sync_memory_batch_update(workflow, - sync_blocks: list[MemoryBlockSpec], - conversation_id: str, - variable_pool: VariablePool, - is_draft: bool = False): - """Submit sync memory batch update task""" + def _should_update_memory( + memory_block: MemoryBlock, + visible_history: Sequence[PromptMessage] + ) -> bool: + return len(visible_history) > memory_block.spec.update_turns - # Execute batch update asynchronously using thread + @staticmethod + def _app_submit_async_memory_update( + block: MemoryBlock, + visible_messages: Sequence[PromptMessage], + is_draft: bool + ): + thread = threading.Thread( + target=ChatflowMemoryService._perform_memory_update, + kwargs={ + 'memory_block': block, + 'visible_messages': visible_messages, + 'variable_pool': VariablePool(), + 'is_draft': is_draft + }, + ) + thread.start() + + @staticmethod + def _app_submit_sync_memory_batch_update( + sync_blocks: Sequence[MemoryBlock], + app_id: str, + conversation_id: str, + visible_messages: Sequence[PromptMessage], + is_draft: bool + ): + """Submit sync memory batch update task""" thread = threading.Thread( target=ChatflowMemoryService._batch_update_sync_memory, kwargs={ - 'workflow': workflow, 'sync_blocks': sync_blocks, + 'app_id': app_id, 'conversation_id': conversation_id, + 'visible_messages': visible_messages, + 'is_draft': is_draft + }, + ) + thread.start() + + @staticmethod + def _batch_update_sync_memory( + sync_blocks: Sequence[MemoryBlock], + app_id: str, + conversation_id: str, + visible_messages: Sequence[PromptMessage], + is_draft: bool + ): + try: + lock_key = _get_memory_sync_lock_key(app_id, conversation_id) + with redis_client.lock(lock_key, timeout=120): + threads = [] + for block in sync_blocks: + thread = threading.Thread( + target=ChatflowMemoryService._perform_memory_update, + kwargs={ + 'memory_block': block, + 'visible_messages': visible_messages, + 'variable_pool': VariablePool(), + 'is_draft': is_draft + }, + ) + threads.append(thread) + for thread in threads: + thread.start() + for thread in threads: + thread.join() + except Exception as e: + logger.exception("Error batch updating memory", exc_info=e) + + @staticmethod + def _update_node_memory_sync( + memory_block: MemoryBlock, + visible_messages: Sequence[PromptMessage], + variable_pool: VariablePool, + is_draft: bool + ): + ChatflowMemoryService._perform_memory_update( + memory_block=memory_block, + visible_messages=visible_messages, + variable_pool=variable_pool, + is_draft=is_draft + ) + + @staticmethod + def _update_node_memory_async( + memory_block: MemoryBlock, + visible_messages: Sequence[PromptMessage], + variable_pool: VariablePool, + is_draft: bool = False + ): + thread = threading.Thread( + target=ChatflowMemoryService._perform_memory_update, + kwargs={ + 'memory_block': memory_block, + 'visible_messages': visible_messages, 'variable_pool': variable_pool, 'is_draft': is_draft }, @@ -561,39 +448,43 @@ class ChatflowMemoryService: thread.start() @staticmethod - def _batch_update_sync_memory(*, workflow, - sync_blocks: list[MemoryBlockSpec], - conversation_id: str, - variable_pool: VariablePool, - is_draft: bool = False): - """Batch update sync memory (with Redis lock)""" - from concurrent.futures import ThreadPoolExecutor + def _perform_memory_update( + memory_block: MemoryBlock, + variable_pool: VariablePool, + visible_messages: Sequence[PromptMessage], + is_draft: bool + ): + updated_value = LLMGenerator.update_memory_block( + tenant_id=memory_block.tenant_id, + visible_history=ChatflowMemoryService._format_chat_history(visible_messages), + memory_block=memory_block, + memory_spec=memory_block.spec, + ) + updated_memory = MemoryBlock( + tenant_id=memory_block.tenant_id, + value=updated_value, + spec=memory_block.spec, + app_id=memory_block.app_id, + conversation_id=memory_block.conversation_id, + node_id=memory_block.node_id + ) + ChatflowMemoryService.save_memory(updated_memory, variable_pool, is_draft) - lock_key = _get_memory_sync_lock_key(workflow.app_id, conversation_id) + @staticmethod + def _format_chat_history(messages: Sequence[PromptMessage]) -> Sequence[tuple[str, str]]: + result = [] + for message in messages: + result.append((str(message.role.value), message.get_text_content())) + return result - # Use Redis lock context manager (30 seconds timeout) - with redis_client.lock(lock_key, timeout=30): - try: - # Update all sync memory in parallel - with ThreadPoolExecutor(max_workers=5) as executor: - futures = [] - for block in sync_blocks: - future = executor.submit( - ChatflowMemoryService._update_app_single_memory, - tenant_id=workflow.tenant_id, - app_id=workflow.app_id, - memory_block_spec=block, - conversation_id=conversation_id, - variable_pool=variable_pool, - is_draft=is_draft - ) - futures.append(future) +def _get_memory_sync_lock_key(app_id: str, conversation_id: str) -> str: + """Generate Redis lock key for memory sync updates - # Wait for all updates to complete - for future in futures: - try: - future.result() - except Exception as e: - logger.exception("Failed to update memory", exc_info=e) - except Exception as e: - logger.exception("Failed to update sync memory for app %s", workflow.app_id, exc_info=e) + Args: + app_id: Application ID + conversation_id: Conversation ID + + Returns: + Formatted lock key + """ + return f"memory_sync_update:{app_id}:{conversation_id}" diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 426637d84e..19e6361284 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -761,9 +761,9 @@ def _fetch_memory_blocks(workflow: Workflow, conversation_id: str, is_draft: boo is_draft=is_draft, ) for memory in memories: - if memory.scope == MemoryScope.APP: - memory_blocks[memory.memory_id] = memory.value + if memory.spec.scope == MemoryScope.APP: + memory_blocks[memory.spec.id] = memory.value else: # NODE scope - memory_blocks[f"{memory.node_id}.{memory.memory_id}"] = memory.value + memory_blocks[f"{memory.node_id}.{memory.spec.id}"] = memory.value return memory_blocks