ci: auto gen api doc and download link (#35919)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: WH-2099 <wh2099@pm.me>
This commit is contained in:
Asuka Minato 2026-05-09 12:01:47 +09:00 committed by GitHub
parent c74cbb68da
commit 38a419d073
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 19481 additions and 40 deletions

View File

@ -116,6 +116,12 @@ jobs:
if: github.event_name != 'merge_group'
uses: ./.github/actions/setup-web
- name: Generate API docs
if: github.event_name != 'merge_group' && steps.api-changes.outputs.any_changed == 'true'
run: |
cd api
uv run dev/generate_swagger_markdown_docs.py --swagger-dir openapi --markdown-dir openapi/markdown
- name: ESLint autofix
if: github.event_name != 'merge_group' && steps.web-changes.outputs.any_changed == 'true'
run: |

View File

@ -71,13 +71,13 @@ type-check:
@echo "📝 Running type checks (basedpyright + pyrefly + mypy)..."
@./dev/basedpyright-check $(PATH_TO_CHECK)
@./dev/pyrefly-check-local
@uv --directory api run mypy --exclude-gitignore --exclude 'tests/' --exclude 'migrations/' --check-untyped-defs --disable-error-code=import-untyped .
@uv --directory api run mypy --exclude-gitignore --exclude 'tests/' --exclude 'migrations/' --exclude 'dev/generate_swagger_specs.py' --check-untyped-defs --disable-error-code=import-untyped .
@echo "✅ Type checks complete"
type-check-core:
@echo "📝 Running core type checks (basedpyright + mypy)..."
@./dev/basedpyright-check $(PATH_TO_CHECK)
@uv --directory api run mypy --exclude-gitignore --exclude 'tests/' --exclude 'migrations/' --check-untyped-defs --disable-error-code=import-untyped .
@uv --directory api run mypy --exclude-gitignore --exclude 'tests/' --exclude 'migrations/' --exclude 'dev/generate_swagger_specs.py' --exclude 'dev/generate_fastopenapi_specs.py' --check-untyped-defs --disable-error-code=import-untyped .
@echo "✅ Core type checks complete"
test:

View File

@ -1,4 +1,10 @@
"""Helpers for registering Pydantic models with Flask-RESTX namespaces."""
"""Helpers for registering Pydantic models with Flask-RESTX namespaces.
Flask-RESTX treats `SchemaModel` bodies as opaque JSON schemas; it does not
promote Pydantic's nested `$defs` into top-level Swagger `definitions`.
These helpers keep that translation centralized so models registered through
`register_schema_models` emit resolvable Swagger 2.0 references.
"""
from enum import StrEnum
@ -8,10 +14,32 @@ from pydantic import BaseModel, TypeAdapter
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
def register_schema_model(namespace: Namespace, model: type[BaseModel]) -> None:
"""Register a single BaseModel with a namespace for Swagger documentation."""
def _register_json_schema(namespace: Namespace, name: str, schema: dict) -> None:
"""Register a JSON schema and promote any nested Pydantic `$defs`."""
namespace.schema_model(model.__name__, model.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0))
nested_definitions = schema.get("$defs")
schema_to_register = dict(schema)
if isinstance(nested_definitions, dict):
schema_to_register.pop("$defs")
namespace.schema_model(name, schema_to_register)
if not isinstance(nested_definitions, dict):
return
for nested_name, nested_schema in nested_definitions.items():
if isinstance(nested_schema, dict):
_register_json_schema(namespace, nested_name, nested_schema)
def register_schema_model(namespace: Namespace, model: type[BaseModel]) -> None:
"""Register a BaseModel and its nested schema definitions for Swagger documentation."""
_register_json_schema(
namespace,
model.__name__,
model.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0),
)
def register_schema_models(namespace: Namespace, *models: type[BaseModel]) -> None:
@ -34,8 +62,10 @@ def get_or_create_model(model_name: str, field_def):
def register_enum_models(namespace: Namespace, *models: type[StrEnum]) -> None:
"""Register multiple StrEnum with a namespace."""
for model in models:
namespace.schema_model(
model.__name__, TypeAdapter(model).json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0)
_register_json_schema(
namespace,
model.__name__,
TypeAdapter(model).json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0),
)

