fix service api blocking mode

This commit is contained in:
takatost 2024-03-20 21:55:06 +08:00
parent a0dde6e4da
commit c3e7299494
13 changed files with 262 additions and 263 deletions

View File

@ -36,15 +36,18 @@ class WorkflowRunApi(Resource):
parser = reqparse.RequestParser()
parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
parser.add_argument('files', type=list, required=False, location='json')
parser.add_argument('response_mode', type=str, choices=['blocking', 'streaming'], location='json')
args = parser.parse_args()
streaming = args.get('response_mode') == 'streaming'
try:
response = AppGenerateService.generate(
app_model=app_model,
user=end_user,
args=args,
invoke_from=InvokeFrom.SERVICE_API,
streaming=True
streaming=streaming
)
return helper.compact_generate_response(response)

View File

@ -6,6 +6,7 @@ from core.app.apps.base_app_generate_response_converter import AppGenerateRespon
from core.app.entities.task_entities import (
ChatbotAppBlockingResponse,
ChatbotAppStreamResponse,
ErrorStreamResponse,
MessageEndStreamResponse,
PingStreamResponse,
)
@ -72,7 +73,11 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter):
'created_at': chunk.created_at
}
response_chunk.update(sub_stream_response.to_dict())
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
else:
response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk)
@classmethod
@ -98,10 +103,15 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter):
'created_at': chunk.created_at
}
sub_stream_response_dict = sub_stream_response.to_dict()
if isinstance(sub_stream_response, MessageEndStreamResponse):
sub_stream_response_dict = sub_stream_response.to_dict()
metadata = sub_stream_response_dict.get('metadata', {})
sub_stream_response_dict['metadata'] = cls._get_simple_metadata(metadata)
response_chunk.update(sub_stream_response_dict)
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
else:
response_chunk.update(sub_stream_response.to_dict())
response_chunk.update(sub_stream_response_dict)
yield json.dumps(response_chunk)

View File

