diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 5dfb2b1443..9ee6ca9dbd 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -1,18 +1,28 @@ import json +import logging +from typing import Generator +from flask import Response, stream_with_context from flask_restful import Resource, marshal_with, reqparse +from werkzeug.exceptions import NotFound, InternalServerError +import services from controllers.console import api -from controllers.console.app.error import DraftWorkflowNotExist +from controllers.console.app.error import DraftWorkflowNotExist, ConversationCompletedError from controllers.console.app.wraps import get_app_model from controllers.console.setup import setup_required from controllers.console.wraps import account_initialization_required +from core.app.entities.app_invoke_entities import InvokeFrom from fields.workflow_fields import workflow_fields +from libs.helper import uuid_value from libs.login import current_user, login_required from models.model import App, AppMode from services.workflow_service import WorkflowService +logger = logging.getLogger(__name__) + + class DraftWorkflowApi(Resource): @setup_required @login_required @@ -59,23 +69,80 @@ class DraftWorkflowApi(Resource): } -class DraftWorkflowRunApi(Resource): +class AdvancedChatDraftWorkflowRunApi(Resource): @setup_required @login_required @account_initialization_required - @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + @get_app_model(mode=[AppMode.ADVANCED_CHAT]) def post(self, app_model: App): """ Run draft workflow """ - # TODO - workflow_service = WorkflowService() - workflow_service.run_draft_workflow(app_model=app_model, account=current_user) + parser = reqparse.RequestParser() + parser.add_argument('inputs', type=dict, required=True, location='json') + parser.add_argument('query', type=str, location='json', default='') + parser.add_argument('files', type=list, required=False, location='json') + parser.add_argument('conversation_id', type=uuid_value, location='json') + args = parser.parse_args() - # TODO - return { - "result": "success" - } + workflow_service = WorkflowService() + try: + response = workflow_service.run_advanced_chat_draft_workflow( + app_model=app_model, + user=current_user, + args=args, + invoke_from=InvokeFrom.DEBUGGER + ) + except services.errors.conversation.ConversationNotExistsError: + raise NotFound("Conversation Not Exists.") + except services.errors.conversation.ConversationCompletedError: + raise ConversationCompletedError() + except ValueError as e: + raise e + except Exception as e: + logging.exception("internal server error.") + raise InternalServerError() + + def generate() -> Generator: + yield from response + + return Response(stream_with_context(generate()), status=200, + mimetype='text/event-stream') + + +class DraftWorkflowRunApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.WORKFLOW]) + def post(self, app_model: App): + """ + Run draft workflow + """ + parser = reqparse.RequestParser() + parser.add_argument('inputs', type=dict, required=True, location='json') + args = parser.parse_args() + + workflow_service = WorkflowService() + + try: + response = workflow_service.run_draft_workflow( + app_model=app_model, + user=current_user, + args=args, + invoke_from=InvokeFrom.DEBUGGER + ) + except ValueError as e: + raise e + except Exception as e: + logging.exception("internal server error.") + raise InternalServerError() + + def generate() -> Generator: + yield from response + + return Response(stream_with_context(generate()), status=200, + mimetype='text/event-stream') class WorkflowTaskStopApi(Resource): @@ -214,10 +281,12 @@ class ConvertToWorkflowApi(Resource): api.add_resource(DraftWorkflowApi, '/apps//workflows/draft') +api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps//advanced-chat/workflows/draft/run') api.add_resource(DraftWorkflowRunApi, '/apps//workflows/draft/run') api.add_resource(WorkflowTaskStopApi, '/apps//workflows/tasks//stop') api.add_resource(DraftWorkflowNodeRunApi, '/apps//workflows/draft/nodes//run') api.add_resource(PublishedWorkflowApi, '/apps//workflows/published') api.add_resource(DefaultBlockConfigsApi, '/apps//workflows/default-workflow-block-configs') -api.add_resource(DefaultBlockConfigApi, '/apps//workflows/default-workflow-block-configs/:block_type') +api.add_resource(DefaultBlockConfigApi, '/apps//workflows/default-workflow-block-configs' + '/') api.add_resource(ConvertToWorkflowApi, '/apps//convert-to-workflow') diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index ca2f400547..918fd4566e 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -16,18 +16,19 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom from core.file.message_file_parser import MessageFileParser from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError -from core.workflow.workflow_engine_manager import WorkflowEngineManager from extensions.ext_database import db from models.account import Account from models.model import App, Conversation, EndUser, Message +from models.workflow import Workflow logger = logging.getLogger(__name__) class AdvancedChatAppGenerator(MessageBasedAppGenerator): def generate(self, app_model: App, + workflow: Workflow, user: Union[Account, EndUser], - args: Any, + args: dict, invoke_from: InvokeFrom, stream: bool = True) \ -> Union[dict, Generator]: @@ -35,6 +36,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): Generate App response. :param app_model: App + :param workflow: Workflow :param user: account or end user :param args: request args :param invoke_from: invoke from source @@ -59,16 +61,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): if args.get('conversation_id'): conversation = self._get_conversation_by_user(app_model, args.get('conversation_id'), user) - # get workflow - workflow_engine_manager = WorkflowEngineManager() - if invoke_from == InvokeFrom.DEBUGGER: - workflow = workflow_engine_manager.get_draft_workflow(app_model=app_model) - else: - workflow = workflow_engine_manager.get_published_workflow(app_model=app_model) - - if not workflow: - raise ValueError('Workflow not initialized') - # parse files files = args['files'] if 'files' in args and args['files'] else [] message_file_parser = MessageFileParser(tenant_id=app_model.tenant_id, app_id=app_model.id) diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 0d701ae224..f853f88af4 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -1,15 +1,20 @@ import logging +import time from typing import cast from core.app.app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig from core.app.apps.base_app_runner import AppRunner from core.app.entities.app_invoke_entities import ( - AdvancedChatAppGenerateEntity, + AdvancedChatAppGenerateEntity, InvokeFrom, ) +from core.app.entities.queue_entities import QueueStopEvent from core.moderation.base import ModerationException +from core.workflow.entities.node_entities import SystemVariable +from core.workflow.workflow_engine_manager import WorkflowEngineManager from extensions.ext_database import db -from models.model import App, Conversation, Message +from models.account import Account +from models.model import App, Conversation, Message, EndUser logger = logging.getLogger(__name__) @@ -38,66 +43,151 @@ class AdvancedChatAppRunner(AppRunner): if not app_record: raise ValueError("App not found") + workflow = WorkflowEngineManager().get_workflow(app_model=app_record, workflow_id=app_config.workflow_id) + if not workflow: + raise ValueError("Workflow not initialized") + inputs = application_generate_entity.inputs query = application_generate_entity.query files = application_generate_entity.files # moderation + if self.handle_input_moderation( + queue_manager=queue_manager, + app_record=app_record, + app_generate_entity=application_generate_entity, + inputs=inputs, + query=query + ): + return + + # annotation reply + if self.handle_annotation_reply( + app_record=app_record, + message=message, + query=query, + queue_manager=queue_manager, + app_generate_entity=application_generate_entity + ): + return + + # fetch user + if application_generate_entity.invoke_from in [InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE]: + user = db.session.query(Account).filter(Account.id == application_generate_entity.user_id).first() + else: + user = db.session.query(EndUser).filter(EndUser.id == application_generate_entity.user_id).first() + + # RUN WORKFLOW + workflow_engine_manager = WorkflowEngineManager() + result_generator = workflow_engine_manager.run_workflow( + app_model=app_record, + workflow=workflow, + user=user, + user_inputs=inputs, + system_inputs={ + SystemVariable.QUERY: query, + SystemVariable.FILES: files, + SystemVariable.CONVERSATION: conversation.id, + } + ) + + for result in result_generator: + # todo handle workflow and node event + pass + + + def handle_input_moderation(self, queue_manager: AppQueueManager, + app_record: App, + app_generate_entity: AdvancedChatAppGenerateEntity, + inputs: dict, + query: str) -> bool: + """ + Handle input moderation + :param queue_manager: application queue manager + :param app_record: app record + :param app_generate_entity: application generate entity + :param inputs: inputs + :param query: query + :return: + """ try: # process sensitive_word_avoidance _, inputs, query = self.moderation_for_inputs( app_id=app_record.id, - tenant_id=app_config.tenant_id, - app_generate_entity=application_generate_entity, + tenant_id=app_generate_entity.app_config.tenant_id, + app_generate_entity=app_generate_entity, inputs=inputs, query=query, ) except ModerationException as e: - # TODO - self.direct_output( + self._stream_output( queue_manager=queue_manager, - app_generate_entity=application_generate_entity, - prompt_messages=prompt_messages, text=str(e), - stream=application_generate_entity.stream + stream=app_generate_entity.stream, + stopped_by=QueueStopEvent.StopBy.INPUT_MODERATION ) - return + return True - if query: - # annotation reply - annotation_reply = self.query_app_annotations_to_reply( - app_record=app_record, - message=message, - query=query, - user_id=application_generate_entity.user_id, - invoke_from=application_generate_entity.invoke_from - ) + return False - if annotation_reply: - queue_manager.publish_annotation_reply( - message_annotation_id=annotation_reply.id, - pub_from=PublishFrom.APPLICATION_MANAGER - ) - - # TODO - self.direct_output( - queue_manager=queue_manager, - app_generate_entity=application_generate_entity, - prompt_messages=prompt_messages, - text=annotation_reply.content, - stream=application_generate_entity.stream - ) - return - - # check hosting moderation - # TODO - hosting_moderation_result = self.check_hosting_moderation( - application_generate_entity=application_generate_entity, - queue_manager=queue_manager, - prompt_messages=prompt_messages + def handle_annotation_reply(self, app_record: App, + message: Message, + query: str, + queue_manager: AppQueueManager, + app_generate_entity: AdvancedChatAppGenerateEntity) -> bool: + """ + Handle annotation reply + :param app_record: app record + :param message: message + :param query: query + :param queue_manager: application queue manager + :param app_generate_entity: application generate entity + """ + # annotation reply + annotation_reply = self.query_app_annotations_to_reply( + app_record=app_record, + message=message, + query=query, + user_id=app_generate_entity.user_id, + invoke_from=app_generate_entity.invoke_from ) - if hosting_moderation_result: - return + if annotation_reply: + queue_manager.publish_annotation_reply( + message_annotation_id=annotation_reply.id, + pub_from=PublishFrom.APPLICATION_MANAGER + ) - # todo RUN WORKFLOW \ No newline at end of file + self._stream_output( + queue_manager=queue_manager, + text=annotation_reply.content, + stream=app_generate_entity.stream, + stopped_by=QueueStopEvent.StopBy.ANNOTATION_REPLY + ) + return True + + return False + + def _stream_output(self, queue_manager: AppQueueManager, + text: str, + stream: bool, + stopped_by: QueueStopEvent.StopBy) -> None: + """ + Direct output + :param queue_manager: application queue manager + :param text: text + :param stream: stream + :return: + """ + if stream: + index = 0 + for token in text: + queue_manager.publish_text_chunk(token, PublishFrom.APPLICATION_MANAGER) + index += 1 + time.sleep(0.01) + + queue_manager.publish( + QueueStopEvent(stopped_by=stopped_by), + PublishFrom.APPLICATION_MANAGER + ) + queue_manager.stop_listen() diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 25bdd7d9e3..e5c6a8eff9 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -165,6 +165,7 @@ class QueueStopEvent(AppQueueEvent): USER_MANUAL = "user-manual" ANNOTATION_REPLY = "annotation-reply" OUTPUT_MODERATION = "output-moderation" + INPUT_MODERATION = "input-moderation" event = QueueEvent.STOP stopped_by: StopBy diff --git a/api/core/workflow/entities/node_entities.py b/api/core/workflow/entities/node_entities.py index 80471cc702..18f0f7746c 100644 --- a/api/core/workflow/entities/node_entities.py +++ b/api/core/workflow/entities/node_entities.py @@ -30,3 +30,12 @@ class NodeType(Enum): if node_type.value == value: return node_type raise ValueError(f'invalid node type value {value}') + + +class SystemVariable(Enum): + """ + System Variables. + """ + QUERY = 'query' + FILES = 'files' + CONVERSATION = 'conversation' diff --git a/api/core/workflow/entities/variable_pool.py b/api/core/workflow/entities/variable_pool.py new file mode 100644 index 0000000000..eefee88c07 --- /dev/null +++ b/api/core/workflow/entities/variable_pool.py @@ -0,0 +1,82 @@ +from enum import Enum +from typing import Optional, Union, Any + +from core.workflow.entities.node_entities import SystemVariable + +VariableValue = Union[str, int, float, dict, list] + + +class ValueType(Enum): + """ + Value Type Enum + """ + STRING = "string" + NUMBER = "number" + OBJECT = "object" + ARRAY = "array" + FILE = "file" + + +class VariablePool: + variables_mapping = {} + + def __init__(self, system_variables: dict[SystemVariable, Any]) -> None: + # system variables + # for example: + # { + # 'query': 'abc', + # 'files': [] + # } + for system_variable, value in system_variables.items(): + self.append_variable('sys', [system_variable.value], value) + + def append_variable(self, node_id: str, variable_key_list: list[str], value: VariableValue) -> None: + """ + Append variable + :param node_id: node id + :param variable_key_list: variable key list, like: ['result', 'text'] + :param value: value + :return: + """ + if node_id not in self.variables_mapping: + self.variables_mapping[node_id] = {} + + variable_key_list_hash = hash(tuple(variable_key_list)) + + self.variables_mapping[node_id][variable_key_list_hash] = value + + def get_variable_value(self, variable_selector: list[str], + target_value_type: Optional[ValueType] = None) -> Optional[VariableValue]: + """ + Get variable + :param variable_selector: include node_id and variables + :param target_value_type: target value type + :return: + """ + if len(variable_selector) < 2: + raise ValueError('Invalid value selector') + + node_id = variable_selector[0] + if node_id not in self.variables_mapping: + return None + + # fetch variable keys, pop node_id + variable_key_list = variable_selector[1:] + + variable_key_list_hash = hash(tuple(variable_key_list)) + + value = self.variables_mapping[node_id].get(variable_key_list_hash) + + if target_value_type: + if target_value_type == ValueType.STRING: + return str(value) + elif target_value_type == ValueType.NUMBER: + return int(value) + elif target_value_type == ValueType.OBJECT: + if not isinstance(value, dict): + raise ValueError('Invalid value type: object') + elif target_value_type == ValueType.ARRAY: + if not isinstance(value, list): + raise ValueError('Invalid value type: array') + + return value diff --git a/api/core/workflow/nodes/base_node.py b/api/core/workflow/nodes/base_node.py index 665338af08..a2751b346f 100644 --- a/api/core/workflow/nodes/base_node.py +++ b/api/core/workflow/nodes/base_node.py @@ -1,7 +1,44 @@ +from abc import abstractmethod from typing import Optional +from core.workflow.entities.node_entities import NodeType +from core.workflow.entities.variable_pool import VariablePool + class BaseNode: + _node_type: NodeType + + def __int__(self, node_config: dict) -> None: + self._node_config = node_config + + @abstractmethod + def run(self, variable_pool: Optional[VariablePool] = None, + run_args: Optional[dict] = None) -> dict: + """ + Run node + :param variable_pool: variable pool + :param run_args: run args + :return: + """ + if variable_pool is None and run_args is None: + raise ValueError("At least one of `variable_pool` or `run_args` must be provided.") + + return self._run( + variable_pool=variable_pool, + run_args=run_args + ) + + @abstractmethod + def _run(self, variable_pool: Optional[VariablePool] = None, + run_args: Optional[dict] = None) -> dict: + """ + Run node + :param variable_pool: variable pool + :param run_args: run args + :return: + """ + raise NotImplementedError + @classmethod def get_default_config(cls, filters: Optional[dict] = None) -> dict: """ diff --git a/api/core/workflow/workflow_engine_manager.py b/api/core/workflow/workflow_engine_manager.py index 73e92d5e89..5914bfc152 100644 --- a/api/core/workflow/workflow_engine_manager.py +++ b/api/core/workflow/workflow_engine_manager.py @@ -1,5 +1,6 @@ -from typing import Optional +from typing import Optional, Union, Generator +from core.memory.token_buffer_memory import TokenBufferMemory from core.workflow.entities.node_entities import NodeType from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode @@ -14,7 +15,8 @@ from core.workflow.nodes.template_transform.template_transform_node import Templ from core.workflow.nodes.tool.tool_node import ToolNode from core.workflow.nodes.variable_assigner.variable_assigner_node import VariableAssignerNode from extensions.ext_database import db -from models.model import App +from models.account import Account +from models.model import App, EndUser, Conversation from models.workflow import Workflow node_classes = { @@ -56,13 +58,20 @@ class WorkflowEngineManager: return None # fetch published workflow by workflow_id + return self.get_workflow(app_model, app_model.workflow_id) + + def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]: + """ + Get workflow + """ + # fetch workflow by workflow_id workflow = db.session.query(Workflow).filter( Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, - Workflow.id == app_model.workflow_id + Workflow.id == workflow_id ).first() - # return published workflow + # return workflow return workflow def get_default_configs(self) -> list[dict]: @@ -96,3 +105,20 @@ class WorkflowEngineManager: return None return default_config + + def run_workflow(self, app_model: App, + workflow: Workflow, + user: Union[Account, EndUser], + user_inputs: dict, + system_inputs: Optional[dict] = None) -> Generator: + """ + Run workflow + :param app_model: App instance + :param workflow: Workflow instance + :param user: account or end user + :param user_inputs: user variables inputs + :param system_inputs: system inputs, like: query, files + :return: + """ + # TODO + pass diff --git a/api/fields/workflow_fields.py b/api/fields/workflow_fields.py index bcb2c318c6..9919a440e8 100644 --- a/api/fields/workflow_fields.py +++ b/api/fields/workflow_fields.py @@ -5,8 +5,8 @@ from libs.helper import TimestampField workflow_fields = { 'id': fields.String, - 'graph': fields.Nested(simple_account_fields, attribute='graph_dict'), - 'features': fields.Nested(simple_account_fields, attribute='features_dict'), + 'graph': fields.Raw(attribute='graph_dict'), + 'features': fields.Raw(attribute='features_dict'), 'created_by': fields.Nested(simple_account_fields, attribute='created_by_account'), 'created_at': TimestampField, 'updated_by': fields.Nested(simple_account_fields, attribute='updated_by_account', allow_null=True), diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index 37751bc70f..85c9c2d2b2 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -22,10 +22,10 @@ workflow_run_for_list_fields = { "id": fields.String, "sequence_number": fields.Integer, "version": fields.String, - "graph": fields.String, - "inputs": fields.String, + "graph": fields.Raw(attribute='graph_dict'), + "inputs": fields.Raw(attribute='inputs_dict'), "status": fields.String, - "outputs": fields.String, + "outputs": fields.Raw(attribute='outputs_dict'), "error": fields.String, "elapsed_time": fields.Float, "total_tokens": fields.Integer, @@ -49,10 +49,10 @@ workflow_run_detail_fields = { "id": fields.String, "sequence_number": fields.Integer, "version": fields.String, - "graph": fields.String, - "inputs": fields.String, + "graph": fields.Raw(attribute='graph_dict'), + "inputs": fields.Raw(attribute='inputs_dict'), "status": fields.String, - "outputs": fields.String, + "outputs": fields.Raw(attribute='outputs_dict'), "error": fields.String, "elapsed_time": fields.Float, "total_tokens": fields.Integer, @@ -73,13 +73,13 @@ workflow_run_node_execution_fields = { "node_id": fields.String, "node_type": fields.String, "title": fields.String, - "inputs": fields.String, - "process_data": fields.String, - "outputs": fields.String, + "inputs": fields.Raw(attribute='inputs_dict'), + "process_data": fields.Raw(attribute='process_data_dict'), + "outputs": fields.Raw(attribute='outputs_dict'), "status": fields.String, "error": fields.String, "elapsed_time": fields.Float, - "execution_metadata": fields.String, + "execution_metadata": fields.Raw(attribute='execution_metadata_dict'), "created_at": TimestampField, "created_by_role": fields.String, "created_by_account": fields.Nested(simple_account_fields, attribute='created_by_account', allow_null=True), diff --git a/api/models/workflow.py b/api/models/workflow.py index 2540d33402..32ff26196c 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -272,6 +272,14 @@ class WorkflowRun(db.Model): return EndUser.query.get(self.created_by) \ if created_by_role == CreatedByRole.END_USER else None + @property + def graph_dict(self): + return self.graph if not self.graph else json.loads(self.graph) + + @property + def inputs_dict(self): + return self.inputs if not self.inputs else json.loads(self.inputs) + @property def outputs_dict(self): return self.outputs if not self.outputs else json.loads(self.outputs) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 0be0783ae0..37f5c16bec 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -1,14 +1,16 @@ import json from datetime import datetime -from typing import Optional +from typing import Optional, Union, Any, Generator from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager +from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager +from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import NodeType from core.workflow.workflow_engine_manager import WorkflowEngineManager from extensions.ext_database import db from models.account import Account -from models.model import App, AppMode +from models.model import App, AppMode, EndUser from models.workflow import Workflow, WorkflowType from services.workflow.workflow_converter import WorkflowConverter @@ -142,6 +144,39 @@ class WorkflowService: workflow_engine_manager = WorkflowEngineManager() return workflow_engine_manager.get_default_config(node_type, filters) + def run_advanced_chat_draft_workflow(self, app_model: App, + user: Union[Account, EndUser], + args: dict, + invoke_from: InvokeFrom) -> Union[dict, Generator]: + """ + Run advanced chatbot draft workflow + """ + # fetch draft workflow by app_model + draft_workflow = self.get_draft_workflow(app_model=app_model) + + if not draft_workflow: + raise ValueError('Workflow not initialized') + + # run draft workflow + app_generator = AdvancedChatAppGenerator() + response = app_generator.generate( + app_model=app_model, + workflow=draft_workflow, + user=user, + args=args, + invoke_from=invoke_from, + stream=True + ) + + return response + + def run_draft_workflow(self, app_model: App, + user: Union[Account, EndUser], + args: dict, + invoke_from: InvokeFrom) -> Union[dict, Generator]: + # TODO + pass + def convert_to_workflow(self, app_model: App, account: Account) -> App: """ Basic mode of chatbot app(expert mode) to workflow