diff --git a/api/controllers/web/chatflow_memory.py b/api/controllers/web/chatflow_memory.py new file mode 100644 index 0000000000..92b259c498 --- /dev/null +++ b/api/controllers/web/chatflow_memory.py @@ -0,0 +1,57 @@ +from flask_restful import reqparse +from sqlalchemy.orm.session import Session + +from controllers.web import api +from controllers.web.wraps import WebApiResource +from libs.helper import uuid_value +from models import db +from models.chatflow_memory import ChatflowMemoryVariable +from services.chatflow_memory_service import ChatflowMemoryService +from services.workflow_service import WorkflowService + + +class MemoryListApi(WebApiResource): + def get(self, app_model): + parser = reqparse.RequestParser() + parser.add_argument("conversation_id", required=False, type=uuid_value, location="args") + args = parser.parse_args() + conversation_id = args.get("conversation_id") + + result = ChatflowMemoryService.get_persistent_memories(app_model) + if conversation_id: + result = [*result, *ChatflowMemoryService.get_session_memories(app_model, conversation_id)] + + return [it for it in result if it.end_user_visible] + +class MemoryEditApi(WebApiResource): + def put(self, app_model): + parser = reqparse.RequestParser() + parser.add_argument('id', type=str, required=True) + parser.add_argument('node_id', type=str, required=False) + parser.add_argument('update', type=str, required=True) + args = parser.parse_args() + workflow = WorkflowService().get_published_workflow(app_model) + if not workflow: + return {'error': 'Workflow not found'}, 404 + memory_spec = next((it for it in workflow.memory_blocks if it.id == args['id']), None) + if not memory_spec: + return {'error': 'Memory not found'}, 404 + if not memory_spec.end_user_editable: + return {'error': 'Memory not editable'}, 403 + with Session(db.engine) as session: + ChatflowMemoryVariable( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + node_id=args['node_id'], + memory_id=args['id'], + name=memory_spec.name, + value=args['update'], + scope=memory_spec.scope, + term=memory_spec.term, + ) + session.add(memory_spec) + session.commit() + return '', 204 + +api.add_resource(MemoryListApi, '/memories') +api.add_resource(MemoryEditApi, '/memory-edit') diff --git a/api/core/memory/entities.py b/api/core/memory/entities.py index 26927c7931..175df321fc 100644 --- a/api/core/memory/entities.py +++ b/api/core/memory/entities.py @@ -92,6 +92,13 @@ class MemoryBlock(BaseModel): """Check if this is node-level scope""" return self.node_id is not None +class MemoryBlockWithVisibility(BaseModel): + id: str + name: str + value: str + end_user_visible: bool + end_user_editable: bool + class ChatflowConversationMetadata(BaseModel): """Metadata for chatflow conversation with visible message count""" diff --git a/api/services/chatflow_memory_service.py b/api/services/chatflow_memory_service.py index 28b4d2c095..dc06397da1 100644 --- a/api/services/chatflow_memory_service.py +++ b/api/services/chatflow_memory_service.py @@ -10,6 +10,7 @@ from sqlalchemy.orm import Session from core.memory.entities import ( MemoryBlock, MemoryBlockSpec, + MemoryBlockWithVisibility, MemoryScheduleMode, MemoryScope, MemoryStrategy, @@ -21,15 +22,13 @@ from core.workflow.constants import MEMORY_BLOCK_VARIABLE_NODE_ID from core.workflow.entities.variable_pool import VariablePool from extensions.ext_database import db from extensions.ext_redis import redis_client +from models import App from models.chatflow_memory import ChatflowMemoryVariable from services.chatflow_history_service import ChatflowHistoryService +from services.workflow_service import WorkflowService logger = logging.getLogger(__name__) -# Important note: Since Dify uses gevent, we don't need an extra task queue (e.g., Celery). -# Threads created via threading.Thread are automatically patched into greenlets in a gevent environment, -# enabling efficient asynchronous execution. - def _get_memory_sync_lock_key(app_id: str, conversation_id: str) -> str: """Generate Redis lock key for memory sync updates @@ -48,6 +47,32 @@ class ChatflowMemoryService: All methods are static and do not require instantiation. """ + @staticmethod + def get_persistent_memories(app: App) -> Sequence[MemoryBlockWithVisibility]: + stmt = select(ChatflowMemoryVariable).where( + and_( + ChatflowMemoryVariable.tenant_id == app.tenant_id, + ChatflowMemoryVariable.app_id == app.id, + ChatflowMemoryVariable.conversation_id == None + ) + ) + with db.session() as session: + db_results = session.execute(stmt).all() + return ChatflowMemoryService._with_visibility(app, [result[0] for result in db_results]) + + @staticmethod + def get_session_memories(app: App, conversation_id: str) -> Sequence[MemoryBlockWithVisibility]: + stmt = select(ChatflowMemoryVariable).where( + and_( + ChatflowMemoryVariable.tenant_id == app.tenant_id, + ChatflowMemoryVariable.app_id == app.id, + ChatflowMemoryVariable.conversation_id == conversation_id + ) + ) + with db.session() as session: + db_results = session.execute(stmt).all() + return ChatflowMemoryService._with_visibility(app, [result[0] for result in db_results]) + @staticmethod def get_memory(memory_id: str, tenant_id: str, app_id: Optional[str] = None, @@ -347,6 +372,29 @@ class ChatflowMemoryService: ) return True + @staticmethod + def _with_visibility( + app: App, + raw_results: Sequence[ChatflowMemoryVariable] + ) -> Sequence[MemoryBlockWithVisibility]: + workflow = WorkflowService().get_published_workflow(app) + if not workflow: + return [] + results = [] + for db_result in raw_results: + spec = next((spec for spec in workflow.memory_blocks if spec.id == db_result.memory_id), None) + if spec: + results.append( + MemoryBlockWithVisibility( + id=db_result.memory_id, + name=db_result.name, + value=db_result.value, + end_user_editable=spec.end_user_editable, + end_user_visible=spec.end_user_visible, + ) + ) + return results + @staticmethod def _should_update_memory(tenant_id: str, app_id: str, memory_block_spec: MemoryBlockSpec,