diff --git a/api/core/virtual_environment/providers/docker_daemon_sandbox.py b/api/core/virtual_environment/providers/docker_daemon_sandbox.py index fd390649ed..14020064ce 100644 --- a/api/core/virtual_environment/providers/docker_daemon_sandbox.py +++ b/api/core/virtual_environment/providers/docker_daemon_sandbox.py @@ -7,7 +7,7 @@ from enum import IntEnum, StrEnum from functools import lru_cache from io import BytesIO from pathlib import PurePosixPath -from queue import Queue +from queue import Empty, Queue from typing import Any, cast from uuid import uuid4 @@ -65,9 +65,15 @@ class DockerDemuxer: to thread-safe queues. This avoids race conditions where multiple threads calling _read_next_frame() simultaneously caused frame header/body corruption, resulting in incomplete stdout/stderr output. + + TIMEOUT HANDLING: + Queue.get() uses a timeout to prevent indefinite blocking when the socket is + closed unexpectedly (e.g., container removed). This allows periodic checks for + error conditions and closed state. """ _HEADER_SIZE = 8 + _QUEUE_GET_TIMEOUT = 5.0 # seconds def __init__(self, sock: socket.SocketIO): self._sock = sock @@ -132,15 +138,31 @@ class DockerDemuxer: return self._read_from_queue(self._stderr_queue) def _read_from_queue(self, queue: Queue[bytes | None]) -> bytes: + """ + Read from queue with timeout to prevent indefinite blocking. + + When the Docker container is removed or the socket is closed unexpectedly, + the demux thread may be stuck in socket.read(). Using a timeout allows us + to periodically check for errors and closed state instead of blocking forever. + """ if self._error: raise TransportEOFError(f"Demuxer error: {self._error}") from self._error - chunk = queue.get() - if chunk is None: - if self._error: - raise TransportEOFError(f"Demuxer error: {str(self._error)}") - raise TransportEOFError("End of demuxed stream") - return chunk + while True: + try: + chunk = queue.get(timeout=self._QUEUE_GET_TIMEOUT) + if chunk is None: + if self._error: + raise TransportEOFError(f"Demuxer error: {str(self._error)}") + raise TransportEOFError("End of demuxed stream") + return chunk + except Empty: + # Timeout - check if we should continue waiting + if self._closed: + raise TransportEOFError("Demuxer closed") + if self._error: + raise TransportEOFError(f"Demuxer error: {self._error}") from self._error + # No error, continue waiting def close(self) -> None: if not self._closed: