fix stream output

This commit is contained in:
takatost 2024-03-19 20:34:43 +08:00
parent 45017f3f35
commit 0183651cd5
2 changed files with 13 additions and 26 deletions

View File

@ -117,8 +117,6 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
created_at=int(self._message.created_at.timestamp()),
stream_response=stream_response
)
# yield "data: " + json.dumps(response) + "\n\n"
else:
return self._process_blocking_response()
@ -239,7 +237,9 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
workflow_node_execution = self._handle_node_finished(event)
# stream outputs when node finished
self._generate_stream_outputs_when_node_finished()
generator = self._generate_stream_outputs_when_node_finished()
if generator:
yield from generator
yield self._workflow_node_finish_to_stream_response(
task_id=self._application_generate_entity.task_id,
@ -459,13 +459,13 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
self._task_state.current_stream_generate_state.generate_route):
self._task_state.current_stream_generate_state = None
def _generate_stream_outputs_when_node_finished(self) -> None:
def _generate_stream_outputs_when_node_finished(self) -> Optional[Generator]:
"""
Generate stream outputs.
:return:
"""
if not self._task_state.current_stream_generate_state:
return
return None
route_chunks = self._task_state.current_stream_generate_state.generate_route[
self._task_state.current_stream_generate_state.current_route_position:]
@ -474,11 +474,8 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
if route_chunk.type == 'text':
route_chunk = cast(TextGenerateRouteChunk, route_chunk)
for token in route_chunk.text:
self._queue_manager.publish(
QueueTextChunkEvent(
text=token
), PublishFrom.TASK_PIPELINE
)
self._task_state.answer += token
yield self._message_to_stream_response(token, self._message.id)
time.sleep(0.01)
else:
route_chunk = cast(VarGenerateRouteChunk, route_chunk)
@ -488,14 +485,6 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
if route_chunk_node_id == 'sys':
# system variable
value = self._workflow_system_variables.get(SystemVariable.value_of(value_selector[1]))
# new_value = []
# if isinstance(value, list):
# for item in value:
# if isinstance(item, FileVar):
# new_value.append(item.to_dict())
#
# if new_value:
# value = new_value
else:
# check chunk node id is before current node id or equal to current node id
if route_chunk_node_id not in self._task_state.ran_node_execution_infos:
@ -568,11 +557,8 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
text = json.dumps(value, ensure_ascii=False)
if text:
self._queue_manager.publish(
QueueTextChunkEvent(
text=text
), PublishFrom.TASK_PIPELINE
)
self._task_state.answer += text
yield self._message_to_stream_response(text, self._message.id)
self._task_state.current_stream_generate_state.current_route_position += 1

View File

@ -1,13 +1,14 @@
from collections.abc import Sequence
from typing import Optional, Union, Generator
from collections.abc import Generator, Sequence
from typing import Optional, Union
from langchain import PromptTemplate
from langchain.agents.structured_chat.base import HUMAN_MESSAGE_TEMPLATE
from langchain.agents.structured_chat.prompt import PREFIX, SUFFIX
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
from core.model_manager import ModelInstance
from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.entities.message_entities import PromptMessageTool, PromptMessageRole, PromptMessage
from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool
from core.prompt.advanced_prompt_transform import AdvancedPromptTransform
from core.prompt.entities.advanced_prompt_entities import ChatModelMessage
from core.workflow.nodes.knowledge_retrieval.entities import KnowledgeRetrievalNodeData