feat: sync conversation history with `chatflow_` tables in chatflow

This commit is contained in:
Stream 2025-08-21 13:03:19 +08:00
parent a13cb7e1c5
commit 97cd21d3be
No known key found for this signature in database
GPG Key ID: 9475891C9507B4F3
1 changed files with 38 additions and 2 deletions

View File

@ -1,6 +1,6 @@
import logging
from collections.abc import Mapping, MutableMapping
from typing import Any, Optional, cast
from typing import Any, Optional, cast, override
from sqlalchemy import select
from sqlalchemy.orm import Session
@ -21,12 +21,13 @@ from core.app.entities.queue_entities import (
)
from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature
from core.memory.entities import MemoryScope
from core.memory.errors import MemorySyncTimeoutError
from core.model_runtime.entities import AssistantPromptMessage, UserPromptMessage
from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
from core.variables.variables import VariableUnion
from core.workflow.callbacks import WorkflowCallback, WorkflowLoggingCallback
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.graph_engine.entities.event import GraphRunSucceededEvent
from core.workflow.system_variable import SystemVariable
from core.workflow.variable_loader import VariableLoader
from core.workflow.workflow_entry import WorkflowEntry
@ -35,6 +36,7 @@ from models import Workflow
from models.enums import UserFrom
from models.model import App, Conversation, Message, MessageAnnotation
from models.workflow import ConversationVariable, WorkflowType
from services.chatflow_history_service import ChatflowHistoryService
from services.chatflow_memory_service import ChatflowMemoryService
logger = logging.getLogger(__name__)
@ -183,6 +185,23 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
for event in generator:
self._handle_event(workflow_entry, event)
@override
def _handle_event(self, workflow_entry: WorkflowEntry, event: Any) -> None:
super()._handle_event(workflow_entry, event)
if isinstance(event, GraphRunSucceededEvent):
workflow_outputs = event.outputs
if not workflow_outputs:
logger.warning("Chatflow output is empty.")
return
assistant_message = workflow_outputs.get('answer')
if not assistant_message:
logger.warning("Chatflow output does not contain 'answer'.")
return
try:
self._sync_conversation_to_chatflow_tables(assistant_message)
except Exception as e:
logger.exception("Failed to sync conversation to memory tables", exc_info=e)
def handle_input_moderation(
self,
app_record: App,
@ -411,3 +430,20 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
memory_blocks_dict[key] = memory.value
return memory_blocks_dict
def _sync_conversation_to_chatflow_tables(self, assistant_message: str):
# Get user input and AI response
user_message = self.application_generate_entity.query
ChatflowHistoryService.save_app_message(
prompt_message=UserPromptMessage(content=user_message),
conversation_id=self.conversation.id,
app_id=self._workflow.app_id,
tenant_id=self._workflow.tenant_id
)
ChatflowHistoryService.save_app_message(
prompt_message=AssistantPromptMessage(content=assistant_message),
conversation_id=self.conversation.id,
app_id=self._workflow.app_id,
tenant_id=self._workflow.tenant_id
)