@ -28,8 +28,10 @@ from core.app.entities.task_entities import (
AdvancedChatTaskState,
ChatbotAppBlockingResponse,
ChatbotAppStreamResponse,
ErrorStreamResponse,
MessageEndStreamResponse,
StreamGenerateRoute,
StreamResponse,
)
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.task_pipeline.message_cycle_manage import MessageCycleManage
@ -94,10 +96,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
usage=LLMUsage.empty_usage()
)
if stream:
self._stream_generate_routes = self._get_stream_generate_routes()
else:
self._stream_generate_routes = None
self._stream_generate_routes = self._get_stream_generate_routes()
def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
"""
@ -108,100 +107,58 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
db.session.refresh(self._user)
db.session.close()
generator = self._process_stream_response()
if self._stream:
generator = self._process_stream_response()
for stream_response in generator:
yield ChatbotAppStreamResponse(
conversation_id=self._conversation.id,
message_id=self._message.id,
created_at=int(self._message.created_at.timestamp()),
stream_response=stream_response
)
return self._to_stream_response(generator)
else:
return self._process_blocking_response()
return self._to_blocking_response(generator)
def _process_blocking_response(self) -> ChatbotAppBlockingResponse:
def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) \
-> ChatbotAppBlockingResponse:
"""
Process blocking response.
:return:
"""
for queue_message in self._queue_manager.listen():
event = queue_message.event
for stream_response in generator:
if isinstance(stream_response, ErrorStreamResponse):
raise stream_response.err
elif isinstance(stream_response, MessageEndStreamResponse):
extras = {}
if stream_response.metadata:
extras['metadata'] = stream_response.metadata
if isinstance(event, QueueErrorEvent):
err = self._handle_error(event)
raise err
elif isinstance(event, QueueRetrieverResourcesEvent):
self._handle_retriever_resources(event)
elif isinstance(event, QueueAnnotationReplyEvent):
self._handle_annotation_reply(event)
elif isinstance(event, QueueWorkflowStartedEvent):
self._handle_workflow_start()
elif isinstance(event, QueueNodeStartedEvent):
self._handle_node_start(event)
elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent):
self._handle_node_finished(event)
elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent):
workflow_run = self._handle_workflow_finished(event)
if workflow_run and workflow_run.status == WorkflowRunStatus.FAILED.value:
raise self._handle_error(QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}')))
# handle output moderation
output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer)
if output_moderation_answer:
self._task_state.answer = output_moderation_answer
# Save message
self._save_message()
return self._to_blocking_response()
elif isinstance(event, QueueTextChunkEvent):
delta_text = event.text
if delta_text is None:
continue
if not self._is_stream_out_support(
event=event
):
continue
# handle output moderation chunk
should_direct_answer = self._handle_output_moderation_chunk(delta_text)
if should_direct_answer:
continue
self._task_state.answer += delta_text
return ChatbotAppBlockingResponse(
task_id=stream_response.task_id,
data=ChatbotAppBlockingResponse.Data(
id=self._message.id,
mode=self._conversation.mode,
conversation_id=self._conversation.id,
message_id=self._message.id,
answer=self._task_state.answer,
created_at=int(self._message.created_at.timestamp()),
**extras
)
)
else:
continue
raise Exception('Queue listening stopped unexpectedly.')
def _to_blocking_response(self) -> ChatbotAppBlockingResponse:
def _to_stream_response(self, generator: Generator[StreamResponse, None, None]) \
-> Generator[ChatbotAppStreamResponse, None, None]:
"""
To blocking response.
To stream response.
:return:
"""
extras = {}
if self._task_state.metadata:
extras['metadata'] = self._task_state.metadata
response = ChatbotAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
data=ChatbotAppBlockingResponse.Data(
id=self._message.id,
mode=self._conversation.mode,
for stream_response in generator:
yield ChatbotAppStreamResponse(
conversation_id=self._conversation.id,
message_id=self._message.id,
answer=self._task_state.answer,
created_at=int(self._message.created_at.timestamp()),
**extras
stream_response=stream_response
)
)
return response
def _process_stream_response(self) -> Generator:
def _process_stream_response(self) -> Generator[StreamResponse, None, None]:
"""
Process stream response.
:return:

View File

