From 3cf8416484f4fbb019178592fb7cb12cb77bfec5 Mon Sep 17 00:00:00 2001 From: takatost Date: Sat, 16 Mar 2024 16:27:39 +0800 Subject: [PATCH] add workflow api for installed app & web api & service api --- api/controllers/console/__init__.py | 3 +- api/controllers/console/app/workflow.py | 1 + api/controllers/console/explore/completion.py | 4 +- api/controllers/console/explore/error.py | 6 ++ api/controllers/console/explore/workflow.py | 85 +++++++++++++++++++ api/controllers/service_api/__init__.py | 2 +- api/controllers/service_api/app/error.py | 6 ++ api/controllers/service_api/app/workflow.py | 84 ++++++++++++++++++ api/controllers/web/__init__.py | 2 +- api/controllers/web/error.py | 6 ++ api/controllers/web/workflow.py | 82 ++++++++++++++++++ 11 files changed, 275 insertions(+), 6 deletions(-) create mode 100644 api/controllers/console/explore/workflow.py create mode 100644 api/controllers/service_api/app/workflow.py create mode 100644 api/controllers/web/workflow.py diff --git a/api/controllers/console/__init__.py b/api/controllers/console/__init__.py index 853ca9e3a7..15e5824db0 100644 --- a/api/controllers/console/__init__.py +++ b/api/controllers/console/__init__.py @@ -16,6 +16,7 @@ from .billing import billing # Import datasets controllers from .datasets import data_source, datasets, datasets_document, datasets_segments, file, hit_testing # Import explore controllers -from .explore import audio, completion, conversation, installed_app, message, parameter, recommended_app, saved_message +from .explore import (audio, completion, conversation, installed_app, message, parameter, recommended_app, + saved_message, workflow) # Import workspace controllers from .workspace import account, members, model_providers, models, tool_providers, workspace diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 0b6aa64291..845ecdf0af 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -119,6 +119,7 @@ class DraftWorkflowRunApi(Resource): """ parser = reqparse.RequestParser() parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json') + parser.add_argument('files', type=list, required=False, location='json') args = parser.parse_args() try: diff --git a/api/controllers/console/explore/completion.py b/api/controllers/console/explore/completion.py index f0bf46f1a6..292b4ed2a0 100644 --- a/api/controllers/console/explore/completion.py +++ b/api/controllers/console/explore/completion.py @@ -104,12 +104,10 @@ class ChatApi(InstalledAppResource): parser.add_argument('inputs', type=dict, required=True, location='json') parser.add_argument('query', type=str, required=True, location='json') parser.add_argument('files', type=list, required=False, location='json') - parser.add_argument('response_mode', type=str, choices=['blocking', 'streaming'], location='json') parser.add_argument('conversation_id', type=uuid_value, location='json') parser.add_argument('retriever_from', type=str, required=False, default='explore_app', location='json') args = parser.parse_args() - streaming = args['response_mode'] == 'streaming' args['auto_generate_name'] = False installed_app.last_used_at = datetime.utcnow() @@ -121,7 +119,7 @@ class ChatApi(InstalledAppResource): user=current_user, args=args, invoke_from=InvokeFrom.EXPLORE, - streaming=streaming + streaming=True ) return helper.compact_generate_response(response) diff --git a/api/controllers/console/explore/error.py b/api/controllers/console/explore/error.py index e1e3a2a877..9c3216ecc8 100644 --- a/api/controllers/console/explore/error.py +++ b/api/controllers/console/explore/error.py @@ -13,6 +13,12 @@ class NotChatAppError(BaseHTTPException): code = 400 +class NotWorkflowAppError(BaseHTTPException): + error_code = 'not_workflow_app' + description = "Only support workflow app." + code = 400 + + class AppSuggestedQuestionsAfterAnswerDisabledError(BaseHTTPException): error_code = 'app_suggested_questions_after_answer_disabled' description = "Function Suggested questions after answer disabled." diff --git a/api/controllers/console/explore/workflow.py b/api/controllers/console/explore/workflow.py new file mode 100644 index 0000000000..d56d943eb8 --- /dev/null +++ b/api/controllers/console/explore/workflow.py @@ -0,0 +1,85 @@ +import logging + +from flask_restful import reqparse +from werkzeug.exceptions import InternalServerError + +from controllers.console import api +from controllers.console.app.error import ( + CompletionRequestError, + ProviderModelCurrentlyNotSupportError, + ProviderNotInitializeError, + ProviderQuotaExceededError, +) +from controllers.console.explore.error import NotWorkflowAppError +from controllers.console.explore.wraps import InstalledAppResource +from core.app.apps.base_app_queue_manager import AppQueueManager +from core.app.entities.app_invoke_entities import InvokeFrom +from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError +from core.model_runtime.errors.invoke import InvokeError +from libs import helper +from libs.login import current_user +from models.model import AppMode, InstalledApp +from services.app_generate_service import AppGenerateService + +logger = logging.getLogger(__name__) + + +class WorkflowRunApi(InstalledAppResource): + def post(self, installed_app: InstalledApp): + """ + Run workflow + """ + app_model = installed_app.app + app_mode = AppMode.value_of(app_model.mode) + if app_mode != AppMode.WORKFLOW: + raise NotWorkflowAppError() + + parser = reqparse.RequestParser() + parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json') + parser.add_argument('files', type=list, required=False, location='json') + args = parser.parse_args() + + try: + response = AppGenerateService.generate( + app_model=app_model, + user=current_user, + args=args, + invoke_from=InvokeFrom.EXPLORE, + streaming=True + ) + + return helper.compact_generate_response(response) + except ProviderTokenNotInitError as ex: + raise ProviderNotInitializeError(ex.description) + except QuotaExceededError: + raise ProviderQuotaExceededError() + except ModelCurrentlyNotSupportError: + raise ProviderModelCurrentlyNotSupportError() + except InvokeError as e: + raise CompletionRequestError(e.description) + except ValueError as e: + raise e + except Exception as e: + logging.exception("internal server error.") + raise InternalServerError() + + +class WorkflowTaskStopApi(InstalledAppResource): + def post(self, installed_app: InstalledApp, task_id: str): + """ + Stop workflow task + """ + app_model = installed_app.app + app_mode = AppMode.value_of(app_model.mode) + if app_mode != AppMode.WORKFLOW: + raise NotWorkflowAppError() + + AppQueueManager.set_stop_flag(task_id, InvokeFrom.EXPLORE, current_user.id) + + return { + "result": "success" + } + + +api.add_resource(WorkflowRunApi, '/installed-apps//workflows/run') +api.add_resource(WorkflowTaskStopApi, '/installed-apps//workflows/tasks//stop') diff --git a/api/controllers/service_api/__init__.py b/api/controllers/service_api/__init__.py index e5138ccc74..9e6bb3a698 100644 --- a/api/controllers/service_api/__init__.py +++ b/api/controllers/service_api/__init__.py @@ -7,5 +7,5 @@ api = ExternalApi(bp) from . import index -from .app import app, audio, completion, conversation, file, message +from .app import app, audio, completion, conversation, file, message, workflow from .dataset import dataset, document, segment diff --git a/api/controllers/service_api/app/error.py b/api/controllers/service_api/app/error.py index 590d462deb..ac9edb1b4f 100644 --- a/api/controllers/service_api/app/error.py +++ b/api/controllers/service_api/app/error.py @@ -19,6 +19,12 @@ class NotChatAppError(BaseHTTPException): code = 400 +class NotWorkflowAppError(BaseHTTPException): + error_code = 'not_workflow_app' + description = "Please check if your app mode matches the right API route." + code = 400 + + class ConversationCompletedError(BaseHTTPException): error_code = 'conversation_completed' description = "The conversation has ended. Please start a new conversation." diff --git a/api/controllers/service_api/app/workflow.py b/api/controllers/service_api/app/workflow.py new file mode 100644 index 0000000000..511a7e5a45 --- /dev/null +++ b/api/controllers/service_api/app/workflow.py @@ -0,0 +1,84 @@ +import logging + +from flask_restful import Resource, reqparse +from werkzeug.exceptions import InternalServerError + +from controllers.service_api import api +from controllers.service_api.app.error import ( + CompletionRequestError, + NotWorkflowAppError, + ProviderModelCurrentlyNotSupportError, + ProviderNotInitializeError, + ProviderQuotaExceededError, +) +from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate_app_token +from core.app.apps.base_app_queue_manager import AppQueueManager +from core.app.entities.app_invoke_entities import InvokeFrom +from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError +from core.model_runtime.errors.invoke import InvokeError +from libs import helper +from models.model import App, AppMode, EndUser +from services.app_generate_service import AppGenerateService + +logger = logging.getLogger(__name__) + + +class WorkflowRunApi(Resource): + @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True)) + def post(self, app_model: App, end_user: EndUser): + """ + Run workflow + """ + app_mode = AppMode.value_of(app_model.mode) + if app_mode != AppMode.WORKFLOW: + raise NotWorkflowAppError() + + parser = reqparse.RequestParser() + parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json') + parser.add_argument('files', type=list, required=False, location='json') + args = parser.parse_args() + + try: + response = AppGenerateService.generate( + app_model=app_model, + user=end_user, + args=args, + invoke_from=InvokeFrom.SERVICE_API, + streaming=True + ) + + return helper.compact_generate_response(response) + except ProviderTokenNotInitError as ex: + raise ProviderNotInitializeError(ex.description) + except QuotaExceededError: + raise ProviderQuotaExceededError() + except ModelCurrentlyNotSupportError: + raise ProviderModelCurrentlyNotSupportError() + except InvokeError as e: + raise CompletionRequestError(e.description) + except ValueError as e: + raise e + except Exception as e: + logging.exception("internal server error.") + raise InternalServerError() + + +class WorkflowTaskStopApi(Resource): + @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True)) + def post(self, app_model: App, end_user: EndUser, task_id: str): + """ + Stop workflow task + """ + app_mode = AppMode.value_of(app_model.mode) + if app_mode != AppMode.WORKFLOW: + raise NotWorkflowAppError() + + AppQueueManager.set_stop_flag(task_id, InvokeFrom.SERVICE_API, end_user.id) + + return { + "result": "success" + } + + +api.add_resource(WorkflowRunApi, '/workflows/run') +api.add_resource(WorkflowTaskStopApi, '/workflows/tasks//stop') diff --git a/api/controllers/web/__init__.py b/api/controllers/web/__init__.py index 27ea0cdb67..c68d23f878 100644 --- a/api/controllers/web/__init__.py +++ b/api/controllers/web/__init__.py @@ -6,4 +6,4 @@ bp = Blueprint('web', __name__, url_prefix='/api') api = ExternalApi(bp) -from . import app, audio, completion, conversation, file, message, passport, saved_message, site +from . import app, audio, completion, conversation, file, message, passport, saved_message, site, workflow diff --git a/api/controllers/web/error.py b/api/controllers/web/error.py index 453d08d2fa..390e3fe7d1 100644 --- a/api/controllers/web/error.py +++ b/api/controllers/web/error.py @@ -19,6 +19,12 @@ class NotChatAppError(BaseHTTPException): code = 400 +class NotWorkflowAppError(BaseHTTPException): + error_code = 'not_workflow_app' + description = "Please check if your Workflow app mode matches the right API route." + code = 400 + + class ConversationCompletedError(BaseHTTPException): error_code = 'conversation_completed' description = "The conversation has ended. Please start a new conversation." diff --git a/api/controllers/web/workflow.py b/api/controllers/web/workflow.py new file mode 100644 index 0000000000..77c468e417 --- /dev/null +++ b/api/controllers/web/workflow.py @@ -0,0 +1,82 @@ +import logging + +from flask_restful import reqparse +from werkzeug.exceptions import InternalServerError + +from controllers.web import api +from controllers.web.error import ( + CompletionRequestError, + NotWorkflowAppError, + ProviderModelCurrentlyNotSupportError, + ProviderNotInitializeError, + ProviderQuotaExceededError, +) +from controllers.web.wraps import WebApiResource +from core.app.apps.base_app_queue_manager import AppQueueManager +from core.app.entities.app_invoke_entities import InvokeFrom +from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError +from core.model_runtime.errors.invoke import InvokeError +from libs import helper +from models.model import App, AppMode, EndUser +from services.app_generate_service import AppGenerateService + +logger = logging.getLogger(__name__) + + +class WorkflowRunApi(WebApiResource): + def post(self, app_model: App, end_user: EndUser): + """ + Run workflow + """ + app_mode = AppMode.value_of(app_model.mode) + if app_mode != AppMode.WORKFLOW: + raise NotWorkflowAppError() + + parser = reqparse.RequestParser() + parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json') + parser.add_argument('files', type=list, required=False, location='json') + args = parser.parse_args() + + try: + response = AppGenerateService.generate( + app_model=app_model, + user=end_user, + args=args, + invoke_from=InvokeFrom.WEB_APP, + streaming=True + ) + + return helper.compact_generate_response(response) + except ProviderTokenNotInitError as ex: + raise ProviderNotInitializeError(ex.description) + except QuotaExceededError: + raise ProviderQuotaExceededError() + except ModelCurrentlyNotSupportError: + raise ProviderModelCurrentlyNotSupportError() + except InvokeError as e: + raise CompletionRequestError(e.description) + except ValueError as e: + raise e + except Exception as e: + logging.exception("internal server error.") + raise InternalServerError() + + +class WorkflowTaskStopApi(WebApiResource): + def post(self, app_model: App, end_user: EndUser, task_id: str): + """ + Stop workflow task + """ + app_mode = AppMode.value_of(app_model.mode) + if app_mode != AppMode.WORKFLOW: + raise NotWorkflowAppError() + + AppQueueManager.set_stop_flag(task_id, InvokeFrom.WEB_APP, end_user.id) + + return { + "result": "success" + } + + +api.add_resource(WorkflowRunApi, '/workflows/run') +api.add_resource(WorkflowTaskStopApi, '/workflows/tasks//stop')