From ec711d094d002bdc3068f73002899b8a2422193c Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Thu, 29 Aug 2024 19:49:57 +0800 Subject: [PATCH] refactor: enforce return object in app generator --- .../app/apps/advanced_chat/app_generator.py | 18 +++++-- .../generate_response_converter.py | 9 ++-- api/core/app/apps/agent_chat/app_generator.py | 13 ++++- .../agent_chat/generate_response_converter.py | 9 ++-- .../base_app_generate_response_converter.py | 18 ++----- api/core/app/apps/base_app_generator.py | 22 +++++++- api/core/app/apps/base_app_queue_manager.py | 4 +- api/core/app/apps/chat/app_generator.py | 13 ++++- .../apps/chat/generate_response_converter.py | 9 ++-- api/core/app/apps/completion/app_generator.py | 9 ++++ .../completion/generate_response_converter.py | 9 ++-- api/core/app/apps/workflow/app_generator.py | 19 +++++-- .../workflow/generate_response_converter.py | 9 ++-- api/services/app_generate_service.py | 50 ++++++++++++------- 14 files changed, 140 insertions(+), 71 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index e7c9ebe097..6d654c2dc6 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -4,7 +4,7 @@ import os import threading import uuid from collections.abc import Generator -from typing import Literal, Union, overload +from typing import Any, Literal, Union, overload from flask import Flask, current_app from pydantic import ValidationError @@ -47,7 +47,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): args: dict, invoke_from: InvokeFrom, stream: Literal[True] = True, - ) -> Generator[str, None, None]: ... + ) -> Generator[dict | str, None, None]: ... @overload def generate( @@ -59,6 +59,16 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): stream: Literal[False] = False, ) -> dict: ... + @overload + def generate( + self, app_model: App, + workflow: Workflow, + user: Union[Account, EndUser], + args: dict, + invoke_from: InvokeFrom, + stream: bool = True, + ) -> Union[dict[str, Any], Generator[dict | str, Any, None]]: ... + def generate( self, app_model: App, workflow: Workflow, @@ -152,7 +162,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): def single_iteration_generate(self, app_model: App, workflow: Workflow, node_id: str, - user: Account, + user: Account | EndUser, args: dict, stream: bool = True): """ @@ -325,7 +335,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): user=user, stream=stream, ) - + return AdvancedChatAppGenerateResponseConverter.convert( response=response, invoke_from=invoke_from diff --git a/api/core/app/apps/advanced_chat/generate_response_converter.py b/api/core/app/apps/advanced_chat/generate_response_converter.py index ef579827b4..2ddbd816e2 100644 --- a/api/core/app/apps/advanced_chat/generate_response_converter.py +++ b/api/core/app/apps/advanced_chat/generate_response_converter.py @@ -1,4 +1,3 @@ -import json from collections.abc import Generator from typing import Any, cast @@ -56,7 +55,7 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter): return response @classmethod - def convert_stream_full_response(cls, stream_response: Generator[AppStreamResponse, None, None]) -> Generator[str, Any, None]: + def convert_stream_full_response(cls, stream_response: Generator[AppStreamResponse, None, None]) -> Generator[dict | str, Any, None]: """ Convert stream full response. :param stream_response: stream response @@ -82,10 +81,10 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter): response_chunk.update(data) else: response_chunk.update(sub_stream_response.to_dict()) - yield json.dumps(response_chunk) + yield response_chunk @classmethod - def convert_stream_simple_response(cls, stream_response: Generator[AppStreamResponse, None, None]) -> Generator[str, Any, None]: + def convert_stream_simple_response(cls, stream_response: Generator[AppStreamResponse, None, None]) -> Generator[dict | str, Any, None]: """ Convert stream simple response. :param stream_response: stream response @@ -119,4 +118,4 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter): else: response_chunk.update(sub_stream_response.to_dict()) - yield json.dumps(response_chunk) + yield response_chunk diff --git a/api/core/app/apps/agent_chat/app_generator.py b/api/core/app/apps/agent_chat/app_generator.py index 29c7447290..726e7ca65c 100644 --- a/api/core/app/apps/agent_chat/app_generator.py +++ b/api/core/app/apps/agent_chat/app_generator.py @@ -35,7 +35,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator): args: dict, invoke_from: InvokeFrom, stream: Literal[True] = True, - ) -> Generator[str, None, None]: ... + ) -> Generator[dict | str, None, None]: ... @overload def generate( @@ -46,12 +46,21 @@ class AgentChatAppGenerator(MessageBasedAppGenerator): stream: Literal[False] = False, ) -> dict: ... + @overload + def generate( + self, app_model: App, + user: Union[Account, EndUser], + args: dict, + invoke_from: InvokeFrom, + stream: bool = False, + ) -> dict | Generator[dict | str, None, None]: ... + def generate(self, app_model: App, user: Union[Account, EndUser], args: Any, invoke_from: InvokeFrom, stream: bool = True) \ - -> Union[dict, Generator[str, None, None]]: + -> Union[dict, Generator[dict | str, None, None]]: """ Generate App response. diff --git a/api/core/app/apps/agent_chat/generate_response_converter.py b/api/core/app/apps/agent_chat/generate_response_converter.py index 118d82c495..02aec27e39 100644 --- a/api/core/app/apps/agent_chat/generate_response_converter.py +++ b/api/core/app/apps/agent_chat/generate_response_converter.py @@ -1,4 +1,3 @@ -import json from collections.abc import Generator from typing import cast @@ -52,7 +51,7 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter): @classmethod def convert_stream_full_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \ - -> Generator[str, None, None]: + -> Generator[dict | str, None, None]: """ Convert stream full response. :param stream_response: stream response @@ -78,11 +77,11 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter): response_chunk.update(data) else: response_chunk.update(sub_stream_response.to_dict()) - yield json.dumps(response_chunk) + yield response_chunk @classmethod def convert_stream_simple_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \ - -> Generator[str, None, None]: + -> Generator[dict | str, None, None]: """ Convert stream simple response. :param stream_response: stream response @@ -114,4 +113,4 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter): else: response_chunk.update(sub_stream_response.to_dict()) - yield json.dumps(response_chunk) + yield response_chunk diff --git a/api/core/app/apps/base_app_generate_response_converter.py b/api/core/app/apps/base_app_generate_response_converter.py index 1165314a7f..bd0d08cb7b 100644 --- a/api/core/app/apps/base_app_generate_response_converter.py +++ b/api/core/app/apps/base_app_generate_response_converter.py @@ -21,24 +21,16 @@ class AppGenerateResponseConverter(ABC): if isinstance(response, AppBlockingResponse): return cls.convert_blocking_full_response(response) else: - def _generate_full_response() -> Generator[str, Any, None]: - for chunk in cls.convert_stream_full_response(response): - if chunk == 'ping': - yield f'event: {chunk}\n\n' - else: - yield f'data: {chunk}\n\n' + def _generate_full_response() -> Generator[dict | str, Any, None]: + yield from cls.convert_stream_simple_response(response) return _generate_full_response() else: if isinstance(response, AppBlockingResponse): return cls.convert_blocking_simple_response(response) else: - def _generate_simple_response() -> Generator[str, Any, None]: - for chunk in cls.convert_stream_simple_response(response): - if chunk == 'ping': - yield f'event: {chunk}\n\n' - else: - yield f'data: {chunk}\n\n' + def _generate_simple_response() -> Generator[dict | str, Any, None]: + yield from cls.convert_stream_simple_response(response) return _generate_simple_response() @@ -55,7 +47,7 @@ class AppGenerateResponseConverter(ABC): @classmethod @abstractmethod def convert_stream_full_response(cls, stream_response: Generator[AppStreamResponse, None, None]) \ - -> Generator[str, None, None]: + -> Generator[dict | str, None, None]: raise NotImplementedError @classmethod diff --git a/api/core/app/apps/base_app_generator.py b/api/core/app/apps/base_app_generator.py index 9e331dff4d..7727519aef 100644 --- a/api/core/app/apps/base_app_generator.py +++ b/api/core/app/apps/base_app_generator.py @@ -1,5 +1,6 @@ -from collections.abc import Mapping -from typing import Any, Optional +from collections.abc import Generator, Mapping +import json +from typing import Any, Optional, Union from core.app.app_config.entities import AppConfig, VariableEntity, VariableEntityType @@ -54,3 +55,20 @@ class BaseAppGenerator: if isinstance(value, str): return value.replace('\x00', '') return value + + @classmethod + def convert_to_event_stream(cls, generator: Union[dict, Generator[dict| str, None, None]]): + """ + Convert messages into event stream + """ + if isinstance(generator, dict): + return generator + else: + def gen(): + for message in generator: + if isinstance(message, dict): + yield f'data: {json.dumps(message)}\n\n' + else: + yield f'event: {message}\n\n' + + return gen() \ No newline at end of file diff --git a/api/core/app/apps/base_app_queue_manager.py b/api/core/app/apps/base_app_queue_manager.py index f929a979f1..b45f57e9b6 100644 --- a/api/core/app/apps/base_app_queue_manager.py +++ b/api/core/app/apps/base_app_queue_manager.py @@ -3,7 +3,7 @@ import time from abc import abstractmethod from collections.abc import Generator from enum import Enum -from typing import Any +from typing import Any, Optional from sqlalchemy.orm import DeclarativeMeta @@ -118,7 +118,7 @@ class AppQueueManager: Set task stop flag :return: """ - result = redis_client.get(cls._generate_task_belong_cache_key(task_id)) + result: Optional[Any] = redis_client.get(cls._generate_task_belong_cache_key(task_id)) if result is None: return diff --git a/api/core/app/apps/chat/app_generator.py b/api/core/app/apps/chat/app_generator.py index ab15928b74..b784f42e7e 100644 --- a/api/core/app/apps/chat/app_generator.py +++ b/api/core/app/apps/chat/app_generator.py @@ -35,7 +35,7 @@ class ChatAppGenerator(MessageBasedAppGenerator): args: Any, invoke_from: InvokeFrom, stream: Literal[True] = True, - ) -> Generator[str, None, None]: ... + ) -> Generator[dict | str, None, None]: ... @overload def generate( @@ -46,13 +46,22 @@ class ChatAppGenerator(MessageBasedAppGenerator): stream: Literal[False] = False, ) -> dict: ... + @overload + def generate( + self, app_model: App, + user: Union[Account, EndUser], + args: Any, + invoke_from: InvokeFrom, + stream: bool = False, + ) -> Union[dict, Generator[dict | str, None, None]]: ... + def generate( self, app_model: App, user: Union[Account, EndUser], args: Any, invoke_from: InvokeFrom, stream: bool = True, - ) -> Union[dict, Generator[str, None, None]]: + ) -> Union[dict, Generator[dict | str, None, None]]: """ Generate App response. diff --git a/api/core/app/apps/chat/generate_response_converter.py b/api/core/app/apps/chat/generate_response_converter.py index 625e14c9c3..0ae9926bb8 100644 --- a/api/core/app/apps/chat/generate_response_converter.py +++ b/api/core/app/apps/chat/generate_response_converter.py @@ -1,4 +1,3 @@ -import json from collections.abc import Generator from typing import cast @@ -52,7 +51,7 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter): @classmethod def convert_stream_full_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \ - -> Generator[str, None, None]: + -> Generator[dict | str, None, None]: """ Convert stream full response. :param stream_response: stream response @@ -78,11 +77,11 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter): response_chunk.update(data) else: response_chunk.update(sub_stream_response.to_dict()) - yield json.dumps(response_chunk) + yield response_chunk @classmethod def convert_stream_simple_response(cls, stream_response: Generator[ChatbotAppStreamResponse, None, None]) \ - -> Generator[str, None, None]: + -> Generator[dict | str, None, None]: """ Convert stream simple response. :param stream_response: stream response @@ -114,4 +113,4 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter): else: response_chunk.update(sub_stream_response.to_dict()) - yield json.dumps(response_chunk) + yield response_chunk diff --git a/api/core/app/apps/completion/app_generator.py b/api/core/app/apps/completion/app_generator.py index c0b13b40fd..3ce4d3ccaa 100644 --- a/api/core/app/apps/completion/app_generator.py +++ b/api/core/app/apps/completion/app_generator.py @@ -48,6 +48,15 @@ class CompletionAppGenerator(MessageBasedAppGenerator): stream: Literal[False] = False, ) -> dict: ... + @overload + def generate( + self, app_model: App, + user: Union[Account, EndUser], + args: dict, + invoke_from: InvokeFrom, + stream: bool = False, + ) -> dict | Generator[str, None, None]: ... + def generate(self, app_model: App, user: Union[Account, EndUser], args: Any, diff --git a/api/core/app/apps/completion/generate_response_converter.py b/api/core/app/apps/completion/generate_response_converter.py index 14db74dbd0..61bb03952f 100644 --- a/api/core/app/apps/completion/generate_response_converter.py +++ b/api/core/app/apps/completion/generate_response_converter.py @@ -1,4 +1,3 @@ -import json from collections.abc import Generator from typing import cast @@ -51,7 +50,7 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter): @classmethod def convert_stream_full_response(cls, stream_response: Generator[CompletionAppStreamResponse, None, None]) \ - -> Generator[str, None, None]: + -> Generator[dict | str, None, None]: """ Convert stream full response. :param stream_response: stream response @@ -76,11 +75,11 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter): response_chunk.update(data) else: response_chunk.update(sub_stream_response.to_dict()) - yield json.dumps(response_chunk) + yield response_chunk @classmethod def convert_stream_simple_response(cls, stream_response: Generator[CompletionAppStreamResponse, None, None]) \ - -> Generator[str, None, None]: + -> Generator[dict | str, None, None]: """ Convert stream simple response. :param stream_response: stream response @@ -111,4 +110,4 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter): else: response_chunk.update(sub_stream_response.to_dict()) - yield json.dumps(response_chunk) + yield response_chunk diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 26bb6c0f4f..f1e79ff3e3 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -40,7 +40,8 @@ class WorkflowAppGenerator(BaseAppGenerator): args: dict, invoke_from: InvokeFrom, stream: Literal[True] = True, - ) -> Generator[str, None, None]: ... + call_depth: int = 0, + ) -> Generator[dict | str, None, None]: ... @overload def generate( @@ -50,8 +51,20 @@ class WorkflowAppGenerator(BaseAppGenerator): args: dict, invoke_from: InvokeFrom, stream: Literal[False] = False, + call_depth: int = 0, ) -> dict: ... + @overload + def generate( + self, app_model: App, + workflow: Workflow, + user: Union[Account, EndUser], + args: dict, + invoke_from: InvokeFrom, + stream: bool = False, + call_depth: int = 0, + ) -> dict | Generator[dict | str, None, None]: ... + def generate( self, app_model: App, workflow: Workflow, @@ -127,7 +140,7 @@ class WorkflowAppGenerator(BaseAppGenerator): application_generate_entity: WorkflowAppGenerateEntity, invoke_from: InvokeFrom, stream: bool = True, - ) -> Union[dict, Generator[str, None, None]]: + ) -> Union[dict, Generator[str | dict, None, None]]: """ Generate App response. @@ -173,7 +186,7 @@ class WorkflowAppGenerator(BaseAppGenerator): def single_iteration_generate(self, app_model: App, workflow: Workflow, node_id: str, - user: Account, + user: Account | EndUser, args: dict, stream: bool = True): """ diff --git a/api/core/app/apps/workflow/generate_response_converter.py b/api/core/app/apps/workflow/generate_response_converter.py index 88bde58ba0..48f20d8dc1 100644 --- a/api/core/app/apps/workflow/generate_response_converter.py +++ b/api/core/app/apps/workflow/generate_response_converter.py @@ -1,4 +1,3 @@ -import json from collections.abc import Generator from typing import cast @@ -36,7 +35,7 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter): @classmethod def convert_stream_full_response(cls, stream_response: Generator[WorkflowAppStreamResponse, None, None]) \ - -> Generator[str, None, None]: + -> Generator[dict | str, None, None]: """ Convert stream full response. :param stream_response: stream response @@ -60,11 +59,11 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter): response_chunk.update(data) else: response_chunk.update(sub_stream_response.to_dict()) - yield json.dumps(response_chunk) + yield response_chunk @classmethod def convert_stream_simple_response(cls, stream_response: Generator[WorkflowAppStreamResponse, None, None]) \ - -> Generator[str, None, None]: + -> Generator[dict | str, None, None]: """ Convert stream simple response. :param stream_response: stream response @@ -90,4 +89,4 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter): response_chunk.update(sub_stream_response.to_ignore_detail_dict()) else: response_chunk.update(sub_stream_response.to_dict()) - yield json.dumps(response_chunk) + yield response_chunk diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 747505977f..fd7eeea48f 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -42,48 +42,58 @@ class AppGenerateService: request_id = rate_limit.enter(request_id) if app_model.mode == AppMode.COMPLETION.value: return rate_limit.generate( - CompletionAppGenerator().generate( - app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming + CompletionAppGenerator.convert_to_event_stream( + CompletionAppGenerator().generate( + app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming + ), ), request_id, ) elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent: return rate_limit.generate( - AgentChatAppGenerator().generate( - app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming + AgentChatAppGenerator.convert_to_event_stream( + AgentChatAppGenerator().generate( + app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming + ), ), request_id, ) elif app_model.mode == AppMode.CHAT.value: return rate_limit.generate( - ChatAppGenerator().generate( - app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming + ChatAppGenerator.convert_to_event_stream( + ChatAppGenerator().generate( + app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming + ), ), request_id, ) elif app_model.mode == AppMode.ADVANCED_CHAT.value: workflow = cls._get_workflow(app_model, invoke_from) return rate_limit.generate( - AdvancedChatAppGenerator().generate( + AdvancedChatAppGenerator.convert_to_event_stream( + AdvancedChatAppGenerator().generate( app_model=app_model, workflow=workflow, user=user, args=args, invoke_from=invoke_from, stream=streaming, + ), ), request_id, ) elif app_model.mode == AppMode.WORKFLOW.value: workflow = cls._get_workflow(app_model, invoke_from) return rate_limit.generate( - WorkflowAppGenerator().generate( - app_model=app_model, - workflow=workflow, - user=user, - args=args, - invoke_from=invoke_from, - stream=streaming, + WorkflowAppGenerator.convert_to_event_stream( + WorkflowAppGenerator().generate( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + stream=streaming, + ), ), request_id, ) @@ -108,13 +118,17 @@ class AppGenerateService: ): if app_model.mode == AppMode.ADVANCED_CHAT.value: workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) - return AdvancedChatAppGenerator().single_iteration_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming + return AdvancedChatAppGenerator.convert_to_event_stream( + AdvancedChatAppGenerator().single_iteration_generate( + app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming + ) ) elif app_model.mode == AppMode.WORKFLOW.value: workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) - return WorkflowAppGenerator().single_iteration_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming + return AdvancedChatAppGenerator.convert_to_event_stream( + WorkflowAppGenerator().single_iteration_generate( + app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming + ) ) else: raise ValueError(f"Invalid app mode {app_model.mode}")