mirror of https://github.com/langgenius/dify.git
refactor: refactor from ChatflowHistoryService and ChatflowMemoryService
This commit is contained in:
parent
4d2fc66a8d
commit
8b68020453
|
|
@ -425,25 +425,22 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
|||
|
||||
# Build memory_id -> value mapping
|
||||
for memory in memories:
|
||||
if memory.scope == MemoryScope.APP:
|
||||
if memory.spec.scope == MemoryScope.APP:
|
||||
# App level: use memory_id directly
|
||||
memory_blocks_dict[memory.memory_id] = memory.value
|
||||
memory_blocks_dict[memory.spec.id] = memory.value
|
||||
else: # NODE scope
|
||||
node_id = memory.node_id
|
||||
if not node_id:
|
||||
logger.warning("Memory block %s has no node_id, skip.", memory.memory_id)
|
||||
logger.warning("Memory block %s has no node_id, skip.", memory.spec.id)
|
||||
continue
|
||||
key = f"{node_id}.{memory.memory_id}"
|
||||
key = f"{node_id}.{memory.spec.id}"
|
||||
memory_blocks_dict[key] = memory.value
|
||||
|
||||
return memory_blocks_dict
|
||||
|
||||
def _sync_conversation_to_chatflow_tables(self, assistant_message: str):
|
||||
# Get user input and AI response
|
||||
user_message = self.application_generate_entity.query
|
||||
|
||||
ChatflowHistoryService.save_app_message(
|
||||
prompt_message=UserPromptMessage(content=user_message),
|
||||
prompt_message=UserPromptMessage(content=(self.application_generate_entity.query)),
|
||||
conversation_id=self.conversation.id,
|
||||
app_id=self._workflow.app_id,
|
||||
tenant_id=self._workflow.tenant_id
|
||||
|
|
@ -456,14 +453,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
|||
)
|
||||
|
||||
def _check_app_memory_updates(self):
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from services.chatflow_memory_service import ChatflowMemoryService
|
||||
|
||||
is_draft = (self.application_generate_entity.invoke_from == InvokeFrom.DEBUGGER)
|
||||
|
||||
ChatflowMemoryService.update_app_memory_after_run(
|
||||
ChatflowMemoryService.update_app_memory_if_needed(
|
||||
workflow=self._workflow,
|
||||
conversation_id=self.conversation.id,
|
||||
variable_pool=VariablePool(), # Make a fake pool to satisfy the signature
|
||||
is_draft=is_draft
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from uuid import uuid4
|
||||
|
|
@ -63,37 +62,12 @@ class MemoryBlock(BaseModel):
|
|||
|
||||
These rules implicitly determine scope and term without redundant storage.
|
||||
"""
|
||||
id: str
|
||||
memory_id: str
|
||||
name: str
|
||||
spec: MemoryBlockSpec
|
||||
tenant_id: str
|
||||
value: str
|
||||
scope: MemoryScope # Derived from node_id: None=APP, str=NODE
|
||||
term: MemoryTerm # Derived from conversation_id: None=PERSISTENT, str=SESSION
|
||||
app_id: str # None=global(future), str=app-specific
|
||||
conversation_id: Optional[str] = None # None=persistent, str=session
|
||||
node_id: Optional[str] = None # None=app-scope, str=node-scope
|
||||
created_at: Optional[datetime] = None
|
||||
updated_at: Optional[datetime] = None
|
||||
|
||||
@property
|
||||
def is_global(self) -> bool:
|
||||
"""Check if this is global memory (future feature)"""
|
||||
return self.app_id is None
|
||||
|
||||
@property
|
||||
def is_persistent(self) -> bool:
|
||||
"""Check if this is persistent memory (cross-conversation)"""
|
||||
return self.conversation_id is None
|
||||
|
||||
@property
|
||||
def is_app_scope(self) -> bool:
|
||||
"""Check if this is app-level scope"""
|
||||
return self.node_id is None
|
||||
|
||||
@property
|
||||
def is_node_scope(self) -> bool:
|
||||
"""Check if this is node-level scope"""
|
||||
return self.node_id is not None
|
||||
app_id: str
|
||||
conversation_id: Optional[str] = None
|
||||
node_id: Optional[str] = None
|
||||
|
||||
class MemoryBlockWithVisibility(BaseModel):
|
||||
id: str
|
||||
|
|
|
|||
|
|
@ -107,7 +107,6 @@ class ChatflowHistoryService:
|
|||
app_id: str,
|
||||
tenant_id: str
|
||||
) -> None:
|
||||
"""Save PromptMessage to node-specific chatflow conversation."""
|
||||
ChatflowHistoryService.save_message(
|
||||
prompt_message=prompt_message,
|
||||
conversation_id=conversation_id,
|
||||
|
|
@ -116,50 +115,6 @@ class ChatflowHistoryService:
|
|||
node_id=node_id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def save_message_version(
|
||||
prompt_message: PromptMessage,
|
||||
message_index: int,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None
|
||||
) -> None:
|
||||
"""
|
||||
Save a new version of an existing message (for message editing scenarios).
|
||||
"""
|
||||
with Session(db.engine) as session:
|
||||
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
|
||||
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True
|
||||
)
|
||||
|
||||
# Get the maximum version number for this index
|
||||
max_version = session.execute(
|
||||
select(func.max(ChatflowMessage.version)).where(
|
||||
and_(
|
||||
ChatflowMessage.conversation_id == chatflow_conv.id,
|
||||
ChatflowMessage.index == message_index
|
||||
)
|
||||
)
|
||||
).scalar() or 0
|
||||
next_version = max_version + 1
|
||||
|
||||
# Save new version of the message
|
||||
message_data = {
|
||||
'role': prompt_message.role.value,
|
||||
'content': prompt_message.get_text_content(),
|
||||
'timestamp': time.time()
|
||||
}
|
||||
|
||||
new_message_version = ChatflowMessage(
|
||||
conversation_id=chatflow_conv.id,
|
||||
index=message_index,
|
||||
version=next_version,
|
||||
data=json.dumps(message_data)
|
||||
)
|
||||
session.add(new_message_version)
|
||||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def update_visible_count(
|
||||
conversation_id: str,
|
||||
|
|
@ -168,20 +123,6 @@ class ChatflowHistoryService:
|
|||
app_id: str,
|
||||
tenant_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Update visible_count metadata for specific scope.
|
||||
|
||||
Args:
|
||||
node_id: None for app-level updates, specific node_id for node-level updates
|
||||
new_visible_count: The new visible_count value (typically preserved_turns)
|
||||
|
||||
Usage Examples:
|
||||
# Update app-level visible_count
|
||||
ChatflowHistoryService.update_visible_count(conv_id, None, 10, app_id, tenant_id)
|
||||
|
||||
# Update node-specific visible_count
|
||||
ChatflowHistoryService.update_visible_count(conv_id, "node-123", 8, app_id, tenant_id)
|
||||
"""
|
||||
with Session(db.engine) as session:
|
||||
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
|
||||
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import logging
|
|||
import threading
|
||||
import time
|
||||
from collections.abc import Sequence
|
||||
from typing import Optional, cast
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import and_, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
|
@ -24,25 +24,13 @@ from extensions.ext_database import db
|
|||
from extensions.ext_redis import redis_client
|
||||
from models import App
|
||||
from models.chatflow_memory import ChatflowMemoryVariable
|
||||
from models.workflow import WorkflowDraftVariable
|
||||
from models.workflow import Workflow, WorkflowDraftVariable
|
||||
from services.chatflow_history_service import ChatflowHistoryService
|
||||
from services.workflow_draft_variable_service import WorkflowDraftVariableService
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def _get_memory_sync_lock_key(app_id: str, conversation_id: str) -> str:
|
||||
"""Generate Redis lock key for memory sync updates
|
||||
|
||||
Args:
|
||||
app_id: Application ID
|
||||
conversation_id: Conversation ID
|
||||
|
||||
Returns:
|
||||
Formatted lock key
|
||||
"""
|
||||
return f"memory_sync_update:{app_id}:{conversation_id}"
|
||||
|
||||
class ChatflowMemoryService:
|
||||
@staticmethod
|
||||
def get_persistent_memories(app: App) -> Sequence[MemoryBlockWithVisibility]:
|
||||
|
|
@ -71,12 +59,34 @@ class ChatflowMemoryService:
|
|||
return ChatflowMemoryService._with_visibility(app, [result[0] for result in db_results])
|
||||
|
||||
@staticmethod
|
||||
def save_memory(memory: MemoryBlock, tenant_id: str, variable_pool: VariablePool, is_draft: bool) -> None:
|
||||
key = f"{memory.node_id}:{memory.memory_id}" if memory.node_id else memory.memory_id
|
||||
def save_memory(memory: MemoryBlock, variable_pool: VariablePool, is_draft: bool) -> None:
|
||||
key = f"{memory.node_id}:{memory.spec.id}" if memory.node_id else memory.spec.id
|
||||
variable_pool.add([MEMORY_BLOCK_VARIABLE_NODE_ID, key], memory.value)
|
||||
|
||||
with db.session() as session:
|
||||
session.merge(ChatflowMemoryService._to_chatflow_memory_variable(memory))
|
||||
with Session(db.engine) as session:
|
||||
existing = session.query(ChatflowMemoryVariable).filter_by(
|
||||
memory_id=memory.spec.id,
|
||||
tenant_id=memory.tenant_id,
|
||||
app_id=memory.app_id,
|
||||
node_id=memory.node_id,
|
||||
conversation_id=memory.conversation_id
|
||||
).first()
|
||||
if existing:
|
||||
existing.value = memory.value
|
||||
else:
|
||||
session.add(
|
||||
ChatflowMemoryVariable(
|
||||
memory_id=memory.spec.id,
|
||||
tenant_id=memory.tenant_id,
|
||||
app_id=memory.app_id,
|
||||
node_id=memory.node_id,
|
||||
conversation_id=memory.conversation_id,
|
||||
name=memory.spec.name,
|
||||
value=memory.value,
|
||||
term=memory.spec.term,
|
||||
scope=memory.spec.scope,
|
||||
)
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if is_draft:
|
||||
|
|
@ -84,7 +94,7 @@ class ChatflowMemoryService:
|
|||
draft_var_service = WorkflowDraftVariableService(session)
|
||||
existing_vars = draft_var_service.get_draft_variables_by_selectors(
|
||||
app_id=memory.app_id,
|
||||
selectors=[['memory_block', memory.memory_id]]
|
||||
selectors=[['memory_block', memory.spec.id]]
|
||||
)
|
||||
if existing_vars:
|
||||
draft_var = existing_vars[0]
|
||||
|
|
@ -92,8 +102,8 @@ class ChatflowMemoryService:
|
|||
else:
|
||||
draft_var = WorkflowDraftVariable.new_memory_block_variable(
|
||||
app_id=memory.app_id,
|
||||
memory_id=memory.memory_id,
|
||||
name=memory.name,
|
||||
memory_id=memory.spec.id,
|
||||
name=memory.spec.name,
|
||||
value=memory.value,
|
||||
description=""
|
||||
)
|
||||
|
|
@ -101,25 +111,30 @@ class ChatflowMemoryService:
|
|||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def get_memories_by_specs(memory_block_specs: Sequence[MemoryBlockSpec],
|
||||
tenant_id: str, app_id: str,
|
||||
conversation_id: Optional[str],
|
||||
node_id: Optional[str],
|
||||
is_draft: bool) -> Sequence[MemoryBlock]:
|
||||
return [ChatflowMemoryService.get_memory_by_spec(
|
||||
def get_memories_by_specs(
|
||||
memory_block_specs: Sequence[MemoryBlockSpec],
|
||||
tenant_id: str, app_id: str,
|
||||
conversation_id: Optional[str],
|
||||
node_id: Optional[str],
|
||||
is_draft: bool
|
||||
) -> Sequence[MemoryBlock]:
|
||||
return [ChatflowMemoryService.get_memory_by_spec(
|
||||
spec, tenant_id, app_id, conversation_id, node_id, is_draft
|
||||
) for spec in memory_block_specs]
|
||||
|
||||
@staticmethod
|
||||
def get_memory_by_spec(spec: MemoryBlockSpec,
|
||||
tenant_id: str, app_id: str,
|
||||
conversation_id: Optional[str],
|
||||
node_id: Optional[str],
|
||||
is_draft: bool) -> MemoryBlock:
|
||||
with (Session(bind=db.engine) as session):
|
||||
def get_memory_by_spec(
|
||||
spec: MemoryBlockSpec,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
conversation_id: Optional[str],
|
||||
node_id: Optional[str],
|
||||
is_draft: bool
|
||||
) -> MemoryBlock:
|
||||
with Session(db.engine) as session:
|
||||
if is_draft:
|
||||
draft_var_service = WorkflowDraftVariableService(session)
|
||||
selector = [MEMORY_BLOCK_VARIABLE_NODE_ID, f"{spec.id}.{node_id}"]\
|
||||
selector = [MEMORY_BLOCK_VARIABLE_NODE_ID, f"{spec.id}.{node_id}"] \
|
||||
if node_id else [MEMORY_BLOCK_VARIABLE_NODE_ID, spec.id]
|
||||
draft_vars = draft_var_service.get_draft_variables_by_selectors(
|
||||
app_id=app_id,
|
||||
|
|
@ -128,38 +143,92 @@ class ChatflowMemoryService:
|
|||
if draft_vars:
|
||||
draft_var = draft_vars[0]
|
||||
return MemoryBlock(
|
||||
id=draft_var.id,
|
||||
memory_id=draft_var.name,
|
||||
name=spec.name,
|
||||
value=draft_var.value,
|
||||
scope=spec.scope,
|
||||
term=spec.term,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id
|
||||
node_id=node_id,
|
||||
spec=spec
|
||||
)
|
||||
stmt = select(ChatflowMemoryVariable).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.memory_id == spec.id,
|
||||
ChatflowMemoryVariable.tenant_id == tenant_id,
|
||||
ChatflowMemoryVariable.app_id == app_id,
|
||||
ChatflowMemoryVariable.node_id == node_id,
|
||||
ChatflowMemoryVariable.conversation_id == conversation_id
|
||||
ChatflowMemoryVariable.node_id == \
|
||||
(node_id if spec.term == MemoryScope.NODE else None),
|
||||
ChatflowMemoryVariable.conversation_id == \
|
||||
(conversation_id if spec.term == MemoryTerm.SESSION else None),
|
||||
)
|
||||
)
|
||||
result = session.execute(stmt).scalar()
|
||||
if result:
|
||||
return ChatflowMemoryService._to_memory_block(result)
|
||||
return MemoryBlock(
|
||||
value=result.value,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id,
|
||||
spec=spec
|
||||
)
|
||||
return MemoryBlock(
|
||||
id="", # Will be assigned when saved
|
||||
memory_id=spec.id,
|
||||
name=spec.name,
|
||||
tenant_id=tenant_id,
|
||||
value=spec.template,
|
||||
scope=spec.scope,
|
||||
term=spec.term,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id
|
||||
node_id=node_id,
|
||||
spec=spec
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def update_app_memory_if_needed(
|
||||
workflow: Workflow,
|
||||
conversation_id: str,
|
||||
is_draft: bool
|
||||
):
|
||||
visible_messages = ChatflowHistoryService.get_visible_chat_history(
|
||||
conversation_id=conversation_id,
|
||||
app_id=workflow.app_id,
|
||||
tenant_id=workflow.tenant_id,
|
||||
node_id=None,
|
||||
)
|
||||
sync_blocks: list[MemoryBlock] = []
|
||||
async_blocks: list[MemoryBlock] = []
|
||||
for memory_spec in workflow.memory_blocks:
|
||||
if memory_spec.scope == MemoryScope.APP:
|
||||
memory = ChatflowMemoryService.get_memory_by_spec(
|
||||
spec=memory_spec,
|
||||
tenant_id=workflow.tenant_id,
|
||||
app_id=workflow.app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=None,
|
||||
is_draft=is_draft
|
||||
)
|
||||
if ChatflowMemoryService._should_update_memory(memory, visible_messages):
|
||||
if memory.spec.schedule_mode == MemoryScheduleMode.SYNC:
|
||||
sync_blocks.append(memory)
|
||||
else:
|
||||
async_blocks.append(memory)
|
||||
|
||||
if not sync_blocks and not async_blocks:
|
||||
return
|
||||
|
||||
# async mode: submit individual async tasks directly
|
||||
for memory_block in async_blocks:
|
||||
ChatflowMemoryService._app_submit_async_memory_update(
|
||||
block=memory_block,
|
||||
is_draft=is_draft,
|
||||
visible_messages=visible_messages
|
||||
)
|
||||
|
||||
# sync mode: submit a batch update task
|
||||
if sync_blocks:
|
||||
ChatflowMemoryService._app_submit_sync_memory_batch_update(
|
||||
sync_blocks=sync_blocks,
|
||||
is_draft=is_draft,
|
||||
conversation_id=conversation_id,
|
||||
app_id=workflow.app_id,
|
||||
visible_messages=visible_messages
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
|
@ -172,307 +241,47 @@ class ChatflowMemoryService:
|
|||
variable_pool: VariablePool,
|
||||
is_draft: bool
|
||||
) -> bool:
|
||||
if not ChatflowMemoryService._should_update_memory(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
memory_block_spec=memory_block_spec,
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id
|
||||
):
|
||||
return False
|
||||
|
||||
if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC:
|
||||
# Node-level sync: blocking execution
|
||||
ChatflowMemoryService._update_node_memory_sync(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
memory_block_spec=memory_block_spec,
|
||||
node_id=node_id,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
)
|
||||
else:
|
||||
# Node-level async: execute asynchronously
|
||||
ChatflowMemoryService._update_node_memory_async(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
memory_block_spec=memory_block_spec,
|
||||
node_id=node_id,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _get_memory_from_chatflow_table(memory_id: str, tenant_id: str,
|
||||
app_id: Optional[str] = None,
|
||||
conversation_id: Optional[str] = None,
|
||||
node_id: Optional[str] = None) -> Optional[MemoryBlock]:
|
||||
stmt = select(ChatflowMemoryVariable).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.app_id == app_id,
|
||||
ChatflowMemoryVariable.memory_id == memory_id,
|
||||
ChatflowMemoryVariable.tenant_id == tenant_id,
|
||||
ChatflowMemoryVariable.conversation_id == conversation_id,
|
||||
ChatflowMemoryVariable.node_id == node_id
|
||||
)
|
||||
)
|
||||
|
||||
with db.session() as session:
|
||||
result = session.execute(stmt).first()
|
||||
return ChatflowMemoryService._to_memory_block(result[0]) if result else None
|
||||
|
||||
@staticmethod
|
||||
def _to_memory_block(entity: ChatflowMemoryVariable) -> MemoryBlock:
|
||||
scope = MemoryScope(entity.scope) if not isinstance(entity.scope, MemoryScope) else entity.scope
|
||||
term = MemoryTerm(entity.term) if not isinstance(entity.term, MemoryTerm) else entity.term
|
||||
return MemoryBlock(
|
||||
id=entity.id,
|
||||
memory_id=entity.memory_id,
|
||||
name=entity.name,
|
||||
value=entity.value,
|
||||
scope=scope,
|
||||
term=term,
|
||||
app_id=cast(str, entity.app_id), # It's supposed to be not nullable for now
|
||||
conversation_id=entity.conversation_id,
|
||||
node_id=entity.node_id,
|
||||
created_at=entity.created_at,
|
||||
updated_at=entity.updated_at,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _to_chatflow_memory_variable(memory_block: MemoryBlock) -> ChatflowMemoryVariable:
|
||||
return ChatflowMemoryVariable(
|
||||
id=memory_block.id,
|
||||
node_id=memory_block.node_id,
|
||||
memory_id=memory_block.memory_id,
|
||||
name=memory_block.name,
|
||||
value=memory_block.value,
|
||||
scope=memory_block.scope,
|
||||
term=memory_block.term,
|
||||
app_id=memory_block.app_id,
|
||||
conversation_id=memory_block.conversation_id,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _with_visibility(
|
||||
app: App,
|
||||
raw_results: Sequence[ChatflowMemoryVariable]
|
||||
) -> Sequence[MemoryBlockWithVisibility]:
|
||||
workflow = WorkflowService().get_published_workflow(app)
|
||||
if not workflow:
|
||||
return []
|
||||
results = []
|
||||
for db_result in raw_results:
|
||||
spec = next((spec for spec in workflow.memory_blocks if spec.id == db_result.memory_id), None)
|
||||
if spec:
|
||||
results.append(
|
||||
MemoryBlockWithVisibility(
|
||||
id=db_result.memory_id,
|
||||
name=db_result.name,
|
||||
value=db_result.value,
|
||||
end_user_editable=spec.end_user_editable,
|
||||
end_user_visible=spec.end_user_visible,
|
||||
)
|
||||
)
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def _should_update_memory(tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
conversation_id: str, node_id: Optional[str] = None) -> bool:
|
||||
"""Check if memory should be updated based on strategy"""
|
||||
# Currently, `memory_block_spec.strategy != MemoryStrategy.ON_TURNS` is not possible, but possible in the future
|
||||
|
||||
# Check turn count
|
||||
turn_key = f"memory_turn_count:{tenant_id}:{app_id}:{conversation_id}"
|
||||
if node_id:
|
||||
turn_key += f":{node_id}"
|
||||
|
||||
current_turns = redis_client.get(turn_key)
|
||||
current_turns = int(current_turns) if current_turns else 0
|
||||
current_turns += 1
|
||||
|
||||
# Update count
|
||||
redis_client.set(turn_key, current_turns)
|
||||
|
||||
return current_turns % memory_block_spec.update_turns == 0
|
||||
|
||||
# App-level async update method
|
||||
@staticmethod
|
||||
def _submit_async_memory_update(tenant_id: str, app_id: str,
|
||||
block: MemoryBlockSpec,
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Submit async memory update task"""
|
||||
|
||||
# Execute update asynchronously using thread
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._update_app_single_memory,
|
||||
kwargs={
|
||||
'tenant_id': tenant_id,
|
||||
'app_id': app_id,
|
||||
'memory_block_spec': block,
|
||||
'conversation_id': conversation_id,
|
||||
'variable_pool': variable_pool,
|
||||
'is_draft': is_draft
|
||||
},
|
||||
daemon=True
|
||||
)
|
||||
thread.start()
|
||||
|
||||
# Node-level sync update method
|
||||
@staticmethod
|
||||
def _update_node_memory_sync(tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
node_id: str, conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Synchronously update node memory (blocking execution)"""
|
||||
ChatflowMemoryService._perform_memory_update(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
memory_block_spec=memory_block_spec,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
node_id=node_id,
|
||||
is_draft=is_draft
|
||||
)
|
||||
# Wait for update to complete before returning
|
||||
|
||||
# Node-level async update method
|
||||
@staticmethod
|
||||
def _update_node_memory_async(
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
node_id: str,
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Asynchronously update node memory (submit task)"""
|
||||
|
||||
# Execute update asynchronously using thread
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._perform_node_memory_update,
|
||||
kwargs={
|
||||
'memory_block_spec': memory_block_spec,
|
||||
'tenant_id': tenant_id,
|
||||
'app_id': app_id,
|
||||
'node_id': node_id,
|
||||
'variable_pool': variable_pool,
|
||||
'is_draft': is_draft
|
||||
},
|
||||
daemon=True
|
||||
)
|
||||
thread.start()
|
||||
# Return immediately without waiting
|
||||
|
||||
@staticmethod
|
||||
def _perform_node_memory_update(
|
||||
*,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
node_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False
|
||||
):
|
||||
ChatflowMemoryService._perform_memory_update(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
memory_block_spec=memory_block_spec,
|
||||
conversation_id=str(variable_pool.get(('sys', 'conversation_id'))),
|
||||
variable_pool=variable_pool,
|
||||
node_id=node_id,
|
||||
is_draft=is_draft
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _update_app_single_memory(*, tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Update single memory"""
|
||||
ChatflowMemoryService._perform_memory_update(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
memory_block_spec=memory_block_spec,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
node_id=None, # App-level memory doesn't have node_id
|
||||
is_draft=is_draft
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _perform_memory_update(
|
||||
tenant_id: str, app_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
node_id: Optional[str],
|
||||
is_draft: bool):
|
||||
history = ChatflowHistoryService.get_visible_chat_history(
|
||||
visible_messages = ChatflowHistoryService.get_visible_chat_history(
|
||||
conversation_id=conversation_id,
|
||||
app_id=app_id,
|
||||
tenant_id=tenant_id,
|
||||
node_id=node_id,
|
||||
)
|
||||
memory_block = ChatflowMemoryService.get_memory_by_spec(
|
||||
tenant_id=tenant_id,
|
||||
spec=memory_block_spec,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id,
|
||||
is_draft=is_draft
|
||||
)
|
||||
updated_value = LLMGenerator.update_memory_block(
|
||||
tenant_id=tenant_id,
|
||||
visible_history=ChatflowMemoryService._format_chat_history(history),
|
||||
if not ChatflowMemoryService._should_update_memory(
|
||||
memory_block=memory_block,
|
||||
memory_spec=memory_block_spec,
|
||||
)
|
||||
# Save updated memory
|
||||
updated_memory = MemoryBlock(
|
||||
id=memory_block.id,
|
||||
memory_id=memory_block_spec.id,
|
||||
name=memory_block_spec.name,
|
||||
value=updated_value,
|
||||
scope=memory_block_spec.scope,
|
||||
term=memory_block_spec.term,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id if memory_block_spec.term == MemoryTerm.SESSION else None,
|
||||
node_id=node_id
|
||||
)
|
||||
ChatflowMemoryService.save_memory(updated_memory, tenant_id, variable_pool, is_draft)
|
||||
visible_history=visible_messages
|
||||
):
|
||||
return False
|
||||
|
||||
# Not implemented yet: Send success event
|
||||
# self._send_memory_update_event(memory_block_spec.id, "completed", updated_value)
|
||||
if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC:
|
||||
# Node-level sync: blocking execution
|
||||
ChatflowMemoryService._update_node_memory_sync(
|
||||
visible_messages=visible_messages,
|
||||
memory_block=memory_block,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
)
|
||||
else:
|
||||
# Node-level async: execute asynchronously
|
||||
ChatflowMemoryService._update_node_memory_async(
|
||||
memory_block=memory_block,
|
||||
visible_messages=visible_messages,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _format_chat_history(messages: Sequence[PromptMessage]) -> Sequence[tuple[str, str]]:
|
||||
result = []
|
||||
for message in messages:
|
||||
result.append((str(message.role.value), message.get_text_content()))
|
||||
return result
|
||||
|
||||
# App-level sync batch update related methods
|
||||
@staticmethod
|
||||
def wait_for_sync_memory_completion(workflow, conversation_id: str):
|
||||
"""Wait for sync memory update to complete, maximum 50 seconds
|
||||
|
||||
Args:
|
||||
workflow: Workflow object
|
||||
conversation_id: Conversation ID
|
||||
|
||||
Raises:
|
||||
MemorySyncTimeoutError: Raised when timeout is reached
|
||||
"""
|
||||
from core.memory.entities import MemoryScope
|
||||
"""Wait for sync memory update to complete, maximum 50 seconds"""
|
||||
|
||||
memory_blocks = workflow.memory_blocks
|
||||
sync_memory_blocks = [
|
||||
|
|
@ -505,54 +314,132 @@ class ChatflowMemoryService:
|
|||
)
|
||||
|
||||
@staticmethod
|
||||
def update_app_memory_after_run(workflow, conversation_id: str, variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Update app-level memory after run completion"""
|
||||
sync_blocks = []
|
||||
async_blocks = []
|
||||
for block in workflow.memory_blocks:
|
||||
if block.scope == MemoryScope.APP:
|
||||
if block.update_mode == "sync":
|
||||
sync_blocks.append(block)
|
||||
else:
|
||||
async_blocks.append(block)
|
||||
|
||||
# async mode: submit individual async tasks directly
|
||||
for block in async_blocks:
|
||||
ChatflowMemoryService._submit_async_memory_update(
|
||||
tenant_id=workflow.tenant_id,
|
||||
app_id=workflow.app_id,
|
||||
block=block,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
)
|
||||
|
||||
# sync mode: submit a batch update task
|
||||
if sync_blocks:
|
||||
ChatflowMemoryService._submit_sync_memory_batch_update(
|
||||
workflow=workflow,
|
||||
sync_blocks=sync_blocks,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
def _with_visibility(
|
||||
app: App,
|
||||
raw_results: Sequence[ChatflowMemoryVariable]
|
||||
) -> Sequence[MemoryBlockWithVisibility]:
|
||||
workflow = WorkflowService().get_published_workflow(app)
|
||||
if not workflow:
|
||||
return []
|
||||
results = []
|
||||
for chatflow_memory_variable in raw_results:
|
||||
spec = next(
|
||||
(spec for spec in workflow.memory_blocks if spec.id == chatflow_memory_variable.memory_id),
|
||||
None
|
||||
)
|
||||
if spec:
|
||||
results.append(
|
||||
MemoryBlockWithVisibility(
|
||||
id=chatflow_memory_variable.memory_id,
|
||||
name=chatflow_memory_variable.name,
|
||||
value=chatflow_memory_variable.value,
|
||||
end_user_editable=spec.end_user_editable,
|
||||
end_user_visible=spec.end_user_visible,
|
||||
)
|
||||
)
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def _submit_sync_memory_batch_update(workflow,
|
||||
sync_blocks: list[MemoryBlockSpec],
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Submit sync memory batch update task"""
|
||||
def _should_update_memory(
|
||||
memory_block: MemoryBlock,
|
||||
visible_history: Sequence[PromptMessage]
|
||||
) -> bool:
|
||||
return len(visible_history) > memory_block.spec.update_turns
|
||||
|
||||
# Execute batch update asynchronously using thread
|
||||
@staticmethod
|
||||
def _app_submit_async_memory_update(
|
||||
block: MemoryBlock,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
is_draft: bool
|
||||
):
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._perform_memory_update,
|
||||
kwargs={
|
||||
'memory_block': block,
|
||||
'visible_messages': visible_messages,
|
||||
'variable_pool': VariablePool(),
|
||||
'is_draft': is_draft
|
||||
},
|
||||
)
|
||||
thread.start()
|
||||
|
||||
@staticmethod
|
||||
def _app_submit_sync_memory_batch_update(
|
||||
sync_blocks: Sequence[MemoryBlock],
|
||||
app_id: str,
|
||||
conversation_id: str,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
is_draft: bool
|
||||
):
|
||||
"""Submit sync memory batch update task"""
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._batch_update_sync_memory,
|
||||
kwargs={
|
||||
'workflow': workflow,
|
||||
'sync_blocks': sync_blocks,
|
||||
'app_id': app_id,
|
||||
'conversation_id': conversation_id,
|
||||
'visible_messages': visible_messages,
|
||||
'is_draft': is_draft
|
||||
},
|
||||
)
|
||||
thread.start()
|
||||
|
||||
@staticmethod
|
||||
def _batch_update_sync_memory(
|
||||
sync_blocks: Sequence[MemoryBlock],
|
||||
app_id: str,
|
||||
conversation_id: str,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
is_draft: bool
|
||||
):
|
||||
try:
|
||||
lock_key = _get_memory_sync_lock_key(app_id, conversation_id)
|
||||
with redis_client.lock(lock_key, timeout=120):
|
||||
threads = []
|
||||
for block in sync_blocks:
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._perform_memory_update,
|
||||
kwargs={
|
||||
'memory_block': block,
|
||||
'visible_messages': visible_messages,
|
||||
'variable_pool': VariablePool(),
|
||||
'is_draft': is_draft
|
||||
},
|
||||
)
|
||||
threads.append(thread)
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
except Exception as e:
|
||||
logger.exception("Error batch updating memory", exc_info=e)
|
||||
|
||||
@staticmethod
|
||||
def _update_node_memory_sync(
|
||||
memory_block: MemoryBlock,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool
|
||||
):
|
||||
ChatflowMemoryService._perform_memory_update(
|
||||
memory_block=memory_block,
|
||||
visible_messages=visible_messages,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _update_node_memory_async(
|
||||
memory_block: MemoryBlock,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False
|
||||
):
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._perform_memory_update,
|
||||
kwargs={
|
||||
'memory_block': memory_block,
|
||||
'visible_messages': visible_messages,
|
||||
'variable_pool': variable_pool,
|
||||
'is_draft': is_draft
|
||||
},
|
||||
|
|
@ -561,39 +448,43 @@ class ChatflowMemoryService:
|
|||
thread.start()
|
||||
|
||||
@staticmethod
|
||||
def _batch_update_sync_memory(*, workflow,
|
||||
sync_blocks: list[MemoryBlockSpec],
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool = False):
|
||||
"""Batch update sync memory (with Redis lock)"""
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
def _perform_memory_update(
|
||||
memory_block: MemoryBlock,
|
||||
variable_pool: VariablePool,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
is_draft: bool
|
||||
):
|
||||
updated_value = LLMGenerator.update_memory_block(
|
||||
tenant_id=memory_block.tenant_id,
|
||||
visible_history=ChatflowMemoryService._format_chat_history(visible_messages),
|
||||
memory_block=memory_block,
|
||||
memory_spec=memory_block.spec,
|
||||
)
|
||||
updated_memory = MemoryBlock(
|
||||
tenant_id=memory_block.tenant_id,
|
||||
value=updated_value,
|
||||
spec=memory_block.spec,
|
||||
app_id=memory_block.app_id,
|
||||
conversation_id=memory_block.conversation_id,
|
||||
node_id=memory_block.node_id
|
||||
)
|
||||
ChatflowMemoryService.save_memory(updated_memory, variable_pool, is_draft)
|
||||
|
||||
lock_key = _get_memory_sync_lock_key(workflow.app_id, conversation_id)
|
||||
@staticmethod
|
||||
def _format_chat_history(messages: Sequence[PromptMessage]) -> Sequence[tuple[str, str]]:
|
||||
result = []
|
||||
for message in messages:
|
||||
result.append((str(message.role.value), message.get_text_content()))
|
||||
return result
|
||||
|
||||
# Use Redis lock context manager (30 seconds timeout)
|
||||
with redis_client.lock(lock_key, timeout=30):
|
||||
try:
|
||||
# Update all sync memory in parallel
|
||||
with ThreadPoolExecutor(max_workers=5) as executor:
|
||||
futures = []
|
||||
for block in sync_blocks:
|
||||
future = executor.submit(
|
||||
ChatflowMemoryService._update_app_single_memory,
|
||||
tenant_id=workflow.tenant_id,
|
||||
app_id=workflow.app_id,
|
||||
memory_block_spec=block,
|
||||
conversation_id=conversation_id,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft
|
||||
)
|
||||
futures.append(future)
|
||||
def _get_memory_sync_lock_key(app_id: str, conversation_id: str) -> str:
|
||||
"""Generate Redis lock key for memory sync updates
|
||||
|
||||
# Wait for all updates to complete
|
||||
for future in futures:
|
||||
try:
|
||||
future.result()
|
||||
except Exception as e:
|
||||
logger.exception("Failed to update memory", exc_info=e)
|
||||
except Exception as e:
|
||||
logger.exception("Failed to update sync memory for app %s", workflow.app_id, exc_info=e)
|
||||
Args:
|
||||
app_id: Application ID
|
||||
conversation_id: Conversation ID
|
||||
|
||||
Returns:
|
||||
Formatted lock key
|
||||
"""
|
||||
return f"memory_sync_update:{app_id}:{conversation_id}"
|
||||
|
|
|
|||
|
|
@ -761,9 +761,9 @@ def _fetch_memory_blocks(workflow: Workflow, conversation_id: str, is_draft: boo
|
|||
is_draft=is_draft,
|
||||
)
|
||||
for memory in memories:
|
||||
if memory.scope == MemoryScope.APP:
|
||||
memory_blocks[memory.memory_id] = memory.value
|
||||
if memory.spec.scope == MemoryScope.APP:
|
||||
memory_blocks[memory.spec.id] = memory.value
|
||||
else: # NODE scope
|
||||
memory_blocks[f"{memory.node_id}.{memory.memory_id}"] = memory.value
|
||||
memory_blocks[f"{memory.node_id}.{memory.spec.id}"] = memory.value
|
||||
|
||||
return memory_blocks
|
||||
|
|
|
|||
Loading…
Reference in New Issue