This commit is contained in:
EndlessLucky 2026-04-07 09:11:47 -04:00
parent 117ff3d89f
commit ef2b0e1c69

View File

@ -1,4 +1,5 @@
import logging
import time
from collections.abc import Mapping
from threading import Lock
from typing import Any
@ -18,6 +19,9 @@ from core.helper.http_client_pooling import get_pooled_http_client
logger = logging.getLogger(__name__)
code_execution_endpoint_url = URL(str(dify_config.CODE_EXECUTION_ENDPOINT))
CODE_EXECUTION_SSL_VERIFY = dify_config.CODE_EXECUTION_SSL_VERIFY
_TRANSIENT_STATUS_CODES = frozenset({502, 503})
_MAX_TRANSIENT_RETRIES = 3
_BASE_RETRY_DELAY_SECONDS = 0.2
_CODE_EXECUTOR_CLIENT_LIMITS = httpx.Limits(
max_connections=dify_config.CODE_EXECUTION_POOL_MAX_CONNECTIONS,
max_keepalive_connections=dify_config.CODE_EXECUTION_POOL_MAX_KEEPALIVE_CONNECTIONS,
@ -94,28 +98,50 @@ class CodeExecutor:
client = get_pooled_http_client(_CODE_EXECUTOR_CLIENT_KEY, _build_code_executor_client)
try:
response = client.post(
str(url),
json=data,
headers=headers,
timeout=timeout,
)
if response.status_code == 503:
raise CodeExecutionError("Code execution service is unavailable")
elif response.status_code != 200:
raise Exception(
f"Failed to execute code, got status code {response.status_code},"
f" please check if the sandbox service is running"
response: httpx.Response | None = None
last_error: Exception | None = None
for attempt in range(_MAX_TRANSIENT_RETRIES + 1):
try:
response = client.post(
str(url),
json=data,
headers=headers,
timeout=timeout,
)
except CodeExecutionError as e:
raise e
except Exception as e:
raise CodeExecutionError(
"Failed to execute code, which is likely a network issue,"
" please check if the sandbox service is running."
f" ( Error: {str(e)} )"
)
if response.status_code in _TRANSIENT_STATUS_CODES:
if attempt < _MAX_TRANSIENT_RETRIES:
time.sleep(_BASE_RETRY_DELAY_SECONDS * (2**attempt))
continue
if response.status_code == 503:
raise CodeExecutionError("Code execution service is unavailable")
raise Exception(
f"Failed to execute code, got status code {response.status_code},"
f" please check if the sandbox service is running"
)
if response.status_code != 200:
raise Exception(
f"Failed to execute code, got status code {response.status_code},"
f" please check if the sandbox service is running"
)
break
except CodeExecutionError:
raise
except Exception as e:
last_error = e
is_transport_error = isinstance(e, httpx.TransportError)
if is_transport_error and attempt < _MAX_TRANSIENT_RETRIES:
time.sleep(_BASE_RETRY_DELAY_SECONDS * (2**attempt))
continue
raise CodeExecutionError(
"Failed to execute code, which is likely a network issue,"
" please check if the sandbox service is running."
f" ( Error: {str(last_error)} )"
)
if response is None:
raise CodeExecutionError("Failed to execute code, no response received from sandbox service")
try:
response_data = response.json()