From 07eb4903b8f0cffedf83426d13276559790e6890 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Fri, 12 Jun 2026 14:35:15 +0800 Subject: [PATCH] feat: 429 rate-limit handling on the unified ErrorBody contract (openapi + difyctl) (#37313) Co-authored-by: Xiyuan Chen <52963600+GareArc@users.noreply.github.com> --- api/configs/feature/__init__.py | 5 +- api/controllers/openapi/app_run.py | 15 +- api/libs/rate_limit.py | 26 +- .../openapi/test_app_run_rate_limit.py | 29 +++ .../openapi/test_error_contract.py | 2 + .../unit_tests/libs/test_external_api.py | 18 ++ .../unit_tests/libs/test_rate_limit_bearer.py | 21 +- cli/src/api/app-run.ts | 2 + .../app/_strategies/streaming-structured.ts | 2 +- .../run/app/_strategies/streaming-text.ts | 2 +- cli/src/commands/run/app/index.ts | 2 + cli/src/commands/run/app/run.ts | 1 + cli/src/errors/codes.test.ts | 2 + cli/src/errors/codes.ts | 4 + cli/src/http/client.test.ts | 242 +++++++++++++++++- cli/src/http/client.ts | 46 +++- cli/src/http/error-mapper.test.ts | 16 ++ cli/src/http/error-mapper.ts | 10 + cli/src/http/rate-limit.test.ts | 91 +++++++ cli/src/http/rate-limit.ts | 90 +++++++ cli/src/http/retry.test.ts | 18 +- cli/src/http/retry.ts | 10 +- cli/src/http/types.ts | 5 + cli/test/fixtures/dify-mock/server.ts | 3 +- 24 files changed, 638 insertions(+), 24 deletions(-) create mode 100644 api/tests/unit_tests/controllers/openapi/test_app_run_rate_limit.py create mode 100644 cli/src/http/rate-limit.test.ts create mode 100644 cli/src/http/rate-limit.ts diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 109ce749a64..dc8c840da9c 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -943,9 +943,10 @@ class AuthConfig(BaseSettings): default=True, ) - OPENAPI_RATE_LIMIT_PER_TOKEN: PositiveInt = Field( + OPENAPI_RATE_LIMIT_PER_TOKEN: NonNegativeInt = Field( description="Per-token rate limit on /openapi/v1/* (requests per minute). " - "Bucket keyed on sha256(token), shared across api replicas via Redis.", + "Bucket keyed on sha256(token), shared across api replicas via Redis. " + "Set to 0 to disable the per-token limit entirely.", default=60, ) diff --git a/api/controllers/openapi/app_run.py b/api/controllers/openapi/app_run.py index d801f5183f1..3101b2de421 100644 --- a/api/controllers/openapi/app_run.py +++ b/api/controllers/openapi/app_run.py @@ -8,7 +8,14 @@ from contextlib import contextmanager from typing import Any from flask_restx import Resource -from werkzeug.exceptions import BadRequest, HTTPException, InternalServerError, NotFound, UnprocessableEntity +from werkzeug.exceptions import ( + BadRequest, + HTTPException, + InternalServerError, + NotFound, + TooManyRequests, + UnprocessableEntity, +) import services from controllers.openapi import openapi_ns @@ -29,6 +36,7 @@ from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpErr from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.entities.app_invoke_entities import InvokeFrom from core.errors.error import ( + AppInvokeQuotaExceededError, ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError, @@ -71,6 +79,11 @@ def _translate_service_errors() -> Iterator[None]: raise ProviderQuotaExceededError() except ModelCurrentlyNotSupportError: raise ProviderModelCurrentlyNotSupportError() + except AppInvokeQuotaExceededError: + # App concurrency limit. Without this it falls through to the bare `except Exception` + # below and surfaces as a 500. Render as the canonical 429 (code "too_many_requests"); + # the source message is dropped since it carries internal detail (client_id / limits). + raise TooManyRequests() except InvokeRateLimitError as ex: raise InvokeRateLimitHttpError(ex.description) except InvokeError as e: diff --git a/api/libs/rate_limit.py b/api/libs/rate_limit.py index 68147f21cfe..13b27eae9f7 100644 --- a/api/libs/rate_limit.py +++ b/api/libs/rate_limit.py @@ -15,7 +15,7 @@ from enum import StrEnum from functools import wraps from typing import ParamSpec, TypeVar -from flask import jsonify, make_response, request, session +from flask import request, session from werkzeug.exceptions import TooManyRequests from configs import dify_config @@ -132,20 +132,32 @@ def enforce(spec: RateLimit, *, key: str) -> None: limiter.increment_rate_limit(key) +class _BearerRateLimited(TooManyRequests): + """Per-token 429. Carries Retry-After as a plain ``headers`` attribute because the openapi + error formatter reads ``getattr(e, "headers")`` (not werkzeug's ``get_headers()``). No + pre-built response, so the formatter still renders the canonical ErrorBody ("too_many_requests"). + """ + + headers: dict[str, str] + + def __init__(self, retry_after_seconds: int) -> None: + super().__init__() + self.headers = {"Retry-After": str(retry_after_seconds)} + + def enforce_bearer_rate_limit(token_hash: str) -> None: """Per-token rate limit on /openapi/v1/* bearer-authed routes. Bucket key = ``token:`` so the same token shares one bucket across api replicas (Redis-backed sliding window). """ + # 0 (or less) disables the per-token limit. Short-circuit here: a limiter built with + # max_attempts=0 would otherwise treat every request as already over the limit. + if LIMIT_BEARER_PER_TOKEN.limit <= 0: + return limiter = _build_limiter(LIMIT_BEARER_PER_TOKEN) key = f"token:{token_hash}" if limiter.is_rate_limited(key): retry_after = limiter.seconds_until_available(key) - response = make_response( - jsonify({"error": "rate_limited", "retry_after_ms": retry_after * 1000}), - 429, - ) - response.headers["Retry-After"] = str(retry_after) - raise TooManyRequests(response=response) + raise _BearerRateLimited(retry_after) limiter.increment_rate_limit(key) diff --git a/api/tests/unit_tests/controllers/openapi/test_app_run_rate_limit.py b/api/tests/unit_tests/controllers/openapi/test_app_run_rate_limit.py new file mode 100644 index 00000000000..d9c468d0a4e --- /dev/null +++ b/api/tests/unit_tests/controllers/openapi/test_app_run_rate_limit.py @@ -0,0 +1,29 @@ +"""The openapi run boundary maps internal rate-limit exceptions to canonical 429s. + +Both render through the ErrorBody formatter: TooManyRequests -> code "too_many_requests" +(retryable throttle), InvokeRateLimitError -> code "rate_limit_error" (quota). +""" + +import pytest +from werkzeug.exceptions import TooManyRequests + +from controllers.openapi.app_run import _translate_service_errors +from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError +from core.errors.error import AppInvokeQuotaExceededError +from services.errors.llm import InvokeRateLimitError + + +def test_translate_maps_app_concurrency_to_too_many_requests(): + # Regression guard: this used to fall through to a 500 (it was not caught here). + with pytest.raises(TooManyRequests) as exc: + with _translate_service_errors(): + raise AppInvokeQuotaExceededError("internal: client_id=abc max=10") + assert exc.value.code == 429 + + +def test_translate_maps_workflow_quota_to_rate_limit_error(): + with pytest.raises(InvokeRateLimitHttpError) as exc: + with _translate_service_errors(): + raise InvokeRateLimitError("workflow quota exhausted") + assert exc.value.error_code == "rate_limit_error" + assert exc.value.code == 429 diff --git a/api/tests/unit_tests/controllers/openapi/test_error_contract.py b/api/tests/unit_tests/controllers/openapi/test_error_contract.py index 12293de321d..45a577443b7 100644 --- a/api/tests/unit_tests/controllers/openapi/test_error_contract.py +++ b/api/tests/unit_tests/controllers/openapi/test_error_contract.py @@ -10,6 +10,7 @@ from werkzeug.exceptions import ( Forbidden, InternalServerError, NotFound, + TooManyRequests, Unauthorized, UnprocessableEntity, ) @@ -309,6 +310,7 @@ ERROR_MATRIX = [ (ProviderModelCurrentlyNotSupportError(), 400, "model_currently_not_support"), (CompletionRequestError(), 400, "completion_request_error"), (InvokeRateLimitHttpError(), 429, "rate_limit_error"), + (TooManyRequests("x"), 429, "too_many_requests"), # difyctl's classifyRateLimit keys retryability on this code (FileTooLargeError(), 413, "file_too_large"), (UnsupportedFileTypeError(), 415, "unsupported_file_type"), (NoFileUploadedError(), 400, "no_file_uploaded"), diff --git a/api/tests/unit_tests/libs/test_external_api.py b/api/tests/unit_tests/libs/test_external_api.py index 5135970bcc5..7ebdf5f60eb 100644 --- a/api/tests/unit_tests/libs/test_external_api.py +++ b/api/tests/unit_tests/libs/test_external_api.py @@ -6,6 +6,7 @@ from constants import COOKIE_NAME_ACCESS_TOKEN, COOKIE_NAME_CSRF_TOKEN, COOKIE_N from core.errors.error import AppInvokeQuotaExceededError from libs.exception import BaseHTTPException from libs.external_api import ExternalApi +from libs.rate_limit import _BearerRateLimited def _create_api_app(): @@ -58,6 +59,13 @@ def _create_api_app(): e.description = {"field": "is required"} raise e + # The per-token rate limit raises this; ExternalApi must carry its Retry-After header to the + # wire (it reads getattr(e, "headers"), not werkzeug's get_headers()). + @api.route("/rate-limited") + class RateLimited(Resource): + def get(self): + raise _BearerRateLimited(23) + app.register_blueprint(bp, url_prefix="/api") return app @@ -115,6 +123,16 @@ def test_external_api_param_mapping_and_quota(): assert res.status_code in (400, 429) +def test_external_api_carries_exception_headers_to_429_response(): + # Locks the coupling enforce_bearer_rate_limit relies on: handle_error reads getattr(e, + # "headers") and puts it on the response, so Retry-After reaches the wire. + app = _create_api_app() + res = app.test_client().get("/api/rate-limited") + assert res.status_code == 429 + assert res.headers["Retry-After"] == "23" + assert (res.get_json() or {})["status"] == 429 + + def test_unauthorized_and_force_logout_clears_cookies(): """Test that UnauthorizedAndForceLogout error clears auth cookies""" diff --git a/api/tests/unit_tests/libs/test_rate_limit_bearer.py b/api/tests/unit_tests/libs/test_rate_limit_bearer.py index b204575ccb8..62363f5f600 100644 --- a/api/tests/unit_tests/libs/test_rate_limit_bearer.py +++ b/api/tests/unit_tests/libs/test_rate_limit_bearer.py @@ -11,6 +11,8 @@ from werkzeug.exceptions import TooManyRequests from libs.helper import RateLimiter from libs.rate_limit import ( LIMIT_BEARER_PER_TOKEN, + RateLimit, + RateLimitScope, enforce_bearer_rate_limit, ) @@ -67,8 +69,17 @@ def test_enforce_bearer_rate_limit_raises_429_with_retry_after(mock_build): mock_build.return_value = limiter with pytest.raises(TooManyRequests) as exc: enforce_bearer_rate_limit("hash-1") - headers = dict(exc.value.get_response().headers) - assert headers.get("Retry-After") == "23" - body = exc.value.get_response().get_json() or {} - assert body.get("error") == "rate_limited" - assert body.get("retry_after_ms") == 23000 + # Header-only TooManyRequests: the canonical ErrorBody (code "too_many_requests") is built + # later by the openapi formatter; here we only assert the advisory header rides along. + assert dict(exc.value.headers).get("Retry-After") == "23" + + +@patch("libs.rate_limit._build_limiter") +def test_enforce_bearer_rate_limit_disabled_when_limit_is_zero(mock_build, monkeypatch): + # 0 disables the limit — short-circuit before building/consulting a limiter. + monkeypatch.setattr( + "libs.rate_limit.LIMIT_BEARER_PER_TOKEN", + RateLimit(limit=0, window=timedelta(minutes=1), scopes=(RateLimitScope.TOKEN_ID,)), + ) + enforce_bearer_rate_limit("hash-1") + mock_build.assert_not_called() diff --git a/cli/src/api/app-run.ts b/cli/src/api/app-run.ts index 1cb64057f8e..cbd36de2049 100644 --- a/cli/src/api/app-run.ts +++ b/cli/src/api/app-run.ts @@ -34,6 +34,7 @@ export function buildRunBody(args: RunBodyArgs): Record { export type StreamOptions = { signal?: AbortSignal includeStateSnapshot?: boolean + retryOnRateLimit?: boolean } export class AppRunClient { @@ -59,6 +60,7 @@ export class AppRunClient { headers: { Accept: 'text/event-stream' }, signal: opts.signal, throwOnError: true, + retryOnRateLimit: opts.retryOnRateLimit, }) if (res.body === null) throw new Error('streaming response body missing') diff --git a/cli/src/commands/run/app/_strategies/streaming-structured.ts b/cli/src/commands/run/app/_strategies/streaming-structured.ts index 182aa89d080..c6f02292528 100644 --- a/cli/src/commands/run/app/_strategies/streaming-structured.ts +++ b/cli/src/commands/run/app/_strategies/streaming-structured.ts @@ -56,7 +56,7 @@ export class StreamingStructuredStrategy implements RunStrategy { let resp: Record try { - const events = await ctx.runClient.runStream(opts.appId, body, { signal: ctrl.signal }) + const events = await ctx.runClient.runStream(opts.appId, body, { signal: ctrl.signal, retryOnRateLimit: opts.retryOnRateLimit }) const wrappedEvents = captureTaskId(events, (id) => { taskId = id }) diff --git a/cli/src/commands/run/app/_strategies/streaming-text.ts b/cli/src/commands/run/app/_strategies/streaming-text.ts index 872acc785bc..66b2c2a88ec 100644 --- a/cli/src/commands/run/app/_strategies/streaming-text.ts +++ b/cli/src/commands/run/app/_strategies/streaming-text.ts @@ -28,7 +28,7 @@ export class StreamingTextStrategy implements RunStrategy { handle('SIGINT', cleanup) try { - const events = await ctx.runClient.runStream(opts.appId, body, { signal: ctrl.signal }) + const events = await ctx.runClient.runStream(opts.appId, body, { signal: ctrl.signal, retryOnRateLimit: opts.retryOnRateLimit }) const sp = streamPrinterFor(mode, ctx.think, deps.io.isErrTTY) const dec = new TextDecoder() for await (const ev of events) { diff --git a/cli/src/commands/run/app/index.ts b/cli/src/commands/run/app/index.ts index 16bd30c069f..44ea93c542b 100644 --- a/cli/src/commands/run/app/index.ts +++ b/cli/src/commands/run/app/index.ts @@ -35,6 +35,7 @@ export default class RunApp extends DifyCommand { 'workspace': Flags.string({ description: 'Workspace id (overrides DIFY_WORKSPACE_ID and stored default)' }), 'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive (default: collect and print at end)', default: false }), 'think': Flags.boolean({ description: 'Show model thinking/reasoning when available. Strips ... blocks silently by default; with --think, thinking is printed to stderr.', default: false }), + 'retry-on-limit': Flags.boolean({ description: 'On a 429 rate limit, wait and retry this POST (bounded) instead of failing immediately. Off by default since running an app is not idempotent.', default: false }), 'http-retry': httpRetryFlag, 'output': Flags.outputFormat({ options: [OutputFormat.JSON, OutputFormat.YAML, OutputFormat.TEXT], default: '' }), } @@ -56,6 +57,7 @@ export default class RunApp extends DifyCommand { format, stream: flags.stream, think: flags.think, + retryOnRateLimit: flags['retry-on-limit'], }, { active: ctx.active, http: ctx.http, host: ctx.host, io: ctx.io, cache: ctx.cache }, ) diff --git a/cli/src/commands/run/app/run.ts b/cli/src/commands/run/app/run.ts index ada582b48b3..8eb767c5dbe 100644 --- a/cli/src/commands/run/app/run.ts +++ b/cli/src/commands/run/app/run.ts @@ -27,6 +27,7 @@ export type RunAppOptions = { readonly format?: string readonly stream?: boolean readonly think?: boolean + readonly retryOnRateLimit?: boolean } export type RunAppDeps = { diff --git a/cli/src/errors/codes.test.ts b/cli/src/errors/codes.test.ts index a29697f57a7..eb76b13a22f 100644 --- a/cli/src/errors/codes.test.ts +++ b/cli/src/errors/codes.test.ts @@ -18,6 +18,7 @@ describe('error codes', () => { expect(ExitCode.Usage).toBe(2) expect(ExitCode.Auth).toBe(4) expect(ExitCode.VersionCompat).toBe(6) + expect(ExitCode.RateLimited).toBe(7) }) it('every code maps to an exit', () => { @@ -46,6 +47,7 @@ describe('error codes', () => { [ErrorCode.Server4xxOther, ExitCode.Generic], [ErrorCode.ClientError, ExitCode.Generic], [ErrorCode.Unknown, ExitCode.Generic], + [ErrorCode.RateLimited, ExitCode.RateLimited], ])('exitFor(%s) -> %d', (code, want) => { expect(exitFor(code)).toBe(want) }) diff --git a/cli/src/errors/codes.ts b/cli/src/errors/codes.ts index cc0d09b745e..e2b16cb3619 100644 --- a/cli/src/errors/codes.ts +++ b/cli/src/errors/codes.ts @@ -12,6 +12,7 @@ export const ErrorCode = { ConfigInvalidKey: 'config_invalid_key', ConfigInvalidValue: 'config_invalid_value', NetworkConnection: 'network_connection', + RateLimited: 'rate_limited', Server5xx: 'server_5xx', Server4xxOther: 'server_4xx_other', ClientError: 'client_error', @@ -28,6 +29,8 @@ export const ExitCode = { Usage: 2, Auth: 4, VersionCompat: 6, + // Distinct from Generic so wrappers can tell "rate limited, retry later" from a hard failure. + RateLimited: 7, } as const export type ExitCodeValue = (typeof ExitCode)[keyof typeof ExitCode] @@ -46,6 +49,7 @@ const CODE_TO_EXIT: Readonly> = { config_invalid_key: ExitCode.Usage, config_invalid_value: ExitCode.Usage, network_connection: ExitCode.Generic, + rate_limited: ExitCode.RateLimited, server_5xx: ExitCode.Generic, server_4xx_other: ExitCode.Generic, client_error: ExitCode.Generic, diff --git a/cli/src/http/client.test.ts b/cli/src/http/client.test.ts index fbde1ecdcab..ae4448843a4 100644 --- a/cli/src/http/client.test.ts +++ b/cli/src/http/client.test.ts @@ -192,7 +192,7 @@ describe('http client', () => { expect(caught.code).toBe(ErrorCode.Server4xxOther) }) - it('handles 429 via retry status code list', async () => { + it('surfaces a 429 as a rate-limit error (dedicated exit code), no retry when budget is 0', async () => { mock.setScenario('rate-limited') const client = createHttpClient({ baseURL: base(mock.url), @@ -206,8 +206,246 @@ describe('http client', () => { } catch (err) { caught = err } expect(isHttpClientError(caught)).toBe(true) - if (isHttpClientError(caught)) + if (isHttpClientError(caught)) { expect(caught.httpStatus).toBe(429) + expect(caught.code).toBe(ErrorCode.RateLimited) + expect(caught.exit()).toBe(7) + expect(caught.serverError?.code).toBe('too_many_requests') + } + }) + + it('retries an idempotent GET on a throttle 429, then succeeds', async () => { + let calls = 0 + const stub = await startStub((_req, res) => { + calls++ + if (calls === 1) { + res.writeHead(429, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 })) + return + } + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ workspaces: [] })) + }) + const events: { phase: string, status?: number, delayMs?: number }[] = [] + try { + const client = createHttpClient({ + baseURL: base(stub.url), + bearer: 'dfoa_test', + retryAttempts: 3, + timeoutMs: 5_000, + logger: e => events.push({ phase: e.phase, status: e.status, delayMs: e.delayMs }), + }) + const body = await client.get<{ workspaces: unknown[] }>('workspaces') + expect(body.workspaces).toEqual([]) + } + finally { + await stub.stop() + } + expect(calls).toBe(2) + const retry = events.find(e => e.phase === 'retry') + expect(retry?.status).toBe(429) + expect(retry?.delayMs).toBeGreaterThan(0) + }) + + it('does not retry a quota 429 (rate_limit_error) — surfaces immediately', async () => { + let requests = 0 + const stub = await startStub((_req, res) => { + res.writeHead(429, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ code: 'rate_limit_error', message: 'quota exhausted', status: 429 })) + }) + let caught: unknown + try { + const client = createHttpClient({ + baseURL: base(stub.url), + bearer: 'dfoa_test', + retryAttempts: 3, + timeoutMs: 5_000, + logger: (e) => { + if (e.phase === 'request') + requests++ + }, + }) + try { + await client.get('workspaces') + } + catch (err) { caught = err } + } + finally { + await stub.stop() + } + expect(requests).toBe(1) + expect(isHttpClientError(caught)).toBe(true) + if (isHttpClientError(caught)) { + expect(caught.code).toBe(ErrorCode.RateLimited) + expect(caught.serverError?.code).toBe('rate_limit_error') + } + }) + + it('does not retry a POST 429 by default; retries with retry-on-limit', async () => { + let postDefault = 0 + const stubDefault = await startStub((_req, res) => { + postDefault++ + res.writeHead(429, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 })) + }) + try { + const client = createHttpClient({ baseURL: base(stubDefault.url), bearer: 'dfoa_test', retryAttempts: 3, timeoutMs: 5_000 }) + await expect(client.post('apps/app-1/run', { json: { inputs: {} } })).rejects.toBeDefined() + } + finally { + await stubDefault.stop() + } + expect(postDefault).toBe(1) + + let calls = 0 + const stubOptIn = await startStub((_req, res) => { + calls++ + if (calls === 1) { + res.writeHead(429, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 })) + return + } + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ ok: true })) + }) + try { + const client = createHttpClient({ baseURL: base(stubOptIn.url), bearer: 'dfoa_test', retryAttempts: 3, timeoutMs: 5_000 }) + const body = await client.post<{ ok: boolean }>('apps/app-1/run', { json: { inputs: {} }, retryOnRateLimit: true }) + expect(body.ok).toBe(true) + } + finally { + await stubOptIn.stop() + } + expect(calls).toBe(2) + }) + + it('still never retries a POST 5xx even with retry-on-limit (idempotency guard)', async () => { + let requests = 0 + const stub = await startStub((_req, res) => { + requests++ + res.writeHead(503, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ code: 'internal_server_error', message: 'boom', status: 503 })) + }) + try { + const client = createHttpClient({ baseURL: base(stub.url), bearer: 'dfoa_test', retryAttempts: 3, timeoutMs: 5_000 }) + await expect(client.post('apps/app-1/run', { json: { inputs: {} }, retryOnRateLimit: true })).rejects.toBeDefined() + } + finally { + await stub.stop() + } + expect(requests).toBe(1) + }) + + it('surfaces a RateLimited error after exhausting 429 retries on GET', async () => { + let requests = 0 + let retries = 0 + const stub = await startStub((_req, res) => { + res.writeHead(429, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 })) + }) + let caught: unknown + try { + const client = createHttpClient({ + baseURL: base(stub.url), + bearer: 'dfoa_test', + retryAttempts: 2, + timeoutMs: 5_000, + logger: (e) => { + if (e.phase === 'request') + requests++ + else if (e.phase === 'retry') + retries++ + }, + }) + try { + await client.get('workspaces') + } + catch (err) { caught = err } + } + finally { + await stub.stop() + } + expect(requests).toBe(3) + expect(retries).toBe(2) + expect(isHttpClientError(caught)).toBe(true) + if (isHttpClientError(caught)) + expect(caught.code).toBe(ErrorCode.RateLimited) + }) + + it('surfaces a throttle 429 whose Retry-After exceeds the honored cap (no retry)', async () => { + let requests = 0 + const stub = await startStub((_req, res) => { + // 120s advised wait > MAX_HONORED_WAIT_MS (60s) — surface rather than park for minutes. + res.writeHead(429, { 'content-type': 'application/json', 'retry-after': '120' }) + res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 })) + }) + let caught: unknown + try { + const client = createHttpClient({ + baseURL: base(stub.url), + bearer: 'dfoa_test', + retryAttempts: 3, + timeoutMs: 5_000, + logger: (e) => { + if (e.phase === 'request') + requests++ + }, + }) + try { + await client.get('workspaces') + } + catch (err) { caught = err } + } + finally { + await stub.stop() + } + expect(requests).toBe(1) + expect(isHttpClientError(caught)).toBe(true) + if (isHttpClientError(caught)) + expect(caught.code).toBe(ErrorCode.RateLimited) + }) + + it('stream GET surfaces a 429 (returns the Response, no throw, no retry)', async () => { + let calls = 0 + const stub = await startStub((_req, res) => { + calls++ + res.writeHead(429, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 })) + }) + try { + const client = createHttpClient({ baseURL: base(stub.url), bearer: 'dfoa_test', timeoutMs: 5_000 }) + const res = await client.stream('workspaces') + expect(res.status).toBe(429) + await res.body?.cancel() + } + finally { + await stub.stop() + } + expect(calls).toBe(1) + }) + + it('stream POST retries a throttle 429 when retry-on-limit is set', async () => { + let calls = 0 + const stub = await startStub((_req, res) => { + calls++ + if (calls === 1) { + res.writeHead(429, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ code: 'too_many_requests', message: 'slow down', status: 429 })) + return + } + res.writeHead(200, { 'content-type': 'text/event-stream' }) + res.end('data: {}\n\n') + }) + try { + const client = createHttpClient({ baseURL: base(stub.url), bearer: 'dfoa_test', timeoutMs: 5_000 }) + const res = await client.stream('apps/app-1/run', { method: 'POST', json: {}, retryOnRateLimit: true }) + expect(res.status).toBe(200) + await res.body?.cancel() + } + finally { + await stub.stop() + } + expect(calls).toBe(2) }) it('does not retry POST on 503', async () => { diff --git a/cli/src/http/client.ts b/cli/src/http/client.ts index f099b012259..940aee9152b 100644 --- a/cli/src/http/client.ts +++ b/cli/src/http/client.ts @@ -15,7 +15,8 @@ import { buildBody } from './body.js' import { classifyResponse } from './error-mapper.js' import { classifyTransport, logRequest, logResponse, setBearer, setUserAgent } from './hooks.js' import { proxyDispatcher } from './proxy.js' -import { backoffDelay, shouldRetry } from './retry.js' +import { classifyRateLimit, MAX_HONORED_WAIT_MS, RATE_LIMIT_MAX_ATTEMPTS, rateLimitDelayMs } from './rate-limit.js' +import { backoffDelay, isIdempotentRetryMethod, shouldRetry } from './retry.js' import { redactBearer } from './sanitize.js' import { appendSearchParams, joinURL } from './url.js' @@ -133,6 +134,7 @@ function buildRequest(state: ClientState, path: string, opts: RequestOptions, th timeoutMs: effectiveTimeoutMs, retryAttempts: effectiveRetryAttempts, throwOnError, + retryOnRateLimit: opts.retryOnRateLimit ?? false, } return { request, resolved, effectiveTimeoutMs, userSignal: opts.signal } } @@ -204,6 +206,37 @@ async function execute( const res = ctx.response if (!res.ok) { + // 429 has its own policy. The server self-describes via the ErrorBody `code`: a + // "too_many_requests" throttle waits-and-retries on idempotent methods (or opted-in POSTs) + // honoring Retry-After; quota / unrecognized 429s surface immediately rather than burning + // retries. Surfacing reuses the shared classifyResponse so the body parses to ErrorBody. + if (res.status === 429) { + const decision = await classifyRateLimit(res.clone()) + const canRetry + = decision.retryable + && attempt < effectiveRetryAttempts + && (decision.retryAfterMs === undefined || decision.retryAfterMs <= MAX_HONORED_WAIT_MS) + && (isIdempotentRetryMethod(method) || (method === 'POST' && resolved.retryOnRateLimit)) + if (canRetry) { + const delay = rateLimitDelayMs(decision, attempt + 1) + state.logger?.({ phase: 'retry', method, url: redactBearer(ctx.request.url), status: 429, attempt: attempt + 1, delayMs: delay }) + await res.body?.cancel().catch(() => {}) + if (delay > 0) + await new Promise(resolve => setTimeout(resolve, delay)) + continue + } + + ctx.error = await classifyResponse(ctx.request, res) + await runHooks(state.hooks.onResponseError, ctx) + if (throwOnError) { + const finalErr = ctx.error + if (finalErr instanceof Error && typeof Error.captureStackTrace === 'function') + Error.captureStackTrace(finalErr, execute) + throw finalErr + } + return res + } + if (attempt < effectiveRetryAttempts && shouldRetry(res, ctx)) { state.logger?.({ phase: 'retry', method, url: redactBearer(ctx.request.url), attempt: attempt + 1 }) // Drain the discarded error body so undici can release the socket back to its @@ -259,10 +292,18 @@ export function createHttpClient(opts: ClientOptions): HttpClient { const streamFetch = (path: string, callOpts?: RequestOptions): Promise => { // SSE bodies must not be aborted by a request-level timeout — `0` is the buildRequest // sentinel for "no timeout" and also overrides the client default. + // + // A stream normally never retries (a mid-stream replay would double-send). When the caller + // opts into 429 retry, allow a bounded budget: the 429 admission rejection arrives as a plain + // body before the stream opens, and execute()'s 429 branch is the only path that fires for a + // POST — shouldRetry still rejects POST for transport / 5xx, so nothing else replays. + const retryAttempts = callOpts?.retryOnRateLimit === true + ? (callOpts.retryAttempts ?? RATE_LIMIT_MAX_ATTEMPTS) + : 0 const finalOpts: RequestOptions = { ...callOpts, method: callOpts?.method ?? 'GET', - retryAttempts: 0, + retryAttempts, timeoutMs: 0, } const built = buildRequest(state, path, finalOpts, false) @@ -284,6 +325,7 @@ export function createHttpClient(opts: ClientOptions): HttpClient { timeoutMs: state.defaultTimeoutMs, retryAttempts: state.defaultRetryAttempts, throwOnError: false, + retryOnRateLimit: false, } const userSignal = init?.signal ?? req.signal return execute(state, req, resolved, state.defaultTimeoutMs, userSignal) diff --git a/cli/src/http/error-mapper.test.ts b/cli/src/http/error-mapper.test.ts index ab0397cce73..0a487723352 100644 --- a/cli/src/http/error-mapper.test.ts +++ b/cli/src/http/error-mapper.test.ts @@ -54,6 +54,22 @@ describe('classifyResponse — canonical ErrorBody', () => { expect(err.code).toBe(ErrorCode.Server4xxOther) expect(err.serverError?.code).toBe('some_future_code') }) + + it('429 classifies as RateLimited (dedicated exit code) and keeps the server code', async () => { + const err = await classified(429, { code: 'too_many_requests', message: 'slow down', status: 429 }) + + expect(err.code).toBe(ErrorCode.RateLimited) + expect(err.exit()).toBe(7) + expect(err.serverError?.code).toBe('too_many_requests') + }) + + it('429 with no parseable ErrorBody falls back to a generic rate-limit message', async () => { + const err = await classified(429, 'not json') + + expect(err.code).toBe(ErrorCode.RateLimited) + expect(err.serverError).toBeUndefined() + expect(err.message).toBe('too many requests') + }) }) describe('classifyResponse — non-conforming bodies (no fallback by design)', () => { diff --git a/cli/src/http/error-mapper.ts b/cli/src/http/error-mapper.ts index f2b3b1bf605..aca1a7e6184 100644 --- a/cli/src/http/error-mapper.ts +++ b/cli/src/http/error-mapper.ts @@ -36,9 +36,19 @@ const SERVER_4XX_CLASS: StatusClass = { includeRaw: true, } +// 429 gets a dedicated CLI code (its own exit code) so wrappers can tell a rate limit from a hard +// failure. The serverError.code ("too_many_requests" / "rate_limit_error") still rides along. +const RATE_LIMITED_CLASS: StatusClass = { + code: ErrorCode.RateLimited, + fallbackMessage: () => 'too many requests', + includeRaw: false, +} + function statusClass(status: number): StatusClass { if (status === 401) return AUTH_EXPIRED_CLASS + if (status === 429) + return RATE_LIMITED_CLASS if (status >= 500) return SERVER_5XX_CLASS return SERVER_4XX_CLASS diff --git a/cli/src/http/rate-limit.test.ts b/cli/src/http/rate-limit.test.ts new file mode 100644 index 00000000000..5b81bb679ae --- /dev/null +++ b/cli/src/http/rate-limit.test.ts @@ -0,0 +1,91 @@ +import { describe, expect, it } from 'vitest' +import { + classifyRateLimit, + MAX_HONORED_WAIT_MS, + parseRetryAfterMs, + RATE_LIMIT_MAX_ATTEMPTS, + rateLimitDelayMs, +} from './rate-limit.js' + +function res429(body: unknown, headers?: Record): Response { + return new Response(typeof body === 'string' ? body : JSON.stringify(body), { + status: 429, + headers: { 'content-type': 'application/json', ...headers }, + }) +} + +function headers(init?: Record): Headers { + return new Headers(init) +} + +describe('classifyRateLimit', () => { + it('throttle (code too_many_requests) is retryable and reads the Retry-After header', async () => { + const d = await classifyRateLimit(res429({ code: 'too_many_requests', status: 429 }, { 'retry-after': '2' })) + expect(d).toEqual({ retryable: true, retryAfterMs: 2000 }) + }) + + it('throttle without Retry-After is retryable with no advised wait', async () => { + const d = await classifyRateLimit(res429({ code: 'too_many_requests', status: 429 })) + expect(d).toEqual({ retryable: true, retryAfterMs: undefined }) + }) + + it('quota (code rate_limit_error) is not retryable', async () => { + const d = await classifyRateLimit(res429({ code: 'rate_limit_error', status: 429 }, { 'retry-after': '5' })) + expect(d.retryable).toBe(false) + expect(d.retryAfterMs).toBeUndefined() + }) + + it.each([ + ['unknown code', { code: 'mystery' }], + ['no code', { message: 'nope' }], + ['non-JSON body', 'not json'], + ])('unrecognized 429 (%s) is conservatively non-retryable', async (_label, body) => { + const d = await classifyRateLimit(res429(body)) + expect(d.retryable).toBe(false) + }) + + it('reads the body off a clone (response stays consumable)', async () => { + const r = res429({ code: 'too_many_requests', status: 429 }) + await classifyRateLimit(r) + await expect(r.text()).resolves.toContain('too_many_requests') + }) +}) + +describe('parseRetryAfterMs', () => { + it('reads integer-seconds Retry-After as ms', () => { + expect(parseRetryAfterMs(headers({ 'retry-after': '3' }))).toBe(3000) + }) + + it('reads an HTTP-date relative to the injected now, clamped at 0', () => { + const now = Date.parse('2026-06-11T00:00:00Z') + expect(parseRetryAfterMs(headers({ 'retry-after': 'Thu, 11 Jun 2026 00:00:05 GMT' }), now)).toBe(5000) + expect(parseRetryAfterMs(headers({ 'retry-after': 'Thu, 11 Jun 2026 00:00:00 GMT' }), now + 10_000)).toBe(0) + }) + + it('returns undefined when absent or unparseable', () => { + expect(parseRetryAfterMs(headers())).toBeUndefined() + expect(parseRetryAfterMs(headers({ 'retry-after': 'soon' }))).toBeUndefined() + }) +}) + +describe('rateLimitDelayMs', () => { + it('returns the advised wait as-is (the caller already declined over-cap waits)', () => { + expect(rateLimitDelayMs({ retryAfterMs: 800 }, 1)).toBe(800) + expect(rateLimitDelayMs({ retryAfterMs: 5000 }, 1)).toBe(5000) + }) + + it('falls back to equal-jitter backoff when no wait is advised (rng pinned)', () => { + // attempt 1 => backoffDelay 300; equal jitter => [150, 300]. + expect(rateLimitDelayMs({}, 1, { rng: () => 0 })).toBe(150) + expect(rateLimitDelayMs({}, 1, { rng: () => 1 })).toBe(300) + }) + + it('returns 0 when there is neither an advised wait nor any backoff (attempt 0)', () => { + expect(rateLimitDelayMs({}, 0, { rng: () => 0.5 })).toBe(0) + }) + + it('exposes sane retry constants', () => { + expect(MAX_HONORED_WAIT_MS).toBe(60_000) + expect(RATE_LIMIT_MAX_ATTEMPTS).toBe(3) + }) +}) diff --git a/cli/src/http/rate-limit.ts b/cli/src/http/rate-limit.ts new file mode 100644 index 00000000000..15115180e69 --- /dev/null +++ b/cli/src/http/rate-limit.ts @@ -0,0 +1,90 @@ +import { backoffDelay } from './retry.js' + +// Stateless handling for the server's 429s: react when one arrives, never predict or store limits. +// The server is self-describing via the unified ErrorBody `code`: +// "too_many_requests" → throttle, waiting helps (retryable) +// "rate_limit_error" → quota, waiting within the window does not (not retryable) +// anything else / unparsable → conservative: not retryable. + +export type RateLimitDecision = { + readonly retryable: boolean + // The advised wait, from the Retry-After header (only meaningful for a retryable throttle). + readonly retryAfterMs?: number +} + +// The longest server-advised wait we'll honor by retrying. If Retry-After is larger, the 429 +// branch surfaces immediately instead of parking the process for minutes (better to let the +// caller decide than to sleep through several capped retries that will just 429 again). +export const MAX_HONORED_WAIT_MS = 60_000 + +export const RATE_LIMIT_MAX_ATTEMPTS = 3 + +function bodyCode(raw: string): string | undefined { + try { + const parsed = JSON.parse(raw) as unknown + if (typeof parsed === 'object' && parsed !== null) { + const code = (parsed as Record).code + return typeof code === 'string' ? code : undefined + } + } + catch { + // not JSON + } + return undefined +} + +// Read a 429 response into a retry decision. Reads the ErrorBody `code` for retryability and +// the Retry-After header for the wait; both off a clone so the body stays consumable downstream. +export async function classifyRateLimit(response: Response): Promise { + let raw = '' + try { + raw = await response.clone().text() + } + catch { + // ignore read errors; raw stays '' + } + const retryable = bodyCode(raw) === 'too_many_requests' + return { retryable, retryAfterMs: retryable ? parseRetryAfterMs(response.headers) : undefined } +} + +// Parse the Retry-After header to ms: integer seconds, or an HTTP-date relative to `now` +// (injectable for deterministic tests). The unified ErrorBody carries no wait field of its own. +export function parseRetryAfterMs(headers: Headers, now: number = Date.now()): number | undefined { + const header = headers.get('retry-after') + if (header === null) { + return undefined + } + const trimmed = header.trim() + if (/^\d+$/.test(trimmed)) { + return Number(trimmed) * 1000 + } + const dateMs = Date.parse(trimmed) + if (!Number.isNaN(dateMs)) { + return Math.max(0, dateMs - now) + } + return undefined +} + +// Equal-jitter backoff around the exponential base: half fixed + half random. Avoids both the +// thundering-herd of a fixed delay and the near-zero spikes of full jitter. +function jitter(baseMs: number, rng: () => number): number { + if (baseMs <= 0) { + return 0 + } + const half = baseMs / 2 + return Math.round(half + rng() * half) +} + +// How long to wait before the next 429 retry: a known server wait takes precedence (the caller +// has already declined to retry waits beyond MAX_HONORED_WAIT_MS), otherwise jittered exponential +// backoff for sources that advise none (e.g. app concurrency). +export function rateLimitDelayMs( + decision: Pick, + attempt: number, + opts: { rng?: () => number } = {}, +): number { + if (decision.retryAfterMs !== undefined) { + return decision.retryAfterMs + } + return jitter(backoffDelay(attempt), opts.rng ?? Math.random) +} diff --git a/cli/src/http/retry.test.ts b/cli/src/http/retry.test.ts index 83e4d4c9965..25d646facf5 100644 --- a/cli/src/http/retry.test.ts +++ b/cli/src/http/retry.test.ts @@ -1,6 +1,6 @@ import type { FetchContext, HttpMethod, ResolvedOptions } from './types.js' import { describe, expect, it } from 'vitest' -import { backoffDelay, shouldRetry } from './retry.js' +import { backoffDelay, isIdempotentRetryMethod, shouldRetry } from './retry.js' function ctxFor(method: HttpMethod): FetchContext { const options: ResolvedOptions = { @@ -10,6 +10,7 @@ function ctxFor(method: HttpMethod): FetchContext { timeoutMs: undefined, retryAttempts: 0, throwOnError: true, + retryOnRateLimit: false, } return { request: new Request('https://x/y', { method }), @@ -30,6 +31,11 @@ describe('shouldRetry', () => { expect(shouldRetry(res, ctxFor('GET'))).toBe(false) }) + it('no longer retries 429 here (it has a dedicated branch in execute())', () => { + const res = new Response(null, { status: 429 }) + expect(shouldRetry(res, ctxFor('GET'))).toBe(false) + }) + it('does not retry POST regardless of status', () => { const res = new Response(null, { status: 503 }) expect(shouldRetry(res, ctxFor('POST'))).toBe(false) @@ -50,6 +56,16 @@ describe('shouldRetry', () => { }) }) +describe('isIdempotentRetryMethod', () => { + it('is true for GET/PUT/DELETE and false for POST/PATCH', () => { + expect(isIdempotentRetryMethod('GET')).toBe(true) + expect(isIdempotentRetryMethod('PUT')).toBe(true) + expect(isIdempotentRetryMethod('DELETE')).toBe(true) + expect(isIdempotentRetryMethod('POST')).toBe(false) + expect(isIdempotentRetryMethod('PATCH')).toBe(false) + }) +}) + describe('backoffDelay', () => { it('returns 0 for attempts <= 0', () => { expect(backoffDelay(0)).toBe(0) diff --git a/cli/src/http/retry.ts b/cli/src/http/retry.ts index 456bf321669..6e663ba74ef 100644 --- a/cli/src/http/retry.ts +++ b/cli/src/http/retry.ts @@ -1,11 +1,19 @@ import type { FetchContext } from './types.js' export const RETRY_METHODS = ['GET', 'PUT', 'DELETE'] as const -export const RETRY_STATUS_CODES = [408, 413, 429, 500, 502, 503, 504] as const +// 429 is intentionally absent — it has a dedicated branch in execute(). shouldRetry covers +// transport errors / 408 / 413 / 5xx only. +export const RETRY_STATUS_CODES = [408, 413, 500, 502, 503, 504] as const const RETRY_METHODS_SET: ReadonlySet = new Set(RETRY_METHODS) const RETRY_STATUS_SET: ReadonlySet = new Set(RETRY_STATUS_CODES) +// GET/PUT/DELETE are idempotent — safe to auto-retry. The 429 branch reuses this to decide which +// methods may wait-and-retry a throttle without risking a double-run. +export function isIdempotentRetryMethod(method: string): boolean { + return RETRY_METHODS_SET.has(method) +} + export function shouldRetry(target: Response | unknown, ctx: FetchContext): boolean { if (!RETRY_METHODS_SET.has(ctx.options.method)) return false diff --git a/cli/src/http/types.ts b/cli/src/http/types.ts index e06bbfc9fa6..d209e97460c 100644 --- a/cli/src/http/types.ts +++ b/cli/src/http/types.ts @@ -7,6 +7,8 @@ export type HttpLogEvent = { readonly status?: number readonly attempt?: number readonly durationMs?: number + // Set on a 429 retry decision so --verbose can explain how long we waited. + readonly delayMs?: number } export type HttpLogger = (event: HttpLogEvent) => void @@ -51,6 +53,8 @@ export type RequestOptions = { readonly retryAttempts?: number readonly signal?: AbortSignal readonly throwOnError?: boolean + // Opt a non-idempotent POST into bounded wait-and-retry on a 429 throttle. + readonly retryOnRateLimit?: boolean } export type ResolvedOptions = { @@ -60,6 +64,7 @@ export type ResolvedOptions = { readonly timeoutMs: number | undefined readonly retryAttempts: number readonly throwOnError: boolean + readonly retryOnRateLimit: boolean } export type ClientOptions = { diff --git a/cli/test/fixtures/dify-mock/server.ts b/cli/test/fixtures/dify-mock/server.ts index afc135e5327..96edc96f9ba 100644 --- a/cli/test/fixtures/dify-mock/server.ts +++ b/cli/test/fixtures/dify-mock/server.ts @@ -150,8 +150,9 @@ export function buildApp(getScenario: () => Scenario, state?: MockState): Hono { app.use('*', async (c, next) => { const scenario = getScenario() if (scenario === 'rate-limited') { + // Unified ErrorBody — per-token throttle (retryable); Retry-After advises the wait. return c.json( - { error: { code: 'rate_limited', message: 'too many requests' } }, + { code: 'too_many_requests', message: 'Too many requests for this API token.', status: 429 }, { status: 429, headers: { 'retry-after': '1' } }, ) }