dify/api/services/chatflow_memory_service.py

773 lines
31 KiB
Python

import logging
import threading
import time
from collections.abc import Sequence
from typing import Optional, cast
from sqlalchemy import and_, select
from sqlalchemy.orm import Session
from core.memory.entities import (
MemoryBlock,
MemoryBlockSpec,
MemoryScheduleMode,
MemoryScope,
MemoryStrategy,
MemoryTerm,
)
from core.memory.errors import MemorySyncTimeoutError
from core.model_runtime.entities.message_entities import AssistantPromptMessage, UserPromptMessage
from core.workflow.entities.variable_pool import VariablePool
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.chatflow_memory import ChatflowMemoryVariable
from services.chatflow_history_service import ChatflowHistoryService
logger = logging.getLogger(__name__)
# Important note: Since Dify uses gevent, we don't need an extra task queue (e.g., Celery).
# Threads created via threading.Thread are automatically patched into greenlets in a gevent environment,
# enabling efficient asynchronous execution.
def _get_memory_sync_lock_key(app_id: str, conversation_id: str) -> str:
"""Generate Redis lock key for memory sync updates
Args:
app_id: Application ID
conversation_id: Conversation ID
Returns:
Formatted lock key
"""
return f"memory_sync_update:{app_id}:{conversation_id}"
class ChatflowMemoryService:
"""
Memory service class with only static methods.
All methods are static and do not require instantiation.
"""
@staticmethod
def get_memory(memory_id: str, tenant_id: str,
app_id: Optional[str] = None,
conversation_id: Optional[str] = None,
node_id: Optional[str] = None) -> Optional[MemoryBlock]:
"""Get single memory by ID"""
stmt = select(ChatflowMemoryVariable).where(
and_(
ChatflowMemoryVariable.memory_id == memory_id,
ChatflowMemoryVariable.tenant_id == tenant_id
)
)
if app_id:
stmt = stmt.where(ChatflowMemoryVariable.app_id == app_id)
if conversation_id:
stmt = stmt.where(ChatflowMemoryVariable.conversation_id == conversation_id)
if node_id:
stmt = stmt.where(ChatflowMemoryVariable.node_id == node_id)
with db.session() as session:
result = session.execute(stmt).first()
if result:
return MemoryBlock.model_validate(result[0].__dict__)
return None
@staticmethod
def save_memory(memory: MemoryBlock, tenant_id: str, is_draft: bool = False) -> None:
"""Save or update memory with draft mode support"""
stmt = select(ChatflowMemoryVariable).where(
and_(
ChatflowMemoryVariable.memory_id == memory.memory_id,
ChatflowMemoryVariable.tenant_id == tenant_id
)
)
with db.session() as session:
existing = session.execute(stmt).first()
if existing:
# Update existing
for key, value in memory.model_dump(exclude_unset=True).items():
if hasattr(existing[0], key):
setattr(existing[0], key, value)
else:
# Create new
new_memory = ChatflowMemoryVariable(
tenant_id=tenant_id,
**memory.model_dump(exclude={'id'})
)
session.add(new_memory)
session.commit()
# In draft mode, also write to workflow_draft_variables
if is_draft:
from models.workflow import WorkflowDraftVariable
from services.workflow_draft_variable_service import WorkflowDraftVariableService
with Session(bind=db.engine) as session:
draft_var_service = WorkflowDraftVariableService(session)
# Try to get existing variables
existing_vars = draft_var_service.get_draft_variables_by_selectors(
app_id=memory.app_id,
selectors=[['memory_block', memory.memory_id]]
)
if existing_vars:
# Update existing draft variable
draft_var = existing_vars[0]
draft_var.value = memory.value
else:
# Create new draft variable
draft_var = WorkflowDraftVariable.new_memory_block_variable(
app_id=memory.app_id,
memory_id=memory.memory_id,
name=memory.name,
value=memory.value,
description=f"Memory block: {memory.name}"
)
session.add(draft_var)
session.commit()
@staticmethod
def get_memories_by_specs(memory_block_specs: Sequence[MemoryBlockSpec],
tenant_id: str, app_id: str,
conversation_id: Optional[str] = None,
node_id: Optional[str] = None,
is_draft: bool = False) -> list[MemoryBlock]:
"""Get runtime memory values based on MemoryBlockSpecs with draft mode support"""
from models.enums import DraftVariableType
if not memory_block_specs:
return []
# In draft mode, prefer reading from workflow_draft_variables
if is_draft:
# Try reading from the draft variables table
from services.workflow_draft_variable_service import WorkflowDraftVariableService
with Session(bind=db.engine) as session:
draft_var_service = WorkflowDraftVariableService(session)
# Build selector list
selectors = [['memory_block', spec.id] for spec in memory_block_specs]
# Fetch draft variables
draft_vars = draft_var_service.get_draft_variables_by_selectors(
app_id=app_id,
selectors=selectors
)
# If draft variables exist, prefer using them
if draft_vars:
spec_by_id = {spec.id: spec for spec in memory_block_specs}
draft_memories = []
for draft_var in draft_vars:
if draft_var.node_id == DraftVariableType.MEMORY_BLOCK:
spec = spec_by_id.get(draft_var.name)
if spec:
memory_block = MemoryBlock(
id=draft_var.id,
memory_id=draft_var.name,
name=spec.name,
value=draft_var.value,
scope=spec.scope,
term=spec.term,
app_id=app_id,
conversation_id='draft',
node_id=node_id
)
draft_memories.append(memory_block)
if draft_memories:
return draft_memories
memory_ids = [spec.id for spec in memory_block_specs]
stmt = select(ChatflowMemoryVariable).where(
and_(
ChatflowMemoryVariable.memory_id.in_(memory_ids),
ChatflowMemoryVariable.tenant_id == tenant_id,
ChatflowMemoryVariable.app_id == app_id
)
)
if conversation_id:
stmt = stmt.where(ChatflowMemoryVariable.conversation_id == conversation_id)
if node_id:
stmt = stmt.where(ChatflowMemoryVariable.node_id == node_id)
with db.session() as session:
results = session.execute(stmt).all()
found_memories = {row[0].memory_id: MemoryBlock.model_validate(row[0].__dict__) for row in results}
# Create MemoryBlock objects for specs that don't have runtime values yet
all_memories = []
for spec in memory_block_specs:
if spec.id in found_memories:
all_memories.append(found_memories[spec.id])
else:
# Create default memory with template value following design rules
default_memory = MemoryBlock(
id="", # Will be assigned when saved
memory_id=spec.id,
name=spec.name,
value=spec.template,
scope=spec.scope,
term=spec.term,
# Design rules:
# - app_id=None for global (future), app_id=str for app-specific
app_id=app_id, # Always app-specific for now
# - conversation_id=None for persistent, conversation_id=str for session
conversation_id=conversation_id if spec.term == MemoryTerm.SESSION else None,
# - node_id=None for app-scope, node_id=str for node-scope
node_id=node_id if spec.scope == MemoryScope.NODE else None
)
all_memories.append(default_memory)
return all_memories
@staticmethod
def get_app_memories_by_workflow(workflow, tenant_id: str,
conversation_id: Optional[str] = None) -> list[MemoryBlock]:
"""Get app-scoped memories based on workflow configuration"""
from core.memory.entities import MemoryScope
app_memory_specs = [spec for spec in workflow.memory_blocks if spec.scope == MemoryScope.APP]
return ChatflowMemoryService.get_memories_by_specs(
memory_block_specs=app_memory_specs,
tenant_id=tenant_id,
app_id=workflow.app_id,
conversation_id=conversation_id
)
@staticmethod
def get_node_memories_by_workflow(workflow, node_id: str, tenant_id: str) -> list[MemoryBlock]:
"""Get node-scoped memories based on workflow configuration"""
from core.memory.entities import MemoryScope
node_memory_specs = [
spec for spec in workflow.memory_blocks
if spec.scope == MemoryScope.NODE and spec.id == node_id
]
return ChatflowMemoryService.get_memories_by_specs(
memory_block_specs=node_memory_specs,
tenant_id=tenant_id,
app_id=workflow.app_id,
node_id=node_id
)
# Core Memory Orchestration features
@staticmethod
def update_memory_if_needed(tenant_id: str, app_id: str,
memory_block_spec: MemoryBlockSpec,
conversation_id: str,
variable_pool: VariablePool,
is_draft: bool = False) -> bool:
"""Update app-level memory if conditions are met
Args:
tenant_id: Tenant ID
app_id: Application ID
memory_block_spec: Memory block specification
conversation_id: Conversation ID
variable_pool: Variable pool for context
is_draft: Whether in draft mode
"""
if not ChatflowMemoryService._should_update_memory(
tenant_id, app_id, memory_block_spec, conversation_id
):
return False
if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC:
# Sync mode: will be processed in batch after the App run completes
# This only marks the need; actual update happens in _update_app_memory_after_run
return True
else:
# Async mode: submit asynchronous update immediately
ChatflowMemoryService._submit_async_memory_update(
tenant_id, app_id, memory_block_spec, conversation_id, variable_pool, is_draft
)
return True
@staticmethod
def update_node_memory_if_needed(tenant_id: str, app_id: str,
memory_block_spec: MemoryBlockSpec,
node_id: str, llm_output: str,
variable_pool: VariablePool,
is_draft: bool = False) -> bool:
"""Update node-level memory after LLM execution
Args:
tenant_id: Tenant ID
app_id: Application ID
memory_block_spec: Memory block specification
node_id: Node ID
llm_output: LLM output content
variable_pool: Variable pool for context
is_draft: Whether in draft mode
"""
conversation_id_segment = variable_pool.get(('sys', 'conversation_id'))
if not conversation_id_segment:
return False
conversation_id = conversation_id_segment.value
# Save LLM output to node conversation history
assistant_message = AssistantPromptMessage(content=llm_output)
ChatflowHistoryService.save_node_message(
prompt_message=assistant_message,
node_id=node_id,
conversation_id=str(conversation_id),
app_id=app_id,
tenant_id=tenant_id
)
if not ChatflowMemoryService._should_update_memory(
tenant_id, app_id, memory_block_spec, str(conversation_id), node_id
):
return False
if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC:
# Node-level sync: blocking execution
ChatflowMemoryService._update_node_memory_sync(
tenant_id, app_id, memory_block_spec, node_id,
str(conversation_id), variable_pool, is_draft
)
else:
# Node-level async: execute asynchronously
ChatflowMemoryService._update_node_memory_async(
tenant_id, app_id, memory_block_spec, node_id,
llm_output, str(conversation_id), variable_pool, is_draft
)
return True
@staticmethod
def _should_update_memory(tenant_id: str, app_id: str,
memory_block_spec: MemoryBlockSpec,
conversation_id: str, node_id: Optional[str] = None) -> bool:
"""Check if memory should be updated based on strategy"""
if memory_block_spec.strategy != MemoryStrategy.ON_TURNS:
return False
# Check turn count
turn_key = f"memory_turn_count:{tenant_id}:{app_id}:{conversation_id}"
if node_id:
turn_key += f":{node_id}"
current_turns = redis_client.get(turn_key)
current_turns = int(current_turns) if current_turns else 0
current_turns += 1
# Update count
redis_client.set(turn_key, current_turns)
return current_turns % memory_block_spec.update_turns == 0
# App-level async update method
@staticmethod
def _submit_async_memory_update(tenant_id: str, app_id: str,
block: MemoryBlockSpec,
conversation_id: str,
variable_pool: VariablePool,
is_draft: bool = False):
"""Submit async memory update task"""
# Execute update asynchronously using thread
thread = threading.Thread(
target=ChatflowMemoryService._update_single_memory,
kwargs={
'tenant_id': tenant_id,
'app_id': app_id,
'memory_block_spec': block,
'conversation_id': conversation_id,
'variable_pool': variable_pool,
'is_draft': is_draft
},
daemon=True
)
thread.start()
# Node-level sync update method
@staticmethod
def _update_node_memory_sync(tenant_id: str, app_id: str,
memory_block_spec: MemoryBlockSpec,
node_id: str, conversation_id: str,
variable_pool: VariablePool,
is_draft: bool = False):
"""Synchronously update node memory (blocking execution)"""
ChatflowMemoryService._perform_memory_update(
tenant_id=tenant_id,
app_id=app_id,
memory_block_spec=memory_block_spec,
conversation_id=conversation_id,
variable_pool=variable_pool,
node_id=node_id,
is_draft=is_draft
)
# Wait for update to complete before returning
# Node-level async update method
@staticmethod
def _update_node_memory_async(tenant_id: str, app_id: str,
memory_block_spec: MemoryBlockSpec,
node_id: str, llm_output: str,
conversation_id: str,
variable_pool: VariablePool,
is_draft: bool = False):
"""Asynchronously update node memory (submit task)"""
# Execute update asynchronously using thread
thread = threading.Thread(
target=ChatflowMemoryService._perform_node_memory_update,
kwargs={
'memory_block_spec': memory_block_spec,
'tenant_id': tenant_id,
'app_id': app_id,
'node_id': node_id,
'llm_output': llm_output,
'variable_pool': variable_pool,
'is_draft': is_draft
},
daemon=True
)
thread.start()
# Return immediately without waiting
@staticmethod
def _perform_node_memory_update(*, memory_block_spec: MemoryBlockSpec,
tenant_id: str, app_id: str, node_id: str,
llm_output: str, variable_pool: VariablePool,
is_draft: bool = False):
"""Execute node memory update"""
try:
# Call existing _perform_memory_update method here
ChatflowMemoryService._perform_memory_update(
tenant_id=tenant_id,
app_id=app_id,
memory_block_spec=memory_block_spec,
conversation_id=str(variable_pool.get(('sys', 'conversation_id'))),
variable_pool=variable_pool,
node_id=node_id,
is_draft=is_draft
)
except Exception as e:
logger.exception(
"Failed to update node memory %s for node %s",
memory_block_spec.id,
node_id,
exc_info=e
)
@staticmethod
def _update_single_memory(*, tenant_id: str, app_id: str,
memory_block_spec: MemoryBlockSpec,
conversation_id: str,
variable_pool: VariablePool,
is_draft: bool = False):
"""Update single memory"""
ChatflowMemoryService._perform_memory_update(
tenant_id=tenant_id,
app_id=app_id,
memory_block_spec=memory_block_spec,
conversation_id=conversation_id,
variable_pool=variable_pool,
node_id=None, # App-level memory doesn't have node_id
is_draft=is_draft
)
@staticmethod
def _perform_memory_update(tenant_id: str, app_id: str,
memory_block_spec: MemoryBlockSpec,
conversation_id: str, variable_pool: VariablePool,
node_id: Optional[str] = None,
is_draft: bool = False):
"""Perform the actual memory update using LLM
Args:
tenant_id: Tenant ID
app_id: Application ID
memory_block_spec: Memory block specification
conversation_id: Conversation ID
variable_pool: Variable pool for context
node_id: Optional node ID for node-level memory updates
is_draft: Whether in draft mode
"""
# Get conversation history
history = ChatflowHistoryService.get_visible_chat_history(
conversation_id=conversation_id,
app_id=app_id,
tenant_id=tenant_id,
node_id=node_id, # Pass node_id, if None then get app-level history
max_visible_count=memory_block_spec.preserved_turns
)
# Get current memory value
current_memory = ChatflowMemoryService.get_memory(
memory_id=memory_block_spec.id,
tenant_id=tenant_id,
app_id=app_id,
conversation_id=conversation_id if memory_block_spec.term == MemoryTerm.SESSION else None,
node_id=node_id
)
current_value = current_memory.value if current_memory else memory_block_spec.template
# Build update prompt - adjust wording based on whether there's a node_id
context_type = "Node conversation history" if node_id else "Conversation history"
memory_update_prompt = f"""
Based on the following {context_type}, update the memory content:
Current memory: {current_value}
{context_type}:
{[msg.content for msg in history]}
Update instruction: {memory_block_spec.instruction}
Please output the updated memory content:
"""
# Invoke LLM to update memory - extracted as a separate method
updated_value = ChatflowMemoryService._invoke_llm_for_memory_update(
tenant_id,
memory_block_spec,
memory_update_prompt,
current_value
)
if updated_value is None:
return # LLM invocation failed
# Save updated memory
updated_memory = MemoryBlock(
id=current_memory.id if current_memory else "",
memory_id=memory_block_spec.id,
name=memory_block_spec.name,
value=updated_value,
scope=memory_block_spec.scope,
term=memory_block_spec.term,
app_id=app_id,
conversation_id=conversation_id if memory_block_spec.term == MemoryTerm.SESSION else None,
node_id=node_id
)
ChatflowMemoryService.save_memory(updated_memory, tenant_id, is_draft)
# Not implemented yet: Send success event
# self._send_memory_update_event(memory_block_spec.id, "completed", updated_value)
@staticmethod
def _invoke_llm_for_memory_update(tenant_id: str,
memory_block_spec: MemoryBlockSpec,
prompt: str, current_value: str) -> Optional[str]:
"""Invoke LLM to update memory content
Args:
tenant_id: Tenant ID
memory_block_spec: Memory block specification
prompt: Update prompt
current_value: Current memory value (used for fallback on failure)
Returns:
Updated value, returns None if failed
"""
from core.model_manager import ModelManager
from core.model_runtime.entities.llm_entities import LLMResult
from core.model_runtime.entities.model_entities import ModelType
model_manager = ModelManager()
# Use model configuration defined in memory_block_spec, use default model if not specified
if hasattr(memory_block_spec, 'model') and memory_block_spec.model:
model_instance = model_manager.get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=memory_block_spec.model.get("provider", ""),
model=memory_block_spec.model.get("name", "")
)
model_parameters = memory_block_spec.model.get("completion_params", {})
else:
# Use default model
model_instance = model_manager.get_default_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM
)
model_parameters = {"temperature": 0.7, "max_tokens": 1000}
try:
response = cast(
LLMResult,
model_instance.invoke_llm(
prompt_messages=[UserPromptMessage(content=prompt)],
model_parameters=model_parameters,
stream=False
)
)
return response.message.get_text_content()
except Exception as e:
logger.exception("Failed to update memory using LLM", exc_info=e)
# Not implemented yet: Send failure event
# ChatflowMemoryService._send_memory_update_event(memory_block_spec.id, "failed", current_value, str(e))
return None
def _send_memory_update_event(self, memory_id: str, status: str, value: str, error: str = ""):
"""Send memory update event
Note: Event system integration not implemented yet, this method is retained as a placeholder
"""
# Not implemented yet: Event system integration will be added in future versions
pass
# App-level sync batch update related methods
@staticmethod
def wait_for_sync_memory_completion(workflow, conversation_id: str):
"""Wait for sync memory update to complete, maximum 50 seconds
Args:
workflow: Workflow object
conversation_id: Conversation ID
Raises:
MemorySyncTimeoutError: Raised when timeout is reached
"""
from core.memory.entities import MemoryScope
memory_blocks = workflow.memory_blocks
sync_memory_blocks = [
block for block in memory_blocks
if block.scope == MemoryScope.APP and block.update_mode == "sync"
]
if not sync_memory_blocks:
return
lock_key = _get_memory_sync_lock_key(workflow.app_id, conversation_id)
# Retry up to 10 times, wait 5 seconds each time, total 50 seconds
max_retries = 10
retry_interval = 5
for i in range(max_retries):
if not redis_client.exists(lock_key):
# Lock doesn't exist, can continue
return
if i < max_retries - 1:
# Still have retry attempts, wait
time.sleep(retry_interval)
else:
# Maximum retry attempts reached, raise exception
raise MemorySyncTimeoutError(
app_id=workflow.app_id,
conversation_id=conversation_id
)
@staticmethod
def update_app_memory_after_run(workflow, conversation_id: str, variable_pool: VariablePool,
is_draft: bool = False):
"""Update app-level memory after run completion
Args:
workflow: Workflow object
conversation_id: Conversation ID
variable_pool: Variable pool
is_draft: Whether in draft mode
"""
from core.memory.entities import MemoryScope
memory_blocks = workflow.memory_blocks
# Separate sync and async memory blocks
sync_blocks = []
async_blocks = []
for block in memory_blocks:
if block.scope == MemoryScope.APP:
if block.update_mode == "sync":
sync_blocks.append(block)
else:
async_blocks.append(block)
# async mode: submit individual async tasks directly
for block in async_blocks:
ChatflowMemoryService._submit_async_memory_update(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
block=block,
conversation_id=conversation_id,
variable_pool=variable_pool,
is_draft=is_draft
)
# sync mode: submit a batch update task
if sync_blocks:
ChatflowMemoryService._submit_sync_memory_batch_update(
workflow=workflow,
sync_blocks=sync_blocks,
conversation_id=conversation_id,
variable_pool=variable_pool,
is_draft=is_draft
)
@staticmethod
def _submit_sync_memory_batch_update(workflow,
sync_blocks: list[MemoryBlockSpec],
conversation_id: str,
variable_pool: VariablePool,
is_draft: bool = False):
"""Submit sync memory batch update task"""
# Execute batch update asynchronously using thread
thread = threading.Thread(
target=ChatflowMemoryService._batch_update_sync_memory,
kwargs={
'workflow': workflow,
'sync_blocks': sync_blocks,
'conversation_id': conversation_id,
'variable_pool': variable_pool,
'is_draft': is_draft
},
daemon=True
)
thread.start()
@staticmethod
def _batch_update_sync_memory(*, workflow,
sync_blocks: list[MemoryBlockSpec],
conversation_id: str,
variable_pool: VariablePool,
is_draft: bool = False):
"""Batch update sync memory (with Redis lock)"""
from concurrent.futures import ThreadPoolExecutor
lock_key = _get_memory_sync_lock_key(workflow.app_id, conversation_id)
# Use Redis lock context manager (30 seconds timeout)
with redis_client.lock(lock_key, timeout=30):
try:
# Update all sync memory in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for block in sync_blocks:
future = executor.submit(
ChatflowMemoryService._update_single_memory,
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
memory_block_spec=block,
conversation_id=conversation_id,
variable_pool=variable_pool,
is_draft=is_draft
)
futures.append(future)
# Wait for all updates to complete
for future in futures:
try:
future.result()
except Exception as e:
logger.exception("Failed to update memory", exc_info=e)
except Exception as e:
logger.exception("Failed to update sync memory for app %s", workflow.app_id, exc_info=e)