View File

@ -12,6 +12,7 @@ from werkzeug.exceptions import BadRequest, NotFound, Unauthorized
from configs import dify_config
from constants.languages import supported_language
from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.wraps import only_edition_cloud
from core.db.session_factory import session_factory
@ -301,15 +302,7 @@ class BatchAddNotificationAccountsPayload(BaseModel):
user_email: list[str] = Field(..., description="List of account email addresses")
console_ns.schema_model(
UpsertNotificationPayload.__name__,
UpsertNotificationPayload.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0),
)
console_ns.schema_model(
BatchAddNotificationAccountsPayload.__name__,
BatchAddNotificationAccountsPayload.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0),
)
register_schema_models(console_ns, UpsertNotificationPayload, BatchAddNotificationAccountsPayload)
@console_ns.route("/admin/upsert_notification")

View File

@ -2,7 +2,7 @@ from flask_restx import Resource
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from controllers.common.schema import register_schema_models
from controllers.common.schema import register_enum_models, register_schema_models
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import (
account_initialization_required,
@ -33,6 +33,7 @@ class AppImportPayload(BaseModel):
app_id: str | None = Field(None)
register_enum_models(console_ns, ImportStatus)
register_schema_models(console_ns, AppImportPayload, Import, CheckDependenciesResult)

View File

@ -3,6 +3,7 @@ from collections.abc import Sequence
from flask_restx import Resource
from pydantic import BaseModel, Field
from controllers.common.schema import register_enum_models, register_schema_models
from controllers.console import console_ns
from controllers.console.app.error import (
CompletionRequestError,
@ -19,13 +20,12 @@ from core.helper.code_executor.python3.python3_code_provider import Python3CodeP
from core.llm_generator.entities import RuleCodeGeneratePayload, RuleGeneratePayload, RuleStructuredOutputPayload
from core.llm_generator.llm_generator import LLMGenerator
from extensions.ext_database import db
from graphon.model_runtime.entities.llm_entities import LLMMode
from graphon.model_runtime.errors.invoke import InvokeError
from libs.login import current_account_with_tenant, login_required
from models import App
from services.workflow_service import WorkflowService
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
class InstructionGeneratePayload(BaseModel):
flow_id: str = Field(..., description="Workflow/Flow ID")
@ -41,16 +41,16 @@ class InstructionTemplatePayload(BaseModel):
type: str = Field(..., description="Instruction template type")
def reg(cls: type[BaseModel]):
console_ns.schema_model(cls.__name__, cls.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0))
reg(RuleGeneratePayload)
reg(RuleCodeGeneratePayload)
reg(RuleStructuredOutputPayload)
reg(InstructionGeneratePayload)
reg(InstructionTemplatePayload)
reg(ModelConfig)
register_enum_models(console_ns, LLMMode)
register_schema_models(
console_ns,
RuleGeneratePayload,
RuleCodeGeneratePayload,
RuleStructuredOutputPayload,
InstructionGeneratePayload,
InstructionTemplatePayload,
ModelConfig,
)
@console_ns.route("/rule-generate")

View File

@ -0,0 +1,95 @@
"""Generate FastOpenAPI OpenAPI 3.0 specs without booting the full backend."""
from __future__ import annotations
import argparse
import json
import logging
import sys
from dataclasses import dataclass
from pathlib import Path
API_ROOT = Path(__file__).resolve().parents[1]
if str(API_ROOT) not in sys.path:
sys.path.insert(0, str(API_ROOT))
from dev.generate_swagger_specs import apply_runtime_defaults, drop_null_values, sort_openapi_arrays
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class FastOpenApiSpecTarget:
route: str
filename: str
FASTOPENAPI_SPEC_TARGETS: tuple[FastOpenApiSpecTarget, ...] = (
FastOpenApiSpecTarget(route="/fastopenapi/openapi.json", filename="fastopenapi-console-openapi.json"),
)
def create_fastopenapi_spec_app():
"""Build a minimal Flask app that only mounts FastOpenAPI docs routes."""
apply_runtime_defaults()
from app_factory import create_flask_app_with_configs
from extensions import ext_fastopenapi
app = create_flask_app_with_configs()
ext_fastopenapi.init_app(app)
return app
def generate_fastopenapi_specs(output_dir: Path) -> list[Path]:
"""Write FastOpenAPI specs to `output_dir` and return the written paths."""
output_dir.mkdir(parents=True, exist_ok=True)
app = create_fastopenapi_spec_app()
client = app.test_client()
written_paths: list[Path] = []
for target in FASTOPENAPI_SPEC_TARGETS:
response = client.get(target.route)
if response.status_code != 200:
raise RuntimeError(f"failed to fetch {target.route}: {response.status_code}")
payload = response.get_json()
if not isinstance(payload, dict):
raise RuntimeError(f"unexpected response payload for {target.route}")
payload = drop_null_values(payload)
payload = sort_openapi_arrays(payload)
output_path = output_dir / target.filename
output_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
written_paths.append(output_path)
return written_paths
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"-o",
"--output-dir",
type=Path,
default=Path("openapi"),
help="Directory where the OpenAPI JSON files will be written.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
written_paths = generate_fastopenapi_specs(args.output_dir)
for path in written_paths:
logger.debug(path)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -0,0 +1,161 @@
"""Generate OpenAPI JSON specs and split Markdown API docs.
The Markdown step uses `swagger-markdown`, the same converter family as the
Swagger Markdown UI, so CI and local regeneration catch converter-incompatible
OpenAPI output early.
"""
from __future__ import annotations
import argparse
import logging
import subprocess
import sys
import tempfile
from pathlib import Path
API_ROOT = Path(__file__).resolve().parents[1]
if str(API_ROOT) not in sys.path:
sys.path.insert(0, str(API_ROOT))
from dev.generate_fastopenapi_specs import FASTOPENAPI_SPEC_TARGETS, generate_fastopenapi_specs
from dev.generate_swagger_specs import SPEC_TARGETS, generate_specs
logger = logging.getLogger(__name__)
SWAGGER_MARKDOWN_PACKAGE = "swagger-markdown@3.0.0"
CONSOLE_SWAGGER_FILENAME = "console-swagger.json"
STALE_COMBINED_MARKDOWN_FILENAME = "api-reference.md"
def _convert_spec_to_markdown(spec_path: Path, markdown_path: Path) -> None:
subprocess.run(
[
"npx",
"--yes",
SWAGGER_MARKDOWN_PACKAGE,
"-i",
str(spec_path),
"-o",
str(markdown_path),
],
check=True,
)
def _demote_markdown_headings(markdown: str, *, levels: int = 1) -> str:
"""Nest generated Markdown under another Markdown section."""
heading_prefix = "#" * levels
lines = []
for line in markdown.splitlines():
if line.startswith("#"):
lines.append(f"{heading_prefix}{line}")
else:
lines.append(line)
return "\n".join(lines).strip()
def _append_fastopenapi_markdown(console_markdown_path: Path, fastopenapi_markdown_path: Path) -> None:
"""Append FastOpenAPI console docs to the existing console API Markdown."""
console_markdown = console_markdown_path.read_text(encoding="utf-8").rstrip()
fastopenapi_markdown = _demote_markdown_headings(
fastopenapi_markdown_path.read_text(encoding="utf-8"),
levels=2,
)
console_markdown_path.write_text(
"\n\n".join(
[
console_markdown,
"## FastOpenAPI Preview (OpenAPI 3.0)",
fastopenapi_markdown,
]
)
+ "\n",
encoding="utf-8",
)
def generate_markdown_docs(
swagger_dir: Path,
markdown_dir: Path,
*,
keep_swagger_json: bool = False,
) -> list[Path]:
"""Generate intermediate specs, convert them to split Markdown API docs, and return Markdown paths."""
swagger_paths = generate_specs(swagger_dir)
fastopenapi_paths = generate_fastopenapi_specs(swagger_dir)
spec_paths = [*swagger_paths, *fastopenapi_paths]
swagger_paths_by_name = {path.name: path for path in swagger_paths}
fastopenapi_paths_by_name = {path.name: path for path in fastopenapi_paths}
markdown_dir.mkdir(parents=True, exist_ok=True)
written_paths: list[Path] = []
try:
with tempfile.TemporaryDirectory(prefix="dify-api-docs-") as temp_dir:
temp_markdown_dir = Path(temp_dir)
for target in SPEC_TARGETS:
swagger_path = swagger_paths_by_name[target.filename]
markdown_path = markdown_dir / f"{swagger_path.stem}.md"
_convert_spec_to_markdown(swagger_path, markdown_path)
written_paths.append(markdown_path)
for target in FASTOPENAPI_SPEC_TARGETS: # type: ignore
fastopenapi_path = fastopenapi_paths_by_name[target.filename]
markdown_path = temp_markdown_dir / f"{fastopenapi_path.stem}.md"
_convert_spec_to_markdown(fastopenapi_path, markdown_path)
console_markdown_path = markdown_dir / f"{Path(CONSOLE_SWAGGER_FILENAME).stem}.md"
_append_fastopenapi_markdown(console_markdown_path, markdown_path)
(markdown_dir / STALE_COMBINED_MARKDOWN_FILENAME).unlink(missing_ok=True)
finally:
if not keep_swagger_json:
for path in spec_paths:
path.unlink(missing_ok=True)
return written_paths
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--swagger-dir",
type=Path,
default=Path("openapi"),
help="Directory where intermediate JSON spec files will be written.",
)
parser.add_argument(
"--markdown-dir",
type=Path,
default=Path("openapi/markdown"),
help="Directory where split Markdown API docs will be written.",
)
parser.add_argument(
"--keep-swagger-json",
action="store_true",
help="Keep intermediate JSON spec files after Markdown generation.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
written_paths = generate_markdown_docs(
args.swagger_dir,
args.markdown_dir,
keep_swagger_json=args.keep_swagger_json,
)
for path in written_paths:
logger.debug(path)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -9,12 +9,15 @@ which is unnecessary when the goal is only to serialize the Flask-RESTX
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import os
import sys
from collections.abc import MutableMapping
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol, TypeGuard
from flask import Flask
from flask_restx.swagger import Swagger
@ -30,19 +33,110 @@ if str(API_ROOT) not in sys.path:
class SpecTarget:
route: str
filename: str
namespace: str
class RestxApi(Protocol):
models: MutableMapping[str, object]
def model(self, name: str, model: dict[object, object]) -> object: ...
SPEC_TARGETS: tuple[SpecTarget, ...] = (
SpecTarget(route="/console/api/swagger.json", filename="console-swagger.json"),
SpecTarget(route="/api/swagger.json", filename="web-swagger.json"),
SpecTarget(route="/v1/swagger.json", filename="service-swagger.json"),
SpecTarget(route="/console/api/swagger.json", filename="console-swagger.json", namespace="console"),
SpecTarget(route="/api/swagger.json", filename="web-swagger.json", namespace="web"),
SpecTarget(route="/v1/swagger.json", filename="service-swagger.json", namespace="service"),
)
_ORIGINAL_REGISTER_MODEL = Swagger.register_model
_ORIGINAL_REGISTER_FIELD = Swagger.register_field
def _apply_runtime_defaults() -> None:
def _is_inline_field_map(value: object) -> TypeGuard[dict[object, object]]:
"""Return whether a nested field map is an anonymous inline mapping."""
from flask_restx.model import Model, OrderedModel
return isinstance(value, dict) and not isinstance(value, (Model, OrderedModel))
def _jsonable_schema_value(value: object) -> object:
"""Return a deterministic JSON-serializable representation for schema fingerprints."""
if value is None or isinstance(value, str | int | float | bool):
return value
if isinstance(value, list | tuple):
return [_jsonable_schema_value(item) for item in value]
if isinstance(value, dict):
return {str(key): _jsonable_schema_value(item) for key, item in value.items()}
value_type = type(value)
return f"<{value_type.__module__}.{value_type.__qualname__}>"
def _field_signature(field: object) -> object:
"""Build a stable signature for a Flask-RESTX field object."""
from flask_restx import fields
from flask_restx.model import instance
field_instance = instance(field)
signature: dict[str, object] = {
"class": f"{field_instance.__class__.__module__}.{field_instance.__class__.__qualname__}"
}
if isinstance(field_instance, fields.Nested):
nested = getattr(field_instance, "nested", None)
if _is_inline_field_map(nested):
signature["nested"] = _inline_model_signature(nested)
else:
signature["nested"] = getattr(
nested,
"name",
f"<{type(nested).__module__}.{type(nested).__qualname__}>",
)
elif hasattr(field_instance, "container"):
signature["container"] = _field_signature(field_instance.container)
else:
schema = getattr(field_instance, "__schema__", None)
if isinstance(schema, dict):
signature["schema"] = _jsonable_schema_value(schema)
for attr_name in (
"attribute",
"default",
"description",
"example",
"max",
"min",
"nullable",
"readonly",
"required",
"title",
):
if hasattr(field_instance, attr_name):
signature[attr_name] = _jsonable_schema_value(getattr(field_instance, attr_name))
return signature
def _inline_model_signature(nested_fields: dict[object, object]) -> object:
"""Build a stable signature for an anonymous inline model."""
return [
(str(field_name), _field_signature(field))
for field_name, field in sorted(nested_fields.items(), key=lambda item: str(item[0]))
]
def _inline_model_name(nested_fields: dict[object, object]) -> str:
"""Return a stable Swagger model name for an anonymous inline field map."""
signature = json.dumps(_inline_model_signature(nested_fields), sort_keys=True, separators=(",", ":"))
digest = hashlib.sha1(signature.encode("utf-8")).hexdigest()[:12]
return f"_AnonymousInlineModel_{digest}"
def apply_runtime_defaults() -> None:
"""Force the small config surface required for Swagger generation."""
os.environ.setdefault("SECRET_KEY", "spec-export")
@ -74,25 +168,26 @@ def _patch_swagger_for_inline_nested_dicts() -> None:
anonymous_models = getattr(self, "_anonymous_inline_models", None)
if anonymous_models is None:
anonymous_models = {}
self._anonymous_inline_models = anonymous_models
self.__dict__["_anonymous_inline_models"] = anonymous_models
anonymous_name = anonymous_models.get(id(nested_fields))
if anonymous_name is None:
anonymous_name = f"_AnonymousInlineModel{len(anonymous_models) + 1}"
anonymous_name = _inline_model_name(nested_fields)
anonymous_models[id(nested_fields)] = anonymous_name
self.api.model(anonymous_name, nested_fields)
if anonymous_name not in self.api.models:
self.api.model(anonymous_name, nested_fields)
return self.api.models[anonymous_name]
def register_model_with_inline_dict_support(self: Swagger, model: object) -> dict[str, str]:
if isinstance(model, dict):
if _is_inline_field_map(model):
model = get_or_create_inline_model(self, model)
return _ORIGINAL_REGISTER_MODEL(self, model)
def register_field_with_inline_dict_support(self: Swagger, field: object) -> None:
nested = getattr(field, "nested", None)
if isinstance(nested, dict):
if _is_inline_field_map(nested):
field.model = get_or_create_inline_model(self, nested) # type: ignore
_ORIGINAL_REGISTER_FIELD(self, field)
@ -105,22 +200,169 @@ def _patch_swagger_for_inline_nested_dicts() -> None:
def create_spec_app() -> Flask:
"""Build a minimal Flask app that only mounts the Swagger-producing blueprints."""
_apply_runtime_defaults()
apply_runtime_defaults()
_patch_swagger_for_inline_nested_dicts()
app = Flask(__name__)
from controllers.console import bp as console_bp
from controllers.console import console_ns
from controllers.service_api import bp as service_api_bp
from controllers.service_api import service_api_ns
from controllers.web import bp as web_bp
from controllers.web import web_ns
app.register_blueprint(console_bp)
app.register_blueprint(web_bp)
app.register_blueprint(service_api_bp)
for namespace in (console_ns, web_ns, service_api_ns):
for api in namespace.apis:
_materialize_inline_model_definitions(api)
return app
def _registered_models(namespace: str) -> dict[str, object]:
"""Return the Flask-RESTX models registered for a Swagger namespace."""
if namespace == "console":
from controllers.console import console_ns
models = dict(console_ns.models)
for api in console_ns.apis:
models.update(api.models)
return models
if namespace == "web":
from controllers.web import web_ns
models = dict(web_ns.models)
for api in web_ns.apis:
models.update(api.models)
return models
if namespace == "service":
from controllers.service_api import service_api_ns
models = dict(service_api_ns.models)
for api in service_api_ns.apis:
models.update(api.models)
return models
raise ValueError(f"unknown Swagger namespace: {namespace}")
def _materialize_inline_model_definitions(api: RestxApi) -> None:
"""Convert inline `fields.Nested({...})` maps into named API models."""
from flask_restx import fields
from flask_restx.model import Model, OrderedModel, instance
inline_models: dict[int, dict[object, object]] = {}
inline_model_names: dict[int, str] = {}
def collect_field(field: object) -> None:
field_instance = instance(field)
if isinstance(field_instance, fields.Nested):
nested = getattr(field_instance, "nested", None)
if _is_inline_field_map(nested) and id(nested) not in inline_models:
inline_models[id(nested)] = nested
for nested_field in nested.values():
collect_field(nested_field)
container = getattr(field_instance, "container", None)
if container is not None:
collect_field(container)
for model in list(api.models.values()):
if isinstance(model, (Model, OrderedModel)):
for field in model.values():
collect_field(field)
for nested_fields in sorted(inline_models.values(), key=_inline_model_name):
anonymous_name = _inline_model_name(nested_fields)
inline_model_names[id(nested_fields)] = anonymous_name
if anonymous_name not in api.models:
api.model(anonymous_name, nested_fields)
def model_name_for(nested_fields: dict[object, object]) -> str:
anonymous_name = inline_model_names.get(id(nested_fields))
if anonymous_name is None:
anonymous_name = _inline_model_name(nested_fields)
inline_model_names[id(nested_fields)] = anonymous_name
if anonymous_name not in api.models:
api.model(anonymous_name, nested_fields)
return anonymous_name
def materialize_field(field: object) -> None:
field_instance = instance(field)
if isinstance(field_instance, fields.Nested):
nested = getattr(field_instance, "nested", None)
if _is_inline_field_map(nested):
field_instance.model = api.models[model_name_for(nested)] # type: ignore[attr-defined]
container = getattr(field_instance, "container", None)
if container is not None:
materialize_field(container)
index = 0
while index < len(api.models):
model = list(api.models.values())[index]
index += 1
if isinstance(model, (Model, OrderedModel)):
for field in model.values():
materialize_field(field)
def drop_null_values(value: object) -> object:
"""Remove JSON null values that make the Markdown converter crash."""
if isinstance(value, dict):
return {key: drop_null_values(item) for key, item in value.items() if item is not None}
if isinstance(value, list):
return [drop_null_values(item) for item in value]
return value
def sort_openapi_arrays(value: object, *, parent_key: str | None = None) -> object:
"""Sort order-insensitive Swagger arrays so generated Markdown is stable."""
if isinstance(value, dict):
return {key: sort_openapi_arrays(item, parent_key=key) for key, item in value.items()}
if not isinstance(value, list):
return value
sorted_items = [sort_openapi_arrays(item, parent_key=parent_key) for item in value]
if parent_key == "parameters":
return sorted(
sorted_items,
key=lambda item: (
item.get("in", "") if isinstance(item, dict) else "",
item.get("name", "") if isinstance(item, dict) else "",
json.dumps(item, sort_keys=True, default=str),
),
)
if parent_key in {"enum", "required", "schemes", "tags"}:
string_items = [item for item in sorted_items if isinstance(item, str)]
if len(string_items) == len(sorted_items):
return sorted(string_items)
return sorted_items
def _merge_registered_definitions(payload: dict[str, object], namespace: str) -> dict[str, object]:
"""Include registered but route-indirect models in the exported Swagger definitions."""
definitions = payload.setdefault("definitions", {})
if not isinstance(definitions, dict):
raise RuntimeError("unexpected Swagger definitions payload")
for name, model in _registered_models(namespace).items():
schema = getattr(model, "__schema__", None)
if isinstance(schema, dict):
definitions.setdefault(name, schema)
return payload
def generate_specs(output_dir: Path) -> list[Path]:
"""Write all Swagger specs to `output_dir` and return the written paths."""
@ -138,6 +380,9 @@ def generate_specs(output_dir: Path) -> list[Path]:
payload = response.get_json()
if not isinstance(payload, dict):
raise RuntimeError(f"unexpected response payload for {target.route}")
payload = _merge_registered_definitions(payload, target.namespace)
payload = drop_null_values(payload)
payload = sort_openapi_arrays(payload)
output_path = output_dir / target.filename
output_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,103 @@
"""Unit tests for the Markdown API docs generator."""
import importlib.util
import sys
from pathlib import Path
def _load_generate_swagger_markdown_docs_module():
api_dir = Path(__file__).resolve().parents[3]
script_path = api_dir / "dev" / "generate_swagger_markdown_docs.py"
spec = importlib.util.spec_from_file_location("generate_swagger_markdown_docs", script_path)
assert spec
assert spec.loader
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module) # type: ignore[attr-defined]
return module
def test_generate_markdown_docs_keeps_split_docs_and_merges_fastopenapi_into_console(tmp_path, monkeypatch):
module = _load_generate_swagger_markdown_docs_module()
swagger_dir = tmp_path / "openapi"
markdown_dir = tmp_path / "markdown"
stale_combined_doc = markdown_dir / "api-reference.md"
markdown_dir.mkdir()
stale_combined_doc.write_text("stale", encoding="utf-8")
def write_specs(output_dir: Path) -> list[Path]:
output_dir.mkdir(parents=True, exist_ok=True)
paths = []
for target in module.SPEC_TARGETS:
path = output_dir / target.filename
path.write_text("{}", encoding="utf-8")
paths.append(path)
return paths
def write_fastopenapi_specs(output_dir: Path) -> list[Path]:
output_dir.mkdir(parents=True, exist_ok=True)
path = output_dir / module.FASTOPENAPI_SPEC_TARGETS[0].filename
path.write_text("{}", encoding="utf-8")
return [path]
def convert_spec_to_markdown(spec_path: Path, markdown_path: Path) -> None:
markdown_path.write_text(f"# {spec_path.stem}\n\n## Routes\n", encoding="utf-8")
monkeypatch.setattr(module, "generate_specs", write_specs)
monkeypatch.setattr(module, "generate_fastopenapi_specs", write_fastopenapi_specs)
monkeypatch.setattr(module, "_convert_spec_to_markdown", convert_spec_to_markdown)
written_paths = module.generate_markdown_docs(swagger_dir, markdown_dir)
assert [path.name for path in written_paths] == [
"console-swagger.md",
"web-swagger.md",
"service-swagger.md",
]
assert not stale_combined_doc.exists()
assert not list(swagger_dir.glob("*.json"))
console_markdown = (markdown_dir / "console-swagger.md").read_text(encoding="utf-8")
assert "## FastOpenAPI Preview (OpenAPI 3.0)" in console_markdown
assert "### fastopenapi-console-openapi" in console_markdown
assert "#### Routes" in console_markdown
assert "FastOpenAPI Preview" not in (markdown_dir / "web-swagger.md").read_text(encoding="utf-8")
assert "FastOpenAPI Preview" not in (markdown_dir / "service-swagger.md").read_text(encoding="utf-8")
def test_generate_markdown_docs_only_removes_generated_specs_from_separate_swagger_dir(tmp_path, monkeypatch):
module = _load_generate_swagger_markdown_docs_module()
swagger_dir = tmp_path / "swagger"
markdown_dir = tmp_path / "markdown"
swagger_dir.mkdir()
existing_file = swagger_dir / "existing.txt"
existing_file.write_text("keep me", encoding="utf-8")
def write_specs(output_dir: Path) -> list[Path]:
output_dir.mkdir(parents=True, exist_ok=True)
paths = []
for target in module.SPEC_TARGETS:
path = output_dir / target.filename
path.write_text("{}", encoding="utf-8")
paths.append(path)
return paths
def write_fastopenapi_specs(output_dir: Path) -> list[Path]:
output_dir.mkdir(parents=True, exist_ok=True)
path = output_dir / module.FASTOPENAPI_SPEC_TARGETS[0].filename
path.write_text("{}", encoding="utf-8")
return [path]
def convert_spec_to_markdown(spec_path: Path, markdown_path: Path) -> None:
markdown_path.write_text(f"# {spec_path.stem}\n", encoding="utf-8")
monkeypatch.setattr(module, "generate_specs", write_specs)
monkeypatch.setattr(module, "generate_fastopenapi_specs", write_fastopenapi_specs)
monkeypatch.setattr(module, "_convert_spec_to_markdown", convert_spec_to_markdown)
module.generate_markdown_docs(swagger_dir, markdown_dir)
assert existing_file.read_text(encoding="utf-8") == "keep me"
assert not list(swagger_dir.glob("*.json"))

