From cc127f5b6242598c285a9ca6f5be65a860764036 Mon Sep 17 00:00:00 2001 From: wangxiaolei Date: Mon, 2 Mar 2026 14:05:04 +0800 Subject: [PATCH] fix: fix chat assistant response mode blocking is not work (#32394) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../easy_ui_based_generate_task_pipeline.py | 8 +-- api/services/app_generate_service.py | 69 ++++++++++++------- .../services/test_app_generate_service.py | 53 ++++++++++++++ 3 files changed, 102 insertions(+), 28 deletions(-) diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py index 8792e65512..a77e5abb30 100644 --- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py @@ -157,7 +157,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): id=self._message_id, mode=self._conversation_mode, message_id=self._message_id, - answer=cast(str, self._task_state.llm_result.message.content), + answer=self._task_state.llm_result.message.get_text_content(), created_at=self._message_created_at, **extras, ), @@ -170,7 +170,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): mode=self._conversation_mode, conversation_id=self._conversation_id, message_id=self._message_id, - answer=cast(str, self._task_state.llm_result.message.content), + answer=self._task_state.llm_result.message.get_text_content(), created_at=self._message_created_at, **extras, ), @@ -283,7 +283,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): # handle output moderation output_moderation_answer = self.handle_output_moderation_when_task_finished( - cast(str, self._task_state.llm_result.message.content) + self._task_state.llm_result.message.get_text_content() ) if output_moderation_answer: self._task_state.llm_result.message.content = output_moderation_answer @@ -397,7 +397,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): message.message_unit_price = usage.prompt_unit_price message.message_price_unit = usage.prompt_price_unit message.answer = ( - PromptTemplateParser.remove_template_variables(cast(str, llm_result.message.content).strip()) + PromptTemplateParser.remove_template_variables(llm_result.message.get_text_content().strip()) if llm_result.message.content else "" ) diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 0c27c403f8..31003cb8f7 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -131,33 +131,54 @@ class AppGenerateService: elif app_model.mode == AppMode.ADVANCED_CHAT: workflow_id = args.get("workflow_id") workflow = cls._get_workflow(app_model, invoke_from, workflow_id) - with rate_limit_context(rate_limit, request_id): - payload = AppExecutionParams.new( - app_model=app_model, - workflow=workflow, - user=user, - args=args, - invoke_from=invoke_from, - streaming=streaming, - call_depth=0, - ) - payload_json = payload.model_dump_json() - def on_subscribe(): - workflow_based_app_execution_task.delay(payload_json) + if streaming: + # Streaming mode: subscribe to SSE and enqueue the execution on first subscriber + with rate_limit_context(rate_limit, request_id): + payload = AppExecutionParams.new( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + streaming=True, + call_depth=0, + ) + payload_json = payload.model_dump_json() - on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) - generator = AdvancedChatAppGenerator() - return rate_limit.generate( - generator.convert_to_event_stream( - generator.retrieve_events( - AppMode.ADVANCED_CHAT, - payload.workflow_run_id, - on_subscribe=on_subscribe, + def on_subscribe(): + workflow_based_app_execution_task.delay(payload_json) + + on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) + generator = AdvancedChatAppGenerator() + return rate_limit.generate( + generator.convert_to_event_stream( + generator.retrieve_events( + AppMode.ADVANCED_CHAT, + payload.workflow_run_id, + on_subscribe=on_subscribe, + ), ), - ), - request_id=request_id, - ) + request_id=request_id, + ) + else: + # Blocking mode: run synchronously and return JSON instead of SSE + # Keep behaviour consistent with WORKFLOW blocking branch. + advanced_generator = AdvancedChatAppGenerator() + return rate_limit.generate( + advanced_generator.convert_to_event_stream( + advanced_generator.generate( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + workflow_run_id=str(uuid.uuid4()), + streaming=False, + ) + ), + request_id=request_id, + ) elif app_model.mode == AppMode.WORKFLOW: workflow_id = args.get("workflow_id") workflow = cls._get_workflow(app_model, invoke_from, workflow_id) diff --git a/api/tests/unit_tests/services/test_app_generate_service.py b/api/tests/unit_tests/services/test_app_generate_service.py index 71134464e6..47b759bc7d 100644 --- a/api/tests/unit_tests/services/test_app_generate_service.py +++ b/api/tests/unit_tests/services/test_app_generate_service.py @@ -63,3 +63,56 @@ def test_workflow_blocking_injects_pause_state_config(mocker, monkeypatch): pause_state_config = call_kwargs.get("pause_state_config") assert pause_state_config is not None assert pause_state_config.state_owner_user_id == "owner-id" + + +def test_advanced_chat_blocking_returns_dict_and_does_not_use_event_retrieval(mocker, monkeypatch): + """ + Regression test: ADVANCED_CHAT in blocking mode should return a plain dict + (non-streaming), and must not go through the async retrieve_events path. + Keeps behavior consistent with WORKFLOW blocking branch. + """ + # Disable billing and stub RateLimit to a no-op that just passes values through + monkeypatch.setattr(app_generate_service_module.dify_config, "BILLING_ENABLED", False) + mocker.patch("services.app_generate_service.RateLimit", _DummyRateLimit) + + # Arrange a fake workflow and wire AppGenerateService._get_workflow to return it + workflow = MagicMock() + workflow.id = "workflow-id" + mocker.patch.object(AppGenerateService, "_get_workflow", return_value=workflow) + + # Spy on the streaming retrieval path to ensure it's NOT called + retrieve_spy = mocker.patch("services.app_generate_service.AdvancedChatAppGenerator.retrieve_events") + + # Make AdvancedChatAppGenerator.generate return a plain dict when streaming=False + generate_spy = mocker.patch( + "services.app_generate_service.AdvancedChatAppGenerator.generate", + return_value={"result": "ok"}, + ) + + # Minimal app model for ADVANCED_CHAT + app_model = MagicMock() + app_model.mode = AppMode.ADVANCED_CHAT + app_model.id = "app-id" + app_model.tenant_id = "tenant-id" + app_model.max_active_requests = 0 + app_model.is_agent = False + + user = MagicMock() + user.id = "user-id" + + # Must include query and inputs for AdvancedChatAppGenerator + args = {"workflow_id": "wf-1", "query": "hello", "inputs": {}} + + # Act: call service with streaming=False (blocking mode) + result = AppGenerateService.generate( + app_model=app_model, + user=user, + args=args, + invoke_from=MagicMock(), + streaming=False, + ) + + # Assert: returns the dict from generate(), and did not call retrieve_events() + assert result == {"result": "ok"} + assert generate_spy.call_args.kwargs.get("streaming") is False + retrieve_spy.assert_not_called()