diff --git a/api/controllers/service_api/app/workflow.py b/api/controllers/service_api/app/workflow.py index 511a7e5a45..2830530db5 100644 --- a/api/controllers/service_api/app/workflow.py +++ b/api/controllers/service_api/app/workflow.py @@ -36,15 +36,18 @@ class WorkflowRunApi(Resource): parser = reqparse.RequestParser() parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json') parser.add_argument('files', type=list, required=False, location='json') + parser.add_argument('response_mode', type=str, choices=['blocking', 'streaming'], location='json') args = parser.parse_args() + streaming = args.get('response_mode') == 'streaming' + try: response = AppGenerateService.generate( app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.SERVICE_API, - streaming=True + streaming=streaming ) return helper.compact_generate_response(response) diff --git a/api/core/app/apps/advanced_chat/generate_response_converter.py b/api/core/app/apps/advanced_chat/generate_response_converter.py index d211db9511..80e8e22e88 100644 --- a/api/core/app/apps/advanced_chat/generate_response_converter.py +++ b/api/core/app/apps/advanced_chat/generate_response_converter.py @@ -6,6 +6,7 @@ from core.app.apps.base_app_generate_response_converter import AppGenerateRespon from core.app.entities.task_entities import ( ChatbotAppBlockingResponse, ChatbotAppStreamResponse, + ErrorStreamResponse, MessageEndStreamResponse, PingStreamResponse, ) @@ -72,7 +73,11 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter): 'created_at': chunk.created_at } - response_chunk.update(sub_stream_response.to_dict()) + if isinstance(sub_stream_response, ErrorStreamResponse): + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + else: + response_chunk.update(sub_stream_response.to_dict()) yield json.dumps(response_chunk) @classmethod @@ -98,10 +103,15 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter): 'created_at': chunk.created_at } - sub_stream_response_dict = sub_stream_response.to_dict() if isinstance(sub_stream_response, MessageEndStreamResponse): + sub_stream_response_dict = sub_stream_response.to_dict() metadata = sub_stream_response_dict.get('metadata', {}) sub_stream_response_dict['metadata'] = cls._get_simple_metadata(metadata) + response_chunk.update(sub_stream_response_dict) + if isinstance(sub_stream_response, ErrorStreamResponse): + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + else: + response_chunk.update(sub_stream_response.to_dict()) - response_chunk.update(sub_stream_response_dict) yield json.dumps(response_chunk) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index d3c9f6e812..85b00a98fd 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -28,8 +28,10 @@ from core.app.entities.task_entities import ( AdvancedChatTaskState, ChatbotAppBlockingResponse, ChatbotAppStreamResponse, + ErrorStreamResponse, MessageEndStreamResponse, StreamGenerateRoute, + StreamResponse, ) from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.app.task_pipeline.message_cycle_manage import MessageCycleManage @@ -94,10 +96,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc usage=LLMUsage.empty_usage() ) - if stream: - self._stream_generate_routes = self._get_stream_generate_routes() - else: - self._stream_generate_routes = None + self._stream_generate_routes = self._get_stream_generate_routes() def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]: """ @@ -108,100 +107,58 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc db.session.refresh(self._user) db.session.close() + generator = self._process_stream_response() if self._stream: - generator = self._process_stream_response() - for stream_response in generator: - yield ChatbotAppStreamResponse( - conversation_id=self._conversation.id, - message_id=self._message.id, - created_at=int(self._message.created_at.timestamp()), - stream_response=stream_response - ) + return self._to_stream_response(generator) else: - return self._process_blocking_response() + return self._to_blocking_response(generator) - def _process_blocking_response(self) -> ChatbotAppBlockingResponse: + def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) \ + -> ChatbotAppBlockingResponse: """ Process blocking response. :return: """ - for queue_message in self._queue_manager.listen(): - event = queue_message.event + for stream_response in generator: + if isinstance(stream_response, ErrorStreamResponse): + raise stream_response.err + elif isinstance(stream_response, MessageEndStreamResponse): + extras = {} + if stream_response.metadata: + extras['metadata'] = stream_response.metadata - if isinstance(event, QueueErrorEvent): - err = self._handle_error(event) - raise err - elif isinstance(event, QueueRetrieverResourcesEvent): - self._handle_retriever_resources(event) - elif isinstance(event, QueueAnnotationReplyEvent): - self._handle_annotation_reply(event) - elif isinstance(event, QueueWorkflowStartedEvent): - self._handle_workflow_start() - elif isinstance(event, QueueNodeStartedEvent): - self._handle_node_start(event) - elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent): - self._handle_node_finished(event) - elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent): - workflow_run = self._handle_workflow_finished(event) - - if workflow_run and workflow_run.status == WorkflowRunStatus.FAILED.value: - raise self._handle_error(QueueErrorEvent(error=ValueError(f'Run failed: {workflow_run.error}'))) - - # handle output moderation - output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer) - if output_moderation_answer: - self._task_state.answer = output_moderation_answer - - # Save message - self._save_message() - - return self._to_blocking_response() - elif isinstance(event, QueueTextChunkEvent): - delta_text = event.text - if delta_text is None: - continue - - if not self._is_stream_out_support( - event=event - ): - continue - - # handle output moderation chunk - should_direct_answer = self._handle_output_moderation_chunk(delta_text) - if should_direct_answer: - continue - - self._task_state.answer += delta_text + return ChatbotAppBlockingResponse( + task_id=stream_response.task_id, + data=ChatbotAppBlockingResponse.Data( + id=self._message.id, + mode=self._conversation.mode, + conversation_id=self._conversation.id, + message_id=self._message.id, + answer=self._task_state.answer, + created_at=int(self._message.created_at.timestamp()), + **extras + ) + ) else: continue raise Exception('Queue listening stopped unexpectedly.') - def _to_blocking_response(self) -> ChatbotAppBlockingResponse: + def _to_stream_response(self, generator: Generator[StreamResponse, None, None]) \ + -> Generator[ChatbotAppStreamResponse, None, None]: """ - To blocking response. + To stream response. :return: """ - extras = {} - if self._task_state.metadata: - extras['metadata'] = self._task_state.metadata - - response = ChatbotAppBlockingResponse( - task_id=self._application_generate_entity.task_id, - data=ChatbotAppBlockingResponse.Data( - id=self._message.id, - mode=self._conversation.mode, + for stream_response in generator: + yield ChatbotAppStreamResponse( conversation_id=self._conversation.id, message_id=self._message.id, - answer=self._task_state.answer, created_at=int(self._message.created_at.timestamp()), - **extras + stream_response=stream_response ) - ) - return response - - def _process_stream_response(self) -> Generator: + def _process_stream_response(self) -> Generator[StreamResponse, None, None]: """ Process stream response. :return: diff --git a/api/core/app/apps/agent_chat/app_generator.py b/api/core/app/apps/agent_chat/app_generator.py index 2ce36ad056..54e94b71c4 100644 --- a/api/core/app/apps/agent_chat/app_generator.py +++ b/api/core/app/apps/agent_chat/app_generator.py @@ -41,6 +41,9 @@ class AgentChatAppGenerator(MessageBasedAppGenerator): :param invoke_from: invoke from source :param stream: is stream """ + if not stream: + raise ValueError('Agent Chat App does not support blocking mode') + if not args.get('query'): raise ValueError('query is required') diff --git a/api/core/app/apps/agent_chat/generate_response_converter.py b/api/core/app/apps/agent_chat/generate_response_converter.py index bd91c5269e..118d82c495 100644 --- a/api/core/app/apps/agent_chat/generate_response_converter.py +++ b/api/core/app/apps/agent_chat/generate_response_converter.py @@ -6,6 +6,7 @@ from core.app.apps.base_app_generate_response_converter import AppGenerateRespon from core.app.entities.task_entities import ( ChatbotAppBlockingResponse, ChatbotAppStreamResponse, + ErrorStreamResponse, MessageEndStreamResponse, PingStreamResponse, ) @@ -72,7 +73,11 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter): 'created_at': chunk.created_at } - response_chunk.update(sub_stream_response.to_dict()) + if isinstance(sub_stream_response, ErrorStreamResponse): + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + else: + response_chunk.update(sub_stream_response.to_dict()) yield json.dumps(response_chunk) @classmethod @@ -98,10 +103,15 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter): 'created_at': chunk.created_at } - sub_stream_response_dict = sub_stream_response.to_dict() if isinstance(sub_stream_response, MessageEndStreamResponse): + sub_stream_response_dict = sub_stream_response.to_dict() metadata = sub_stream_response_dict.get('metadata', {}) sub_stream_response_dict['metadata'] = cls._get_simple_metadata(metadata) + response_chunk.update(sub_stream_response_dict) + if isinstance(sub_stream_response, ErrorStreamResponse): + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + else: + response_chunk.update(sub_stream_response.to_dict()) - response_chunk.update(sub_stream_response_dict) yield json.dumps(response_chunk) diff --git a/api/core/app/apps/base_app_generate_response_converter.py b/api/core/app/apps/base_app_generate_response_converter.py index cbc07b1c70..7202822975 100644 --- a/api/core/app/apps/base_app_generate_response_converter.py +++ b/api/core/app/apps/base_app_generate_response_converter.py @@ -1,9 +1,12 @@ +import logging from abc import ABC, abstractmethod from collections.abc import Generator from typing import Union from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.task_entities import AppBlockingResponse, AppStreamResponse +from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError +from core.model_runtime.errors.invoke import InvokeError class AppGenerateResponseConverter(ABC): @@ -17,18 +20,24 @@ class AppGenerateResponseConverter(ABC): dict, Generator[str, None, None] ]: - if invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE]: + if invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.SERVICE_API]: if isinstance(response, cls._blocking_response_type): return cls.convert_blocking_full_response(response) else: - for chunk in cls.convert_stream_full_response(response): - yield f'data: {chunk}\n\n' + def _generate(): + for chunk in cls.convert_stream_full_response(response): + yield f'data: {chunk}\n\n' + + return _generate() else: if isinstance(response, cls._blocking_response_type): return cls.convert_blocking_simple_response(response) else: - for chunk in cls.convert_stream_simple_response(response): - yield f'data: {chunk}\n\n' + def _generate(): + for chunk in cls.convert_stream_simple_response(response): + yield f'data: {chunk}\n\n' + + return _generate() @classmethod @abstractmethod @@ -79,4 +88,42 @@ class AppGenerateResponseConverter(ABC): if 'usage' in metadata: del metadata['usage'] - return metadata \ No newline at end of file + return metadata + + @classmethod + def _error_to_stream_response(cls, e: Exception) -> dict: + """ + Error to stream response. + :param e: exception + :return: + """ + error_responses = { + ValueError: {'code': 'invalid_param', 'status': 400}, + ProviderTokenNotInitError: {'code': 'provider_not_initialize', 'status': 400}, + QuotaExceededError: { + 'code': 'provider_quota_exceeded', + 'message': "Your quota for Dify Hosted Model Provider has been exhausted. " + "Please go to Settings -> Model Provider to complete your own provider credentials.", + 'status': 400 + }, + ModelCurrentlyNotSupportError: {'code': 'model_currently_not_support', 'status': 400}, + InvokeError: {'code': 'completion_request_error', 'status': 400} + } + + # Determine the response based on the type of exception + data = None + for k, v in error_responses.items(): + if isinstance(e, k): + data = v + + if data: + data.setdefault('message', getattr(e, 'description', str(e))) + else: + logging.error(e) + data = { + 'code': 'internal_server_error', + 'message': 'Internal Server Error, please contact support.', + 'status': 500 + } + + return data diff --git a/api/core/app/apps/chat/generate_response_converter.py b/api/core/app/apps/chat/generate_response_converter.py index 898561e01a..625e14c9c3 100644 --- a/api/core/app/apps/chat/generate_response_converter.py +++ b/api/core/app/apps/chat/generate_response_converter.py @@ -6,6 +6,7 @@ from core.app.apps.base_app_generate_response_converter import AppGenerateRespon from core.app.entities.task_entities import ( ChatbotAppBlockingResponse, ChatbotAppStreamResponse, + ErrorStreamResponse, MessageEndStreamResponse, PingStreamResponse, ) @@ -72,7 +73,11 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter): 'created_at': chunk.created_at } - response_chunk.update(sub_stream_response.to_dict()) + if isinstance(sub_stream_response, ErrorStreamResponse): + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + else: + response_chunk.update(sub_stream_response.to_dict()) yield json.dumps(response_chunk) @classmethod @@ -98,10 +103,15 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter): 'created_at': chunk.created_at } - sub_stream_response_dict = sub_stream_response.to_dict() if isinstance(sub_stream_response, MessageEndStreamResponse): + sub_stream_response_dict = sub_stream_response.to_dict() metadata = sub_stream_response_dict.get('metadata', {}) sub_stream_response_dict['metadata'] = cls._get_simple_metadata(metadata) + response_chunk.update(sub_stream_response_dict) + if isinstance(sub_stream_response, ErrorStreamResponse): + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + else: + response_chunk.update(sub_stream_response.to_dict()) - response_chunk.update(sub_stream_response_dict) yield json.dumps(response_chunk) diff --git a/api/core/app/apps/completion/generate_response_converter.py b/api/core/app/apps/completion/generate_response_converter.py index 0570f815a6..14db74dbd0 100644 --- a/api/core/app/apps/completion/generate_response_converter.py +++ b/api/core/app/apps/completion/generate_response_converter.py @@ -6,6 +6,7 @@ from core.app.apps.base_app_generate_response_converter import AppGenerateRespon from core.app.entities.task_entities import ( CompletionAppBlockingResponse, CompletionAppStreamResponse, + ErrorStreamResponse, MessageEndStreamResponse, PingStreamResponse, ) @@ -70,7 +71,11 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter): 'created_at': chunk.created_at } - response_chunk.update(sub_stream_response.to_dict()) + if isinstance(sub_stream_response, ErrorStreamResponse): + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + else: + response_chunk.update(sub_stream_response.to_dict()) yield json.dumps(response_chunk) @classmethod @@ -95,10 +100,15 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter): 'created_at': chunk.created_at } - sub_stream_response_dict = sub_stream_response.to_dict() if isinstance(sub_stream_response, MessageEndStreamResponse): + sub_stream_response_dict = sub_stream_response.to_dict() metadata = sub_stream_response_dict.get('metadata', {}) sub_stream_response_dict['metadata'] = cls._get_simple_metadata(metadata) + response_chunk.update(sub_stream_response_dict) + if isinstance(sub_stream_response, ErrorStreamResponse): + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + else: + response_chunk.update(sub_stream_response.to_dict()) - response_chunk.update(sub_stream_response_dict) yield json.dumps(response_chunk) diff --git a/api/core/app/apps/workflow/generate_response_converter.py b/api/core/app/apps/workflow/generate_response_converter.py index 6dec3430de..d907b82c99 100644 --- a/api/core/app/apps/workflow/generate_response_converter.py +++ b/api/core/app/apps/workflow/generate_response_converter.py @@ -4,6 +4,7 @@ from typing import cast from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter from core.app.entities.task_entities import ( + ErrorStreamResponse, PingStreamResponse, WorkflowAppBlockingResponse, WorkflowAppStreamResponse, @@ -52,7 +53,11 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter): 'workflow_run_id': chunk.workflow_run_id, } - response_chunk.update(sub_stream_response.to_dict()) + if isinstance(sub_stream_response, ErrorStreamResponse): + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + else: + response_chunk.update(sub_stream_response.to_dict()) yield json.dumps(response_chunk) @classmethod diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 26dcd2dc41..3e0a9e5e5c 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -21,10 +21,13 @@ from core.app.entities.queue_entities import ( QueueWorkflowSucceededEvent, ) from core.app.entities.task_entities import ( + ErrorStreamResponse, + StreamResponse, TextChunkStreamResponse, TextReplaceStreamResponse, WorkflowAppBlockingResponse, WorkflowAppStreamResponse, + WorkflowFinishStreamResponse, WorkflowTaskState, ) from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline @@ -84,71 +87,61 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa db.session.refresh(self._user) db.session.close() + generator = self._process_stream_response() if self._stream: - generator = self._process_stream_response() - for stream_response in generator: - yield WorkflowAppStreamResponse( - workflow_run_id=self._task_state.workflow_run_id, - stream_response=stream_response - ) + return self._to_stream_response(generator) else: - return self._process_blocking_response() + return self._to_blocking_response(generator) - def _process_blocking_response(self) -> WorkflowAppBlockingResponse: + def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) \ + -> WorkflowAppBlockingResponse: """ - Process blocking response. + To blocking response. :return: """ - for queue_message in self._queue_manager.listen(): - event = queue_message.event + for stream_response in generator: + if isinstance(stream_response, ErrorStreamResponse): + raise stream_response.err + elif isinstance(stream_response, WorkflowFinishStreamResponse): + workflow_run = db.session.query(WorkflowRun).filter( + WorkflowRun.id == self._task_state.workflow_run_id).first() - if isinstance(event, QueueErrorEvent): - err = self._handle_error(event) - raise err - elif isinstance(event, QueueWorkflowStartedEvent): - self._handle_workflow_start() - elif isinstance(event, QueueNodeStartedEvent): - self._handle_node_start(event) - elif isinstance(event, QueueNodeSucceededEvent | QueueNodeFailedEvent): - self._handle_node_finished(event) - elif isinstance(event, QueueStopEvent | QueueWorkflowSucceededEvent | QueueWorkflowFailedEvent): - workflow_run = self._handle_workflow_finished(event) + response = WorkflowAppBlockingResponse( + task_id=self._application_generate_entity.task_id, + workflow_run_id=workflow_run.id, + data=WorkflowAppBlockingResponse.Data( + id=workflow_run.id, + workflow_id=workflow_run.workflow_id, + status=workflow_run.status, + outputs=workflow_run.outputs_dict, + error=workflow_run.error, + elapsed_time=workflow_run.elapsed_time, + total_tokens=workflow_run.total_tokens, + total_steps=workflow_run.total_steps, + created_at=int(workflow_run.created_at.timestamp()), + finished_at=int(workflow_run.finished_at.timestamp()) + ) + ) - # save workflow app log - self._save_workflow_app_log(workflow_run) - - return self._to_blocking_response(workflow_run) + return response else: continue raise Exception('Queue listening stopped unexpectedly.') - def _to_blocking_response(self, workflow_run: WorkflowRun) -> WorkflowAppBlockingResponse: + def _to_stream_response(self, generator: Generator[StreamResponse, None, None]) \ + -> Generator[WorkflowAppStreamResponse, None, None]: """ - To blocking response. - :param workflow_run: workflow run + To stream response. :return: """ - response = WorkflowAppBlockingResponse( - task_id=self._application_generate_entity.task_id, - workflow_run_id=workflow_run.id, - data=WorkflowAppBlockingResponse.Data( - id=workflow_run.id, - workflow_id=workflow_run.workflow_id, - status=workflow_run.status, - outputs=workflow_run.outputs_dict, - error=workflow_run.error, - elapsed_time=workflow_run.elapsed_time, - total_tokens=workflow_run.total_tokens, - total_steps=workflow_run.total_steps, - created_at=int(workflow_run.created_at.timestamp()), - finished_at=int(workflow_run.finished_at.timestamp()) + for stream_response in generator: + yield WorkflowAppStreamResponse( + workflow_run_id=self._task_state.workflow_run_id, + stream_response=stream_response ) - ) - return response - - def _process_stream_response(self) -> Generator: + def _process_stream_response(self) -> Generator[StreamResponse, None, None]: """ Process stream response. :return: diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index b9558d393e..b2c80ec22c 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -101,9 +101,10 @@ class ErrorStreamResponse(StreamResponse): ErrorStreamResponse entity """ event: StreamEvent = StreamEvent.ERROR - code: str - status: int - message: Optional[str] = None + err: Exception + + class Config: + arbitrary_types_allowed = True class MessageStreamResponse(StreamResponse): diff --git a/api/core/app/task_pipeline/based_generate_task_pipeline.py b/api/core/app/task_pipeline/based_generate_task_pipeline.py index 2606b56bcd..9e50926ebb 100644 --- a/api/core/app/task_pipeline/based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/based_generate_task_pipeline.py @@ -14,7 +14,6 @@ from core.app.entities.task_entities import ( PingStreamResponse, TaskState, ) -from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError from core.moderation.output_moderation import ModerationRule, OutputModeration from models.account import Account @@ -71,38 +70,9 @@ class BasedGenerateTaskPipeline: :param e: exception :return: """ - error_responses = { - ValueError: {'code': 'invalid_param', 'status': 400}, - ProviderTokenNotInitError: {'code': 'provider_not_initialize', 'status': 400}, - QuotaExceededError: { - 'code': 'provider_quota_exceeded', - 'message': "Your quota for Dify Hosted Model Provider has been exhausted. " - "Please go to Settings -> Model Provider to complete your own provider credentials.", - 'status': 400 - }, - ModelCurrentlyNotSupportError: {'code': 'model_currently_not_support', 'status': 400}, - InvokeError: {'code': 'completion_request_error', 'status': 400} - } - - # Determine the response based on the type of exception - data = None - for k, v in error_responses.items(): - if isinstance(e, k): - data = v - - if data: - data.setdefault('message', getattr(e, 'description', str(e))) - else: - logging.error(e) - data = { - 'code': 'internal_server_error', - 'message': 'Internal Server Error, please contact support.', - 'status': 500 - } - return ErrorStreamResponse( task_id=self._application_generate_entity.task_id, - **data + err=e ) def _ping_stream_response(self) -> PingStreamResponse: diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py index c7c380e57c..3d936e2b44 100644 --- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py @@ -30,7 +30,9 @@ from core.app.entities.task_entities import ( CompletionAppBlockingResponse, CompletionAppStreamResponse, EasyUITaskState, + ErrorStreamResponse, MessageEndStreamResponse, + StreamResponse, ) from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.app.task_pipeline.message_cycle_manage import MessageCycleManage @@ -107,67 +109,84 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline, MessageCycleMan db.session.refresh(self._message) db.session.close() + generator = self._process_stream_response() if self._stream: - generator = self._process_stream_response() - for stream_response in generator: - if isinstance(self._application_generate_entity, CompletionAppGenerateEntity): - yield CompletionAppStreamResponse( - message_id=self._message.id, - created_at=int(self._message.created_at.timestamp()), - stream_response=stream_response - ) - else: - yield ChatbotAppStreamResponse( - conversation_id=self._conversation.id, - message_id=self._message.id, - created_at=int(self._message.created_at.timestamp()), - stream_response=stream_response - ) - - # yield "data: " + json.dumps(response) + "\n\n" + return self._to_stream_response(generator) else: - return self._process_blocking_response() + return self._to_blocking_response(generator) - def _process_blocking_response(self) -> Union[ChatbotAppBlockingResponse, CompletionAppBlockingResponse]: + def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) -> Union[ + ChatbotAppBlockingResponse, + CompletionAppBlockingResponse + ]: """ Process blocking response. :return: """ - for queue_message in self._queue_manager.listen(): - event = queue_message.event + for stream_response in generator: + if isinstance(stream_response, ErrorStreamResponse): + raise stream_response.err + elif isinstance(stream_response, MessageEndStreamResponse): + extras = { + 'usage': jsonable_encoder(self._task_state.llm_result.usage) + } + if self._task_state.metadata: + extras['metadata'] = self._task_state.metadata - if isinstance(event, QueueErrorEvent): - err = self._handle_error(event) - raise err - elif isinstance(event, QueueRetrieverResourcesEvent): - self._handle_retriever_resources(event) - elif isinstance(event, QueueAnnotationReplyEvent): - annotation = self._handle_annotation_reply(event) - if annotation: - self._task_state.llm_result.message.content = annotation.content - elif isinstance(event, QueueStopEvent | QueueMessageEndEvent): - if isinstance(event, QueueMessageEndEvent): - self._task_state.llm_result = event.llm_result + if self._conversation.mode == AppMode.COMPLETION.value: + response = CompletionAppBlockingResponse( + task_id=self._application_generate_entity.task_id, + data=CompletionAppBlockingResponse.Data( + id=self._message.id, + mode=self._conversation.mode, + message_id=self._message.id, + answer=self._task_state.llm_result.message.content, + created_at=int(self._message.created_at.timestamp()), + **extras + ) + ) else: - self._handle_stop(event) + response = ChatbotAppBlockingResponse( + task_id=self._application_generate_entity.task_id, + data=ChatbotAppBlockingResponse.Data( + id=self._message.id, + mode=self._conversation.mode, + conversation_id=self._conversation.id, + message_id=self._message.id, + answer=self._task_state.llm_result.message.content, + created_at=int(self._message.created_at.timestamp()), + **extras + ) + ) - # handle output moderation - output_moderation_answer = self._handle_output_moderation_when_task_finished( - self._task_state.llm_result.message.content - ) - if output_moderation_answer: - self._task_state.llm_result.message.content = output_moderation_answer - - # Save message - self._save_message() - - return self._to_blocking_response() + return response else: continue raise Exception('Queue listening stopped unexpectedly.') - def _process_stream_response(self) -> Generator: + def _to_stream_response(self, generator: Generator[StreamResponse, None, None]) \ + -> Generator[Union[ChatbotAppStreamResponse, CompletionAppStreamResponse], None, None]: + """ + To stream response. + :return: + """ + for stream_response in generator: + if isinstance(self._application_generate_entity, CompletionAppGenerateEntity): + yield CompletionAppStreamResponse( + message_id=self._message.id, + created_at=int(self._message.created_at.timestamp()), + stream_response=stream_response + ) + else: + yield ChatbotAppStreamResponse( + conversation_id=self._conversation.id, + message_id=self._message.id, + created_at=int(self._message.created_at.timestamp()), + stream_response=stream_response + ) + + def _process_stream_response(self) -> Generator[StreamResponse, None, None]: """ Process stream response. :return: @@ -313,45 +332,6 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline, MessageCycleMan completion_tokens ) - def _to_blocking_response(self) -> ChatbotAppBlockingResponse: - """ - To blocking response. - :return: - """ - self._task_state.metadata['usage'] = jsonable_encoder(self._task_state.llm_result.usage) - - extras = {} - if self._task_state.metadata: - extras['metadata'] = self._task_state.metadata - - if self._conversation.mode != AppMode.COMPLETION.value: - response = CompletionAppBlockingResponse( - task_id=self._application_generate_entity.task_id, - data=CompletionAppBlockingResponse.Data( - id=self._message.id, - mode=self._conversation.mode, - message_id=self._message.id, - answer=self._task_state.llm_result.message.content, - created_at=int(self._message.created_at.timestamp()), - **extras - ) - ) - else: - response = ChatbotAppBlockingResponse( - task_id=self._application_generate_entity.task_id, - data=ChatbotAppBlockingResponse.Data( - id=self._message.id, - mode=self._conversation.mode, - conversation_id=self._conversation.id, - message_id=self._message.id, - answer=self._task_state.llm_result.message.content, - created_at=int(self._message.created_at.timestamp()), - **extras - ) - ) - - return response - def _message_end_to_stream_response(self) -> MessageEndStreamResponse: """ Message end to stream response.