View File

@ -6,6 +6,16 @@ import sys
from pathlib import Path
def _walk_values(value):
yield value
if isinstance(value, dict):
for child in value.values():
yield from _walk_values(child)
elif isinstance(value, list):
for child in value:
yield from _walk_values(child)
def _load_generate_swagger_specs_module():
api_dir = Path(__file__).resolve().parents[3]
script_path = api_dir / "dev" / "generate_swagger_specs.py"
@ -35,3 +45,32 @@ def test_generate_specs_writes_console_web_and_service_swagger_files(tmp_path):
payload = json.loads(path.read_text(encoding="utf-8"))
assert payload["swagger"] == "2.0"
assert "paths" in payload
def test_generate_specs_writes_swagger_with_resolvable_references_and_no_nulls(tmp_path):
module = _load_generate_swagger_specs_module()
written_paths = module.generate_specs(tmp_path)
for path in written_paths:
payload = json.loads(path.read_text(encoding="utf-8"))
definitions = payload["definitions"]
refs = {
item["$ref"].removeprefix("#/definitions/")
for item in _walk_values(payload)
if isinstance(item, dict) and isinstance(item.get("$ref"), str)
}
assert refs <= set(definitions)
assert all(value is not None for value in _walk_values(payload))
def test_generate_specs_is_idempotent(tmp_path):
module = _load_generate_swagger_specs_module()
first_paths = module.generate_specs(tmp_path / "first")
second_paths = module.generate_specs(tmp_path / "second")
assert [path.name for path in first_paths] == [path.name for path in second_paths]
for first_path, second_path in zip(first_paths, second_paths):
assert first_path.read_text(encoding="utf-8") == second_path.read_text(encoding="utf-8")

View File

@ -17,6 +17,14 @@ class ProductModel(BaseModel):
price: float
class ChildModel(BaseModel):
value: str
class ParentModel(BaseModel):
child: ChildModel
@pytest.fixture(autouse=True)
def mock_console_ns():
"""Mock the console_ns to avoid circular imports during test collection."""
@ -64,6 +72,22 @@ def test_register_schema_model_passes_schema_from_pydantic():
assert schema == expected_schema
def test_register_schema_model_promotes_nested_pydantic_definitions():
from controllers.common.schema import DEFAULT_REF_TEMPLATE_SWAGGER_2_0, register_schema_model
namespace = MagicMock(spec=Namespace)
register_schema_model(namespace, ParentModel)
called_schemas = {call.args[0]: call.args[1] for call in namespace.schema_model.call_args_list}
parent_schema = ParentModel.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0)
assert set(called_schemas) == {"ParentModel", "ChildModel"}
assert "$defs" not in called_schemas["ParentModel"]
assert called_schemas["ParentModel"]["properties"]["child"]["$ref"] == "#/definitions/ChildModel"
assert called_schemas["ChildModel"] == parent_schema["$defs"]["ChildModel"]
def test_register_schema_models_registers_multiple_models():
from controllers.common.schema import register_schema_models