import logging from typing import Literal from flask import request from flask_restx import Resource from pydantic import BaseModel, Field from sqlalchemy import select from werkzeug.exceptions import Forbidden, InternalServerError, NotFound import services from controllers.common.fields import AudioBinaryResponse, AudioTranscriptResponse, SimpleResultResponse from controllers.common.fields import Parameters as ParametersResponse from controllers.common.fields import Site as SiteResponse from controllers.common.schema import ( query_params_from_model, query_params_from_request, register_response_schema_models, register_schema_models, ) from controllers.console import console_ns from controllers.console.app.app import AppDetailWithSite from controllers.console.app.error import ( AppUnavailableError, AudioTooLargeError, CompletionRequestError, ConversationCompletedError, NeedAddIdsError, NoAudioUploadedError, ProviderModelCurrentlyNotSupportError, ProviderNotInitializeError, ProviderNotSupportSpeechToTextError, ProviderQuotaExceededError, UnsupportedAudioTypeError, ) from controllers.console.app.workflow import WorkflowResponse from controllers.console.app.wraps import get_app_model_with_trial from controllers.console.explore.error import ( AppSuggestedQuestionsAfterAnswerDisabledError, NotChatAppError, NotCompletionAppError, NotWorkflowAppError, ) from controllers.console.explore.wraps import TrialAppResource, trial_feature_enable from controllers.console.wraps import with_current_user from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError from core.app.app_config.common.parameters_mapping import get_parameters_from_feature_dict from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.entities.app_invoke_entities import InvokeFrom from core.errors.error import ( ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError, ) from extensions.ext_database import db from extensions.ext_redis import redis_client from fields.base import ResponseModel from fields.dataset_fields import DatasetDetailResponse from fields.message_fields import SuggestedQuestionsResponse from graphon.graph_engine.manager import GraphEngineManager from graphon.model_runtime.errors.invoke import InvokeError from libs import helper from libs.helper import dump_response, uuid_value from models import Account from models.account import TenantStatus from models.model import AppMode, Site from models.workflow import Workflow from services.app_generate_service import AppGenerateService from services.app_service import AppService from services.audio_service import AudioService from services.dataset_service import DatasetService from services.errors.audio import ( AudioTooLargeServiceError, NoAudioUploadedServiceError, ProviderNotSupportSpeechToTextServiceError, UnsupportedAudioTypeServiceError, ) from services.errors.conversation import ConversationNotExistsError from services.errors.llm import InvokeRateLimitError from services.errors.message import ( MessageNotExistsError, SuggestedQuestionsAfterAnswerDisabledError, ) from services.message_service import MessageService from services.recommended_app_service import RecommendedAppService logger = logging.getLogger(__name__) class TrialDatasetListItemResponse(DatasetDetailResponse): pass class TrialDatasetListResponse(ResponseModel): data: list[TrialDatasetListItemResponse] has_more: bool limit: int total: int page: int register_response_schema_models( console_ns, ParametersResponse, AppDetailWithSite, AudioBinaryResponse, AudioTranscriptResponse, SimpleResultResponse, SiteResponse, SuggestedQuestionsResponse, TrialDatasetListItemResponse, TrialDatasetListResponse, WorkflowResponse, ) class WorkflowRunRequest(BaseModel): inputs: dict files: list | None = None class ChatRequest(BaseModel): inputs: dict query: str files: list | None = None conversation_id: str | None = None parent_message_id: str | None = None retriever_from: str = "explore_app" class TextToSpeechRequest(BaseModel): message_id: str | None = None voice: str | None = None text: str | None = None streaming: bool | None = None class CompletionRequest(BaseModel): inputs: dict query: str = "" files: list | None = None response_mode: Literal["blocking", "streaming"] | None = None retriever_from: str = "explore_app" class TrialDatasetListQuery(BaseModel): page: int = Field(default=1, ge=1, description="Page number") limit: int = Field(default=20, ge=1, description="Number of items per page") ids: list[str] = Field(default_factory=list, description="Dataset IDs") register_schema_models( console_ns, WorkflowRunRequest, ChatRequest, TextToSpeechRequest, CompletionRequest, TrialDatasetListQuery, ) class TrialAppWorkflowRunApi(TrialAppResource): @trial_feature_enable @with_current_user @console_ns.expect(console_ns.models[WorkflowRunRequest.__name__]) @console_ns.response(200, "Success") def post(self, current_user: Account, trial_app): """ Run workflow """ app_model = trial_app if not app_model: raise NotWorkflowAppError() app_mode = AppMode.value_of(app_model.mode) if app_mode != AppMode.WORKFLOW: raise NotWorkflowAppError() request_data = WorkflowRunRequest.model_validate(console_ns.payload) args = request_data.model_dump() try: app_id = app_model.id user_id = current_user.id response = AppGenerateService.generate( app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.EXPLORE, streaming=True ) RecommendedAppService.add_trial_app_record(db.session, app_id, user_id) # response-contract:ignore compact_generate_response return helper.compact_generate_response(response) except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) except QuotaExceededError: raise ProviderQuotaExceededError() except ModelCurrentlyNotSupportError: raise ProviderModelCurrentlyNotSupportError() except InvokeError as e: raise CompletionRequestError(e.description) except InvokeRateLimitError as ex: raise InvokeRateLimitHttpError(ex.description) except ValueError as e: raise e except Exception: logger.exception("internal server error.") raise InternalServerError() class TrialAppWorkflowTaskStopApi(TrialAppResource): @console_ns.response(200, "Success", console_ns.models[SimpleResultResponse.__name__]) @trial_feature_enable def post(self, trial_app, task_id: str): """ Stop workflow task """ app_model = trial_app if not app_model: raise NotWorkflowAppError() app_mode = AppMode.value_of(app_model.mode) if app_mode != AppMode.WORKFLOW: raise NotWorkflowAppError() # Stop using both mechanisms for backward compatibility # Legacy stop flag mechanism (without user check) AppQueueManager.set_stop_flag_no_user_check(task_id) # New graph engine command channel mechanism GraphEngineManager(redis_client).send_stop_command(task_id) return SimpleResultResponse(result="success").model_dump(mode="json") class TrialChatApi(TrialAppResource): @console_ns.expect(console_ns.models[ChatRequest.__name__]) @console_ns.response(200, "Success") @trial_feature_enable @with_current_user def post(self, current_user: Account, trial_app): app_model = trial_app app_mode = AppMode.value_of(app_model.mode) if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}: raise NotChatAppError() request_data = ChatRequest.model_validate(console_ns.payload) args = request_data.model_dump() # Validate UUID values if provided if args.get("conversation_id"): args["conversation_id"] = uuid_value(args["conversation_id"]) if args.get("parent_message_id"): args["parent_message_id"] = uuid_value(args["parent_message_id"]) args["auto_generate_name"] = False try: # Get IDs before they might be detached from session app_id = app_model.id user_id = current_user.id response = AppGenerateService.generate( app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.EXPLORE, streaming=True ) RecommendedAppService.add_trial_app_record(db.session, app_id, user_id) # response-contract:ignore compact_generate_response return helper.compact_generate_response(response) except services.errors.conversation.ConversationNotExistsError: raise NotFound("Conversation Not Exists.") except services.errors.conversation.ConversationCompletedError: raise ConversationCompletedError() except services.errors.app_model_config.AppModelConfigBrokenError: logger.exception("App model config broken.") raise AppUnavailableError() except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) except QuotaExceededError: raise ProviderQuotaExceededError() except ModelCurrentlyNotSupportError: raise ProviderModelCurrentlyNotSupportError() except InvokeError as e: raise CompletionRequestError(e.description) except InvokeRateLimitError as ex: raise InvokeRateLimitHttpError(ex.description) except ValueError as e: raise e except Exception: logger.exception("internal server error.") raise InternalServerError() class TrialMessageSuggestedQuestionApi(TrialAppResource): @console_ns.response(200, "Success", console_ns.models[SuggestedQuestionsResponse.__name__]) @with_current_user def get(self, current_user: Account, trial_app, message_id): app_model = trial_app app_mode = AppMode.value_of(app_model.mode) if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}: raise NotChatAppError() message_id = str(message_id) try: questions = MessageService.get_suggested_questions_after_answer( app_model=app_model, user=current_user, message_id=message_id, invoke_from=InvokeFrom.EXPLORE ) except MessageNotExistsError: raise NotFound("Message not found") except ConversationNotExistsError: raise NotFound("Conversation not found") except SuggestedQuestionsAfterAnswerDisabledError: raise AppSuggestedQuestionsAfterAnswerDisabledError() except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) except QuotaExceededError: raise ProviderQuotaExceededError() except ModelCurrentlyNotSupportError: raise ProviderModelCurrentlyNotSupportError() except InvokeError as e: raise CompletionRequestError(e.description) except Exception: logger.exception("internal server error.") raise InternalServerError() return dump_response(SuggestedQuestionsResponse, {"data": questions}) class TrialChatAudioApi(TrialAppResource): @console_ns.response(200, "Success", console_ns.models[AudioTranscriptResponse.__name__]) @trial_feature_enable @with_current_user def post(self, current_user: Account, trial_app): app_model = trial_app file = request.files["file"] try: # Get IDs before they might be detached from session app_id = app_model.id user_id = current_user.id response = AudioService.transcript_asr(app_model=app_model, file=file, end_user=None) RecommendedAppService.add_trial_app_record(db.session, app_id, user_id) return dump_response(AudioTranscriptResponse, response) except services.errors.app_model_config.AppModelConfigBrokenError: logger.exception("App model config broken.") raise AppUnavailableError() except NoAudioUploadedServiceError: raise NoAudioUploadedError() except AudioTooLargeServiceError as e: raise AudioTooLargeError(str(e)) except UnsupportedAudioTypeServiceError: raise UnsupportedAudioTypeError() except ProviderNotSupportSpeechToTextServiceError: raise ProviderNotSupportSpeechToTextError() except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) except QuotaExceededError: raise ProviderQuotaExceededError() except ModelCurrentlyNotSupportError: raise ProviderModelCurrentlyNotSupportError() except InvokeError as e: raise CompletionRequestError(e.description) except ValueError as e: raise e except Exception as e: logger.exception("internal server error.") raise InternalServerError() class TrialChatTextApi(TrialAppResource): @console_ns.expect(console_ns.models[TextToSpeechRequest.__name__]) @console_ns.response(200, "Success", console_ns.models[AudioBinaryResponse.__name__]) @trial_feature_enable @with_current_user def post(self, current_user: Account, trial_app): app_model = trial_app try: request_data = TextToSpeechRequest.model_validate(console_ns.payload) message_id = request_data.message_id text = request_data.text voice = request_data.voice # Get IDs before they might be detached from session app_id = app_model.id user_id = current_user.id response = AudioService.transcript_tts( app_model=app_model, session=db.session, text=text, voice=voice, message_id=message_id, ) RecommendedAppService.add_trial_app_record(db.session, app_id, user_id) # response-contract:ignore binary response return response except services.errors.app_model_config.AppModelConfigBrokenError: logger.exception("App model config broken.") raise AppUnavailableError() except NoAudioUploadedServiceError: raise NoAudioUploadedError() except AudioTooLargeServiceError as e: raise AudioTooLargeError(str(e)) except UnsupportedAudioTypeServiceError: raise UnsupportedAudioTypeError() except ProviderNotSupportSpeechToTextServiceError: raise ProviderNotSupportSpeechToTextError() except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) except QuotaExceededError: raise ProviderQuotaExceededError() except ModelCurrentlyNotSupportError: raise ProviderModelCurrentlyNotSupportError() except InvokeError as e: raise CompletionRequestError(e.description) except ValueError as e: raise e except Exception as e: logger.exception("internal server error.") raise InternalServerError() class TrialCompletionApi(TrialAppResource): @console_ns.expect(console_ns.models[CompletionRequest.__name__]) @console_ns.response(200, "Success") @trial_feature_enable @with_current_user def post(self, current_user: Account, trial_app): app_model = trial_app if app_model.mode != "completion": raise NotCompletionAppError() request_data = CompletionRequest.model_validate(console_ns.payload) args = request_data.model_dump() streaming = args["response_mode"] == "streaming" args["auto_generate_name"] = False try: # Get IDs before they might be detached from session app_id = app_model.id user_id = current_user.id response = AppGenerateService.generate( app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.EXPLORE, streaming=streaming ) RecommendedAppService.add_trial_app_record(db.session, app_id, user_id) # response-contract:ignore compact_generate_response return helper.compact_generate_response(response) except services.errors.conversation.ConversationNotExistsError: raise NotFound("Conversation Not Exists.") except services.errors.conversation.ConversationCompletedError: raise ConversationCompletedError() except services.errors.app_model_config.AppModelConfigBrokenError: logger.exception("App model config broken.") raise AppUnavailableError() except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) except QuotaExceededError: raise ProviderQuotaExceededError() except ModelCurrentlyNotSupportError: raise ProviderModelCurrentlyNotSupportError() except InvokeError as e: raise CompletionRequestError(e.description) except ValueError as e: raise e except Exception: logger.exception("internal server error.") raise InternalServerError() class TrialSitApi(Resource): """Resource for trial app sites.""" @console_ns.response(200, "Success", console_ns.models[SiteResponse.__name__]) @get_app_model_with_trial(None) def get(self, app_model): """Retrieve app site info. Returns the site configuration for the application including theme, icons, and text. """ site = db.session.scalar(select(Site).where(Site.app_id == app_model.id).limit(1)) if not site: raise Forbidden() assert app_model.tenant if app_model.tenant.status == TenantStatus.ARCHIVE: raise Forbidden() return SiteResponse.model_validate(site).model_dump(mode="json") class TrialAppParameterApi(Resource): """Resource for app variables.""" @console_ns.response(200, "Success", console_ns.models[ParametersResponse.__name__]) @get_app_model_with_trial(None) def get(self, app_model): """Retrieve app parameters.""" if app_model is None: raise AppUnavailableError() if app_model.mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}: workflow = app_model.workflow if workflow is None: raise AppUnavailableError() features_dict = workflow.features_dict user_input_form = workflow.user_input_form(to_old_structure=True) else: app_model_config = app_model.app_model_config if app_model_config is None: raise AppUnavailableError() features_dict = app_model_config.to_dict() user_input_form = features_dict.get("user_input_form", []) parameters = get_parameters_from_feature_dict(features_dict=features_dict, user_input_form=user_input_form) return ParametersResponse.model_validate(parameters).model_dump(mode="json") class AppApi(Resource): @get_app_model_with_trial(None) @console_ns.response(200, "App detail retrieved successfully", console_ns.models[AppDetailWithSite.__name__]) def get(self, app_model): """Get app detail""" app_service = AppService() app_model = app_service.get_app(app_model) return dump_response(AppDetailWithSite, app_model) class AppWorkflowApi(Resource): @get_app_model_with_trial(None) @console_ns.response(200, "Workflow detail retrieved successfully", console_ns.models[WorkflowResponse.__name__]) def get(self, app_model): """Get workflow detail""" if not app_model.workflow_id: raise AppUnavailableError() workflow = db.session.get(Workflow, app_model.workflow_id) return dump_response(WorkflowResponse, workflow) class DatasetListApi(Resource): @console_ns.doc(params=query_params_from_model(TrialDatasetListQuery)) @console_ns.response(200, "Success", console_ns.models[TrialDatasetListResponse.__name__]) @get_app_model_with_trial(None) def get(self, app_model): query = query_params_from_request( TrialDatasetListQuery, list_fields=("ids",), use_defaults_for_malformed_ints=True, ) tenant_id = app_model.tenant_id if query.ids: datasets, total = DatasetService.get_datasets_by_ids(query.ids, tenant_id) else: raise NeedAddIdsError() return TrialDatasetListResponse( data=datasets, has_more=len(datasets) == query.limit, limit=query.limit, total=total or 0, page=query.page ).model_dump(mode="json") console_ns.add_resource(TrialChatApi, "/trial-apps//chat-messages", endpoint="trial_app_chat_completion") console_ns.add_resource( TrialMessageSuggestedQuestionApi, "/trial-apps//messages//suggested-questions", endpoint="trial_app_suggested_question", ) console_ns.add_resource(TrialChatAudioApi, "/trial-apps//audio-to-text", endpoint="trial_app_audio") console_ns.add_resource(TrialChatTextApi, "/trial-apps//text-to-audio", endpoint="trial_app_text") console_ns.add_resource( TrialCompletionApi, "/trial-apps//completion-messages", endpoint="trial_app_completion" ) console_ns.add_resource(TrialSitApi, "/trial-apps//site") console_ns.add_resource(TrialAppParameterApi, "/trial-apps//parameters", endpoint="trial_app_parameters") console_ns.add_resource(AppApi, "/trial-apps/", endpoint="trial_app") console_ns.add_resource( TrialAppWorkflowRunApi, "/trial-apps//workflows/run", endpoint="trial_app_workflow_run" ) console_ns.add_resource(TrialAppWorkflowTaskStopApi, "/trial-apps//workflows/tasks//stop") console_ns.add_resource(AppWorkflowApi, "/trial-apps//workflows", endpoint="trial_app_workflow") console_ns.add_resource(DatasetListApi, "/trial-apps//datasets", endpoint="trial_app_datasets")