mirror of https://github.com/langgenius/dify.git
feat: add ChatflowHistoryService and ChatflowMemoryService
This commit is contained in:
parent
f977dc410a
commit
45fddc70d5
|
|
@ -0,0 +1,359 @@
|
|||
import json
|
||||
import time
|
||||
from collections.abc import Sequence
|
||||
from typing import Literal, Optional, overload
|
||||
|
||||
from sqlalchemy import Row, Select, and_, func, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.memory.entities import ChatflowConversationMetadata
|
||||
from core.model_runtime.entities.message_entities import (
|
||||
AssistantPromptMessage,
|
||||
PromptMessage,
|
||||
UserPromptMessage,
|
||||
)
|
||||
from extensions.ext_database import db
|
||||
from models.chatflow_memory import ChatflowConversation, ChatflowMessage
|
||||
|
||||
|
||||
class ChatflowHistoryService:
|
||||
"""
|
||||
Service layer for managing chatflow conversation history.
|
||||
|
||||
This unified service handles all chatflow memory operations:
|
||||
- Reading visible chat history with version control
|
||||
- Saving messages to append-only table
|
||||
- Managing visible_count metadata
|
||||
- Supporting both app-level and node-level scoping
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def get_visible_chat_history(
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
max_visible_count: Optional[int] = None
|
||||
) -> Sequence[PromptMessage]:
|
||||
"""
|
||||
Get visible chat history based on metadata visible_count.
|
||||
|
||||
Args:
|
||||
conversation_id: Original conversation ID
|
||||
node_id: None for app-level, specific node_id for node-level
|
||||
max_visible_count: Override visible_count for memory update operations
|
||||
|
||||
Returns:
|
||||
Sequence of PromptMessage objects in chronological order (oldest first)
|
||||
"""
|
||||
with db.session() as session:
|
||||
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
|
||||
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=False
|
||||
)
|
||||
|
||||
if not chatflow_conv:
|
||||
return []
|
||||
|
||||
# Parse metadata
|
||||
metadata_dict = json.loads(chatflow_conv.conversation_metadata)
|
||||
metadata = ChatflowConversationMetadata.model_validate(metadata_dict)
|
||||
|
||||
# Determine the actual number of messages to return
|
||||
target_visible_count = max_visible_count if max_visible_count is not None else metadata.visible_count
|
||||
|
||||
# Fetch all messages (handle versioning)
|
||||
msg_stmt = select(ChatflowMessage).where(
|
||||
ChatflowMessage.conversation_id == chatflow_conv.id
|
||||
).order_by(ChatflowMessage.index.asc(), ChatflowMessage.version.desc())
|
||||
|
||||
all_messages: Sequence[Row[tuple[ChatflowMessage]]] = session.execute(msg_stmt).all()
|
||||
|
||||
# Filter in memory: keep only the latest version for each index
|
||||
latest_messages_by_index: dict[int, ChatflowMessage] = {}
|
||||
for msg_row in all_messages:
|
||||
msg = msg_row[0]
|
||||
index = msg.index
|
||||
|
||||
if index not in latest_messages_by_index or msg.version > latest_messages_by_index[index].version:
|
||||
latest_messages_by_index[index] = msg
|
||||
|
||||
# Sort by index and take the latest target_visible_count messages
|
||||
sorted_messages = sorted(latest_messages_by_index.values(), key=lambda m: m.index, reverse=True)
|
||||
visible_messages = sorted_messages[:target_visible_count]
|
||||
|
||||
# Convert to PromptMessage and restore correct order (oldest first)
|
||||
prompt_messages: list[PromptMessage] = []
|
||||
for msg in reversed(visible_messages): # Restore chronological order (index ascending)
|
||||
data = json.loads(msg.data)
|
||||
role = data.get('role', 'user')
|
||||
content = data.get('content', '')
|
||||
|
||||
if role == 'user':
|
||||
prompt_messages.append(UserPromptMessage(content=content))
|
||||
elif role == 'assistant':
|
||||
prompt_messages.append(AssistantPromptMessage(content=content))
|
||||
|
||||
return prompt_messages
|
||||
|
||||
@staticmethod
|
||||
def get_app_visible_chat_history(
|
||||
app_id: str,
|
||||
conversation_id: str,
|
||||
tenant_id: str,
|
||||
max_visible_count: Optional[int] = None
|
||||
) -> Sequence[PromptMessage]:
|
||||
"""Get visible chat history for app level."""
|
||||
return ChatflowHistoryService.get_visible_chat_history(
|
||||
conversation_id=conversation_id,
|
||||
app_id=app_id,
|
||||
tenant_id=tenant_id,
|
||||
node_id=None, # App level
|
||||
max_visible_count=max_visible_count
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_node_visible_chat_history(
|
||||
node_id: str,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
max_visible_count: Optional[int] = None
|
||||
) -> Sequence[PromptMessage]:
|
||||
"""Get visible chat history for a specific node."""
|
||||
return ChatflowHistoryService.get_visible_chat_history(
|
||||
conversation_id=conversation_id,
|
||||
app_id=app_id,
|
||||
tenant_id=tenant_id,
|
||||
node_id=node_id,
|
||||
max_visible_count=max_visible_count
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def save_message(
|
||||
prompt_message: PromptMessage,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None
|
||||
) -> None:
|
||||
"""
|
||||
Save a message to the append-only chatflow_messages table.
|
||||
|
||||
Args:
|
||||
node_id: None for app-level, specific node_id for node-level
|
||||
"""
|
||||
with db.session() as session:
|
||||
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
|
||||
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True
|
||||
)
|
||||
|
||||
# Get next index
|
||||
max_index = session.execute(
|
||||
select(func.max(ChatflowMessage.index)).where(
|
||||
ChatflowMessage.conversation_id == chatflow_conv.id
|
||||
)
|
||||
).scalar() or -1
|
||||
next_index = max_index + 1
|
||||
|
||||
# Save new message to append-only table
|
||||
message_data = {
|
||||
'role': prompt_message.role.value,
|
||||
'content': prompt_message.get_text_content(),
|
||||
'timestamp': time.time()
|
||||
}
|
||||
|
||||
new_message = ChatflowMessage(
|
||||
conversation_id=chatflow_conv.id,
|
||||
index=next_index,
|
||||
version=1,
|
||||
data=json.dumps(message_data)
|
||||
)
|
||||
session.add(new_message)
|
||||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def save_app_message(
|
||||
prompt_message: PromptMessage,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str
|
||||
) -> None:
|
||||
"""Save PromptMessage to app-level chatflow conversation."""
|
||||
ChatflowHistoryService.save_message(
|
||||
prompt_message=prompt_message,
|
||||
conversation_id=conversation_id,
|
||||
app_id=app_id,
|
||||
tenant_id=tenant_id,
|
||||
node_id=None
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def save_node_message(
|
||||
prompt_message: PromptMessage,
|
||||
node_id: str,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str
|
||||
) -> None:
|
||||
"""Save PromptMessage to node-specific chatflow conversation."""
|
||||
ChatflowHistoryService.save_message(
|
||||
prompt_message=prompt_message,
|
||||
conversation_id=conversation_id,
|
||||
app_id=app_id,
|
||||
tenant_id=tenant_id,
|
||||
node_id=node_id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def save_message_version(
|
||||
prompt_message: PromptMessage,
|
||||
message_index: int,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None
|
||||
) -> None:
|
||||
"""
|
||||
Save a new version of an existing message (for message editing scenarios).
|
||||
"""
|
||||
with db.session() as session:
|
||||
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
|
||||
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True
|
||||
)
|
||||
|
||||
# Get the maximum version number for this index
|
||||
max_version = session.execute(
|
||||
select(func.max(ChatflowMessage.version)).where(
|
||||
and_(
|
||||
ChatflowMessage.conversation_id == chatflow_conv.id,
|
||||
ChatflowMessage.index == message_index
|
||||
)
|
||||
)
|
||||
).scalar() or 0
|
||||
next_version = max_version + 1
|
||||
|
||||
# Save new version of the message
|
||||
message_data = {
|
||||
'role': prompt_message.role.value,
|
||||
'content': prompt_message.get_text_content(),
|
||||
'timestamp': time.time()
|
||||
}
|
||||
|
||||
new_message_version = ChatflowMessage(
|
||||
conversation_id=chatflow_conv.id,
|
||||
index=message_index,
|
||||
version=next_version,
|
||||
data=json.dumps(message_data)
|
||||
)
|
||||
session.add(new_message_version)
|
||||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def update_visible_count(
|
||||
conversation_id: str,
|
||||
node_id: Optional[str],
|
||||
new_visible_count: int,
|
||||
app_id: str,
|
||||
tenant_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Update visible_count metadata for specific scope.
|
||||
|
||||
Args:
|
||||
node_id: None for app-level updates, specific node_id for node-level updates
|
||||
new_visible_count: The new visible_count value (typically preserved_turns)
|
||||
|
||||
Usage Examples:
|
||||
# Update app-level visible_count
|
||||
ChatflowHistoryService.update_visible_count(conv_id, None, 10, app_id, tenant_id)
|
||||
|
||||
# Update node-specific visible_count
|
||||
ChatflowHistoryService.update_visible_count(conv_id, "node-123", 8, app_id, tenant_id)
|
||||
"""
|
||||
with db.session() as session:
|
||||
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
|
||||
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True
|
||||
)
|
||||
|
||||
# Only update visible_count in metadata, do not delete any data
|
||||
new_metadata = ChatflowConversationMetadata(visible_count=new_visible_count)
|
||||
chatflow_conv.conversation_metadata = new_metadata.model_dump_json()
|
||||
|
||||
session.commit()
|
||||
|
||||
@overload
|
||||
@staticmethod
|
||||
def _get_or_create_chatflow_conversation(
|
||||
session: Session,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
create_if_missing: Literal[True] = True
|
||||
) -> ChatflowConversation: ...
|
||||
|
||||
@overload
|
||||
@staticmethod
|
||||
def _get_or_create_chatflow_conversation(
|
||||
session: Session,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
create_if_missing: Literal[False] = False
|
||||
) -> Optional[ChatflowConversation]: ...
|
||||
|
||||
@overload
|
||||
@staticmethod
|
||||
def _get_or_create_chatflow_conversation(
|
||||
session: Session,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
create_if_missing: bool = False
|
||||
) -> Optional[ChatflowConversation]: ...
|
||||
|
||||
@staticmethod
|
||||
def _get_or_create_chatflow_conversation(
|
||||
session: Session,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
create_if_missing: bool = False
|
||||
) -> Optional[ChatflowConversation]:
|
||||
"""Get existing chatflow conversation or optionally create new one"""
|
||||
stmt: Select[tuple[ChatflowConversation]] = select(ChatflowConversation).where(
|
||||
and_(
|
||||
ChatflowConversation.original_conversation_id == conversation_id,
|
||||
ChatflowConversation.tenant_id == tenant_id,
|
||||
ChatflowConversation.app_id == app_id
|
||||
)
|
||||
)
|
||||
|
||||
if node_id:
|
||||
stmt = stmt.where(ChatflowConversation.node_id == node_id)
|
||||
else:
|
||||
stmt = stmt.where(ChatflowConversation.node_id.is_(None))
|
||||
|
||||
chatflow_conv: Row[tuple[ChatflowConversation]] | None = session.execute(stmt).first()
|
||||
|
||||
if chatflow_conv:
|
||||
result: ChatflowConversation = chatflow_conv[0] # Extract the ChatflowConversation object
|
||||
return result
|
||||
else:
|
||||
if create_if_missing:
|
||||
# Create a new chatflow conversation
|
||||
default_metadata = ChatflowConversationMetadata(visible_count=20)
|
||||
new_chatflow_conv = ChatflowConversation(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
node_id=node_id,
|
||||
original_conversation_id=conversation_id,
|
||||
conversation_metadata=default_metadata.model_dump_json(),
|
||||
)
|
||||
session.add(new_chatflow_conv)
|
||||
session.flush() # Obtain ID
|
||||
return new_chatflow_conv
|
||||
return None
|
||||
|
|
@ -0,0 +1,772 @@
|
|||
import logging
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Sequence
|
||||
from typing import Optional, cast
|
||||
|
||||
from sqlalchemy import and_, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.memory.entities import (
|
||||
MemoryBlock,
|
||||
MemoryBlockSpec,
|
||||
MemoryScheduleMode,
|
||||
MemoryScope,
|
||||
MemoryStrategy,
|
||||
MemoryTerm,
|
||||
)
|
||||
from core.memory.errors import MemorySyncTimeoutError
|
||||
from core.model_runtime.entities.message_entities import AssistantPromptMessage, UserPromptMessage
|
||||
from core.workflow.entities.variable_pool import VariablePool
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.chatflow_memory import ChatflowMemoryVariable
|
||||
from services.chatflow_history_service import ChatflowHistoryService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Important note: Since Dify uses gevent, we don't need an extra task queue (e.g., Celery).
|
||||
# Threads created via threading.Thread are automatically patched into greenlets in a gevent environment,
|
||||
# enabling efficient asynchronous execution.
|
||||
|
||||
def _get_memory_sync_lock_key(app_id: str, conversation_id: str) -> str:
|
||||
"""Generate Redis lock key for memory sync updates
|
||||
|
||||
Args:
|
||||
app_id: Application ID
|
||||
conversation_id: Conversation ID
|
||||
|
||||
Returns:
|
||||
Formatted lock key
|
||||
"""
|
||||
return f"memory_sync_update:{app_id}:{conversation_id}"
|
||||
|
||||
class ChatflowMemoryService:
|
||||
"""
|
||||
Memory service class with only static methods.
|
||||
All methods are static and do not require instantiation.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def get_memory(memory_id: str, tenant_id: str,
|
||||
app_id: Optional[str] = None,
|
||||
conversation_id: Optional[str] = None,
|
||||
node_id: Optional[str] = None) -> Optional[MemoryBlock]:
|
||||
"""Get single memory by ID"""
|
||||
stmt = select(ChatflowMemoryVariable).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.memory_id == memory_id,
|
||||
ChatflowMemoryVariable.tenant_id == tenant_id
|
||||
)
|
||||
)
|
||||
|
||||
if app_id:
|
||||
stmt = stmt.where(ChatflowMemoryVariable.app_id == app_id)
|
||||
if conversation_id:
|
||||
stmt = stmt.where(ChatflowMemoryVariable.conversation_id == conversation_id)
|
||||
if node_id:
|
||||
stmt = stmt.where(ChatflowMemoryVariable.node_id == node_id)
|
||||
|
||||
with db.session() as session:
|
||||
result = session.execute(stmt).first()
|
||||
if result:
|
||||
return MemoryBlock.model_validate(result[0].__dict__)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def save_memory(memory: MemoryBlock, tenant_id: str, is_draft: bool = False) -> None:
|
||||
"""Save or update memory with draft mode support"""
|
||||
stmt = select(ChatflowMemoryVariable).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.memory_id == memory.memory_id,
|
||||
ChatflowMemoryVariable.tenant_id == tenant_id
|
||||
)
|
||||
)
|
||||
|
||||
with db.session() as session:
|
||||
existing = session.execute(stmt).first()
|
||||
if existing:
|
||||
# Update existing
|
||||
for key, value in memory.model_dump(exclude_unset=True).items():
|
||||
if hasattr(existing[0], key):
|
||||
setattr(existing[0], key, value)
|
||||
else:
|
||||
# Create new
|
||||
new_memory = ChatflowMemoryVariable(
|
||||
tenant_id=tenant_id,
|
||||
**memory.model_dump(exclude={'id'})
|
||||
)
|
||||
session.add(new_memory)
|
||||
session.commit()
|
||||
|
||||
# In draft mode, also write to workflow_draft_variables
|
||||
if is_draft:
|
||||
from models.workflow import WorkflowDraftVariable
|
||||
from services.workflow_draft_variable_service import WorkflowDraftVariableService
|
||||
with Session(bind=db.engine) as session:
|
||||
draft_var_service = WorkflowDraftVariableService(session)
|
||||
|
||||
# Try to get existing variables
|
||||
existing_vars = draft_var_service.get_draft_variables_by_selectors(
|
||||
app_id=memory.app_id,
|
||||
selectors=[['memory_block', memory.memory_id]]
|
||||
)
|
||||
|
||||
if existing_vars:
|
||||
# Update existing draft variable
|
||||
draft_var = existing_vars[0]
|
||||
draft_var.value = memory.value
|
||||
else:
|
||||
# Create new draft variable
|
||||
draft_var = WorkflowDraftVariable.new_memory_block_variable(
|
||||
app_id=memory.app_id,
|
||||
memory_id=memory.memory_id,
|
||||
name=memory.name,
|
||||
value=memory.value,
|
||||
description=f"Memory block: {memory.name}"
|
||||
)
|
||||
session.add(draft_var)
|
||||
|
||||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def get_memories_by_specs(memory_block_specs: Sequence[MemoryBlockSpec],
|
||||
tenant_id: str, app_id: str,
|
||||
conversation_id: Optional[str] = None,
|
||||
node_id: Optional[str] = None,
|
||||
is_draft: bool = False) -> list[MemoryBlock]:
|
||||
"""Get runtime memory values based on MemoryBlockSpecs with draft mode support"""
|
||||
from models.enums import DraftVariableType
|
||||
|
||||
if not memory_block_specs:
|
||||
return []
|
||||
|
||||
# In draft mode, prefer reading from workflow_draft_variables
|
||||
if is_draft:
|
||||
# Try reading from the draft variables table
|
||||
from services.workflow_draft_variable_service import WorkflowDraftVariableService
|
||||
with Session(bind=db.engine) as session:
|
||||
draft_var_service = WorkflowDraftVariableService(session)
|
||||
|
||||
# Build selector list
|
||||
selectors = [['memory_block', spec.id] for spec in memory_block_specs]
|
||||
|
||||
# Fetch draft variables
|
||||
draft_vars = draft_var_service.get_draft_variables_by_selectors(
|
||||
app_id=app_id,
|
||||
selectors=selectors
|
||||
)
|
||||
|
||||
# If draft variables exist, prefer using them
|
||||
if draft_vars:
|
||||
spec_by_id = {spec.id: spec for spec in memory_block_specs}
|
||||
draft_memories = []
|
||||
|
||||
for draft_var in draft_vars:
|
||||
if draft_var.node_id == DraftVariableType.MEMORY_BLOCK:
|
||||
spec = spec_by_id.get(draft_var.name)
|
||||
if spec:
|
||||
memory_block = MemoryBlock(
|
||||
id=draft_var.id,
|
||||
memory_id=draft_var.name,
|
||||
name=spec.name,
|
||||
value=draft_var.value,
|
||||
scope=spec.scope,
|
||||
term=spec.term,
|
||||
app_id=app_id,
|
||||
conversation_id='draft',
|
||||
node_id=node_id
|
||||
)
|
||||
draft_memories.append(memory_block)
|
||||
|
||||
if draft_memories:
|
||||
return draft_memories
|
||||
|
||||
memory_ids = [spec.id for spec in memory_block_specs]
|
||||
|
||||
stmt = select(ChatflowMemoryVariable).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.memory_id.in_(memory_ids),
|
||||
ChatflowMemoryVariable.tenant_id == tenant_id,
|
||||
ChatflowMemoryVariable.app_id == app_id
|
||||
)
|
||||
)
|
||||
|
||||
if conversation_id:
|
||||
stmt = stmt.where(ChatflowMemoryVariable.conversation_id == conversation_id)
|
||||
if node_id:
|
||||
stmt = stmt.where(ChatflowMemoryVariable.node_id == node_id)
|
||||
|
||||
with db.session() as session:
|
||||
results = session.execute(stmt).all()
|
||||
found_memories = {row[0].memory_id: MemoryBlock.model_validate(row[0].__dict__) for row in results}
|
||||
|
||||
# Create MemoryBlock objects for specs that don't have runtime values yet
|
||||
all_memories = []
|
||||
for spec in memory_block_specs:
|
||||
if spec.id in found_memories:
|
||||
all_memories.append(found_memories[spec.id])
|
||||
else:
|
||||
# Create default memory with template value following design rules
|
||||
default_memory = MemoryBlock(
|
||||
id="", # Will be assigned when saved
|
||||
memory_id=spec.id,
|
||||
name=spec.name,
|
||||
value=spec.template,
|
||||
scope=spec.scope,
|
||||
term=spec.term,
|
||||
# Design rules:
|
||||
# - app_id=None for global (future), app_id=str for app-specific
|
||||
app_id=app_id, # Always app-specific for now
|
||||
# - conversation_id=None for persistent, conversation_id=str for session
|
||||
conversation_id=conversation_id if spec.term == MemoryTerm.SESSION else None,
|
||||
# - node_id=None for app-scope, node_id=str for node-scope
|
||||
node_id=node_id if spec.scope == MemoryScope.NODE else None
|
||||
)
|
||||
all_memories.append(default_memory)
|
||||
|
||||
return all_memories
|
||||
|
||||
@staticmethod
|
||||
def get_app_memories_by_workflow(workflow, tenant_id: str,
|
||||
conversation_id: Optional[str] = None) -> list[MemoryBlock]:
|
||||
"""Get app-scoped memories based on workflow configuration"""
|
||||
from core.memory.entities import MemoryScope
|
||||
|
||||
app_memory_specs = [spec for spec in workflow.memory_blocks if spec.scope == MemoryScope.APP]
|
||||
return ChatflowMemoryService.get_memories_by_specs(
|
||||
memory_block_specs=app_memory_specs,
|
||||
tenant_id=tenant_id,
|
||||
app_id=workflow.app_id,
|
||||
conversation_id=conversation_id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_node_memories_by_workflow(workflow, node_id: str, tenant_id: str) -> list[MemoryBlock]:
|
||||
"""Get node-scoped memories based on workflow configuration"""
|
||||
from core.memory.entities import MemoryScope
|
||||
|
||||
node_memory_specs = [
|
||||
spec for spec in workflow.memory_blocks
|
||||
if spec.scope == MemoryScope.NODE and spec.id == node_id
|
||||
]
|
||||
return ChatflowMemoryService.get_memories_by_specs(
|
||||
memory_block_specs=node_memory_specs,
|
||||
tenant_id=tenant_id,
|
||||
app_id=workflow.app_id,
|
||||
node_id=node_id
|
||||
)
|
||||
|
||||
# Core Memory Orchestration features
|
||||
|
||||
@staticmethod
|
||||
def update_memory_if_needed(tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False) -> bool:
|
||||
"""Update app-level memory if conditions are met
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant ID
|
||||
app_id: Application ID
|
||||
memory_block_spec: Memory block specification
|
||||
conversation_id: Conversation ID
|
||||
variable_pool: Variable pool for context
|
||||
is_draft: Whether in draft mode
|
||||
"""
|
||||
if not ChatflowMemoryService._should_update_memory(
|
||||
tenant_id, app_id, memory_block_spec, conversation_id
|
||||
):
|
||||
return False
|
||||
|
||||
if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC:
|
||||
# Sync mode: will be processed in batch after the App run completes
|
||||
# This only marks the need; actual update happens in _update_app_memory_after_run
|
||||
return True
|
||||
else:
|
||||
# Async mode: submit asynchronous update immediately
|
||||
ChatflowMemoryService._submit_async_memory_update(
|
||||
tenant_id, app_id, memory_block_spec, conversation_id, variable_pool, is_draft
|
||||
)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def update_node_memory_if_needed(tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
node_id: str, llm_output: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False) -> bool:
|
||||
"""Update node-level memory after LLM execution
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant ID
|
||||
app_id: Application ID
|
||||
memory_block_spec: Memory block specification
|
||||
node_id: Node ID
|
||||
llm_output: LLM output content
|
||||
variable_pool: Variable pool for context
|
||||
is_draft: Whether in draft mode
|
||||
"""
|
||||
conversation_id_segment = variable_pool.get(('sys', 'conversation_id'))
|
||||
if not conversation_id_segment:
|
||||
return False
|
||||
conversation_id = conversation_id_segment.value
|
||||
|
||||
# Save LLM output to node conversation history
|
||||
assistant_message = AssistantPromptMessage(content=llm_output)
|
||||
ChatflowHistoryService.save_node_message(
|
||||
prompt_message=assistant_message,
|
||||
node_id=node_id,
|
||||
conversation_id=str(conversation_id),
|
||||
app_id=app_id,
|
||||
tenant_id=tenant_id
|
||||
)
|
||||
|
||||
if not ChatflowMemoryService._should_update_memory(
|
||||
tenant_id, app_id, memory_block_spec, str(conversation_id), node_id
|
||||
):
|
||||
return False
|
||||
|
||||
if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC:
|
||||
# Node-level sync: blocking execution
|
||||
ChatflowMemoryService._update_node_memory_sync(
|
||||
tenant_id, app_id, memory_block_spec, node_id,
|
||||
str(conversation_id), variable_pool, is_draft
|
||||
)
|
||||
else:
|
||||
# Node-level async: execute asynchronously
|
||||
ChatflowMemoryService._update_node_memory_async(
|
||||
tenant_id, app_id, memory_block_spec, node_id,
|
||||
llm_output, str(conversation_id), variable_pool, is_draft
|
||||
)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _should_update_memory(tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
conversation_id: str, node_id: Optional[str] = None) -> bool:
|
||||
"""Check if memory should be updated based on strategy"""
|
||||
if memory_block_spec.strategy != MemoryStrategy.ON_TURNS:
|
||||
return False
|
||||
|
||||
# Check turn count
|
||||
turn_key = f"memory_turn_count:{tenant_id}:{app_id}:{conversation_id}"
|
||||
if node_id:
|
||||
turn_key += f":{node_id}"
|
||||
|
||||
current_turns = redis_client.get(turn_key)
|
||||
current_turns = int(current_turns) if current_turns else 0
|
||||
current_turns += 1
|
||||
|
||||
# Update count
|
||||
redis_client.set(turn_key, current_turns)
|
||||
|
||||
return current_turns % memory_block_spec.update_turns == 0
|
||||
|
||||
# App-level async update method
|
||||
@staticmethod
|
||||
def _submit_async_memory_update(tenant_id: str, app_id: str,
|
||||
block: MemoryBlockSpec,
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Submit async memory update task"""
|
||||
|
||||
# Execute update asynchronously using thread
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._update_single_memory,
|
||||
kwargs={
|
||||
'tenant_id': tenant_id,
|
||||
'app_id': app_id,
|
||||
'memory_block_spec': block,
|
||||
'conversation_id': conversation_id,
|
||||
'variable_pool': variable_pool,
|
||||
'is_draft': is_draft
|
||||
},
|
||||
daemon=True
|
||||
)
|
||||
thread.start()
|
||||
|
||||
# Node-level sync update method
|
||||
@staticmethod
|
||||
def _update_node_memory_sync(tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
node_id: str, conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Synchronously update node memory (blocking execution)"""
|
||||
ChatflowMemoryService._perform_memory_update(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
memory_block_spec=memory_block_spec,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
node_id=node_id,
|
||||
is_draft=is_draft
|
||||
)
|
||||
# Wait for update to complete before returning
|
||||
|
||||
# Node-level async update method
|
||||
@staticmethod
|
||||
def _update_node_memory_async(tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
node_id: str, llm_output: str,
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Asynchronously update node memory (submit task)"""
|
||||
|
||||
# Execute update asynchronously using thread
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._perform_node_memory_update,
|
||||
kwargs={
|
||||
'memory_block_spec': memory_block_spec,
|
||||
'tenant_id': tenant_id,
|
||||
'app_id': app_id,
|
||||
'node_id': node_id,
|
||||
'llm_output': llm_output,
|
||||
'variable_pool': variable_pool,
|
||||
'is_draft': is_draft
|
||||
},
|
||||
daemon=True
|
||||
)
|
||||
thread.start()
|
||||
# Return immediately without waiting
|
||||
|
||||
@staticmethod
|
||||
def _perform_node_memory_update(*, memory_block_spec: MemoryBlockSpec,
|
||||
tenant_id: str, app_id: str, node_id: str,
|
||||
llm_output: str, variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Execute node memory update"""
|
||||
try:
|
||||
# Call existing _perform_memory_update method here
|
||||
ChatflowMemoryService._perform_memory_update(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
memory_block_spec=memory_block_spec,
|
||||
conversation_id=str(variable_pool.get(('sys', 'conversation_id'))),
|
||||
variable_pool=variable_pool,
|
||||
node_id=node_id,
|
||||
is_draft=is_draft
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to update node memory %s for node %s",
|
||||
memory_block_spec.id,
|
||||
node_id,
|
||||
exc_info=e
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _update_single_memory(*, tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Update single memory"""
|
||||
ChatflowMemoryService._perform_memory_update(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
memory_block_spec=memory_block_spec,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
node_id=None, # App-level memory doesn't have node_id
|
||||
is_draft=is_draft
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _perform_memory_update(tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
conversation_id: str, variable_pool: VariablePool,
|
||||
node_id: Optional[str] = None,
|
||||
is_draft: bool = False):
|
||||
"""Perform the actual memory update using LLM
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant ID
|
||||
app_id: Application ID
|
||||
memory_block_spec: Memory block specification
|
||||
conversation_id: Conversation ID
|
||||
variable_pool: Variable pool for context
|
||||
node_id: Optional node ID for node-level memory updates
|
||||
is_draft: Whether in draft mode
|
||||
"""
|
||||
# Get conversation history
|
||||
history = ChatflowHistoryService.get_visible_chat_history(
|
||||
conversation_id=conversation_id,
|
||||
app_id=app_id,
|
||||
tenant_id=tenant_id,
|
||||
node_id=node_id, # Pass node_id, if None then get app-level history
|
||||
max_visible_count=memory_block_spec.preserved_turns
|
||||
)
|
||||
|
||||
# Get current memory value
|
||||
current_memory = ChatflowMemoryService.get_memory(
|
||||
memory_id=memory_block_spec.id,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id if memory_block_spec.term == MemoryTerm.SESSION else None,
|
||||
node_id=node_id
|
||||
)
|
||||
|
||||
current_value = current_memory.value if current_memory else memory_block_spec.template
|
||||
|
||||
# Build update prompt - adjust wording based on whether there's a node_id
|
||||
context_type = "Node conversation history" if node_id else "Conversation history"
|
||||
memory_update_prompt = f"""
|
||||
Based on the following {context_type}, update the memory content:
|
||||
|
||||
Current memory: {current_value}
|
||||
|
||||
{context_type}:
|
||||
{[msg.content for msg in history]}
|
||||
|
||||
Update instruction: {memory_block_spec.instruction}
|
||||
|
||||
Please output the updated memory content:
|
||||
"""
|
||||
|
||||
# Invoke LLM to update memory - extracted as a separate method
|
||||
updated_value = ChatflowMemoryService._invoke_llm_for_memory_update(
|
||||
tenant_id,
|
||||
memory_block_spec,
|
||||
memory_update_prompt,
|
||||
current_value
|
||||
)
|
||||
|
||||
if updated_value is None:
|
||||
return # LLM invocation failed
|
||||
|
||||
# Save updated memory
|
||||
updated_memory = MemoryBlock(
|
||||
id=current_memory.id if current_memory else "",
|
||||
memory_id=memory_block_spec.id,
|
||||
name=memory_block_spec.name,
|
||||
value=updated_value,
|
||||
scope=memory_block_spec.scope,
|
||||
term=memory_block_spec.term,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id if memory_block_spec.term == MemoryTerm.SESSION else None,
|
||||
node_id=node_id
|
||||
)
|
||||
|
||||
ChatflowMemoryService.save_memory(updated_memory, tenant_id, is_draft)
|
||||
|
||||
# Not implemented yet: Send success event
|
||||
# self._send_memory_update_event(memory_block_spec.id, "completed", updated_value)
|
||||
|
||||
@staticmethod
|
||||
def _invoke_llm_for_memory_update(tenant_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
prompt: str, current_value: str) -> Optional[str]:
|
||||
"""Invoke LLM to update memory content
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant ID
|
||||
memory_block_spec: Memory block specification
|
||||
prompt: Update prompt
|
||||
current_value: Current memory value (used for fallback on failure)
|
||||
|
||||
Returns:
|
||||
Updated value, returns None if failed
|
||||
"""
|
||||
from core.model_manager import ModelManager
|
||||
from core.model_runtime.entities.llm_entities import LLMResult
|
||||
from core.model_runtime.entities.model_entities import ModelType
|
||||
|
||||
model_manager = ModelManager()
|
||||
|
||||
# Use model configuration defined in memory_block_spec, use default model if not specified
|
||||
if hasattr(memory_block_spec, 'model') and memory_block_spec.model:
|
||||
model_instance = model_manager.get_model_instance(
|
||||
tenant_id=tenant_id,
|
||||
model_type=ModelType.LLM,
|
||||
provider=memory_block_spec.model.get("provider", ""),
|
||||
model=memory_block_spec.model.get("name", "")
|
||||
)
|
||||
model_parameters = memory_block_spec.model.get("completion_params", {})
|
||||
else:
|
||||
# Use default model
|
||||
model_instance = model_manager.get_default_model_instance(
|
||||
tenant_id=tenant_id,
|
||||
model_type=ModelType.LLM
|
||||
)
|
||||
model_parameters = {"temperature": 0.7, "max_tokens": 1000}
|
||||
|
||||
try:
|
||||
response = cast(
|
||||
LLMResult,
|
||||
model_instance.invoke_llm(
|
||||
prompt_messages=[UserPromptMessage(content=prompt)],
|
||||
model_parameters=model_parameters,
|
||||
stream=False
|
||||
)
|
||||
)
|
||||
return response.message.get_text_content()
|
||||
except Exception as e:
|
||||
logger.exception("Failed to update memory using LLM", exc_info=e)
|
||||
# Not implemented yet: Send failure event
|
||||
# ChatflowMemoryService._send_memory_update_event(memory_block_spec.id, "failed", current_value, str(e))
|
||||
return None
|
||||
|
||||
|
||||
def _send_memory_update_event(self, memory_id: str, status: str, value: str, error: str = ""):
|
||||
"""Send memory update event
|
||||
|
||||
Note: Event system integration not implemented yet, this method is retained as a placeholder
|
||||
"""
|
||||
# Not implemented yet: Event system integration will be added in future versions
|
||||
pass
|
||||
|
||||
# App-level sync batch update related methods
|
||||
@staticmethod
|
||||
def wait_for_sync_memory_completion(workflow, conversation_id: str):
|
||||
"""Wait for sync memory update to complete, maximum 50 seconds
|
||||
|
||||
Args:
|
||||
workflow: Workflow object
|
||||
conversation_id: Conversation ID
|
||||
|
||||
Raises:
|
||||
MemorySyncTimeoutError: Raised when timeout is reached
|
||||
"""
|
||||
from core.memory.entities import MemoryScope
|
||||
|
||||
memory_blocks = workflow.memory_blocks
|
||||
sync_memory_blocks = [
|
||||
block for block in memory_blocks
|
||||
if block.scope == MemoryScope.APP and block.update_mode == "sync"
|
||||
]
|
||||
|
||||
if not sync_memory_blocks:
|
||||
return
|
||||
|
||||
lock_key = _get_memory_sync_lock_key(workflow.app_id, conversation_id)
|
||||
|
||||
# Retry up to 10 times, wait 5 seconds each time, total 50 seconds
|
||||
max_retries = 10
|
||||
retry_interval = 5
|
||||
|
||||
for i in range(max_retries):
|
||||
if not redis_client.exists(lock_key):
|
||||
# Lock doesn't exist, can continue
|
||||
return
|
||||
|
||||
if i < max_retries - 1:
|
||||
# Still have retry attempts, wait
|
||||
time.sleep(retry_interval)
|
||||
else:
|
||||
# Maximum retry attempts reached, raise exception
|
||||
raise MemorySyncTimeoutError(
|
||||
app_id=workflow.app_id,
|
||||
conversation_id=conversation_id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def update_app_memory_after_run(workflow, conversation_id: str, variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Update app-level memory after run completion
|
||||
|
||||
Args:
|
||||
workflow: Workflow object
|
||||
conversation_id: Conversation ID
|
||||
variable_pool: Variable pool
|
||||
is_draft: Whether in draft mode
|
||||
"""
|
||||
from core.memory.entities import MemoryScope
|
||||
|
||||
memory_blocks = workflow.memory_blocks
|
||||
|
||||
# Separate sync and async memory blocks
|
||||
sync_blocks = []
|
||||
async_blocks = []
|
||||
|
||||
for block in memory_blocks:
|
||||
if block.scope == MemoryScope.APP:
|
||||
if block.update_mode == "sync":
|
||||
sync_blocks.append(block)
|
||||
else:
|
||||
async_blocks.append(block)
|
||||
|
||||
# async mode: submit individual async tasks directly
|
||||
for block in async_blocks:
|
||||
ChatflowMemoryService._submit_async_memory_update(
|
||||
tenant_id=workflow.tenant_id,
|
||||
app_id=workflow.app_id,
|
||||
block=block,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
)
|
||||
|
||||
# sync mode: submit a batch update task
|
||||
if sync_blocks:
|
||||
ChatflowMemoryService._submit_sync_memory_batch_update(
|
||||
workflow=workflow,
|
||||
sync_blocks=sync_blocks,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _submit_sync_memory_batch_update(workflow,
|
||||
sync_blocks: list[MemoryBlockSpec],
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Submit sync memory batch update task"""
|
||||
|
||||
# Execute batch update asynchronously using thread
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._batch_update_sync_memory,
|
||||
kwargs={
|
||||
'workflow': workflow,
|
||||
'sync_blocks': sync_blocks,
|
||||
'conversation_id': conversation_id,
|
||||
'variable_pool': variable_pool,
|
||||
'is_draft': is_draft
|
||||
},
|
||||
daemon=True
|
||||
)
|
||||
thread.start()
|
||||
|
||||
@staticmethod
|
||||
def _batch_update_sync_memory(*, workflow,
|
||||
sync_blocks: list[MemoryBlockSpec],
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Batch update sync memory (with Redis lock)"""
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
lock_key = _get_memory_sync_lock_key(workflow.app_id, conversation_id)
|
||||
|
||||
# Use Redis lock context manager (30 seconds timeout)
|
||||
with redis_client.lock(lock_key, timeout=30):
|
||||
try:
|
||||
# Update all sync memory in parallel
|
||||
with ThreadPoolExecutor(max_workers=5) as executor:
|
||||
futures = []
|
||||
for block in sync_blocks:
|
||||
future = executor.submit(
|
||||
ChatflowMemoryService._update_single_memory,
|
||||
tenant_id=workflow.tenant_id,
|
||||
app_id=workflow.app_id,
|
||||
memory_block_spec=block,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
)
|
||||
futures.append(future)
|
||||
|
||||
# Wait for all updates to complete
|
||||
for future in futures:
|
||||
try:
|
||||
future.result()
|
||||
except Exception as e:
|
||||
logger.exception("Failed to update memory", exc_info=e)
|
||||
except Exception as e:
|
||||
logger.exception("Failed to update sync memory for app %s", workflow.app_id, exc_info=e)
|
||||
Loading…
Reference in New Issue