From 187bfafe8b001cf8afad95e257374f2b2e7e5983 Mon Sep 17 00:00:00 2001 From: wangxiaolei Date: Wed, 7 Jan 2026 14:54:11 +0800 Subject: [PATCH] fix: fix assign value stand as default (#30651) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../advanced_chat/generate_task_pipeline.py | 19 + ...test_generate_task_pipeline_answer_node.py | 390 ++++++++++++++++++ 2 files changed, 409 insertions(+) create mode 100644 api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_answer_node.py diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index da1e9f19b6..4dd95be52d 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -358,6 +358,25 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): if node_finish_resp: yield node_finish_resp + # For ANSWER nodes, check if we need to send a message_replace event + # Only send if the final output differs from the accumulated task_state.answer + # This happens when variables were updated by variable_assigner during workflow execution + if event.node_type == NodeType.ANSWER and event.outputs: + final_answer = event.outputs.get("answer") + if final_answer is not None and final_answer != self._task_state.answer: + logger.info( + "ANSWER node final output '%s' differs from accumulated answer '%s', sending message_replace event", + final_answer, + self._task_state.answer, + ) + # Update the task state answer + self._task_state.answer = str(final_answer) + # Send message_replace event to update the UI + yield self._message_cycle_manager.message_replace_to_stream_response( + answer=str(final_answer), + reason="variable_update", + ) + def _handle_node_failed_events( self, event: Union[QueueNodeFailedEvent, QueueNodeExceptionEvent], diff --git a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_answer_node.py b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_answer_node.py new file mode 100644 index 0000000000..205b157542 --- /dev/null +++ b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_answer_node.py @@ -0,0 +1,390 @@ +""" +Tests for AdvancedChatAppGenerateTaskPipeline._handle_node_succeeded_event method, +specifically testing the ANSWER node message_replace logic. +""" + +from datetime import datetime +from types import SimpleNamespace +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity +from core.app.entities.queue_entities import QueueNodeSucceededEvent +from core.workflow.enums import NodeType +from models import EndUser +from models.model import AppMode + + +class TestAnswerNodeMessageReplace: + """Test cases for ANSWER node message_replace event logic.""" + + @pytest.fixture + def mock_application_generate_entity(self): + """Create a mock application generate entity.""" + entity = Mock(spec=AdvancedChatAppGenerateEntity) + entity.task_id = "test-task-id" + entity.app_id = "test-app-id" + entity.workflow_run_id = "test-workflow-run-id" + # minimal app_config used by pipeline internals + entity.app_config = SimpleNamespace( + tenant_id="test-tenant-id", + app_id="test-app-id", + app_mode=AppMode.ADVANCED_CHAT, + app_model_config_dict={}, + additional_features=None, + sensitive_word_avoidance=None, + ) + entity.query = "test query" + entity.files = [] + entity.extras = {} + entity.trace_manager = None + entity.inputs = {} + entity.invoke_from = "debugger" + return entity + + @pytest.fixture + def mock_workflow(self): + """Create a mock workflow.""" + workflow = Mock() + workflow.id = "test-workflow-id" + workflow.features_dict = {} + return workflow + + @pytest.fixture + def mock_queue_manager(self): + """Create a mock queue manager.""" + manager = Mock() + manager.listen.return_value = [] + manager.graph_runtime_state = None + return manager + + @pytest.fixture + def mock_conversation(self): + """Create a mock conversation.""" + conversation = Mock() + conversation.id = "test-conversation-id" + conversation.mode = "advanced_chat" + return conversation + + @pytest.fixture + def mock_message(self): + """Create a mock message.""" + message = Mock() + message.id = "test-message-id" + message.query = "test query" + message.created_at = Mock() + message.created_at.timestamp.return_value = 1234567890 + return message + + @pytest.fixture + def mock_user(self): + """Create a mock end user.""" + user = MagicMock(spec=EndUser) + user.id = "test-user-id" + user.session_id = "test-session-id" + return user + + @pytest.fixture + def mock_draft_var_saver_factory(self): + """Create a mock draft variable saver factory.""" + return Mock() + + @pytest.fixture + def pipeline( + self, + mock_application_generate_entity, + mock_workflow, + mock_queue_manager, + mock_conversation, + mock_message, + mock_user, + mock_draft_var_saver_factory, + ): + """Create an AdvancedChatAppGenerateTaskPipeline instance with mocked dependencies.""" + from core.app.apps.advanced_chat.generate_task_pipeline import AdvancedChatAppGenerateTaskPipeline + + with patch("core.app.apps.advanced_chat.generate_task_pipeline.db"): + pipeline = AdvancedChatAppGenerateTaskPipeline( + application_generate_entity=mock_application_generate_entity, + workflow=mock_workflow, + queue_manager=mock_queue_manager, + conversation=mock_conversation, + message=mock_message, + user=mock_user, + stream=True, + dialogue_count=1, + draft_var_saver_factory=mock_draft_var_saver_factory, + ) + # Initialize workflow run id to avoid validation errors + pipeline._workflow_run_id = "test-workflow-run-id" + # Mock the message cycle manager methods we need to track + pipeline._message_cycle_manager.message_replace_to_stream_response = Mock() + return pipeline + + def test_answer_node_with_different_output_sends_message_replace(self, pipeline, mock_application_generate_entity): + """ + Test that when an ANSWER node's final output differs from accumulated answer, + a message_replace event is sent. + """ + # Arrange: Set initial accumulated answer + pipeline._task_state.answer = "initial answer" + + # Create ANSWER node succeeded event with different final output + event = QueueNodeSucceededEvent( + node_execution_id="test-node-execution-id", + node_id="test-answer-node", + node_type=NodeType.ANSWER, + start_at=datetime.now(), + outputs={"answer": "updated final answer"}, + ) + + # Mock the workflow response converter to avoid extra processing + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = Mock(return_value=None) + pipeline._save_output_for_event = Mock() + + # Act + responses = list(pipeline._handle_node_succeeded_event(event)) + + # Assert + assert pipeline._task_state.answer == "updated final answer" + # Verify message_replace was called + pipeline._message_cycle_manager.message_replace_to_stream_response.assert_called_once_with( + answer="updated final answer", reason="variable_update" + ) + + def test_answer_node_with_same_output_does_not_send_message_replace(self, pipeline): + """ + Test that when an ANSWER node's final output is the same as accumulated answer, + no message_replace event is sent. + """ + # Arrange: Set initial accumulated answer + pipeline._task_state.answer = "same answer" + + # Create ANSWER node succeeded event with same output + event = QueueNodeSucceededEvent( + node_execution_id="test-node-execution-id", + node_id="test-answer-node", + node_type=NodeType.ANSWER, + start_at=datetime.now(), + outputs={"answer": "same answer"}, + ) + + # Mock the workflow response converter + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = Mock(return_value=None) + pipeline._save_output_for_event = Mock() + + # Act + list(pipeline._handle_node_succeeded_event(event)) + + # Assert: answer should remain unchanged + assert pipeline._task_state.answer == "same answer" + # Verify message_replace was NOT called + pipeline._message_cycle_manager.message_replace_to_stream_response.assert_not_called() + + def test_answer_node_with_none_output_does_not_send_message_replace(self, pipeline): + """ + Test that when an ANSWER node's output is None or missing 'answer' key, + no message_replace event is sent. + """ + # Arrange: Set initial accumulated answer + pipeline._task_state.answer = "existing answer" + + # Create ANSWER node succeeded event with None output + event = QueueNodeSucceededEvent( + node_execution_id="test-node-execution-id", + node_id="test-answer-node", + node_type=NodeType.ANSWER, + start_at=datetime.now(), + outputs={"answer": None}, + ) + + # Mock the workflow response converter + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = Mock(return_value=None) + pipeline._save_output_for_event = Mock() + + # Act + list(pipeline._handle_node_succeeded_event(event)) + + # Assert: answer should remain unchanged + assert pipeline._task_state.answer == "existing answer" + # Verify message_replace was NOT called + pipeline._message_cycle_manager.message_replace_to_stream_response.assert_not_called() + + def test_answer_node_with_empty_outputs_does_not_send_message_replace(self, pipeline): + """ + Test that when an ANSWER node has empty outputs dict, + no message_replace event is sent. + """ + # Arrange: Set initial accumulated answer + pipeline._task_state.answer = "existing answer" + + # Create ANSWER node succeeded event with empty outputs + event = QueueNodeSucceededEvent( + node_execution_id="test-node-execution-id", + node_id="test-answer-node", + node_type=NodeType.ANSWER, + start_at=datetime.now(), + outputs={}, + ) + + # Mock the workflow response converter + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = Mock(return_value=None) + pipeline._save_output_for_event = Mock() + + # Act + list(pipeline._handle_node_succeeded_event(event)) + + # Assert: answer should remain unchanged + assert pipeline._task_state.answer == "existing answer" + # Verify message_replace was NOT called + pipeline._message_cycle_manager.message_replace_to_stream_response.assert_not_called() + + def test_answer_node_with_no_answer_key_in_outputs(self, pipeline): + """ + Test that when an ANSWER node's outputs don't contain 'answer' key, + no message_replace event is sent. + """ + # Arrange: Set initial accumulated answer + pipeline._task_state.answer = "existing answer" + + # Create ANSWER node succeeded event without 'answer' key in outputs + event = QueueNodeSucceededEvent( + node_execution_id="test-node-execution-id", + node_id="test-answer-node", + node_type=NodeType.ANSWER, + start_at=datetime.now(), + outputs={"other_key": "some value"}, + ) + + # Mock the workflow response converter + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = Mock(return_value=None) + pipeline._save_output_for_event = Mock() + + # Act + list(pipeline._handle_node_succeeded_event(event)) + + # Assert: answer should remain unchanged + assert pipeline._task_state.answer == "existing answer" + # Verify message_replace was NOT called + pipeline._message_cycle_manager.message_replace_to_stream_response.assert_not_called() + + def test_non_answer_node_does_not_send_message_replace(self, pipeline): + """ + Test that non-ANSWER nodes (e.g., LLM, END) don't trigger message_replace events. + """ + # Arrange: Set initial accumulated answer + pipeline._task_state.answer = "existing answer" + + # Test with LLM node + llm_event = QueueNodeSucceededEvent( + node_execution_id="test-llm-execution-id", + node_id="test-llm-node", + node_type=NodeType.LLM, + start_at=datetime.now(), + outputs={"answer": "different answer"}, + ) + + # Mock the workflow response converter + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = Mock(return_value=None) + pipeline._save_output_for_event = Mock() + + # Act + list(pipeline._handle_node_succeeded_event(llm_event)) + + # Assert: answer should remain unchanged + assert pipeline._task_state.answer == "existing answer" + # Verify message_replace was NOT called + pipeline._message_cycle_manager.message_replace_to_stream_response.assert_not_called() + + def test_end_node_does_not_send_message_replace(self, pipeline): + """ + Test that END nodes don't trigger message_replace events even with 'answer' output. + """ + # Arrange: Set initial accumulated answer + pipeline._task_state.answer = "existing answer" + + # Create END node succeeded event with answer output + event = QueueNodeSucceededEvent( + node_execution_id="test-end-execution-id", + node_id="test-end-node", + node_type=NodeType.END, + start_at=datetime.now(), + outputs={"answer": "different answer"}, + ) + + # Mock the workflow response converter + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = Mock(return_value=None) + pipeline._save_output_for_event = Mock() + + # Act + list(pipeline._handle_node_succeeded_event(event)) + + # Assert: answer should remain unchanged + assert pipeline._task_state.answer == "existing answer" + # Verify message_replace was NOT called + pipeline._message_cycle_manager.message_replace_to_stream_response.assert_not_called() + + def test_answer_node_with_numeric_output_converts_to_string(self, pipeline): + """ + Test that when an ANSWER node's final output is numeric, + it gets converted to string properly. + """ + # Arrange: Set initial accumulated answer + pipeline._task_state.answer = "text answer" + + # Create ANSWER node succeeded event with numeric output + event = QueueNodeSucceededEvent( + node_execution_id="test-node-execution-id", + node_id="test-answer-node", + node_type=NodeType.ANSWER, + start_at=datetime.now(), + outputs={"answer": 12345}, + ) + + # Mock the workflow response converter + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = Mock(return_value=None) + pipeline._save_output_for_event = Mock() + + # Act + list(pipeline._handle_node_succeeded_event(event)) + + # Assert: answer should be converted to string + assert pipeline._task_state.answer == "12345" + # Verify message_replace was called with string + pipeline._message_cycle_manager.message_replace_to_stream_response.assert_called_once_with( + answer="12345", reason="variable_update" + ) + + def test_answer_node_files_are_recorded(self, pipeline): + """ + Test that ANSWER nodes properly record files from outputs. + """ + # Arrange + pipeline._task_state.answer = "existing answer" + + # Create ANSWER node succeeded event with files + event = QueueNodeSucceededEvent( + node_execution_id="test-node-execution-id", + node_id="test-answer-node", + node_type=NodeType.ANSWER, + start_at=datetime.now(), + outputs={ + "answer": "same answer", + "files": [ + {"type": "image", "transfer_method": "remote_url", "remote_url": "http://example.com/img.png"} + ], + }, + ) + + # Mock the workflow response converter + pipeline._workflow_response_converter.fetch_files_from_node_outputs = Mock(return_value=event.outputs["files"]) + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = Mock(return_value=None) + pipeline._save_output_for_event = Mock() + + # Act + list(pipeline._handle_node_succeeded_event(event)) + + # Assert: files should be recorded + assert len(pipeline._recorded_files) == 1 + assert pipeline._recorded_files[0] == event.outputs["files"][0]