feat: 429 rate-limit handling on the unified ErrorBody contract (openapi + difyctl) (#37313)

Co-authored-by: Xiyuan Chen <52963600+GareArc@users.noreply.github.com>
This commit is contained in:
L1nSn0w 2026-06-12 14:35:15 +08:00 committed by GitHub
parent c5ab38b2ad
commit 07eb4903b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 638 additions and 24 deletions

View File

@ -943,9 +943,10 @@ class AuthConfig(BaseSettings):
default=True,
)
OPENAPI_RATE_LIMIT_PER_TOKEN: PositiveInt = Field(
OPENAPI_RATE_LIMIT_PER_TOKEN: NonNegativeInt = Field(
description="Per-token rate limit on /openapi/v1/* (requests per minute). "
"Bucket keyed on sha256(token), shared across api replicas via Redis.",
"Bucket keyed on sha256(token), shared across api replicas via Redis. "
"Set to 0 to disable the per-token limit entirely.",
default=60,
)

View File

@ -8,7 +8,14 @@ from contextlib import contextmanager
from typing import Any
from flask_restx import Resource
from werkzeug.exceptions import BadRequest, HTTPException, InternalServerError, NotFound, UnprocessableEntity
from werkzeug.exceptions import (
BadRequest,
HTTPException,
InternalServerError,
NotFound,
TooManyRequests,
UnprocessableEntity,
)
import services
from controllers.openapi import openapi_ns
@ -29,6 +36,7 @@ from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpErr
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import (
AppInvokeQuotaExceededError,
ModelCurrentlyNotSupportError,
ProviderTokenNotInitError,
QuotaExceededError,
@ -71,6 +79,11 @@ def _translate_service_errors() -> Iterator[None]:
raise ProviderQuotaExceededError()
except ModelCurrentlyNotSupportError:
raise ProviderModelCurrentlyNotSupportError()
except AppInvokeQuotaExceededError:
# App concurrency limit. Without this it falls through to the bare `except Exception`
# below and surfaces as a 500. Render as the canonical 429 (code "too_many_requests");
# the source message is dropped since it carries internal detail (client_id / limits).
raise TooManyRequests()
except InvokeRateLimitError as ex:
raise InvokeRateLimitHttpError(ex.description)
except InvokeError as e:

View File

@ -15,7 +15,7 @@ from enum import StrEnum
from functools import wraps
from typing import ParamSpec, TypeVar
from flask import jsonify, make_response, request, session
from flask import request, session
from werkzeug.exceptions import TooManyRequests
from configs import dify_config
@ -132,20 +132,32 @@ def enforce(spec: RateLimit, *, key: str) -> None:
limiter.increment_rate_limit(key)
class _BearerRateLimited(TooManyRequests):
"""Per-token 429. Carries Retry-After as a plain ``headers`` attribute because the openapi
error formatter reads ``getattr(e, "headers")`` (not werkzeug's ``get_headers()``). No
pre-built response, so the formatter still renders the canonical ErrorBody ("too_many_requests").
"""
headers: dict[str, str]
def __init__(self, retry_after_seconds: int) -> None:
super().__init__()
self.headers = {"Retry-After": str(retry_after_seconds)}
def enforce_bearer_rate_limit(token_hash: str) -> None:
"""Per-token rate limit on /openapi/v1/* bearer-authed routes.
Bucket key = ``token:<sha256_hex>`` so the same token shares one
bucket across api replicas (Redis-backed sliding window).
"""
# 0 (or less) disables the per-token limit. Short-circuit here: a limiter built with
# max_attempts=0 would otherwise treat every request as already over the limit.
if LIMIT_BEARER_PER_TOKEN.limit <= 0:
return
limiter = _build_limiter(LIMIT_BEARER_PER_TOKEN)
key = f"token:{token_hash}"
if limiter.is_rate_limited(key):
retry_after = limiter.seconds_until_available(key)
response = make_response(
jsonify({"error": "rate_limited", "retry_after_ms": retry_after * 1000}),
429,
)
response.headers["Retry-After"] = str(retry_after)
raise TooManyRequests(response=response)
raise _BearerRateLimited(retry_after)
limiter.increment_rate_limit(key)

View File

@ -0,0 +1,29 @@
"""The openapi run boundary maps internal rate-limit exceptions to canonical 429s.
Both render through the ErrorBody formatter: TooManyRequests -> code "too_many_requests"
(retryable throttle), InvokeRateLimitError -> code "rate_limit_error" (quota).
"""
import pytest
from werkzeug.exceptions import TooManyRequests
from controllers.openapi.app_run import _translate_service_errors
from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError
from core.errors.error import AppInvokeQuotaExceededError
from services.errors.llm import InvokeRateLimitError
def test_translate_maps_app_concurrency_to_too_many_requests():
# Regression guard: this used to fall through to a 500 (it was not caught here).
with pytest.raises(TooManyRequests) as exc:
with _translate_service_errors():
raise AppInvokeQuotaExceededError("internal: client_id=abc max=10")
assert exc.value.code == 429
def test_translate_maps_workflow_quota_to_rate_limit_error():
with pytest.raises(InvokeRateLimitHttpError) as exc:
with _translate_service_errors():
raise InvokeRateLimitError("workflow quota exhausted")
assert exc.value.error_code == "rate_limit_error"
assert exc.value.code == 429

View File

@ -10,6 +10,7 @@ from werkzeug.exceptions import (
Forbidden,
InternalServerError,
NotFound,
TooManyRequests,
Unauthorized,
UnprocessableEntity,
)
@ -309,6 +310,7 @@ ERROR_MATRIX = [
(ProviderModelCurrentlyNotSupportError(), 400, "model_currently_not_support"),
(CompletionRequestError(), 400, "completion_request_error"),
(InvokeRateLimitHttpError(), 429, "rate_limit_error"),
(TooManyRequests("x"), 429, "too_many_requests"), # difyctl's classifyRateLimit keys retryability on this code
(FileTooLargeError(), 413, "file_too_large"),
(UnsupportedFileTypeError(), 415, "unsupported_file_type"),
(NoFileUploadedError(), 400, "no_file_uploaded"),

View File

@ -6,6 +6,7 @@ from constants import COOKIE_NAME_ACCESS_TOKEN, COOKIE_NAME_CSRF_TOKEN, COOKIE_N
from core.errors.error import AppInvokeQuotaExceededError
from libs.exception import BaseHTTPException
from libs.external_api import ExternalApi
from libs.rate_limit import _BearerRateLimited
def _create_api_app():
@ -58,6 +59,13 @@ def _create_api_app():
e.description = {"field": "is required"}
raise e
# The per-token rate limit raises this; ExternalApi must carry its Retry-After header to the
# wire (it reads getattr(e, "headers"), not werkzeug's get_headers()).
@api.route("/rate-limited")
class RateLimited(Resource):
def get(self):
raise _BearerRateLimited(23)
app.register_blueprint(bp, url_prefix="/api")
return app
@ -115,6 +123,16 @@ def test_external_api_param_mapping_and_quota():
assert res.status_code in (400, 429)
def test_external_api_carries_exception_headers_to_429_response():
# Locks the coupling enforce_bearer_rate_limit relies on: handle_error reads getattr(e,
# "headers") and puts it on the response, so Retry-After reaches the wire.
app = _create_api_app()
res = app.test_client().get("/api/rate-limited")
assert res.status_code == 429
assert res.headers["Retry-After"] == "23"
assert (res.get_json() or {})["status"] == 429
def test_unauthorized_and_force_logout_clears_cookies():
"""Test that UnauthorizedAndForceLogout error clears auth cookies"""

View File

@ -11,6 +11,8 @@ from werkzeug.exceptions import TooManyRequests
from libs.helper import RateLimiter
from libs.rate_limit import (
LIMIT_BEARER_PER_TOKEN,
RateLimit,
RateLimitScope,
enforce_bearer_rate_limit,
)
@ -67,8 +69,17 @@ def test_enforce_bearer_rate_limit_raises_429_with_retry_after(mock_build):
mock_build.return_value = limiter
with pytest.raises(TooManyRequests) as exc:
enforce_bearer_rate_limit("hash-1")
headers = dict(exc.value.get_response().headers)
assert headers.get("Retry-After") == "23"
body = exc.value.get_response().get_json() or {}
assert body.get("error") == "rate_limited"
assert body.get("retry_after_ms") == 23000
# Header-only TooManyRequests: the canonical ErrorBody (code "too_many_requests") is built
# later by the openapi formatter; here we only assert the advisory header rides along.
assert dict(exc.value.headers).get("Retry-After") == "23"
@patch("libs.rate_limit._build_limiter")
def test_enforce_bearer_rate_limit_disabled_when_limit_is_zero(mock_build, monkeypatch):
# 0 disables the limit — short-circuit before building/consulting a limiter.
monkeypatch.setattr(
"libs.rate_limit.LIMIT_BEARER_PER_TOKEN",
RateLimit(limit=0, window=timedelta(minutes=1), scopes=(RateLimitScope.TOKEN_ID,)),
)
enforce_bearer_rate_limit("hash-1")
mock_build.assert_not_called()

View File

@ -34,6 +34,7 @@ export function buildRunBody(args: RunBodyArgs): Record<string, unknown> {
export type StreamOptions = {
signal?: AbortSignal
includeStateSnapshot?: boolean
retryOnRateLimit?: boolean
}
export class AppRunClient {
@ -59,6 +60,7 @@ export class AppRunClient {
headers: { Accept: 'text/event-stream' },
signal: opts.signal,
throwOnError: true,
retryOnRateLimit: opts.retryOnRateLimit,
})
if (res.body === null)
throw new Error('streaming response body missing')

View File

@ -56,7 +56,7 @@ export class StreamingStructuredStrategy implements RunStrategy {
let resp: Record<string, unknown>
try {
const events = await ctx.runClient.runStream(opts.appId, body, { signal: ctrl.signal })
const events = await ctx.runClient.runStream(opts.appId, body, { signal: ctrl.signal, retryOnRateLimit: opts.retryOnRateLimit })
const wrappedEvents = captureTaskId(events, (id) => {
taskId = id
})

View File

@ -28,7 +28,7 @@ export class StreamingTextStrategy implements RunStrategy {
handle('SIGINT', cleanup)
try {
const events = await ctx.runClient.runStream(opts.appId, body, { signal: ctrl.signal })
const events = await ctx.runClient.runStream(opts.appId, body, { signal: ctrl.signal, retryOnRateLimit: opts.retryOnRateLimit })
const sp = streamPrinterFor(mode, ctx.think, deps.io.isErrTTY)
const dec = new TextDecoder()
for await (const ev of events) {

View File

@ -35,6 +35,7 @@ export default class RunApp extends DifyCommand {
'workspace': Flags.string({ description: 'Workspace id (overrides DIFY_WORKSPACE_ID and stored default)' }),
'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive (default: collect and print at end)', default: false }),
'think': Flags.boolean({ description: 'Show model thinking/reasoning when available. Strips <think>...</think> blocks silently by default; with --think, thinking is printed to stderr.', default: false }),
'retry-on-limit': Flags.boolean({ description: 'On a 429 rate limit, wait and retry this POST (bounded) instead of failing immediately. Off by default since running an app is not idempotent.', default: false }),
'http-retry': httpRetryFlag,
'output': Flags.outputFormat({ options: [OutputFormat.JSON, OutputFormat.YAML, OutputFormat.TEXT], default: '' }),
}
@ -56,6 +57,7 @@ export default class RunApp extends DifyCommand {
format,
stream: flags.stream,
think: flags.think,
retryOnRateLimit: flags['retry-on-limit'],
},
{ active: ctx.active, http: ctx.http, host: ctx.host, io: ctx.io, cache: ctx.cache },
)

View File

@ -27,6 +27,7 @@ export type RunAppOptions = {
readonly format?: string
readonly stream?: boolean
readonly think?: boolean
readonly retryOnRateLimit?: boolean
}
export type RunAppDeps = {

View File

@ -18,6 +18,7 @@ describe('error codes', () => {
expect(ExitCode.Usage).toBe(2)
expect(ExitCode.Auth).toBe(4)
expect(ExitCode.VersionCompat).toBe(6)
expect(ExitCode.RateLimited).toBe(7)
})
it('every code maps to an exit', () => {
@ -46,6 +47,7 @@ describe('error codes', () => {
[ErrorCode.Server4xxOther, ExitCode.Generic],
[ErrorCode.ClientError, ExitCode.Generic],
[ErrorCode.Unknown, ExitCode.Generic],
[ErrorCode.RateLimited, ExitCode.RateLimited],
])('exitFor(%s) -> %d', (code, want) => {
expect(exitFor(code)).toBe(want)
})

View File

@ -12,6 +12,7 @@ export const ErrorCode = {
ConfigInvalidKey: 'config_invalid_key',
ConfigInvalidValue: 'config_invalid_value',
NetworkConnection: 'network_connection',
RateLimited: 'rate_limited',
Server5xx: 'server_5xx',
Server4xxOther: 'server_4xx_other',
ClientError: 'client_error',
@ -28,6 +29,8 @@ export const ExitCode = {
Usage: 2,
Auth: 4,
VersionCompat: 6,
// Distinct from Generic so wrappers can tell "rate limited, retry later" from a hard failure.
RateLimited: 7,
} as const
export type ExitCodeValue = (typeof ExitCode)[keyof typeof ExitCode]
@ -46,6 +49,7 @@ const CODE_TO_EXIT: Readonly<Record<ErrorCodeValue, ExitCodeValue>> = {
config_invalid_key: ExitCode.Usage,
config_invalid_value: ExitCode.Usage,
network_connection: ExitCode.Generic,
rate_limited: ExitCode.RateLimited,
server_5xx: ExitCode.Generic,
server_4xx_other: ExitCode.Generic,
client_error: ExitCode.Generic,

View File

@ -192,7 +192,7 @@ describe('http client', () => {
expect(caught.code).toBe(ErrorCode.Server4xxOther)
})
it('handles 429 via retry status code list', async () => {
it('surfaces a 429 as a rate-limit error (dedicated exit code), no retry when budget is 0', async () => {
mock.setScenario('rate-limited')
const client = createHttpClient({
baseURL: base(mock.url),
@ -206,8 +206,246 @@ describe('http client', () => {
}
catch (err) { caught = err }
expect(isHttpClientError(caught)).toBe(true)
if (isHttpClientError(caught))
if (isHttpClientError(caught)) {
expect(caught.httpStatus).toBe(429)
expect(caught.code).toBe(ErrorCode.RateLimited)
expect(caught.exit()).toBe(7)
expect(caught.serverError?.code).toBe('too_many_requests')
}
})
it('retries an idempotent GET on a throttle 429, then succeeds', async () => {
let calls = 0
const stub = await startStub((_req, res) => {
calls++
if (calls === 1) {
res.writeHead(429, { 'content-type': 'application/json' })
res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 }))
return
}
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ workspaces: [] }))
})
const events: { phase: string, status?: number, delayMs?: number }[] = []
try {
const client = createHttpClient({
baseURL: base(stub.url),
bearer: 'dfoa_test',
retryAttempts: 3,
timeoutMs: 5_000,
logger: e => events.push({ phase: e.phase, status: e.status, delayMs: e.delayMs }),
})
const body = await client.get<{ workspaces: unknown[] }>('workspaces')
expect(body.workspaces).toEqual([])
}
finally {
await stub.stop()
}
expect(calls).toBe(2)
const retry = events.find(e => e.phase === 'retry')
expect(retry?.status).toBe(429)
expect(retry?.delayMs).toBeGreaterThan(0)
})
it('does not retry a quota 429 (rate_limit_error) — surfaces immediately', async () => {
let requests = 0
const stub = await startStub((_req, res) => {
res.writeHead(429, { 'content-type': 'application/json' })
res.end(JSON.stringify({ code: 'rate_limit_error', message: 'quota exhausted', status: 429 }))
})
let caught: unknown
try {
const client = createHttpClient({
baseURL: base(stub.url),
bearer: 'dfoa_test',
retryAttempts: 3,
timeoutMs: 5_000,
logger: (e) => {
if (e.phase === 'request')
requests++
},
})
try {
await client.get('workspaces')
}
catch (err) { caught = err }
}
finally {
await stub.stop()
}
expect(requests).toBe(1)
expect(isHttpClientError(caught)).toBe(true)
if (isHttpClientError(caught)) {
expect(caught.code).toBe(ErrorCode.RateLimited)
expect(caught.serverError?.code).toBe('rate_limit_error')
}
})
it('does not retry a POST 429 by default; retries with retry-on-limit', async () => {
let postDefault = 0
const stubDefault = await startStub((_req, res) => {
postDefault++
res.writeHead(429, { 'content-type': 'application/json' })
res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 }))
})
try {
const client = createHttpClient({ baseURL: base(stubDefault.url), bearer: 'dfoa_test', retryAttempts: 3, timeoutMs: 5_000 })
await expect(client.post('apps/app-1/run', { json: { inputs: {} } })).rejects.toBeDefined()
}
finally {
await stubDefault.stop()
}
expect(postDefault).toBe(1)
let calls = 0
const stubOptIn = await startStub((_req, res) => {
calls++
if (calls === 1) {
res.writeHead(429, { 'content-type': 'application/json' })
res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 }))
return
}
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ ok: true }))
})
try {
const client = createHttpClient({ baseURL: base(stubOptIn.url), bearer: 'dfoa_test', retryAttempts: 3, timeoutMs: 5_000 })
const body = await client.post<{ ok: boolean }>('apps/app-1/run', { json: { inputs: {} }, retryOnRateLimit: true })
expect(body.ok).toBe(true)
}
finally {
await stubOptIn.stop()
}
expect(calls).toBe(2)
})
it('still never retries a POST 5xx even with retry-on-limit (idempotency guard)', async () => {
let requests = 0
const stub = await startStub((_req, res) => {
requests++
res.writeHead(503, { 'content-type': 'application/json' })
res.end(JSON.stringify({ code: 'internal_server_error', message: 'boom', status: 503 }))
})
try {
const client = createHttpClient({ baseURL: base(stub.url), bearer: 'dfoa_test', retryAttempts: 3, timeoutMs: 5_000 })
await expect(client.post('apps/app-1/run', { json: { inputs: {} }, retryOnRateLimit: true })).rejects.toBeDefined()
}
finally {
await stub.stop()
}
expect(requests).toBe(1)
})
it('surfaces a RateLimited error after exhausting 429 retries on GET', async () => {
let requests = 0
let retries = 0
const stub = await startStub((_req, res) => {
res.writeHead(429, { 'content-type': 'application/json' })
res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 }))
})
let caught: unknown
try {
const client = createHttpClient({
baseURL: base(stub.url),
bearer: 'dfoa_test',
retryAttempts: 2,
timeoutMs: 5_000,
logger: (e) => {
if (e.phase === 'request')
requests++
else if (e.phase === 'retry')
retries++
},
})
try {
await client.get('workspaces')
}
catch (err) { caught = err }
}
finally {
await stub.stop()
}
expect(requests).toBe(3)
expect(retries).toBe(2)
expect(isHttpClientError(caught)).toBe(true)
if (isHttpClientError(caught))
expect(caught.code).toBe(ErrorCode.RateLimited)
})
it('surfaces a throttle 429 whose Retry-After exceeds the honored cap (no retry)', async () => {
let requests = 0
const stub = await startStub((_req, res) => {
// 120s advised wait > MAX_HONORED_WAIT_MS (60s) — surface rather than park for minutes.
res.writeHead(429, { 'content-type': 'application/json', 'retry-after': '120' })
res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 }))
})
let caught: unknown
try {
const client = createHttpClient({
baseURL: base(stub.url),
bearer: 'dfoa_test',
retryAttempts: 3,
timeoutMs: 5_000,
logger: (e) => {
if (e.phase === 'request')
requests++
},
})
try {
await client.get('workspaces')
}
catch (err) { caught = err }
}
finally {
await stub.stop()
}
expect(requests).toBe(1)
expect(isHttpClientError(caught)).toBe(true)
if (isHttpClientError(caught))
expect(caught.code).toBe(ErrorCode.RateLimited)
})
it('stream GET surfaces a 429 (returns the Response, no throw, no retry)', async () => {
let calls = 0
const stub = await startStub((_req, res) => {
calls++
res.writeHead(429, { 'content-type': 'application/json' })
res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 }))
})
try {
const client = createHttpClient({ baseURL: base(stub.url), bearer: 'dfoa_test', timeoutMs: 5_000 })
const res = await client.stream('workspaces')
expect(res.status).toBe(429)
await res.body?.cancel()
}
finally {
await stub.stop()
}
expect(calls).toBe(1)
})
it('stream POST retries a throttle 429 when retry-on-limit is set', async () => {
let calls = 0
const stub = await startStub((_req, res) => {
calls++
if (calls === 1) {
res.writeHead(429, { 'content-type': 'application/json' })
res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 }))
return
}
res.writeHead(200, { 'content-type': 'text/event-stream' })
res.end('data: {}\n\n')
})
try {
const client = createHttpClient({ baseURL: base(stub.url), bearer: 'dfoa_test', timeoutMs: 5_000 })
const res = await client.stream('apps/app-1/run', { method: 'POST', json: {}, retryOnRateLimit: true })
expect(res.status).toBe(200)
await res.body?.cancel()
}
finally {
await stub.stop()
}
expect(calls).toBe(2)
})
it('does not retry POST on 503', async () => {

View File

@ -15,7 +15,8 @@ import { buildBody } from './body.js'
import { classifyResponse } from './error-mapper.js'
import { classifyTransport, logRequest, logResponse, setBearer, setUserAgent } from './hooks.js'
import { proxyDispatcher } from './proxy.js'
import { backoffDelay, shouldRetry } from './retry.js'
import { classifyRateLimit, MAX_HONORED_WAIT_MS, RATE_LIMIT_MAX_ATTEMPTS, rateLimitDelayMs } from './rate-limit.js'
import { backoffDelay, isIdempotentRetryMethod, shouldRetry } from './retry.js'
import { redactBearer } from './sanitize.js'
import { appendSearchParams, joinURL } from './url.js'
@ -133,6 +134,7 @@ function buildRequest(state: ClientState, path: string, opts: RequestOptions, th
timeoutMs: effectiveTimeoutMs,
retryAttempts: effectiveRetryAttempts,
throwOnError,
retryOnRateLimit: opts.retryOnRateLimit ?? false,
}
return { request, resolved, effectiveTimeoutMs, userSignal: opts.signal }
}
@ -204,6 +206,37 @@ async function execute(
const res = ctx.response
if (!res.ok) {
// 429 has its own policy. The server self-describes via the ErrorBody `code`: a
// "too_many_requests" throttle waits-and-retries on idempotent methods (or opted-in POSTs)
// honoring Retry-After; quota / unrecognized 429s surface immediately rather than burning
// retries. Surfacing reuses the shared classifyResponse so the body parses to ErrorBody.
if (res.status === 429) {
const decision = await classifyRateLimit(res.clone())
const canRetry
= decision.retryable
&& attempt < effectiveRetryAttempts
&& (decision.retryAfterMs === undefined || decision.retryAfterMs <= MAX_HONORED_WAIT_MS)
&& (isIdempotentRetryMethod(method) || (method === 'POST' && resolved.retryOnRateLimit))
if (canRetry) {
const delay = rateLimitDelayMs(decision, attempt + 1)
state.logger?.({ phase: 'retry', method, url: redactBearer(ctx.request.url), status: 429, attempt: attempt + 1, delayMs: delay })
await res.body?.cancel().catch(() => {})
if (delay > 0)
await new Promise(resolve => setTimeout(resolve, delay))
continue
}
ctx.error = await classifyResponse(ctx.request, res)
await runHooks(state.hooks.onResponseError, ctx)
if (throwOnError) {
const finalErr = ctx.error
if (finalErr instanceof Error && typeof Error.captureStackTrace === 'function')
Error.captureStackTrace(finalErr, execute)
throw finalErr
}
return res
}
if (attempt < effectiveRetryAttempts && shouldRetry(res, ctx)) {
state.logger?.({ phase: 'retry', method, url: redactBearer(ctx.request.url), attempt: attempt + 1 })
// Drain the discarded error body so undici can release the socket back to its
@ -259,10 +292,18 @@ export function createHttpClient(opts: ClientOptions): HttpClient {
const streamFetch = (path: string, callOpts?: RequestOptions): Promise<Response> => {
// SSE bodies must not be aborted by a request-level timeout — `0` is the buildRequest
// sentinel for "no timeout" and also overrides the client default.
//
// A stream normally never retries (a mid-stream replay would double-send). When the caller
// opts into 429 retry, allow a bounded budget: the 429 admission rejection arrives as a plain
// body before the stream opens, and execute()'s 429 branch is the only path that fires for a
// POST — shouldRetry still rejects POST for transport / 5xx, so nothing else replays.
const retryAttempts = callOpts?.retryOnRateLimit === true
? (callOpts.retryAttempts ?? RATE_LIMIT_MAX_ATTEMPTS)
: 0
const finalOpts: RequestOptions = {
...callOpts,
method: callOpts?.method ?? 'GET',
retryAttempts: 0,
retryAttempts,
timeoutMs: 0,
}
const built = buildRequest(state, path, finalOpts, false)
@ -284,6 +325,7 @@ export function createHttpClient(opts: ClientOptions): HttpClient {
timeoutMs: state.defaultTimeoutMs,
retryAttempts: state.defaultRetryAttempts,
throwOnError: false,
retryOnRateLimit: false,
}
const userSignal = init?.signal ?? req.signal
return execute(state, req, resolved, state.defaultTimeoutMs, userSignal)

View File

@ -54,6 +54,22 @@ describe('classifyResponse — canonical ErrorBody', () => {
expect(err.code).toBe(ErrorCode.Server4xxOther)
expect(err.serverError?.code).toBe('some_future_code')
})
it('429 classifies as RateLimited (dedicated exit code) and keeps the server code', async () => {
const err = await classified(429, { code: 'too_many_requests', message: 'slow down', status: 429 })
expect(err.code).toBe(ErrorCode.RateLimited)
expect(err.exit()).toBe(7)
expect(err.serverError?.code).toBe('too_many_requests')
})
it('429 with no parseable ErrorBody falls back to a generic rate-limit message', async () => {
const err = await classified(429, 'not json')
expect(err.code).toBe(ErrorCode.RateLimited)
expect(err.serverError).toBeUndefined()
expect(err.message).toBe('too many requests')
})
})
describe('classifyResponse — non-conforming bodies (no fallback by design)', () => {

View File

@ -36,9 +36,19 @@ const SERVER_4XX_CLASS: StatusClass = {
includeRaw: true,
}
// 429 gets a dedicated CLI code (its own exit code) so wrappers can tell a rate limit from a hard
// failure. The serverError.code ("too_many_requests" / "rate_limit_error") still rides along.
const RATE_LIMITED_CLASS: StatusClass = {
code: ErrorCode.RateLimited,
fallbackMessage: () => 'too many requests',
includeRaw: false,
}
function statusClass(status: number): StatusClass {
if (status === 401)
return AUTH_EXPIRED_CLASS
if (status === 429)
return RATE_LIMITED_CLASS
if (status >= 500)
return SERVER_5XX_CLASS
return SERVER_4XX_CLASS

View File

@ -0,0 +1,91 @@
import { describe, expect, it } from 'vitest'
import {
classifyRateLimit,
MAX_HONORED_WAIT_MS,
parseRetryAfterMs,
RATE_LIMIT_MAX_ATTEMPTS,
rateLimitDelayMs,
} from './rate-limit.js'
function res429(body: unknown, headers?: Record<string, string>): Response {
return new Response(typeof body === 'string' ? body : JSON.stringify(body), {
status: 429,
headers: { 'content-type': 'application/json', ...headers },
})
}
function headers(init?: Record<string, string>): Headers {
return new Headers(init)
}
describe('classifyRateLimit', () => {
it('throttle (code too_many_requests) is retryable and reads the Retry-After header', async () => {
const d = await classifyRateLimit(res429({ code: 'too_many_requests', status: 429 }, { 'retry-after': '2' }))
expect(d).toEqual({ retryable: true, retryAfterMs: 2000 })
})
it('throttle without Retry-After is retryable with no advised wait', async () => {
const d = await classifyRateLimit(res429({ code: 'too_many_requests', status: 429 }))
expect(d).toEqual({ retryable: true, retryAfterMs: undefined })
})
it('quota (code rate_limit_error) is not retryable', async () => {
const d = await classifyRateLimit(res429({ code: 'rate_limit_error', status: 429 }, { 'retry-after': '5' }))
expect(d.retryable).toBe(false)
expect(d.retryAfterMs).toBeUndefined()
})
it.each([
['unknown code', { code: 'mystery' }],
['no code', { message: 'nope' }],
['non-JSON body', 'not json'],
])('unrecognized 429 (%s) is conservatively non-retryable', async (_label, body) => {
const d = await classifyRateLimit(res429(body))
expect(d.retryable).toBe(false)
})
it('reads the body off a clone (response stays consumable)', async () => {
const r = res429({ code: 'too_many_requests', status: 429 })
await classifyRateLimit(r)
await expect(r.text()).resolves.toContain('too_many_requests')
})
})
describe('parseRetryAfterMs', () => {
it('reads integer-seconds Retry-After as ms', () => {
expect(parseRetryAfterMs(headers({ 'retry-after': '3' }))).toBe(3000)
})
it('reads an HTTP-date relative to the injected now, clamped at 0', () => {
const now = Date.parse('2026-06-11T00:00:00Z')
expect(parseRetryAfterMs(headers({ 'retry-after': 'Thu, 11 Jun 2026 00:00:05 GMT' }), now)).toBe(5000)
expect(parseRetryAfterMs(headers({ 'retry-after': 'Thu, 11 Jun 2026 00:00:00 GMT' }), now + 10_000)).toBe(0)
})
it('returns undefined when absent or unparseable', () => {
expect(parseRetryAfterMs(headers())).toBeUndefined()
expect(parseRetryAfterMs(headers({ 'retry-after': 'soon' }))).toBeUndefined()
})
})
describe('rateLimitDelayMs', () => {
it('returns the advised wait as-is (the caller already declined over-cap waits)', () => {
expect(rateLimitDelayMs({ retryAfterMs: 800 }, 1)).toBe(800)
expect(rateLimitDelayMs({ retryAfterMs: 5000 }, 1)).toBe(5000)
})
it('falls back to equal-jitter backoff when no wait is advised (rng pinned)', () => {
// attempt 1 => backoffDelay 300; equal jitter => [150, 300].
expect(rateLimitDelayMs({}, 1, { rng: () => 0 })).toBe(150)
expect(rateLimitDelayMs({}, 1, { rng: () => 1 })).toBe(300)
})
it('returns 0 when there is neither an advised wait nor any backoff (attempt 0)', () => {
expect(rateLimitDelayMs({}, 0, { rng: () => 0.5 })).toBe(0)
})
it('exposes sane retry constants', () => {
expect(MAX_HONORED_WAIT_MS).toBe(60_000)
expect(RATE_LIMIT_MAX_ATTEMPTS).toBe(3)
})
})

View File

@ -0,0 +1,90 @@
import { backoffDelay } from './retry.js'
// Stateless handling for the server's 429s: react when one arrives, never predict or store limits.
// The server is self-describing via the unified ErrorBody `code`:
// "too_many_requests" → throttle, waiting helps (retryable)
// "rate_limit_error" → quota, waiting within the window does not (not retryable)
// anything else / unparsable → conservative: not retryable.
export type RateLimitDecision = {
readonly retryable: boolean
// The advised wait, from the Retry-After header (only meaningful for a retryable throttle).
readonly retryAfterMs?: number
}
// The longest server-advised wait we'll honor by retrying. If Retry-After is larger, the 429
// branch surfaces immediately instead of parking the process for minutes (better to let the
// caller decide than to sleep through several capped retries that will just 429 again).
export const MAX_HONORED_WAIT_MS = 60_000
export const RATE_LIMIT_MAX_ATTEMPTS = 3
function bodyCode(raw: string): string | undefined {
try {
const parsed = JSON.parse(raw) as unknown
if (typeof parsed === 'object' && parsed !== null) {
const code = (parsed as Record<string, unknown>).code
return typeof code === 'string' ? code : undefined
}
}
catch {
// not JSON
}
return undefined
}
// Read a 429 response into a retry decision. Reads the ErrorBody `code` for retryability and
// the Retry-After header for the wait; both off a clone so the body stays consumable downstream.
export async function classifyRateLimit(response: Response): Promise<RateLimitDecision> {
let raw = ''
try {
raw = await response.clone().text()
}
catch {
// ignore read errors; raw stays ''
}
const retryable = bodyCode(raw) === 'too_many_requests'
return { retryable, retryAfterMs: retryable ? parseRetryAfterMs(response.headers) : undefined }
}
// Parse the Retry-After header to ms: integer seconds, or an HTTP-date relative to `now`
// (injectable for deterministic tests). The unified ErrorBody carries no wait field of its own.
export function parseRetryAfterMs(headers: Headers, now: number = Date.now()): number | undefined {
const header = headers.get('retry-after')
if (header === null) {
return undefined
}
const trimmed = header.trim()
if (/^\d+$/.test(trimmed)) {
return Number(trimmed) * 1000
}
const dateMs = Date.parse(trimmed)
if (!Number.isNaN(dateMs)) {
return Math.max(0, dateMs - now)
}
return undefined
}
// Equal-jitter backoff around the exponential base: half fixed + half random. Avoids both the
// thundering-herd of a fixed delay and the near-zero spikes of full jitter.
function jitter(baseMs: number, rng: () => number): number {
if (baseMs <= 0) {
return 0
}
const half = baseMs / 2
return Math.round(half + rng() * half)
}
// How long to wait before the next 429 retry: a known server wait takes precedence (the caller
// has already declined to retry waits beyond MAX_HONORED_WAIT_MS), otherwise jittered exponential
// backoff for sources that advise none (e.g. app concurrency).
export function rateLimitDelayMs(
decision: Pick<RateLimitDecision, 'retryAfterMs'>,
attempt: number,
opts: { rng?: () => number } = {},
): number {
if (decision.retryAfterMs !== undefined) {
return decision.retryAfterMs
}
return jitter(backoffDelay(attempt), opts.rng ?? Math.random)
}

View File

@ -1,6 +1,6 @@
import type { FetchContext, HttpMethod, ResolvedOptions } from './types.js'
import { describe, expect, it } from 'vitest'
import { backoffDelay, shouldRetry } from './retry.js'
import { backoffDelay, isIdempotentRetryMethod, shouldRetry } from './retry.js'
function ctxFor(method: HttpMethod): FetchContext {
const options: ResolvedOptions = {
@ -10,6 +10,7 @@ function ctxFor(method: HttpMethod): FetchContext {
timeoutMs: undefined,
retryAttempts: 0,
throwOnError: true,
retryOnRateLimit: false,
}
return {
request: new Request('https://x/y', { method }),
@ -30,6 +31,11 @@ describe('shouldRetry', () => {
expect(shouldRetry(res, ctxFor('GET'))).toBe(false)
})
it('no longer retries 429 here (it has a dedicated branch in execute())', () => {
const res = new Response(null, { status: 429 })
expect(shouldRetry(res, ctxFor('GET'))).toBe(false)
})
it('does not retry POST regardless of status', () => {
const res = new Response(null, { status: 503 })
expect(shouldRetry(res, ctxFor('POST'))).toBe(false)
@ -50,6 +56,16 @@ describe('shouldRetry', () => {
})
})
describe('isIdempotentRetryMethod', () => {
it('is true for GET/PUT/DELETE and false for POST/PATCH', () => {
expect(isIdempotentRetryMethod('GET')).toBe(true)
expect(isIdempotentRetryMethod('PUT')).toBe(true)
expect(isIdempotentRetryMethod('DELETE')).toBe(true)
expect(isIdempotentRetryMethod('POST')).toBe(false)
expect(isIdempotentRetryMethod('PATCH')).toBe(false)
})
})
describe('backoffDelay', () => {
it('returns 0 for attempts <= 0', () => {
expect(backoffDelay(0)).toBe(0)

View File

@ -1,11 +1,19 @@
import type { FetchContext } from './types.js'
export const RETRY_METHODS = ['GET', 'PUT', 'DELETE'] as const
export const RETRY_STATUS_CODES = [408, 413, 429, 500, 502, 503, 504] as const
// 429 is intentionally absent — it has a dedicated branch in execute(). shouldRetry covers
// transport errors / 408 / 413 / 5xx only.
export const RETRY_STATUS_CODES = [408, 413, 500, 502, 503, 504] as const
const RETRY_METHODS_SET: ReadonlySet<string> = new Set(RETRY_METHODS)
const RETRY_STATUS_SET: ReadonlySet<number> = new Set(RETRY_STATUS_CODES)
// GET/PUT/DELETE are idempotent — safe to auto-retry. The 429 branch reuses this to decide which
// methods may wait-and-retry a throttle without risking a double-run.
export function isIdempotentRetryMethod(method: string): boolean {
return RETRY_METHODS_SET.has(method)
}
export function shouldRetry(target: Response | unknown, ctx: FetchContext): boolean {
if (!RETRY_METHODS_SET.has(ctx.options.method))
return false

View File

@ -7,6 +7,8 @@ export type HttpLogEvent = {
readonly status?: number
readonly attempt?: number
readonly durationMs?: number
// Set on a 429 retry decision so --verbose can explain how long we waited.
readonly delayMs?: number
}
export type HttpLogger = (event: HttpLogEvent) => void
@ -51,6 +53,8 @@ export type RequestOptions = {
readonly retryAttempts?: number
readonly signal?: AbortSignal
readonly throwOnError?: boolean
// Opt a non-idempotent POST into bounded wait-and-retry on a 429 throttle.
readonly retryOnRateLimit?: boolean
}
export type ResolvedOptions = {
@ -60,6 +64,7 @@ export type ResolvedOptions = {
readonly timeoutMs: number | undefined
readonly retryAttempts: number
readonly throwOnError: boolean
readonly retryOnRateLimit: boolean
}
export type ClientOptions = {

View File

@ -150,8 +150,9 @@ export function buildApp(getScenario: () => Scenario, state?: MockState): Hono {
app.use('*', async (c, next) => {
const scenario = getScenario()
if (scenario === 'rate-limited') {
// Unified ErrorBody — per-token throttle (retryable); Retry-After advises the wait.
return c.json(
{ error: { code: 'rate_limited', message: 'too many requests' } },
{ code: 'too_many_requests', message: 'Too many requests for this API token.', status: 429 },
{ status: 429, headers: { 'retry-after': '1' } },
)
}