diff --git a/api/core/app/apps/agent_app/app_generator.py b/api/core/app/apps/agent_app/app_generator.py index 92c75276e0..32ceecddf5 100644 --- a/api/core/app/apps/agent_app/app_generator.py +++ b/api/core/app/apps/agent_app/app_generator.py @@ -373,7 +373,7 @@ class AgentAppGenerator(MessageBasedAppGenerator): app_config = application_generate_entity.app_config model_name = application_generate_entity.model_conf.model - query = application_generate_entity.query + query = application_generate_entity.query or "" # content moderation (sensitive_word_avoidance); a blocked input yields a # preset answer, an "overridden" action returns a sanitized query. @@ -388,7 +388,7 @@ class AgentAppGenerator(MessageBasedAppGenerator): trace_manager=application_generate_entity.trace_manager, ) except ModerationError as e: - publish_text_answer(queue_manager=queue_manager, model_name=model_name, answer=str(e)) + publish_text_answer(queue_manager=queue_manager, model_name=model_name, answer=str(e), user_query=query) return True, query # annotation reply: a matching annotation answers the turn deterministically. @@ -405,7 +405,12 @@ class AgentAppGenerator(MessageBasedAppGenerator): QueueAnnotationReplyEvent(message_annotation_id=annotation_reply.id), PublishFrom.APPLICATION_MANAGER, ) - publish_text_answer(queue_manager=queue_manager, model_name=model_name, answer=annotation_reply.content) + publish_text_answer( + queue_manager=queue_manager, + model_name=model_name, + answer=annotation_reply.content, + user_query=query, + ) return True, query return False, query diff --git a/api/core/app/apps/agent_app/app_runner.py b/api/core/app/apps/agent_app/app_runner.py index d4d2a754bf..03c6f3e410 100644 --- a/api/core/app/apps/agent_app/app_runner.py +++ b/api/core/app/apps/agent_app/app_runner.py @@ -46,13 +46,25 @@ from core.repositories.human_input_repository import HumanInputFormRepository, H from core.workflow.nodes.agent_v2.ask_human_hitl import AskHumanFormBuildError, create_ask_human_form from core.workflow.nodes.agent_v2.ask_human_resume import build_deferred_tool_results, resolve_ask_human_form from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage -from graphon.model_runtime.entities.message_entities import AssistantPromptMessage +from graphon.model_runtime.entities.message_entities import AssistantPromptMessage, PromptMessage, UserPromptMessage from models.agent_config_entities import AgentSoulConfig logger = logging.getLogger(__name__) -def publish_text_answer(*, queue_manager: AppQueueManager, model_name: str, answer: str) -> None: +def _prompt_messages_from_query(user_query: str | None) -> list[PromptMessage]: + if not user_query: + return [] + return [UserPromptMessage(content=user_query)] + + +def publish_text_answer( + *, + queue_manager: AppQueueManager, + model_name: str, + answer: str, + user_query: str | None = None, +) -> None: """Publish a complete assistant answer as one chunk + message-end. The EasyUI chat task pipeline consumes a QueueLLMChunkEvent stream followed @@ -60,9 +72,10 @@ def publish_text_answer(*, queue_manager: AppQueueManager, model_name: str, answ both the backend-produced answer and short-circuited answers (moderation / annotation reply) share the exact same persistence + SSE path. """ + prompt_messages = _prompt_messages_from_query(user_query) chunk = LLMResultChunk( model=model_name, - prompt_messages=[], + prompt_messages=prompt_messages, delta=LLMResultChunkDelta(index=0, message=AssistantPromptMessage(content=answer)), ) queue_manager.publish(QueueLLMChunkEvent(chunk=chunk), PublishFrom.APPLICATION_MANAGER) @@ -70,7 +83,7 @@ def publish_text_answer(*, queue_manager: AppQueueManager, model_name: str, answ QueueMessageEndEvent( llm_result=LLMResult( model=model_name, - prompt_messages=[], + prompt_messages=prompt_messages, message=AssistantPromptMessage(content=answer), usage=LLMUsage.empty_usage(), ), @@ -153,6 +166,7 @@ class AgentAppRunner: model_name=model_name, runtime=runtime, queue_manager=queue_manager, + query=query, ) return @@ -161,7 +175,7 @@ class AgentAppRunner: raise AgentBackendError(str(error)) answer = self._extract_answer(terminal.output) - self._publish_answer(queue_manager=queue_manager, model_name=model_name, answer=answer) + self._publish_answer(queue_manager=queue_manager, model_name=model_name, answer=answer, query=query) self._save_session( scope=scope, backend_run_id=terminal.run_id, @@ -181,6 +195,7 @@ class AgentAppRunner: model_name: str, runtime: AgentAppRuntimeRequest, queue_manager: AppQueueManager, + query: str, ) -> None: """End the chat turn on a dify.ask_human call: create a conversation-owned HITL form, persist the pause correlation, and surface the question.""" @@ -214,6 +229,7 @@ class AgentAppRunner: queue_manager=queue_manager, model_name=model_name, answer=self._ask_human_message(created.args), + query=query, ) def _resolve_pending_ask_human( @@ -287,10 +303,12 @@ class AgentAppRunner: except Exception: logger.warning("Failed to cancel stopped Agent App backend run: run_id=%s", run_id, exc_info=True) - def _publish_answer(self, *, queue_manager: AppQueueManager, model_name: str, answer: str) -> None: + def _publish_answer( + self, *, queue_manager: AppQueueManager, model_name: str, answer: str, query: str | None + ) -> None: # MVP: emit the full answer as a single chunk + message-end. The chat # task pipeline streams the chunk over SSE and persists the message. - publish_text_answer(queue_manager=queue_manager, model_name=model_name, answer=answer) + publish_text_answer(queue_manager=queue_manager, model_name=model_name, answer=answer, user_query=query) def _save_session( self, diff --git a/api/tests/unit_tests/core/app/apps/agent_app/test_app_runner.py b/api/tests/unit_tests/core/app/apps/agent_app/test_app_runner.py index 48dea5583c..e696d4aaa0 100644 --- a/api/tests/unit_tests/core/app/apps/agent_app/test_app_runner.py +++ b/api/tests/unit_tests/core/app/apps/agent_app/test_app_runner.py @@ -160,6 +160,16 @@ def _run(runner: AgentAppRunner, qm: _FakeQueueManager) -> None: ) +def _message_end(qm: _FakeQueueManager) -> QueueMessageEndEvent: + return next(e for e in qm.events if isinstance(e, QueueMessageEndEvent)) + + +def _saved_user_query(qm: _FakeQueueManager) -> str: + prompt_messages = _message_end(qm).llm_result.prompt_messages + assert len(prompt_messages) == 1 + return prompt_messages[0].content + + def test_successful_turn_publishes_chunk_and_message_end_and_saves_session(): client = FakeAgentBackendRunClient() # SUCCESS: output {"text": "hello agent"} store = _FakeSessionStore() @@ -175,6 +185,7 @@ def test_successful_turn_publishes_chunk_and_message_end_and_saves_session(): assert chunk_events[0].chunk.delta.message.content == "hello agent" assert end_events[0].llm_result.message.content == "hello agent" assert end_events[0].llm_result.model == "gpt-4o-mini" + assert _saved_user_query(qm) == "hello" # The conversation session snapshot is persisted for multi-turn continuity. assert store.saved saved_scope, saved_run_id, saved_snapshot, saved_specs, pending_form_id, pending_tool_call_id = store.saved[0] @@ -256,6 +267,7 @@ def test_ask_human_pauses_turn_creates_form_and_persists_correlation(): assert created_params.conversation_id == "conv-1" assert created_params.workflow_execution_id is None assert [e for e in qm.events if isinstance(e, QueueMessageEndEvent)] + assert _saved_user_query(qm) == "hello" # The pause correlation is persisted so a form submission can resume the run. assert store.saved assert store.saved[0][4] == "form-1" diff --git a/api/tests/unit_tests/core/app/apps/agent_app/test_input_guards.py b/api/tests/unit_tests/core/app/apps/agent_app/test_input_guards.py index e1c8d51b0d..31bc4a1080 100644 --- a/api/tests/unit_tests/core/app/apps/agent_app/test_input_guards.py +++ b/api/tests/unit_tests/core/app/apps/agent_app/test_input_guards.py @@ -65,6 +65,13 @@ def _answer_text(events: list[Any]) -> str: return end.llm_result.message.content +def _saved_user_query(events: list[Any]) -> str: + end = next(e for e in events if isinstance(e, QueueMessageEndEvent)) + prompt_messages = end.llm_result.prompt_messages + assert len(prompt_messages) == 1 + return prompt_messages[0].content + + class TestRunInputGuards: def test_no_guards_passes_through(self, monkeypatch): _patch_moderation(monkeypatch, returns=(False, {}, "hello")) @@ -113,6 +120,7 @@ class TestRunInputGuards: assert handled is True assert any(isinstance(e, QueueLLMChunkEvent) for e in qm.events) assert _answer_text(qm.events) == "blocked preset answer" + assert _saved_user_query(qm.events) == "forbidden" def test_annotation_hit_short_circuits(self, monkeypatch): _patch_moderation(monkeypatch, returns=(False, {}, "what is your name")) @@ -131,3 +139,4 @@ class TestRunInputGuards: assert len(annotation_events) == 1 assert annotation_events[0].message_annotation_id == "anno-1" assert _answer_text(qm.events) == "I am the annotated Iris." + assert _saved_user_query(qm.events) == "what is your name"