@ -41,6 +41,9 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
:param invoke_from: invoke from source
:param stream: is stream
"""
if not stream:
raise ValueError('Agent Chat App does not support blocking mode')
if not args.get('query'):
raise ValueError('query is required')

View File

@ -6,6 +6,7 @@ from core.app.apps.base_app_generate_response_converter import AppGenerateRespon
from core.app.entities.task_entities import (
ChatbotAppBlockingResponse,
ChatbotAppStreamResponse,
ErrorStreamResponse,
MessageEndStreamResponse,
PingStreamResponse,
)
@ -72,7 +73,11 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter):
'created_at': chunk.created_at
}
response_chunk.update(sub_stream_response.to_dict())
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
else:
response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk)
@classmethod
@ -98,10 +103,15 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter):
'created_at': chunk.created_at
}
sub_stream_response_dict = sub_stream_response.to_dict()
if isinstance(sub_stream_response, MessageEndStreamResponse):
sub_stream_response_dict = sub_stream_response.to_dict()
metadata = sub_stream_response_dict.get('metadata', {})
sub_stream_response_dict['metadata'] = cls._get_simple_metadata(metadata)
response_chunk.update(sub_stream_response_dict)
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
else:
response_chunk.update(sub_stream_response.to_dict())
response_chunk.update(sub_stream_response_dict)
yield json.dumps(response_chunk)

View File

@ -1,9 +1,12 @@
import logging
from abc import ABC, abstractmethod
from collections.abc import Generator
from typing import Union
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.task_entities import AppBlockingResponse, AppStreamResponse
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeError
class AppGenerateResponseConverter(ABC):
@ -17,18 +20,24 @@ class AppGenerateResponseConverter(ABC):
dict,
Generator[str, None, None]
]:
if invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE]:
if invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.SERVICE_API]:
if isinstance(response, cls._blocking_response_type):
return cls.convert_blocking_full_response(response)
else:
for chunk in cls.convert_stream_full_response(response):
yield f'data: {chunk}\n\n'
def _generate():
for chunk in cls.convert_stream_full_response(response):
yield f'data: {chunk}\n\n'
return _generate()
else:
if isinstance(response, cls._blocking_response_type):
return cls.convert_blocking_simple_response(response)
else:
for chunk in cls.convert_stream_simple_response(response):
yield f'data: {chunk}\n\n'
def _generate():
for chunk in cls.convert_stream_simple_response(response):
yield f'data: {chunk}\n\n'
return _generate()
@classmethod
@abstractmethod
@ -79,4 +88,42 @@ class AppGenerateResponseConverter(ABC):
if 'usage' in metadata:
del metadata['usage']
return metadata
return metadata
@classmethod
def _error_to_stream_response(cls, e: Exception) -> dict:
"""
Error to stream response.
:param e: exception
:return:
"""
error_responses = {
ValueError: {'code': 'invalid_param', 'status': 400},
ProviderTokenNotInitError: {'code': 'provider_not_initialize', 'status': 400},
QuotaExceededError: {
'code': 'provider_quota_exceeded',
'message': "Your quota for Dify Hosted Model Provider has been exhausted. "
"Please go to Settings -> Model Provider to complete your own provider credentials.",
'status': 400
},
ModelCurrentlyNotSupportError: {'code': 'model_currently_not_support', 'status': 400},
InvokeError: {'code': 'completion_request_error', 'status': 400}
}
# Determine the response based on the type of exception
data = None
for k, v in error_responses.items():
if isinstance(e, k):
data = v
if data:
data.setdefault('message', getattr(e, 'description', str(e)))
else:
logging.error(e)
data = {
'code': 'internal_server_error',
'message': 'Internal Server Error, please contact support.',
'status': 500
}
return data

View File

@ -6,6 +6,7 @@ from core.app.apps.base_app_generate_response_converter import AppGenerateRespon
from core.app.entities.task_entities import (
ChatbotAppBlockingResponse,
ChatbotAppStreamResponse,
ErrorStreamResponse,
MessageEndStreamResponse,
PingStreamResponse,
)
@ -72,7 +73,11 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter):
'created_at': chunk.created_at
}
response_chunk.update(sub_stream_response.to_dict())
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
else:
response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk)
@classmethod
@ -98,10 +103,15 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter):
'created_at': chunk.created_at
}
sub_stream_response_dict = sub_stream_response.to_dict()
if isinstance(sub_stream_response, MessageEndStreamResponse):
sub_stream_response_dict = sub_stream_response.to_dict()
metadata = sub_stream_response_dict.get('metadata', {})
sub_stream_response_dict['metadata'] = cls._get_simple_metadata(metadata)
response_chunk.update(sub_stream_response_dict)
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
else:
response_chunk.update(sub_stream_response.to_dict())
response_chunk.update(sub_stream_response_dict)
yield json.dumps(response_chunk)

View File

@ -6,6 +6,7 @@ from core.app.apps.base_app_generate_response_converter import AppGenerateRespon
from core.app.entities.task_entities import (
CompletionAppBlockingResponse,
CompletionAppStreamResponse,
ErrorStreamResponse,
MessageEndStreamResponse,
PingStreamResponse,
)
@ -70,7 +71,11 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter):
'created_at': chunk.created_at
}
response_chunk.update(sub_stream_response.to_dict())
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
else:
response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk)
@classmethod
@ -95,10 +100,15 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter):
'created_at': chunk.created_at
}
sub_stream_response_dict = sub_stream_response.to_dict()
if isinstance(sub_stream_response, MessageEndStreamResponse):
sub_stream_response_dict = sub_stream_response.to_dict()
metadata = sub_stream_response_dict.get('metadata', {})
sub_stream_response_dict['metadata'] = cls._get_simple_metadata(metadata)
response_chunk.update(sub_stream_response_dict)
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
else:
response_chunk.update(sub_stream_response.to_dict())
response_chunk.update(sub_stream_response_dict)
yield json.dumps(response_chunk)

View File

@ -4,6 +4,7 @@ from typing import cast
from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter
from core.app.entities.task_entities import (
ErrorStreamResponse,
PingStreamResponse,
WorkflowAppBlockingResponse,
WorkflowAppStreamResponse,
@ -52,7 +53,11 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter):
'workflow_run_id': chunk.workflow_run_id,
}
response_chunk.update(sub_stream_response.to_dict())
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
else:
response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk)
@classmethod

View File

@ -21,10 +21,13 @@ from core.app.entities.queue_entities import (
QueueWorkflowSucceededEvent,
)
from core.app.entities.task_entities import (
ErrorStreamResponse,
StreamResponse,
TextChunkStreamResponse,
TextReplaceStreamResponse,
WorkflowAppBlockingResponse,
WorkflowAppStreamResponse,
WorkflowFinishStreamResponse,
WorkflowTaskState,
)
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
@ -84,71 +87,61 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
db.session.refresh(self._user)
db.session.close()
generator = self._process_stream_response()
if self._stream:
generator = self._process_stream_response()
for stream_response in generator:
yield WorkflowAppStreamResponse(
workflow_run_id=self._task_state.workflow_run_id,
stream_response=stream_response
)
return self._to_stream_response(generator)
else:
return self._process_blocking_response()
return self._to_blocking_response(generator)
def _process_blocking_response(self) -> WorkflowAppBlockingResponse:
def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) \
-> WorkflowAppBlockingResponse:
"""
Process blocking response.
To blocking response.
:return:
"""
for queue_message in self._queue_manager.listen():
event = queue_message.event
for stream_response in generator:
if isinstance(stream_response, ErrorStreamResponse):
raise stream_response.err
elif isinstance(stream_response, WorkflowFinishStreamResponse):
workflow_run = db.session.query(WorkflowRun).filter(
WorkflowRun.id == self._task_state.workflow_run_id).first()
if isinstance(event, QueueErrorEvent):
err = self._handle_error(event)
raise err
elif isinstance(event, QueueWorkflowStartedEvent):
self._handle_workflow_start()
elif isinstance(event, QueueNodeStartedEvent):
self._handle_node_start(event)
elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent):
self._handle_node_finished(event)
elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent):
workflow_run = self._handle_workflow_finished(event)
response = WorkflowAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
workflow_run_id=workflow_run.id,
data=WorkflowAppBlockingResponse.Data(
id=workflow_run.id,
workflow_id=workflow_run.workflow_id,
status=workflow_run.status,
outputs=workflow_run.outputs_dict,
error=workflow_run.error,
elapsed_time=workflow_run.elapsed_time,
total_tokens=workflow_run.total_tokens,
total_steps=workflow_run.total_steps,
created_at=int(workflow_run.created_at.timestamp()),
finished_at=int(workflow_run.finished_at.timestamp())
)
)
# save workflow app log
self._save_workflow_app_log(workflow_run)
return self._to_blocking_response(workflow_run)
return response
else:
continue
raise Exception('Queue listening stopped unexpectedly.')
def _to_blocking_response(self, workflow_run: WorkflowRun) -> WorkflowAppBlockingResponse:
def _to_stream_response(self, generator: Generator[StreamResponse, None, None]) \
-> Generator[WorkflowAppStreamResponse, None, None]:
"""
To blocking response.
:param workflow_run: workflow run
To stream response.
:return:
"""
response = WorkflowAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
workflow_run_id=workflow_run.id,
data=WorkflowAppBlockingResponse.Data(
id=workflow_run.id,
workflow_id=workflow_run.workflow_id,
status=workflow_run.status,
outputs=workflow_run.outputs_dict,
error=workflow_run.error,
elapsed_time=workflow_run.elapsed_time,
total_tokens=workflow_run.total_tokens,
total_steps=workflow_run.total_steps,
created_at=int(workflow_run.created_at.timestamp()),
finished_at=int(workflow_run.finished_at.timestamp())
for stream_response in generator:
yield WorkflowAppStreamResponse(
workflow_run_id=self._task_state.workflow_run_id,
stream_response=stream_response
)
)
return response
def _process_stream_response(self) -> Generator:
def _process_stream_response(self) -> Generator[StreamResponse, None, None]:
"""
Process stream response.
:return:

