From 97cd21d3beac59e6a95d605be7efa69022d66ea4 Mon Sep 17 00:00:00 2001 From: Stream Date: Thu, 21 Aug 2025 13:03:19 +0800 Subject: [PATCH] feat: sync conversation history with `chatflow_` tables in chatflow --- api/core/app/apps/advanced_chat/app_runner.py | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 4ba477682f..4a672bdf20 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -1,6 +1,6 @@ import logging from collections.abc import Mapping, MutableMapping -from typing import Any, Optional, cast +from typing import Any, Optional, cast, override from sqlalchemy import select from sqlalchemy.orm import Session @@ -21,12 +21,13 @@ from core.app.entities.queue_entities import ( ) from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature from core.memory.entities import MemoryScope -from core.memory.errors import MemorySyncTimeoutError +from core.model_runtime.entities import AssistantPromptMessage, UserPromptMessage from core.moderation.base import ModerationError from core.moderation.input_moderation import InputModeration from core.variables.variables import VariableUnion from core.workflow.callbacks import WorkflowCallback, WorkflowLoggingCallback from core.workflow.entities.variable_pool import VariablePool +from core.workflow.graph_engine.entities.event import GraphRunSucceededEvent from core.workflow.system_variable import SystemVariable from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry @@ -35,6 +36,7 @@ from models import Workflow from models.enums import UserFrom from models.model import App, Conversation, Message, MessageAnnotation from models.workflow import ConversationVariable, WorkflowType +from services.chatflow_history_service import ChatflowHistoryService from services.chatflow_memory_service import ChatflowMemoryService logger = logging.getLogger(__name__) @@ -183,6 +185,23 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): for event in generator: self._handle_event(workflow_entry, event) + @override + def _handle_event(self, workflow_entry: WorkflowEntry, event: Any) -> None: + super()._handle_event(workflow_entry, event) + if isinstance(event, GraphRunSucceededEvent): + workflow_outputs = event.outputs + if not workflow_outputs: + logger.warning("Chatflow output is empty.") + return + assistant_message = workflow_outputs.get('answer') + if not assistant_message: + logger.warning("Chatflow output does not contain 'answer'.") + return + try: + self._sync_conversation_to_chatflow_tables(assistant_message) + except Exception as e: + logger.exception("Failed to sync conversation to memory tables", exc_info=e) + def handle_input_moderation( self, app_record: App, @@ -411,3 +430,20 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): memory_blocks_dict[key] = memory.value return memory_blocks_dict + + def _sync_conversation_to_chatflow_tables(self, assistant_message: str): + # Get user input and AI response + user_message = self.application_generate_entity.query + + ChatflowHistoryService.save_app_message( + prompt_message=UserPromptMessage(content=user_message), + conversation_id=self.conversation.id, + app_id=self._workflow.app_id, + tenant_id=self._workflow.tenant_id + ) + ChatflowHistoryService.save_app_message( + prompt_message=AssistantPromptMessage(content=assistant_message), + conversation_id=self.conversation.id, + app_id=self._workflow.app_id, + tenant_id=self._workflow.tenant_id + )