from uuid import UUID from flask import abort, request from flask_restx import Resource from pydantic import BaseModel, Field, field_validator from controllers.common.schema import query_params_from_model, register_response_schema_models, register_schema_models from controllers.console import console_ns from controllers.console.agent.app_helpers import resolve_agent_app_model from controllers.console.app.app import ( AppDetailWithSite, AppListQuery, AppPagination, AppPartial, UpdateAppPayload, _normalize_app_list_query_args, ) from controllers.console.wraps import ( account_initialization_required, cloud_edition_billing_resource_check, edit_permission_required, enterprise_license_required, setup_required, with_current_tenant_id, with_current_user, ) from extensions.ext_database import db from fields.agent_fields import ( AgentConfigSnapshotDetailResponse, AgentConfigSnapshotListResponse, AgentInviteOptionsResponse, AgentLogListResponse, AgentPublishedReferenceResponse, AgentRosterListResponse, AgentStatisticSummaryEnvelopeResponse, ) from libs.datetime_utils import parse_time_range from libs.helper import dump_response from libs.login import login_required from models import Account from models.model import IconType from services.agent.errors import AgentNotFoundError from services.agent.observability_service import ( AgentLogQueryParams, AgentObservabilityService, AgentStatisticsQueryParams, ) from services.agent.roster_service import AgentRosterService from services.app_service import AppListParams, AppService, CreateAppParams from services.enterprise.enterprise_service import EnterpriseService from services.entities.agent_entities import RosterListQuery from services.feature_service import FeatureService class AgentInviteOptionsQuery(RosterListQuery): app_id: str | None = Field(default=None, description="Workflow app id for in-current-workflow markers") class AgentIdPath(BaseModel): agent_id: str class AgentAppCreatePayload(BaseModel): name: str = Field(..., min_length=1, description="Agent name") description: str | None = Field(default=None, description="Agent description (max 400 chars)", max_length=400) role: str = Field(default="", description="Agent role", max_length=255) icon_type: IconType | None = Field(default=None, description="Icon type") icon: str | None = Field(default=None, description="Icon") icon_background: str | None = Field(default=None, description="Icon background color") class AgentAppUpdatePayload(UpdateAppPayload): role: str | None = Field(default=None, description="Agent role", max_length=255) class AgentAppPublishedReferenceResponse(BaseModel): app_id: str app_name: str app_icon_type: str | None = None app_icon: str | None = None app_icon_background: str | None = None class AgentAppPartial(AppPartial): published_reference_count: int = 0 published_references: list[AgentAppPublishedReferenceResponse] = Field(default_factory=list) class AgentAppPagination(BaseModel): page: int limit: int total: int has_more: bool data: list[AgentAppPartial] class AgentLogsQuery(BaseModel): page: int = Field(default=1, ge=1, description="Page number") limit: int = Field(default=20, ge=1, le=100, description="Page size") keyword: str | None = Field(default=None, description="Search query, answer, or conversation name") status: str | None = Field(default=None, description="Filter by success, failed, or paused") source: str | None = Field( default=None, description="Filter by all, console/explore, api/service-api, web-app, debugger, openapi, or trigger", ) start: str | None = Field(default=None, description="Start date (YYYY-MM-DD HH:MM)") end: str | None = Field(default=None, description="End date (YYYY-MM-DD HH:MM)") @field_validator("keyword", "status", "source", "start", "end", mode="before") @classmethod def empty_string_to_none(cls, value: str | None) -> str | None: if value == "": return None return value class AgentStatisticsQuery(BaseModel): source: str | None = Field( default=None, description="Filter by all, console/explore, api/service-api, web-app, debugger, openapi, or trigger", ) start: str | None = Field(default=None, description="Start date (YYYY-MM-DD HH:MM)") end: str | None = Field(default=None, description="End date (YYYY-MM-DD HH:MM)") @field_validator("source", "start", "end", mode="before") @classmethod def empty_string_to_none(cls, value: str | None) -> str | None: if value == "": return None return value register_schema_models( console_ns, AgentAppCreatePayload, AgentAppUpdatePayload, AgentInviteOptionsQuery, AgentLogsQuery, AgentStatisticsQuery, AgentIdPath, AppListQuery, UpdateAppPayload, RosterListQuery, ) register_response_schema_models( console_ns, AppDetailWithSite, AgentAppPagination, AgentAppPublishedReferenceResponse, AgentConfigSnapshotDetailResponse, AgentConfigSnapshotListResponse, AgentInviteOptionsResponse, AgentLogListResponse, AgentPublishedReferenceResponse, AgentRosterListResponse, AgentStatisticSummaryEnvelopeResponse, ) def _agent_roster_service() -> AgentRosterService: return AgentRosterService(db.session) def _serialize_agent_app_detail(app_model) -> dict: app_model = AppService().get_app(app_model) if FeatureService.get_system_features().webapp_auth.enabled: app_setting = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id=str(app_model.id)) app_model.access_mode = app_setting.access_mode # type: ignore[attr-defined] roster_service = _agent_roster_service() agent = roster_service.get_app_backing_agent(tenant_id=app_model.tenant_id, app_id=app_model.id) if not agent: raise AgentNotFoundError() payload = AppDetailWithSite.model_validate(app_model, from_attributes=True).model_dump(mode="json") payload.pop("bound_agent_id", None) payload["app_id"] = str(app_model.id) payload["id"] = agent.id payload["role"] = agent.role or "" payload["active_config_is_published"] = roster_service.active_config_is_published( tenant_id=app_model.tenant_id, agent=agent, ) return payload def _serialize_agent_app_pagination(app_pagination, *, tenant_id: str) -> dict: app_ids = [str(app.id) for app in app_pagination.items] roster_service = _agent_roster_service() agents_by_app_id = roster_service.load_app_backing_agents_by_app_id( tenant_id=tenant_id, app_ids=app_ids, ) active_config_is_published_by_agent_id = roster_service.load_active_config_is_published_by_agent_id( tenant_id=tenant_id, agents=list(agents_by_app_id.values()), ) published_references_by_agent_id = roster_service.load_published_references_by_agent_id( tenant_id=tenant_id, agent_ids=[agent.id for agent in agents_by_app_id.values()], ) payload = AppPagination.model_validate(app_pagination, from_attributes=True).model_dump(mode="json") for item in payload["data"]: app_id = item["id"] item.pop("bound_agent_id", None) agent = agents_by_app_id.get(app_id) if agent: item["app_id"] = app_id item["id"] = agent.id item["role"] = agent.role or "" item["active_config_is_published"] = active_config_is_published_by_agent_id.get(agent.id, False) published_references = published_references_by_agent_id.get(agent.id, []) item["published_reference_count"] = len(published_references) item["published_references"] = [ { "app_id": reference["app_id"], "app_name": reference["app_name"], "app_icon_type": reference["app_icon_type"], "app_icon": reference["app_icon"], "app_icon_background": reference["app_icon_background"], } for reference in published_references ] return AgentAppPagination.model_validate(payload).model_dump( mode="json", exclude={"data": {"__all__": {"bound_agent_id"}}}, ) def _resolve_agent_app_model(*, tenant_id: str, agent_id: UUID): return resolve_agent_app_model(tenant_id=tenant_id, agent_id=agent_id) def _agent_observability_service() -> AgentObservabilityService: return AgentObservabilityService(db.session) def _parse_observability_time_range(start: str | None, end: str | None, account: Account): timezone = account.timezone or "UTC" try: return parse_time_range(start, end, timezone) except ValueError as exc: abort(400, description=str(exc)) @console_ns.route("/agent") class AgentAppListApi(Resource): @console_ns.doc(params=query_params_from_model(AppListQuery)) @console_ns.response(200, "Agent app list", console_ns.models[AgentAppPagination.__name__]) @setup_required @login_required @account_initialization_required @with_current_user @with_current_tenant_id def get(self, current_tenant_id: str, current_user: Account): args = AppListQuery.model_validate(_normalize_app_list_query_args(request.args)) params = AppListParams( page=args.page, limit=args.limit, mode="agent", name=args.name, tag_ids=args.tag_ids, creator_ids=args.creator_ids, is_created_by_me=args.is_created_by_me, status="normal", ) app_pagination = AppService().get_paginate_apps(current_user.id, current_tenant_id, params) if app_pagination is None: empty = AgentAppPagination(page=args.page, limit=args.limit, total=0, has_more=False, data=[]) return empty.model_dump(mode="json") return _serialize_agent_app_pagination(app_pagination, tenant_id=current_tenant_id) @console_ns.expect(console_ns.models[AgentAppCreatePayload.__name__]) @console_ns.response(201, "Agent app created successfully", console_ns.models[AppDetailWithSite.__name__]) @console_ns.response(403, "Insufficient permissions") @console_ns.response(400, "Invalid request parameters") @setup_required @login_required @account_initialization_required @cloud_edition_billing_resource_check("apps") @edit_permission_required @with_current_user @with_current_tenant_id def post(self, current_tenant_id: str, current_user: Account): args = AgentAppCreatePayload.model_validate(console_ns.payload) params = CreateAppParams( name=args.name, description=args.description, mode="agent", agent_role=args.role, icon_type=args.icon_type, icon=args.icon, icon_background=args.icon_background, ) app = AppService().create_app(current_tenant_id, params, current_user) return _serialize_agent_app_detail(app), 201 @console_ns.route("/agent/") class AgentAppApi(Resource): @console_ns.response(200, "Agent app detail", console_ns.models[AppDetailWithSite.__name__]) @setup_required @login_required @account_initialization_required @enterprise_license_required @with_current_tenant_id def get(self, tenant_id: str, agent_id: UUID): app_model = _resolve_agent_app_model(tenant_id=tenant_id, agent_id=agent_id) return _serialize_agent_app_detail(app_model) @console_ns.expect(console_ns.models[AgentAppUpdatePayload.__name__]) @console_ns.response(200, "Agent app updated successfully", console_ns.models[AppDetailWithSite.__name__]) @console_ns.response(403, "Insufficient permissions") @console_ns.response(400, "Invalid request parameters") @setup_required @login_required @account_initialization_required @edit_permission_required @with_current_tenant_id def put(self, tenant_id: str, agent_id: UUID): app_model = _resolve_agent_app_model(tenant_id=tenant_id, agent_id=agent_id) args = AgentAppUpdatePayload.model_validate(console_ns.payload) args_dict: AppService.ArgsDict = { "name": args.name, "description": args.description or "", "icon_type": args.icon_type, "icon": args.icon or "", "icon_background": args.icon_background or "", "use_icon_as_answer_icon": args.use_icon_as_answer_icon or False, "max_active_requests": args.max_active_requests or 0, "role": args.role, } updated = AppService().update_app(app_model, args_dict) return _serialize_agent_app_detail(updated) @console_ns.response(204, "Agent app deleted successfully") @console_ns.response(403, "Insufficient permissions") @setup_required @login_required @account_initialization_required @edit_permission_required @with_current_tenant_id def delete(self, tenant_id: str, agent_id: UUID): app_model = _resolve_agent_app_model(tenant_id=tenant_id, agent_id=agent_id) AppService().delete_app(app_model) return "", 204 @console_ns.route("/agent/invite-options") class AgentInviteOptionsApi(Resource): @console_ns.doc(params=query_params_from_model(AgentInviteOptionsQuery)) @console_ns.response(200, "Agent invite options", console_ns.models[AgentInviteOptionsResponse.__name__]) @setup_required @login_required @account_initialization_required @with_current_tenant_id def get(self, tenant_id: str): query = AgentInviteOptionsQuery.model_validate(request.args.to_dict(flat=True)) return dump_response( AgentInviteOptionsResponse, _agent_roster_service().list_invite_options( tenant_id=tenant_id, page=query.page, limit=query.limit, keyword=query.keyword, app_id=query.app_id, ), ) @console_ns.route("/agent//logs") class AgentLogsApi(Resource): @console_ns.doc(params=query_params_from_model(AgentLogsQuery)) @console_ns.response(200, "Agent logs", console_ns.models[AgentLogListResponse.__name__]) @setup_required @login_required @account_initialization_required @with_current_user @with_current_tenant_id def get(self, tenant_id: str, current_user: Account, agent_id: UUID): app_model = _resolve_agent_app_model(tenant_id=tenant_id, agent_id=agent_id) query = AgentLogsQuery.model_validate(request.args.to_dict(flat=True)) start, end = _parse_observability_time_range(query.start, query.end, current_user) try: payload = _agent_observability_service().list_logs( app=app_model, params=AgentLogQueryParams( page=query.page, limit=query.limit, keyword=query.keyword, status=query.status, source=query.source, start=start, end=end, ), ) except ValueError as exc: abort(400, description=str(exc)) return dump_response(AgentLogListResponse, payload) @console_ns.route("/agent//statistics/summary") class AgentStatisticsSummaryApi(Resource): @console_ns.doc(params=query_params_from_model(AgentStatisticsQuery)) @console_ns.response( 200, "Agent monitoring summary and chart data", console_ns.models[AgentStatisticSummaryEnvelopeResponse.__name__], ) @setup_required @login_required @account_initialization_required @with_current_user @with_current_tenant_id def get(self, tenant_id: str, current_user: Account, agent_id: UUID): app_model = _resolve_agent_app_model(tenant_id=tenant_id, agent_id=agent_id) query = AgentStatisticsQuery.model_validate(request.args.to_dict(flat=True)) timezone = current_user.timezone or "UTC" start, end = _parse_observability_time_range(query.start, query.end, current_user) try: payload = _agent_observability_service().get_statistics_summary( app=app_model, params=AgentStatisticsQueryParams(source=query.source, start=start, end=end, timezone=timezone), ) except ValueError as exc: abort(400, description=str(exc)) return dump_response(AgentStatisticSummaryEnvelopeResponse, payload) @console_ns.route("/agent//versions") class AgentRosterVersionsApi(Resource): @console_ns.response(200, "Agent versions", console_ns.models[AgentConfigSnapshotListResponse.__name__]) @setup_required @login_required @account_initialization_required @with_current_tenant_id def get(self, tenant_id: str, agent_id: UUID): return dump_response( AgentConfigSnapshotListResponse, {"data": _agent_roster_service().list_agent_versions(tenant_id=tenant_id, agent_id=str(agent_id))}, ) @console_ns.route("/agent//versions/") class AgentRosterVersionDetailApi(Resource): @console_ns.response(200, "Agent version detail", console_ns.models[AgentConfigSnapshotDetailResponse.__name__]) @setup_required @login_required @account_initialization_required @with_current_tenant_id def get(self, tenant_id: str, agent_id: UUID, version_id: UUID): return dump_response( AgentConfigSnapshotDetailResponse, _agent_roster_service().get_agent_version_detail( tenant_id=tenant_id, agent_id=str(agent_id), version_id=str(version_id), ), )