View File

@ -101,9 +101,10 @@ class ErrorStreamResponse(StreamResponse):
ErrorStreamResponse entity
"""
event: StreamEvent = StreamEvent.ERROR
code: str
status: int
message: Optional[str] = None
err: Exception
class Config:
arbitrary_types_allowed = True
class MessageStreamResponse(StreamResponse):

View File

@ -14,7 +14,6 @@ from core.app.entities.task_entities import (
PingStreamResponse,
TaskState,
)
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.moderation.output_moderation import ModerationRule, OutputModeration
from models.account import Account
@ -71,38 +70,9 @@ class BasedGenerateTaskPipeline:
:param e: exception
:return:
"""
error_responses = {
ValueError: {'code': 'invalid_param', 'status': 400},
ProviderTokenNotInitError: {'code': 'provider_not_initialize', 'status': 400},
QuotaExceededError: {
'code': 'provider_quota_exceeded',
'message': "Your quota for Dify Hosted Model Provider has been exhausted. "
"Please go to Settings -> Model Provider to complete your own provider credentials.",
'status': 400
},
ModelCurrentlyNotSupportError: {'code': 'model_currently_not_support', 'status': 400},
InvokeError: {'code': 'completion_request_error', 'status': 400}
}
# Determine the response based on the type of exception
data = None
for k, v in error_responses.items():
if isinstance(e, k):
data = v
if data:
data.setdefault('message', getattr(e, 'description', str(e)))
else:
logging.error(e)
data = {
'code': 'internal_server_error',
'message': 'Internal Server Error, please contact support.',
'status': 500
}
return ErrorStreamResponse(
task_id=self._application_generate_entity.task_id,
**data
err=e
)
def _ping_stream_response(self) -> PingStreamResponse:

