dify/api/dev/generate_swagger_specs.py
Asuka Minato 38a419d073
ci: auto gen api doc and download link (#35919)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: WH-2099 <wh2099@pm.me>
2026-05-09 03:01:47 +00:00

418 lines
15 KiB
Python

"""Generate Flask-RESTX Swagger 2.0 specs without booting the full backend.
This helper intentionally avoids `app_factory.create_app()`. The normal backend
startup eagerly initializes database, Redis, Celery, and storage extensions,
which is unnecessary when the goal is only to serialize the Flask-RESTX
`/swagger.json` documents.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import os
import sys
from collections.abc import MutableMapping
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol, TypeGuard
from flask import Flask
from flask_restx.swagger import Swagger
logger = logging.getLogger(__name__)
API_ROOT = Path(__file__).resolve().parents[1]
if str(API_ROOT) not in sys.path:
sys.path.insert(0, str(API_ROOT))
@dataclass(frozen=True)
class SpecTarget:
route: str
filename: str
namespace: str
class RestxApi(Protocol):
models: MutableMapping[str, object]
def model(self, name: str, model: dict[object, object]) -> object: ...
SPEC_TARGETS: tuple[SpecTarget, ...] = (
SpecTarget(route="/console/api/swagger.json", filename="console-swagger.json", namespace="console"),
SpecTarget(route="/api/swagger.json", filename="web-swagger.json", namespace="web"),
SpecTarget(route="/v1/swagger.json", filename="service-swagger.json", namespace="service"),
)
_ORIGINAL_REGISTER_MODEL = Swagger.register_model
_ORIGINAL_REGISTER_FIELD = Swagger.register_field
def _is_inline_field_map(value: object) -> TypeGuard[dict[object, object]]:
"""Return whether a nested field map is an anonymous inline mapping."""
from flask_restx.model import Model, OrderedModel
return isinstance(value, dict) and not isinstance(value, (Model, OrderedModel))
def _jsonable_schema_value(value: object) -> object:
"""Return a deterministic JSON-serializable representation for schema fingerprints."""
if value is None or isinstance(value, str | int | float | bool):
return value
if isinstance(value, list | tuple):
return [_jsonable_schema_value(item) for item in value]
if isinstance(value, dict):
return {str(key): _jsonable_schema_value(item) for key, item in value.items()}
value_type = type(value)
return f"<{value_type.__module__}.{value_type.__qualname__}>"
def _field_signature(field: object) -> object:
"""Build a stable signature for a Flask-RESTX field object."""
from flask_restx import fields
from flask_restx.model import instance
field_instance = instance(field)
signature: dict[str, object] = {
"class": f"{field_instance.__class__.__module__}.{field_instance.__class__.__qualname__}"
}
if isinstance(field_instance, fields.Nested):
nested = getattr(field_instance, "nested", None)
if _is_inline_field_map(nested):
signature["nested"] = _inline_model_signature(nested)
else:
signature["nested"] = getattr(
nested,
"name",
f"<{type(nested).__module__}.{type(nested).__qualname__}>",
)
elif hasattr(field_instance, "container"):
signature["container"] = _field_signature(field_instance.container)
else:
schema = getattr(field_instance, "__schema__", None)
if isinstance(schema, dict):
signature["schema"] = _jsonable_schema_value(schema)
for attr_name in (
"attribute",
"default",
"description",
"example",
"max",
"min",
"nullable",
"readonly",
"required",
"title",
):
if hasattr(field_instance, attr_name):
signature[attr_name] = _jsonable_schema_value(getattr(field_instance, attr_name))
return signature
def _inline_model_signature(nested_fields: dict[object, object]) -> object:
"""Build a stable signature for an anonymous inline model."""
return [
(str(field_name), _field_signature(field))
for field_name, field in sorted(nested_fields.items(), key=lambda item: str(item[0]))
]
def _inline_model_name(nested_fields: dict[object, object]) -> str:
"""Return a stable Swagger model name for an anonymous inline field map."""
signature = json.dumps(_inline_model_signature(nested_fields), sort_keys=True, separators=(",", ":"))
digest = hashlib.sha1(signature.encode("utf-8")).hexdigest()[:12]
return f"_AnonymousInlineModel_{digest}"
def apply_runtime_defaults() -> None:
"""Force the small config surface required for Swagger generation."""
os.environ.setdefault("SECRET_KEY", "spec-export")
os.environ.setdefault("STORAGE_TYPE", "local")
os.environ.setdefault("STORAGE_LOCAL_PATH", "/tmp/dify-storage")
os.environ.setdefault("SWAGGER_UI_ENABLED", "true")
from configs import dify_config
dify_config.SECRET_KEY = os.environ["SECRET_KEY"]
dify_config.STORAGE_TYPE = "local"
dify_config.STORAGE_LOCAL_PATH = os.environ["STORAGE_LOCAL_PATH"]
dify_config.SWAGGER_UI_ENABLED = os.environ["SWAGGER_UI_ENABLED"].lower() == "true"
def _patch_swagger_for_inline_nested_dicts() -> None:
"""Teach Flask-RESTX Swagger generation to tolerate inline nested field maps.
Some existing controllers use `fields.Nested({...})` with a raw field mapping
instead of a named `api.model(...)`. Flask-RESTX crashes on those anonymous
dicts during schema registration, so this helper upgrades them into temporary
named models at export time.
"""
if getattr(Swagger, "_dify_inline_nested_dict_patch", False):
return
def get_or_create_inline_model(self: Swagger, nested_fields: dict[object, object]) -> object:
anonymous_models = getattr(self, "_anonymous_inline_models", None)
if anonymous_models is None:
anonymous_models = {}
self.__dict__["_anonymous_inline_models"] = anonymous_models
anonymous_name = anonymous_models.get(id(nested_fields))
if anonymous_name is None:
anonymous_name = _inline_model_name(nested_fields)
anonymous_models[id(nested_fields)] = anonymous_name
if anonymous_name not in self.api.models:
self.api.model(anonymous_name, nested_fields)
return self.api.models[anonymous_name]
def register_model_with_inline_dict_support(self: Swagger, model: object) -> dict[str, str]:
if _is_inline_field_map(model):
model = get_or_create_inline_model(self, model)
return _ORIGINAL_REGISTER_MODEL(self, model)
def register_field_with_inline_dict_support(self: Swagger, field: object) -> None:
nested = getattr(field, "nested", None)
if _is_inline_field_map(nested):
field.model = get_or_create_inline_model(self, nested) # type: ignore
_ORIGINAL_REGISTER_FIELD(self, field)
Swagger.register_model = register_model_with_inline_dict_support
Swagger.register_field = register_field_with_inline_dict_support
Swagger._dify_inline_nested_dict_patch = True
def create_spec_app() -> Flask:
"""Build a minimal Flask app that only mounts the Swagger-producing blueprints."""
apply_runtime_defaults()
_patch_swagger_for_inline_nested_dicts()
app = Flask(__name__)
from controllers.console import bp as console_bp
from controllers.console import console_ns
from controllers.service_api import bp as service_api_bp
from controllers.service_api import service_api_ns
from controllers.web import bp as web_bp
from controllers.web import web_ns
app.register_blueprint(console_bp)
app.register_blueprint(web_bp)
app.register_blueprint(service_api_bp)
for namespace in (console_ns, web_ns, service_api_ns):
for api in namespace.apis:
_materialize_inline_model_definitions(api)
return app
def _registered_models(namespace: str) -> dict[str, object]:
"""Return the Flask-RESTX models registered for a Swagger namespace."""
if namespace == "console":
from controllers.console import console_ns
models = dict(console_ns.models)
for api in console_ns.apis:
models.update(api.models)
return models
if namespace == "web":
from controllers.web import web_ns
models = dict(web_ns.models)
for api in web_ns.apis:
models.update(api.models)
return models
if namespace == "service":
from controllers.service_api import service_api_ns
models = dict(service_api_ns.models)
for api in service_api_ns.apis:
models.update(api.models)
return models
raise ValueError(f"unknown Swagger namespace: {namespace}")
def _materialize_inline_model_definitions(api: RestxApi) -> None:
"""Convert inline `fields.Nested({...})` maps into named API models."""
from flask_restx import fields
from flask_restx.model import Model, OrderedModel, instance
inline_models: dict[int, dict[object, object]] = {}
inline_model_names: dict[int, str] = {}
def collect_field(field: object) -> None:
field_instance = instance(field)
if isinstance(field_instance, fields.Nested):
nested = getattr(field_instance, "nested", None)
if _is_inline_field_map(nested) and id(nested) not in inline_models:
inline_models[id(nested)] = nested
for nested_field in nested.values():
collect_field(nested_field)
container = getattr(field_instance, "container", None)
if container is not None:
collect_field(container)
for model in list(api.models.values()):
if isinstance(model, (Model, OrderedModel)):
for field in model.values():
collect_field(field)
for nested_fields in sorted(inline_models.values(), key=_inline_model_name):
anonymous_name = _inline_model_name(nested_fields)
inline_model_names[id(nested_fields)] = anonymous_name
if anonymous_name not in api.models:
api.model(anonymous_name, nested_fields)
def model_name_for(nested_fields: dict[object, object]) -> str:
anonymous_name = inline_model_names.get(id(nested_fields))
if anonymous_name is None:
anonymous_name = _inline_model_name(nested_fields)
inline_model_names[id(nested_fields)] = anonymous_name
if anonymous_name not in api.models:
api.model(anonymous_name, nested_fields)
return anonymous_name
def materialize_field(field: object) -> None:
field_instance = instance(field)
if isinstance(field_instance, fields.Nested):
nested = getattr(field_instance, "nested", None)
if _is_inline_field_map(nested):
field_instance.model = api.models[model_name_for(nested)] # type: ignore[attr-defined]
container = getattr(field_instance, "container", None)
if container is not None:
materialize_field(container)
index = 0
while index < len(api.models):
model = list(api.models.values())[index]
index += 1
if isinstance(model, (Model, OrderedModel)):
for field in model.values():
materialize_field(field)
def drop_null_values(value: object) -> object:
"""Remove JSON null values that make the Markdown converter crash."""
if isinstance(value, dict):
return {key: drop_null_values(item) for key, item in value.items() if item is not None}
if isinstance(value, list):
return [drop_null_values(item) for item in value]
return value
def sort_openapi_arrays(value: object, *, parent_key: str | None = None) -> object:
"""Sort order-insensitive Swagger arrays so generated Markdown is stable."""
if isinstance(value, dict):
return {key: sort_openapi_arrays(item, parent_key=key) for key, item in value.items()}
if not isinstance(value, list):
return value
sorted_items = [sort_openapi_arrays(item, parent_key=parent_key) for item in value]
if parent_key == "parameters":
return sorted(
sorted_items,
key=lambda item: (
item.get("in", "") if isinstance(item, dict) else "",
item.get("name", "") if isinstance(item, dict) else "",
json.dumps(item, sort_keys=True, default=str),
),
)
if parent_key in {"enum", "required", "schemes", "tags"}:
string_items = [item for item in sorted_items if isinstance(item, str)]
if len(string_items) == len(sorted_items):
return sorted(string_items)
return sorted_items
def _merge_registered_definitions(payload: dict[str, object], namespace: str) -> dict[str, object]:
"""Include registered but route-indirect models in the exported Swagger definitions."""
definitions = payload.setdefault("definitions", {})
if not isinstance(definitions, dict):
raise RuntimeError("unexpected Swagger definitions payload")
for name, model in _registered_models(namespace).items():
schema = getattr(model, "__schema__", None)
if isinstance(schema, dict):
definitions.setdefault(name, schema)
return payload
def generate_specs(output_dir: Path) -> list[Path]:
"""Write all Swagger specs to `output_dir` and return the written paths."""
output_dir.mkdir(parents=True, exist_ok=True)
app = create_spec_app()
client = app.test_client()
written_paths: list[Path] = []
for target in SPEC_TARGETS:
response = client.get(target.route)
if response.status_code != 200:
raise RuntimeError(f"failed to fetch {target.route}: {response.status_code}")
payload = response.get_json()
if not isinstance(payload, dict):
raise RuntimeError(f"unexpected response payload for {target.route}")
payload = _merge_registered_definitions(payload, target.namespace)
payload = drop_null_values(payload)
payload = sort_openapi_arrays(payload)
output_path = output_dir / target.filename
output_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
written_paths.append(output_path)
return written_paths
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"-o",
"--output-dir",
type=Path,
default=Path("openapi"),
help="Directory where the Swagger JSON files will be written.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
written_paths = generate_specs(args.output_dir)
for path in written_paths:
logger.debug(path)
return 0
if __name__ == "__main__":
raise SystemExit(main())