feat: new instruction-generate with new LLMGenerator

Signed-off-by: Stream <Stream_2@qq.com>
This commit is contained in:
Stream 2025-11-14 16:43:33 +08:00
parent 77f70c5973
commit b0f73d681c
No known key found for this signature in database
GPG Key ID: 033728094B100D70
4 changed files with 713 additions and 91 deletions

View File

@ -1,5 +1,3 @@
from collections.abc import Sequence
from flask_restx import Resource, fields, reqparse
from controllers.console import api, console_ns
@ -11,13 +9,9 @@ from controllers.console.app.error import (
)
from controllers.console.wraps import account_initialization_required, setup_required
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.helper.code_executor.javascript.javascript_code_provider import JavascriptCodeProvider
from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider
from core.llm_generator.llm_generator import LLMGenerator
from core.model_runtime.errors.invoke import InvokeError
from extensions.ext_database import db
from libs.login import current_account_with_tenant, login_required
from models import App
from services.workflow_service import WorkflowService
@ -177,11 +171,29 @@ class InstructionGenerateApi(Resource):
api.model(
"InstructionGenerateRequest",
{
"flow_id": fields.String(required=True, description="Workflow/Flow ID"),
"node_id": fields.String(description="Node ID for workflow context"),
"current": fields.String(description="Current instruction text"),
"language": fields.String(default="javascript", description="Programming language (javascript/python)"),
"instruction": fields.String(required=True, description="Instruction for generation"),
"type": fields.String(
required=True,
description="Request type",
enum=[
"legacy_prompt_generate",
"workflow_prompt_generate",
"workflow_code_generate",
"workflow_prompt_edit",
"workflow_code_edit",
"memory_template_generate",
"memory_instruction_generate",
"memory_template_edit",
"memory_instruction_edit",
]
),
"flow_id": fields.String(description="Workflow/Flow ID"),
"node_id": fields.String(description="Node ID (optional)"),
"current": fields.String(description="Current content"),
"language": fields.String(
default="javascript",
description="Programming language (javascript/python)"
),
"instruction": fields.String(required=True, description="User instruction"),
"model_config": fields.Raw(required=True, description="Model configuration"),
"ideal_output": fields.String(description="Expected ideal output"),
},
@ -196,7 +208,8 @@ class InstructionGenerateApi(Resource):
def post(self):
parser = (
reqparse.RequestParser()
.add_argument("flow_id", type=str, required=True, default="", location="json")
.add_argument("type", type=str, required=True, nullable=False, location="json")
.add_argument("flow_id", type=str, required=False, default="", location="json")
.add_argument("node_id", type=str, required=False, default="", location="json")
.add_argument("current", type=str, required=False, default="", location="json")
.add_argument("language", type=str, required=False, default="javascript", location="json")
@ -206,72 +219,16 @@ class InstructionGenerateApi(Resource):
)
args = parser.parse_args()
_, current_tenant_id = current_account_with_tenant()
code_template = (
Python3CodeProvider.get_default_code()
if args["language"] == "python"
else (JavascriptCodeProvider.get_default_code())
if args["language"] == "javascript"
else ""
)
try:
# Generate from nothing for a workflow node
if (args["current"] == code_template or args["current"] == "") and args["node_id"] != "":
app = db.session.query(App).where(App.id == args["flow_id"]).first()
if not app:
return {"error": f"app {args['flow_id']} not found"}, 400
workflow = WorkflowService().get_draft_workflow(app_model=app)
if not workflow:
return {"error": f"workflow {args['flow_id']} not found"}, 400
nodes: Sequence = workflow.graph_dict["nodes"]
node = [node for node in nodes if node["id"] == args["node_id"]]
if len(node) == 0:
return {"error": f"node {args['node_id']} not found"}, 400
node_type = node[0]["data"]["type"]
match node_type:
case "llm":
return LLMGenerator.generate_rule_config(
current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=True,
)
case "agent":
return LLMGenerator.generate_rule_config(
current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=True,
)
case "code":
return LLMGenerator.generate_code(
tenant_id=current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
code_language=args["language"],
)
case _:
return {"error": f"invalid node type: {node_type}"}
if args["node_id"] == "" and args["current"] != "": # For legacy app without a workflow
return LLMGenerator.instruction_modify_legacy(
tenant_id=current_tenant_id,
flow_id=args["flow_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
if args["node_id"] != "" and args["current"] != "": # For workflow node
return LLMGenerator.instruction_modify_workflow(
tenant_id=current_tenant_id,
flow_id=args["flow_id"],
node_id=args["node_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
workflow_service=WorkflowService(),
)
return {"error": "incompatible parameters"}, 400
# Validate parameters
is_valid, error_message = self._validate_params(args["type"], args)
if not is_valid:
return {"error": error_message}, 400
# Route based on type
return self._handle_by_type(args["type"], args, current_tenant_id)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
except QuotaExceededError:
@ -281,6 +238,131 @@ class InstructionGenerateApi(Resource):
except InvokeError as e:
raise CompletionRequestError(e.description)
def _validate_params(self, request_type: str, args: dict) -> tuple[bool, str]:
"""
Validate request parameters
Returns:
(is_valid, error_message)
"""
# All types require instruction and model_config
if not args.get("instruction"):
return False, "instruction is required"
if not args.get("model_config"):
return False, "model_config is required"
# Edit types require flow_id and current
if request_type.endswith("_edit"):
if not args.get("flow_id"):
return False, f"{request_type} requires flow_id"
if not args.get("current"):
return False, f"{request_type} requires current content"
# Code generate requires language
if request_type == "workflow_code_generate":
if args.get("language") not in ["python", "javascript"]:
return False, "language must be 'python' or 'javascript'"
return True, ""
def _handle_by_type(self, request_type: str, args: dict, tenant_id: str):
"""
Route handling based on type
"""
match request_type:
case "legacy_prompt_generate":
# Legacy prompt generation doesn't exist, this is actually an edit
if not args.get("flow_id"):
return {"error": "legacy_prompt_generate requires flow_id"}, 400
return LLMGenerator.instruction_modify_legacy(
tenant_id=tenant_id,
flow_id=args["flow_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
case "workflow_prompt_generate":
return LLMGenerator.generate_rule_config(
tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=True,
)
case "workflow_code_generate":
return LLMGenerator.generate_code(
tenant_id=tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
code_language=args["language"],
)
case "workflow_prompt_edit":
return LLMGenerator.instruction_modify_workflow(
tenant_id=tenant_id,
flow_id=args["flow_id"],
node_id=args["node_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
workflow_service=WorkflowService(),
)
case "workflow_code_edit":
# Code edit uses the same workflow edit logic
return LLMGenerator.instruction_modify_workflow(
tenant_id=tenant_id,
flow_id=args["flow_id"],
node_id=args["node_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
workflow_service=WorkflowService(),
)
case "memory_template_generate":
return LLMGenerator.generate_memory_template(
tenant_id=tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
)
case "memory_instruction_generate":
return LLMGenerator.generate_memory_instruction(
tenant_id=tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
)
case "memory_template_edit":
return LLMGenerator.edit_memory_template(
tenant_id=tenant_id,
flow_id=args["flow_id"],
node_id=args.get("node_id") or None,
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
case "memory_instruction_edit":
return LLMGenerator.edit_memory_instruction(
tenant_id=tenant_id,
flow_id=args["flow_id"],
node_id=args.get("node_id") or None,
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
case _:
return {"error": f"Invalid request type: {request_type}"}, 400
@console_ns.route("/instruction-generate/template")
class InstructionGenerationTemplateApi(Resource):

View File

@ -1,8 +1,8 @@
import json
import logging
import re
from collections.abc import Sequence
from typing import Protocol, cast
from collections.abc import Callable, Mapping, Sequence
from typing import Any, Protocol, cast
import json_repair
@ -14,6 +14,10 @@ from core.llm_generator.prompts import (
JAVASCRIPT_CODE_GENERATOR_PROMPT_TEMPLATE,
LLM_MODIFY_CODE_SYSTEM,
LLM_MODIFY_PROMPT_SYSTEM,
MEMORY_INSTRUCTION_EDIT_SYSTEM_PROMPT,
MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT,
MEMORY_TEMPLATE_EDIT_SYSTEM_PROMPT,
MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT,
MEMORY_UPDATE_PROMPT,
PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE,
SYSTEM_STRUCTURED_OUTPUT_GENERATE,
@ -509,16 +513,17 @@ class LLMGenerator:
node_type: str,
ideal_output: str | None,
):
LAST_RUN = "{{#last_run#}}"
CURRENT = "{{#current#}}"
ERROR_MESSAGE = "{{#error_message#}}"
injected_instruction = instruction
if LAST_RUN in injected_instruction:
injected_instruction = injected_instruction.replace(LAST_RUN, json.dumps(last_run))
if CURRENT in injected_instruction:
injected_instruction = injected_instruction.replace(CURRENT, current or "null")
if ERROR_MESSAGE in injected_instruction:
injected_instruction = injected_instruction.replace(ERROR_MESSAGE, error_message or "null")
# Use unified variable injector
variable_providers = {
"last_run": lambda: json.dumps(last_run) if last_run else "null",
"current": lambda: current or "null",
"error_message": lambda: error_message or "null",
}
injected_instruction = LLMGenerator.__inject_variables(
instruction=instruction,
variable_providers=variable_providers
)
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
@ -597,3 +602,392 @@ class LLMGenerator:
stream=False,
)
return llm_result.message.get_text_content()
@staticmethod
def generate_memory_template(
tenant_id: str,
instruction: str,
model_config: dict,
) -> dict:
"""
Generate Memory Template
Uses MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT
"""
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
prompt_messages: list[PromptMessage] = [
SystemPromptMessage(content=MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT),
UserPromptMessage(content=instruction),
]
try:
response = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters={"temperature": 0.7},
stream=False,
)
generated_template = response.message.get_text_content()
return {"template": generated_template}
except Exception as e:
logger.exception("Failed to generate memory template")
return {"error": f"Failed to generate memory template: {str(e)}"}
@staticmethod
def generate_memory_instruction(
tenant_id: str,
instruction: str,
model_config: dict,
) -> dict:
"""
Generate Memory Instruction
Uses MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT
"""
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
prompt_messages: list[PromptMessage] = [
SystemPromptMessage(content=MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT),
UserPromptMessage(content=instruction),
]
try:
response = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters={"temperature": 0.7},
stream=False,
)
generated_instruction = response.message.get_text_content()
return {"instruction": generated_instruction}
except Exception as e:
logger.exception("Failed to generate memory instruction")
return {"error": f"Failed to generate memory instruction: {str(e)}"}
@staticmethod
def edit_memory_template(
tenant_id: str,
flow_id: str,
node_id: str | None,
current: str,
instruction: str,
model_config: dict,
ideal_output: str | None = None,
) -> dict:
"""
Edit Memory Template
Supports variable references: {{#history#}}, {{#system_prompt#}}
"""
# Use unified variable injector
variable_providers = {
"history": lambda: LLMGenerator.__get_history_json(flow_id, node_id, tenant_id),
"system_prompt": lambda: json.dumps(
LLMGenerator.__get_system_prompt(flow_id, node_id, tenant_id),
ensure_ascii=False
),
}
injected_instruction = LLMGenerator.__inject_variables(
instruction=instruction,
variable_providers=variable_providers
)
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
system_prompt = MEMORY_TEMPLATE_EDIT_SYSTEM_PROMPT
user_content = json.dumps({
"current_template": current,
"instruction": injected_instruction,
"ideal_output": ideal_output,
})
prompt_messages: list[PromptMessage] = [
SystemPromptMessage(content=system_prompt),
UserPromptMessage(content=user_content),
]
try:
response = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters={"temperature": 0.4},
stream=False,
)
generated_raw = response.message.get_text_content()
# Extract JSON
first_brace = generated_raw.find("{")
last_brace = generated_raw.rfind("}")
result = json.loads(generated_raw[first_brace : last_brace + 1])
return {
"modified": result.get("modified", ""),
"message": result.get("message", "Template updated successfully"),
}
except Exception as e:
logger.exception("Failed to edit memory template")
return {"error": f"Failed to edit memory template: {str(e)}"}
@staticmethod
def edit_memory_instruction(
tenant_id: str,
flow_id: str,
node_id: str | None,
current: str,
instruction: str,
model_config: dict,
ideal_output: str | None = None,
) -> dict:
"""
Edit Memory Instruction
Supports variable references: {{#history#}}, {{#system_prompt#}}
"""
# Use unified variable injector
variable_providers = {
"history": lambda: LLMGenerator.__get_history_json(flow_id, node_id, tenant_id),
"system_prompt": lambda: json.dumps(
LLMGenerator.__get_system_prompt(flow_id, node_id, tenant_id),
ensure_ascii=False
),
}
injected_instruction = LLMGenerator.__inject_variables(
instruction=instruction,
variable_providers=variable_providers
)
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
system_prompt = MEMORY_INSTRUCTION_EDIT_SYSTEM_PROMPT
user_content = json.dumps({
"current_instruction": current,
"instruction": injected_instruction,
"ideal_output": ideal_output,
})
prompt_messages: list[PromptMessage] = [
SystemPromptMessage(content=system_prompt),
UserPromptMessage(content=user_content),
]
try:
response = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters={"temperature": 0.4},
stream=False,
)
generated_raw = response.message.get_text_content()
# Extract JSON
first_brace = generated_raw.find("{")
last_brace = generated_raw.rfind("}")
result = json.loads(generated_raw[first_brace : last_brace + 1])
return {
"modified": result.get("modified", ""),
"message": result.get("message", "Instruction updated successfully"),
}
except Exception as e:
logger.exception("Failed to edit memory instruction")
return {"error": f"Failed to edit memory instruction: {str(e)}"}
# ==================== Unified variable injector (private method) ====================
@staticmethod
def __inject_variables(
instruction: str,
variable_providers: Mapping[str, Callable[[], str]]
) -> str:
"""
Unified variable injector (private method)
Replaces variable placeholders {{#variable_name#}} in instruction with actual values
Args:
instruction: User's original instruction
variable_providers: Mapping of variable name -> getter function
Example: {"last_run": lambda: json.dumps(data), "history": lambda: get_history()}
Returns:
Instruction with injected variables
Features:
1. Lazy loading: Only calls getter function when placeholder is present
2. Fault tolerance: Failure of one variable doesn't affect others
3. Extensible: New variables can be added through variable_providers parameter
"""
injected = instruction
for var_name, provider_func in variable_providers.items():
placeholder = f"{{{{#{var_name}#}}}}"
if placeholder in injected:
try:
# Lazy loading: only call when needed
value = provider_func()
injected = injected.replace(placeholder, value)
except Exception as e:
logger.warning("Failed to inject variable '%s': %s", var_name, e)
# Use default value on failure, don't block the request
default_value = "[]" if var_name == "history" else '""'
injected = injected.replace(placeholder, default_value)
return injected
@staticmethod
def __get_history_json(
flow_id: str,
node_id: str | None,
tenant_id: str
) -> str:
"""
Get conversation history as JSON string (private method)
Args:
flow_id: Application ID
node_id: Node ID (optional, None indicates APP level)
tenant_id: Tenant ID
Returns:
JSON array string in format: [{"role": "user", "content": "..."}, ...]
Returns "[]" if no history exists
"""
from services.chatflow_history_service import ChatflowHistoryService
app = db.session.query(App).filter_by(id=flow_id).first()
if not app:
return "[]"
visible_messages = ChatflowHistoryService.get_latest_chat_history_for_app(
app_id=app.id,
tenant_id=tenant_id,
node_id=node_id or None
)
history_json = [
{"role": msg.role.value, "content": msg.content}
for msg in visible_messages
]
return json.dumps(history_json, ensure_ascii=False)
@staticmethod
def __get_system_prompt(
flow_id: str,
node_id: str | None,
tenant_id: str
) -> str:
"""
Get system prompt (private method)
Args:
flow_id: Application ID
node_id: Node ID (optional)
tenant_id: Tenant ID
Returns:
System prompt string, returns "" if none exists
"""
from services.workflow_service import WorkflowService
app = db.session.query(App).filter_by(id=flow_id).first()
if not app:
return ""
# Legacy app
if app.mode in {"chat", "completion"}:
try:
app_model_config = app.app_model_config_dict
return app_model_config.get("pre_prompt", "")
except Exception:
return ""
# Workflow app
try:
workflow = WorkflowService().get_draft_workflow(app_model=app)
if not workflow:
return ""
nodes = workflow.graph_dict.get("nodes", [])
if node_id:
# Get system prompt for specified node
node = next((n for n in nodes if n["id"] == node_id), None)
if not node or node["data"]["type"] not in ["llm", "agent"]:
return ""
prompt_template = node["data"].get("prompt_template")
return LLMGenerator.__extract_system_prompt_from_template(prompt_template)
else:
# APP level: find the main LLM node (connected to END node)
edges = workflow.graph_dict.get("edges", [])
llm_nodes = [n for n in nodes if n["data"]["type"] in ["llm", "agent"]]
for edge in edges:
if edge.get("target") == "end":
source_node = next((n for n in llm_nodes if n["id"] == edge.get("source")), None)
if source_node:
prompt_template = source_node["data"].get("prompt_template")
system_prompt = LLMGenerator.__extract_system_prompt_from_template(prompt_template)
if system_prompt:
return system_prompt
# Fallback: return system prompt from first LLM node
if llm_nodes:
prompt_template = llm_nodes[0]["data"].get("prompt_template")
return LLMGenerator.__extract_system_prompt_from_template(prompt_template)
return ""
except Exception as e:
logger.warning("Failed to get system prompt: %s", e)
return ""
@staticmethod
def __extract_system_prompt_from_template(prompt_template: Any) -> str:
"""
Extract system prompt from prompt_template (private method)
Args:
prompt_template: LLM node's prompt_template (may be list or dict)
Returns:
System prompt string
"""
if not prompt_template:
return ""
if isinstance(prompt_template, list):
# Chat model: [{"role": "system", "text": "..."}, ...]
system_msg = next((m for m in prompt_template if m.get("role") == "system"), None)
return system_msg.get("text", "") if system_msg else ""
elif isinstance(prompt_template, dict):
# Completion model: {"text": "..."}
return prompt_template.get("text", "")
else:
return ""

View File

@ -437,3 +437,97 @@ Update instruction:
Please output only the updated memory content, no other text like greeting:
"""
MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT = """
You are a helpful assistant designed to extract structured template information from a long-term conversation. Your task is to generate a concise and complete MemoryBlock template based on the underlying purpose of the conversation.
Each MemoryBlock represents a reusable schema that captures the key elements relevant to a specific task or goal (e.g., planning a trip, conducting a job interview, writing a blog post, etc.).
When generating a template:
1. Analyze the overall goal or purpose of the conversation described in the user's instruction.
2. Identify essential information categories that would be relevant to track.
3. Structure the template using Markdown format with clear sections and fields.
4. Do not fill in actual user data only describe the structure and purpose of each field.
5. Be general enough to be reusable, but specific enough to serve the user's intent.
Respond with only the template in Markdown format, with no additional explanation.
Example format:
# [Template Name]
## Section 1
- **Field 1:** Description of what should be captured here
- **Field 2:** Description of what should be captured here
## Section 2
- **Field 3:** Description of what should be captured here
""" # noqa: E501
MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT = """
You are a prompt generation model.
Your task is to generate an instruction for a downstream language model tasked with extracting structured memory blocks (MemoryBlock) from long, multi-turn conversations between a user and an assistant.
The downstream model will receive:
- A template describing the structure and fields of the memory block it should extract.
- The historical conversation, serialized as plain text with message tags.
- Optional context including the assistant's system prompt.
You must generate a clear, specific, and instructional prompt that:
1. Explains what a MemoryBlock is and its purpose.
2. Instructs the model to extract only information relevant to the template fields.
3. Emphasizes handling implicit information and scattered mentions across multiple turns.
4. Instructs to ignore irrelevant or casual dialogue.
5. Describes the expected output format (structured object matching the template).
6. Uses placeholders {{#history#}} and {{#system_prompt#}} for runtime variable injection.
The tone should be concise, instructional, and focused on task precision.
Based on the user's description of the conversation context, generate the ideal extraction instruction.
""" # noqa: E501
MEMORY_TEMPLATE_EDIT_SYSTEM_PROMPT = """
You are an expert at refining memory templates for conversation tracking systems.
You will receive:
- current_template: The existing memory template
- instruction: User's instruction for how to modify the template (may include {{#history#}} and {{#system_prompt#}} references)
- ideal_output: Optional description of the desired result
Your task:
1. Analyze the current template structure
2. Apply the requested modifications based on the instruction
3. Ensure the modified template maintains proper Markdown structure
4. Keep field descriptions clear and actionable
Output format (JSON):
{
"modified": "<the updated template in Markdown format>",
"message": "<brief explanation of changes made>"
}
Only output the JSON, no additional text.
""" # noqa: E501
MEMORY_INSTRUCTION_EDIT_SYSTEM_PROMPT = """
You are an expert at refining extraction instructions for memory systems.
You will receive:
- current_instruction: The existing extraction instruction
- instruction: User's instruction for how to improve it (may include {{#history#}} and {{#system_prompt#}} references)
- ideal_output: Optional description of the desired result
Your task:
1. Analyze the current instruction's effectiveness
2. Apply the requested improvements based on the user's instruction
3. Ensure the modified instruction is clear, specific, and actionable
4. Maintain focus on structured extraction matching the template
Output format (JSON):
{
"modified": "<the improved instruction>",
"message": "<brief explanation of improvements>"
}
Only output the JSON, no additional text.
"""

View File

@ -2,7 +2,7 @@ import json
from collections.abc import MutableMapping, Sequence
from typing import Literal, Optional, overload
from sqlalchemy import Row, Select, and_, func, select
from sqlalchemy import Row, Select, and_, desc, func, select
from sqlalchemy.orm import Session
from core.memory.entities import ChatflowConversationMetadata
@ -45,6 +45,58 @@ class ChatflowHistoryService:
visible_messages = sorted_messages[-visible_count:]
return [PromptMessage.model_validate_json(it.data) for it in visible_messages]
@staticmethod
def get_latest_chat_history_for_app(
app_id: str,
tenant_id: str,
node_id: Optional[str] = None,
max_visible_count: Optional[int] = None
) -> Sequence[PromptMessage]:
"""
Get the latest chat history for an app
Args:
app_id: Application ID
tenant_id: Tenant ID
node_id: Node ID (None for APP level)
max_visible_count: Maximum number of visible messages (optional)
Returns:
PromptMessage sequence, empty list if no history exists
"""
with Session(db.engine) as session:
# Query the most recently updated chatflow conversation
stmt = select(ChatflowConversation).where(
ChatflowConversation.tenant_id == tenant_id,
ChatflowConversation.app_id == app_id,
ChatflowConversation.node_id == (node_id or None)
).order_by(desc(ChatflowConversation.updated_at)).limit(1)
chatflow_conv_row = session.execute(stmt).first()
if not chatflow_conv_row:
return []
chatflow_conv = chatflow_conv_row[0]
# Get visible messages for this conversation
metadata = ChatflowConversationMetadata.model_validate_json(
chatflow_conv.conversation_metadata
)
visible_count: int = max_visible_count or metadata.visible_count
stmt = select(ChatflowMessage).where(
ChatflowMessage.conversation_id == chatflow_conv.id
).order_by(ChatflowMessage.index.asc(), ChatflowMessage.version.desc())
raw_messages: Sequence[Row[tuple[ChatflowMessage]]] = session.execute(stmt).all()
sorted_messages = ChatflowHistoryService._filter_latest_messages(
[it[0] for it in raw_messages]
)
visible_count = min(visible_count, len(sorted_messages))
visible_messages = sorted_messages[-visible_count:]
return [PromptMessage.model_validate_json(it.data) for it in visible_messages]
@staticmethod
def save_message(
prompt_message: PromptMessage,
@ -76,7 +128,7 @@ class ChatflowHistoryService:
session.add(new_message)
session.commit()
# 添加每次保存消息后简单增长visible_count
# Increment visible_count after each message save
current_metadata = ChatflowConversationMetadata.model_validate_json(chatflow_conv.conversation_metadata)
new_visible_count = current_metadata.visible_count + 1
new_metadata = ChatflowConversationMetadata(visible_count=new_visible_count)