fix(agent): persist Agent App prompt message (#37534)

This commit is contained in:
zyssyz123 2026-06-17 00:44:06 +08:00 committed by GitHub
parent 56dce93524
commit 7cb4a30040
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 54 additions and 10 deletions

View File

@ -373,7 +373,7 @@ class AgentAppGenerator(MessageBasedAppGenerator):
app_config = application_generate_entity.app_config
model_name = application_generate_entity.model_conf.model
query = application_generate_entity.query
query = application_generate_entity.query or ""
# content moderation (sensitive_word_avoidance); a blocked input yields a
# preset answer, an "overridden" action returns a sanitized query.
@ -388,7 +388,7 @@ class AgentAppGenerator(MessageBasedAppGenerator):
trace_manager=application_generate_entity.trace_manager,
)
except ModerationError as e:
publish_text_answer(queue_manager=queue_manager, model_name=model_name, answer=str(e))
publish_text_answer(queue_manager=queue_manager, model_name=model_name, answer=str(e), user_query=query)
return True, query
# annotation reply: a matching annotation answers the turn deterministically.
@ -405,7 +405,12 @@ class AgentAppGenerator(MessageBasedAppGenerator):
QueueAnnotationReplyEvent(message_annotation_id=annotation_reply.id),
PublishFrom.APPLICATION_MANAGER,
)
publish_text_answer(queue_manager=queue_manager, model_name=model_name, answer=annotation_reply.content)
publish_text_answer(
queue_manager=queue_manager,
model_name=model_name,
answer=annotation_reply.content,
user_query=query,
)
return True, query
return False, query

View File

