dify/api/controllers/console/explore/installed_app.py
非法操作 5bec8eb33a
chore: filter unavailable apps from the installed apps list API (#37206)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-06-09 05:12:29 +00:00

310 lines
11 KiB
Python

import logging
from datetime import datetime
from typing import Any
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field, computed_field, field_validator
from sqlalchemy import and_, exists, or_, select
from werkzeug.exceptions import BadRequest, Forbidden, NotFound
from controllers.common.fields import SimpleMessageResponse, SimpleResultMessageResponse
from controllers.common.schema import register_response_schema_models, register_schema_models
from controllers.console import console_ns
from controllers.console.explore.wraps import InstalledAppResource
from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
with_current_tenant_id,
with_current_user,
)
from extensions.ext_database import db
from fields.base import ResponseModel
from graphon.file import helpers as file_helpers
from libs.datetime_utils import naive_utc_now
from libs.helper import to_timestamp
from libs.login import login_required
from models import Account, App, AppModelConfig, InstalledApp, RecommendedApp, Workflow
from models.model import AppMode, IconType
from services.account_service import TenantService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
class InstalledAppCreatePayload(BaseModel):
app_id: str
class InstalledAppUpdatePayload(BaseModel):
is_pinned: bool | None = None
class InstalledAppsListQuery(BaseModel):
app_id: str | None = Field(default=None, description="App ID to filter by")
logger = logging.getLogger(__name__)
def _build_icon_url(icon_type: str | IconType | None, icon: str | None) -> str | None:
if icon is None or icon_type is None:
return None
icon_type_value = icon_type.value if isinstance(icon_type, IconType) else str(icon_type)
if icon_type_value.lower() != IconType.IMAGE:
return None
return file_helpers.get_signed_file_url(icon)
def _safe_primitive(value: Any) -> Any:
if value is None or isinstance(value, (str, int, float, bool, datetime)):
return value
return None
def _published_app_filter():
"""Return the SQL predicate for installed-app web API availability.
The installed-app parameters endpoint reads the published workflow for
workflow-style apps and the published app model config for easy UI apps.
Keep the list endpoint aligned in SQL so it does not return entries that
will immediately fail with app_unavailable when opened.
"""
workflow_app_modes = (AppMode.ADVANCED_CHAT, AppMode.WORKFLOW)
has_published_workflow = exists(select(Workflow.id).where(Workflow.id == App.workflow_id))
has_published_model_config = exists(select(AppModelConfig.id).where(AppModelConfig.id == App.app_model_config_id))
return or_(
and_(App.mode.in_(workflow_app_modes), App.workflow_id.isnot(None), has_published_workflow),
and_(~App.mode.in_(workflow_app_modes), App.app_model_config_id.isnot(None), has_published_model_config),
)
class InstalledAppInfoResponse(ResponseModel):
id: str
name: str | None = None
mode: str | None = None
icon_type: str | None = None
icon: str | None = None
icon_background: str | None = None
use_icon_as_answer_icon: bool | None = None
@field_validator("mode", "icon_type", mode="before")
@classmethod
def _normalize_enum_like(cls, value: Any) -> str | None:
if value is None:
return None
if isinstance(value, str):
return value
return str(getattr(value, "value", value))
@computed_field(return_type=str | None) # type: ignore[prop-decorator]
@property
def icon_url(self) -> str | None:
return _build_icon_url(self.icon_type, self.icon)
class InstalledAppResponse(ResponseModel):
id: str
app: InstalledAppInfoResponse
app_owner_tenant_id: str
is_pinned: bool
last_used_at: int | None = None
editable: bool
uninstallable: bool
@field_validator("app", mode="before")
@classmethod
def _normalize_app(cls, value: Any) -> Any:
if isinstance(value, dict):
return value
return {
"id": _safe_primitive(getattr(value, "id", "")) or "",
"name": _safe_primitive(getattr(value, "name", None)),
"mode": _safe_primitive(getattr(value, "mode", None)),
"icon_type": _safe_primitive(getattr(value, "icon_type", None)),
"icon": _safe_primitive(getattr(value, "icon", None)),
"icon_background": _safe_primitive(getattr(value, "icon_background", None)),
"use_icon_as_answer_icon": _safe_primitive(getattr(value, "use_icon_as_answer_icon", None)),
}
@field_validator("last_used_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return to_timestamp(value)
class InstalledAppListResponse(ResponseModel):
installed_apps: list[InstalledAppResponse]
register_schema_models(
console_ns,
InstalledAppCreatePayload,
InstalledAppUpdatePayload,
InstalledAppsListQuery,
InstalledAppInfoResponse,
InstalledAppResponse,
InstalledAppListResponse,
)
register_response_schema_models(console_ns, SimpleMessageResponse, SimpleResultMessageResponse)
@console_ns.route("/installed-apps")
class InstalledAppsListApi(Resource):
@login_required
@account_initialization_required
@console_ns.response(200, "Success", console_ns.models[InstalledAppListResponse.__name__])
@with_current_user
@with_current_tenant_id
def get(self, current_tenant_id: str, current_user: Account):
query = InstalledAppsListQuery.model_validate(request.args.to_dict())
stmt = (
select(InstalledApp, App)
.join(App, App.id == InstalledApp.app_id)
.where(InstalledApp.tenant_id == current_tenant_id, _published_app_filter())
)
if query.app_id:
stmt = stmt.where(InstalledApp.app_id == query.app_id)
installed_apps = db.session.execute(stmt).all()
if current_user.current_tenant is None:
raise ValueError("current_user.current_tenant must not be None")
current_user.role = TenantService.get_user_role(current_user, current_user.current_tenant)
installed_app_list: list[dict[str, Any]] = []
for installed_app, app_model in installed_apps:
installed_app_list.append(
{
"id": installed_app.id,
"app": app_model,
"app_owner_tenant_id": installed_app.app_owner_tenant_id,
"is_pinned": installed_app.is_pinned,
"last_used_at": installed_app.last_used_at,
"editable": current_user.role in {"owner", "admin"},
"uninstallable": current_tenant_id == installed_app.app_owner_tenant_id,
}
)
# filter out apps that user doesn't have access to
if FeatureService.get_system_features().webapp_auth.enabled:
user_id = current_user.id
app_ids = [installed_app["app"].id for installed_app in installed_app_list]
webapp_settings = EnterpriseService.WebAppAuth.batch_get_app_access_mode_by_id(app_ids)
# Pre-filter out apps without setting or with sso_verified
filtered_installed_apps = []
for installed_app in installed_app_list:
app_id = installed_app["app"].id
webapp_setting = webapp_settings.get(app_id)
if not webapp_setting or webapp_setting.access_mode == "sso_verified":
continue
filtered_installed_apps.append(installed_app)
# Batch permission check
app_ids = [installed_app["app"].id for installed_app in filtered_installed_apps]
permissions = EnterpriseService.WebAppAuth.batch_is_user_allowed_to_access_webapps(
user_id=user_id,
app_ids=app_ids,
)
# Keep only allowed apps
res = []
for installed_app in filtered_installed_apps:
app_id = installed_app["app"].id
if permissions.get(app_id):
res.append(installed_app)
installed_app_list = res
logger.debug("installed_app_list: %s, user_id: %s", installed_app_list, user_id)
installed_app_list.sort(
key=lambda app: (
-app["is_pinned"],
app["last_used_at"] is None,
-app["last_used_at"].timestamp() if app["last_used_at"] is not None else 0,
)
)
return InstalledAppListResponse.model_validate(
{"installed_apps": installed_app_list}, from_attributes=True
).model_dump(mode="json")
@login_required
@account_initialization_required
@cloud_edition_billing_resource_check("apps")
@console_ns.response(200, "Success", console_ns.models[SimpleMessageResponse.__name__])
@with_current_tenant_id
def post(self, current_tenant_id: str):
payload = InstalledAppCreatePayload.model_validate(console_ns.payload or {})
recommended_app = db.session.scalar(
select(RecommendedApp).where(RecommendedApp.app_id == payload.app_id).limit(1)
)
if recommended_app is None:
raise NotFound("Recommended app not found")
app = db.session.get(App, payload.app_id)
if app is None:
raise NotFound("App entity not found")
if not app.is_public:
raise Forbidden("You can't install a non-public app")
installed_app = db.session.scalar(
select(InstalledApp)
.where(and_(InstalledApp.app_id == payload.app_id, InstalledApp.tenant_id == current_tenant_id))
.limit(1)
)
if installed_app is None:
# todo: position
recommended_app.install_count += 1
new_installed_app = InstalledApp(
app_id=payload.app_id,
tenant_id=current_tenant_id,
app_owner_tenant_id=app.tenant_id,
is_pinned=False,
last_used_at=naive_utc_now(),
)
db.session.add(new_installed_app)
db.session.commit()
return {"message": "App installed successfully"}
@console_ns.route("/installed-apps/<uuid:installed_app_id>")
class InstalledAppApi(InstalledAppResource):
"""
update and delete an installed app
use InstalledAppResource to apply default decorators and get installed_app
"""
@console_ns.response(204, "App uninstalled successfully")
@with_current_tenant_id
def delete(self, current_tenant_id: str, installed_app: InstalledApp):
if installed_app.app_owner_tenant_id == current_tenant_id:
raise BadRequest("You can't uninstall an app owned by the current tenant")
db.session.delete(installed_app)
db.session.commit()
return "", 204
@console_ns.response(200, "Success", console_ns.models[SimpleResultMessageResponse.__name__])
def patch(self, installed_app: InstalledApp):
payload = InstalledAppUpdatePayload.model_validate(console_ns.payload or {})
commit_args = False
if payload.is_pinned is not None:
installed_app.is_pinned = payload.is_pinned
commit_args = True
if commit_args:
db.session.commit()
return {"result": "success", "message": "App info updated successfully"}