View File

@ -30,7 +30,9 @@ from core.app.entities.task_entities import (
CompletionAppBlockingResponse,
CompletionAppStreamResponse,
EasyUITaskState,
ErrorStreamResponse,
MessageEndStreamResponse,
StreamResponse,
)
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.task_pipeline.message_cycle_manage import MessageCycleManage
@ -107,67 +109,84 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline, MessageCycleMan
db.session.refresh(self._message)
db.session.close()
generator = self._process_stream_response()
if self._stream:
generator = self._process_stream_response()
for stream_response in generator:
if isinstance(self._application_generate_entity, CompletionAppGenerateEntity):
yield CompletionAppStreamResponse(
message_id=self._message.id,
created_at=int(self._message.created_at.timestamp()),
stream_response=stream_response
)
else:
yield ChatbotAppStreamResponse(
conversation_id=self._conversation.id,
message_id=self._message.id,
created_at=int(self._message.created_at.timestamp()),
stream_response=stream_response
)
# yield "data: " + json.dumps(response) + "\n\n"
return self._to_stream_response(generator)
else:
return self._process_blocking_response()
return self._to_blocking_response(generator)
def _process_blocking_response(self) -> Union[ChatbotAppBlockingResponse, CompletionAppBlockingResponse]:
def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) -> Union[
ChatbotAppBlockingResponse,
CompletionAppBlockingResponse
]:
"""
Process blocking response.
:return:
"""
for queue_message in self._queue_manager.listen():
event = queue_message.event
for stream_response in generator:
if isinstance(stream_response, ErrorStreamResponse):
raise stream_response.err
elif isinstance(stream_response, MessageEndStreamResponse):
extras = {
'usage': jsonable_encoder(self._task_state.llm_result.usage)
}
if self._task_state.metadata:
extras['metadata'] = self._task_state.metadata
if isinstance(event, QueueErrorEvent):
err = self._handle_error(event)
raise err
elif isinstance(event, QueueRetrieverResourcesEvent):
self._handle_retriever_resources(event)
elif isinstance(event, QueueAnnotationReplyEvent):
annotation = self._handle_annotation_reply(event)
if annotation:
self._task_state.llm_result.message.content = annotation.content
elif isinstance(event, QueueStopEvent | QueueMessageEndEvent):
if isinstance(event, QueueMessageEndEvent):
self._task_state.llm_result = event.llm_result
if self._conversation.mode == AppMode.COMPLETION.value:
response = CompletionAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
data=CompletionAppBlockingResponse.Data(
id=self._message.id,
mode=self._conversation.mode,
message_id=self._message.id,
answer=self._task_state.llm_result.message.content,
created_at=int(self._message.created_at.timestamp()),
**extras
)
)
else:
self._handle_stop(event)
response = ChatbotAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
data=ChatbotAppBlockingResponse.Data(
id=self._message.id,
mode=self._conversation.mode,
conversation_id=self._conversation.id,
message_id=self._message.id,
answer=self._task_state.llm_result.message.content,
created_at=int(self._message.created_at.timestamp()),
**extras
)
)
# handle output moderation
output_moderation_answer = self._handle_output_moderation_when_task_finished(
self._task_state.llm_result.message.content
)
if output_moderation_answer:
self._task_state.llm_result.message.content = output_moderation_answer
# Save message
self._save_message()
return self._to_blocking_response()
return response
else:
continue
raise Exception('Queue listening stopped unexpectedly.')
def _process_stream_response(self) -> Generator:
def _to_stream_response(self, generator: Generator[StreamResponse, None, None]) \
-> Generator[Union[ChatbotAppStreamResponse, CompletionAppStreamResponse], None, None]:
"""
To stream response.
:return:
"""
for stream_response in generator:
if isinstance(self._application_generate_entity, CompletionAppGenerateEntity):
yield CompletionAppStreamResponse(
message_id=self._message.id,
created_at=int(self._message.created_at.timestamp()),
stream_response=stream_response
)
else:
yield ChatbotAppStreamResponse(
conversation_id=self._conversation.id,
message_id=self._message.id,
created_at=int(self._message.created_at.timestamp()),
stream_response=stream_response
)
def _process_stream_response(self) -> Generator[StreamResponse, None, None]:
"""
Process stream response.
:return:
@ -313,45 +332,6 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline, MessageCycleMan
completion_tokens
)
def _to_blocking_response(self) -> ChatbotAppBlockingResponse:
"""
To blocking response.
:return:
"""
self._task_state.metadata['usage'] = jsonable_encoder(self._task_state.llm_result.usage)
extras = {}
if self._task_state.metadata:
extras['metadata'] = self._task_state.metadata
if self._conversation.mode != AppMode.COMPLETION.value:
response = CompletionAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
data=CompletionAppBlockingResponse.Data(
id=self._message.id,
mode=self._conversation.mode,
message_id=self._message.id,
answer=self._task_state.llm_result.message.content,
created_at=int(self._message.created_at.timestamp()),
**extras
)
)
else:
response = ChatbotAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
data=ChatbotAppBlockingResponse.Data(
id=self._message.id,
mode=self._conversation.mode,
conversation_id=self._conversation.id,
message_id=self._message.id,
answer=self._task_state.llm_result.message.content,
created_at=int(self._message.created_at.timestamp()),
**extras
)
)
return response
def _message_end_to_stream_response(self) -> MessageEndStreamResponse:
"""
Message end to stream response.