@ -46,13 +46,25 @@ from core.repositories.human_input_repository import HumanInputFormRepository, H
from core.workflow.nodes.agent_v2.ask_human_hitl import AskHumanFormBuildError, create_ask_human_form
from core.workflow.nodes.agent_v2.ask_human_resume import build_deferred_tool_results, resolve_ask_human_form
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from graphon.model_runtime.entities.message_entities import AssistantPromptMessage
from graphon.model_runtime.entities.message_entities import AssistantPromptMessage, PromptMessage, UserPromptMessage
from models.agent_config_entities import AgentSoulConfig
logger = logging.getLogger(__name__)
def publish_text_answer(*, queue_manager: AppQueueManager, model_name: str, answer: str) -> None:
def _prompt_messages_from_query(user_query: str | None) -> list[PromptMessage]:
if not user_query:
return []
return [UserPromptMessage(content=user_query)]
def publish_text_answer(
*,
queue_manager: AppQueueManager,
model_name: str,
answer: str,
user_query: str | None = None,
) -> None:
"""Publish a complete assistant answer as one chunk + message-end.
The EasyUI chat task pipeline consumes a QueueLLMChunkEvent stream followed
@ -60,9 +72,10 @@ def publish_text_answer(*, queue_manager: AppQueueManager, model_name: str, answ
both the backend-produced answer and short-circuited answers (moderation /
annotation reply) share the exact same persistence + SSE path.
"""
prompt_messages = _prompt_messages_from_query(user_query)
chunk = LLMResultChunk(
model=model_name,
prompt_messages=[],
prompt_messages=prompt_messages,
delta=LLMResultChunkDelta(index=0, message=AssistantPromptMessage(content=answer)),
)
queue_manager.publish(QueueLLMChunkEvent(chunk=chunk), PublishFrom.APPLICATION_MANAGER)
@ -70,7 +83,7 @@ def publish_text_answer(*, queue_manager: AppQueueManager, model_name: str, answ
QueueMessageEndEvent(
llm_result=LLMResult(
model=model_name,
prompt_messages=[],
prompt_messages=prompt_messages,
message=AssistantPromptMessage(content=answer),
usage=LLMUsage.empty_usage(),
),
@ -153,6 +166,7 @@ class AgentAppRunner:
model_name=model_name,
runtime=runtime,
queue_manager=queue_manager,
query=query,
)
return
@ -161,7 +175,7 @@ class AgentAppRunner:
raise AgentBackendError(str(error))
answer = self._extract_answer(terminal.output)
self._publish_answer(queue_manager=queue_manager, model_name=model_name, answer=answer)
self._publish_answer(queue_manager=queue_manager, model_name=model_name, answer=answer, query=query)
self._save_session(
scope=scope,
backend_run_id=terminal.run_id,
@ -181,6 +195,7 @@ class AgentAppRunner:
model_name: str,
runtime: AgentAppRuntimeRequest,
queue_manager: AppQueueManager,
query: str,
) -> None:
"""End the chat turn on a dify.ask_human call: create a conversation-owned
HITL form, persist the pause correlation, and surface the question."""
@ -214,6 +229,7 @@ class AgentAppRunner:
queue_manager=queue_manager,
model_name=model_name,
answer=self._ask_human_message(created.args),
query=query,
)
def _resolve_pending_ask_human(
@ -287,10 +303,12 @@ class AgentAppRunner:
except Exception:
logger.warning("Failed to cancel stopped Agent App backend run: run_id=%s", run_id, exc_info=True)
def _publish_answer(self, *, queue_manager: AppQueueManager, model_name: str, answer: str) -> None:
def _publish_answer(
self, *, queue_manager: AppQueueManager, model_name: str, answer: str, query: str | None
) -> None:
# MVP: emit the full answer as a single chunk + message-end. The chat
# task pipeline streams the chunk over SSE and persists the message.
publish_text_answer(queue_manager=queue_manager, model_name=model_name, answer=answer)
publish_text_answer(queue_manager=queue_manager, model_name=model_name, answer=answer, user_query=query)
def _save_session(
self,

View File

@ -160,6 +160,16 @@ def _run(runner: AgentAppRunner, qm: _FakeQueueManager) -> None:
)
def _message_end(qm: _FakeQueueManager) -> QueueMessageEndEvent:
return next(e for e in qm.events if isinstance(e, QueueMessageEndEvent))
def _saved_user_query(qm: _FakeQueueManager) -> str:
prompt_messages = _message_end(qm).llm_result.prompt_messages
assert len(prompt_messages) == 1
return prompt_messages[0].content
def test_successful_turn_publishes_chunk_and_message_end_and_saves_session():
client = FakeAgentBackendRunClient() # SUCCESS: output {"text": "hello agent"}
store = _FakeSessionStore()
@ -175,6 +185,7 @@ def test_successful_turn_publishes_chunk_and_message_end_and_saves_session():
assert chunk_events[0].chunk.delta.message.content == "hello agent"
assert end_events[0].llm_result.message.content == "hello agent"
assert end_events[0].llm_result.model == "gpt-4o-mini"
assert _saved_user_query(qm) == "hello"
# The conversation session snapshot is persisted for multi-turn continuity.
assert store.saved
saved_scope, saved_run_id, saved_snapshot, saved_specs, pending_form_id, pending_tool_call_id = store.saved[0]
@ -256,6 +267,7 @@ def test_ask_human_pauses_turn_creates_form_and_persists_correlation():
assert created_params.conversation_id == "conv-1"
assert created_params.workflow_execution_id is None
assert [e for e in qm.events if isinstance(e, QueueMessageEndEvent)]
assert _saved_user_query(qm) == "hello"
# The pause correlation is persisted so a form submission can resume the run.
assert store.saved
assert store.saved[0][4] == "form-1"

View File

@ -65,6 +65,13 @@ def _answer_text(events: list[Any]) -> str:
return end.llm_result.message.content
def _saved_user_query(events: list[Any]) -> str:
end = next(e for e in events if isinstance(e, QueueMessageEndEvent))
prompt_messages = end.llm_result.prompt_messages
assert len(prompt_messages) == 1
return prompt_messages[0].content
class TestRunInputGuards:
def test_no_guards_passes_through(self, monkeypatch):
_patch_moderation(monkeypatch, returns=(False, {}, "hello"))
@ -113,6 +120,7 @@ class TestRunInputGuards:
assert handled is True
assert any(isinstance(e, QueueLLMChunkEvent) for e in qm.events)
assert _answer_text(qm.events) == "blocked preset answer"
assert _saved_user_query(qm.events) == "forbidden"
def test_annotation_hit_short_circuits(self, monkeypatch):
_patch_moderation(monkeypatch, returns=(False, {}, "what is your name"))
@ -131,3 +139,4 @@ class TestRunInputGuards:
assert len(annotation_events) == 1
assert annotation_events[0].message_annotation_id == "anno-1"
assert _answer_text(qm.events) == "I am the annotated Iris."
assert _saved_user_query(qm.events) == "what is your name"