diff --git a/api/controllers/openapi/app_run.py b/api/controllers/openapi/app_run.py new file mode 100644 index 0000000000..0a1b08ae60 --- /dev/null +++ b/api/controllers/openapi/app_run.py @@ -0,0 +1,200 @@ +"""POST /openapi/v1/apps//run — mode-agnostic runner. + +Server reads ``apps.mode`` after AppResolver and dispatches via +_DISPATCH to the per-mode helper. Per-mode constraints (e.g. chat-family +requires ``query``; workflow rejects ``query``) are enforced inside +the helper, post-resolve, since ``mode`` is not in the request body. +""" + +from __future__ import annotations + +import logging +from collections.abc import Callable, Mapping +from typing import Any, Literal +from uuid import UUID + +from pydantic import BaseModel, field_validator +from werkzeug.exceptions import BadRequest, InternalServerError, NotFound, UnprocessableEntity + +import services +from controllers.openapi._models import ( + ChatMessageResponse, + CompletionMessageResponse, + WorkflowRunResponse, +) +from controllers.service_api.app.error import ( + AppUnavailableError, + CompletionRequestError, + ConversationCompletedError, + ProviderModelCurrentlyNotSupportError, + ProviderNotInitializeError, + ProviderQuotaExceededError, +) +from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError +from core.app.entities.app_invoke_entities import InvokeFrom +from core.errors.error import ( + ModelCurrentlyNotSupportError, + ProviderTokenNotInitError, + QuotaExceededError, +) +from graphon.model_runtime.errors.invoke import InvokeError +from libs import helper +from libs.helper import UUIDStrOrEmpty +from models.model import App, AppMode +from services.app_generate_service import AppGenerateService +from services.errors.app import ( + IsDraftWorkflowError, + WorkflowIdFormatError, + WorkflowNotFoundError, +) +from services.errors.llm import InvokeRateLimitError + +logger = logging.getLogger(__name__) + + +class AppRunRequest(BaseModel): + inputs: dict[str, Any] + query: str | None = None + files: list[dict[str, Any]] | None = None + response_mode: Literal["blocking", "streaming"] | None = None + conversation_id: UUIDStrOrEmpty | None = None + auto_generate_name: bool = True + workflow_id: str | None = None + + @field_validator("conversation_id", mode="before") + @classmethod + def _normalize_conv(cls, value: str | UUID | None) -> str | None: + if isinstance(value, str): + value = value.strip() + if not value: + return None + try: + return helper.uuid_value(value) + except ValueError as exc: + raise ValueError("conversation_id must be a valid UUID") from exc + + +def _enforce_chat_constraint(payload: AppRunRequest) -> None: + if not payload.query or not payload.query.strip(): + raise UnprocessableEntity("query_required_for_chat") + + +def _enforce_workflow_constraint(payload: AppRunRequest) -> None: + if payload.query is not None: + raise UnprocessableEntity("query_not_supported_for_workflow") + + +def _unpack_blocking(response: Any) -> Mapping[str, Any]: + if isinstance(response, tuple): + body_dict: Any = response[0] + else: + body_dict = response + if not isinstance(body_dict, Mapping): + raise InternalServerError("blocking generate returned non-mapping response") + return dict(body_dict) + + +def _generate(app: App, caller: Any, args: dict[str, Any], streaming: bool): + return AppGenerateService.generate( + app_model=app, + user=caller, + args=args, + invoke_from=InvokeFrom.OPENAPI, + streaming=streaming, + ) + + +def _run_chat(app: App, caller: Any, payload: AppRunRequest, streaming: bool): + _enforce_chat_constraint(payload) + args = payload.model_dump(exclude_none=True) + try: + response = _generate(app, caller, args, streaming) + except WorkflowNotFoundError as ex: + raise NotFound(str(ex)) + except (IsDraftWorkflowError, WorkflowIdFormatError) as ex: + raise BadRequest(str(ex)) + except services.errors.conversation.ConversationNotExistsError: + raise NotFound("Conversation Not Exists.") + except services.errors.conversation.ConversationCompletedError: + raise ConversationCompletedError() + except services.errors.app_model_config.AppModelConfigBrokenError: + logger.exception("App model config broken.") + raise AppUnavailableError() + except ProviderTokenNotInitError as ex: + raise ProviderNotInitializeError(ex.description) + except QuotaExceededError: + raise ProviderQuotaExceededError() + except ModelCurrentlyNotSupportError: + raise ProviderModelCurrentlyNotSupportError() + except InvokeRateLimitError as ex: + raise InvokeRateLimitHttpError(ex.description) + except InvokeError as e: + raise CompletionRequestError(e.description) + + if streaming: + return response, None + body = _unpack_blocking(response) + return None, ChatMessageResponse.model_validate(body).model_dump(mode="json") + + +def _run_completion(app: App, caller: Any, payload: AppRunRequest, streaming: bool): + args = payload.model_dump(exclude_none=True) + args["auto_generate_name"] = False + args.setdefault("query", "") + try: + response = _generate(app, caller, args, streaming) + except services.errors.conversation.ConversationNotExistsError: + raise NotFound("Conversation Not Exists.") + except services.errors.conversation.ConversationCompletedError: + raise ConversationCompletedError() + except services.errors.app_model_config.AppModelConfigBrokenError: + logger.exception("App model config broken.") + raise AppUnavailableError() + except ProviderTokenNotInitError as ex: + raise ProviderNotInitializeError(ex.description) + except QuotaExceededError: + raise ProviderQuotaExceededError() + except ModelCurrentlyNotSupportError: + raise ProviderModelCurrentlyNotSupportError() + except InvokeError as e: + raise CompletionRequestError(e.description) + + if streaming: + return response, None + body = _unpack_blocking(response) + return None, CompletionMessageResponse.model_validate(body).model_dump(mode="json") + + +def _run_workflow(app: App, caller: Any, payload: AppRunRequest, streaming: bool): + _enforce_workflow_constraint(payload) + args = payload.model_dump(exclude={"query", "conversation_id", "auto_generate_name"}, exclude_none=True) + try: + response = _generate(app, caller, args, streaming) + except WorkflowNotFoundError as ex: + raise NotFound(str(ex)) + except (IsDraftWorkflowError, WorkflowIdFormatError) as ex: + raise BadRequest(str(ex)) + except ProviderTokenNotInitError as ex: + raise ProviderNotInitializeError(ex.description) + except QuotaExceededError: + raise ProviderQuotaExceededError() + except ModelCurrentlyNotSupportError: + raise ProviderModelCurrentlyNotSupportError() + except InvokeRateLimitError as ex: + raise InvokeRateLimitHttpError(ex.description) + except InvokeError as e: + raise CompletionRequestError(e.description) + + if streaming: + return response, None + body = _unpack_blocking(response) + return None, WorkflowRunResponse.model_validate(body).model_dump(mode="json") + + +_DISPATCH: dict[AppMode, Callable[[App, Any, AppRunRequest, bool], tuple[Any, Any]]] = { + AppMode.CHAT: _run_chat, + AppMode.AGENT_CHAT: _run_chat, + AppMode.ADVANCED_CHAT: _run_chat, + AppMode.COMPLETION: _run_completion, + AppMode.WORKFLOW: _run_workflow, +} diff --git a/api/tests/unit_tests/controllers/openapi/test_app_run_dispatch.py b/api/tests/unit_tests/controllers/openapi/test_app_run_dispatch.py new file mode 100644 index 0000000000..27e402e4c9 --- /dev/null +++ b/api/tests/unit_tests/controllers/openapi/test_app_run_dispatch.py @@ -0,0 +1,25 @@ +import pytest +from werkzeug.exceptions import UnprocessableEntity + +from controllers.openapi.app_run import ( + _DISPATCH, + AppRunRequest, + _enforce_chat_constraint, + _enforce_workflow_constraint, +) +from models.model import AppMode + + +def test_dispatch_covers_runnable_modes(): + runnable = {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.COMPLETION, AppMode.WORKFLOW} + assert set(_DISPATCH) == runnable + + +def test_chat_constraint_requires_query(): + with pytest.raises(UnprocessableEntity, match="query_required_for_chat"): + _enforce_chat_constraint(AppRunRequest(inputs={})) + + +def test_workflow_constraint_rejects_query(): + with pytest.raises(UnprocessableEntity, match="query_not_supported_for_workflow"): + _enforce_workflow_constraint(AppRunRequest(inputs={}, query="hi"))