diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index b4ed123475..571b3c7936 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -117,8 +117,6 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc created_at=int(self._message.created_at.timestamp()), stream_response=stream_response ) - - # yield "data: " + json.dumps(response) + "\n\n" else: return self._process_blocking_response() @@ -239,7 +237,9 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc workflow_node_execution = self._handle_node_finished(event) # stream outputs when node finished - self._generate_stream_outputs_when_node_finished() + generator = self._generate_stream_outputs_when_node_finished() + if generator: + yield from generator yield self._workflow_node_finish_to_stream_response( task_id=self._application_generate_entity.task_id, @@ -459,13 +459,13 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc self._task_state.current_stream_generate_state.generate_route): self._task_state.current_stream_generate_state = None - def _generate_stream_outputs_when_node_finished(self) -> None: + def _generate_stream_outputs_when_node_finished(self) -> Optional[Generator]: """ Generate stream outputs. :return: """ if not self._task_state.current_stream_generate_state: - return + return None route_chunks = self._task_state.current_stream_generate_state.generate_route[ self._task_state.current_stream_generate_state.current_route_position:] @@ -474,11 +474,8 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc if route_chunk.type == 'text': route_chunk = cast(TextGenerateRouteChunk, route_chunk) for token in route_chunk.text: - self._queue_manager.publish( - QueueTextChunkEvent( - text=token - ), PublishFrom.TASK_PIPELINE - ) + self._task_state.answer += token + yield self._message_to_stream_response(token, self._message.id) time.sleep(0.01) else: route_chunk = cast(VarGenerateRouteChunk, route_chunk) @@ -488,14 +485,6 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc if route_chunk_node_id == 'sys': # system variable value = self._workflow_system_variables.get(SystemVariable.value_of(value_selector[1])) - # new_value = [] - # if isinstance(value, list): - # for item in value: - # if isinstance(item, FileVar): - # new_value.append(item.to_dict()) - # - # if new_value: - # value = new_value else: # check chunk node id is before current node id or equal to current node id if route_chunk_node_id not in self._task_state.ran_node_execution_infos: @@ -568,11 +557,8 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc text = json.dumps(value, ensure_ascii=False) if text: - self._queue_manager.publish( - QueueTextChunkEvent( - text=text - ), PublishFrom.TASK_PIPELINE - ) + self._task_state.answer += text + yield self._message_to_stream_response(text, self._message.id) self._task_state.current_stream_generate_state.current_route_position += 1 diff --git a/api/core/workflow/nodes/knowledge_retrieval/structed_multi_dataset_router_agent.py b/api/core/workflow/nodes/knowledge_retrieval/structed_multi_dataset_router_agent.py index 75c293c0e7..f694a01346 100644 --- a/api/core/workflow/nodes/knowledge_retrieval/structed_multi_dataset_router_agent.py +++ b/api/core/workflow/nodes/knowledge_retrieval/structed_multi_dataset_router_agent.py @@ -1,13 +1,14 @@ -from collections.abc import Sequence -from typing import Optional, Union, Generator +from collections.abc import Generator, Sequence +from typing import Optional, Union from langchain import PromptTemplate from langchain.agents.structured_chat.base import HUMAN_MESSAGE_TEMPLATE from langchain.agents.structured_chat.prompt import PREFIX, SUFFIX + from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity from core.model_manager import ModelInstance from core.model_runtime.entities.llm_entities import LLMUsage -from core.model_runtime.entities.message_entities import PromptMessageTool, PromptMessageRole, PromptMessage +from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool from core.prompt.advanced_prompt_transform import AdvancedPromptTransform from core.prompt.entities.advanced_prompt_entities import ChatModelMessage from core.workflow.nodes.knowledge_retrieval.entities import KnowledgeRetrievalNodeData