diff --git a/api/core/virtual_environment/channel/queue_transport.py b/api/core/virtual_environment/channel/queue_transport.py index 789492e644..7cf524316a 100644 --- a/api/core/virtual_environment/channel/queue_transport.py +++ b/api/core/virtual_environment/channel/queue_transport.py @@ -1,5 +1,3 @@ -from queue import Queue - from core.virtual_environment.channel.exec import TransportEOFError from core.virtual_environment.channel.transport import TransportReadCloser @@ -22,11 +20,15 @@ class QueueTransportReadCloser(TransportReadCloser): q_transport.close() """ + _QUEUE_GET_TIMEOUT = 5.0 + class WriteHandler: """ A write handler that writes data to a queue. """ + from queue import Queue + def __init__(self, queue: Queue[bytes | None]) -> None: self.queue = queue @@ -39,6 +41,8 @@ class QueueTransportReadCloser(TransportReadCloser): """ Initialize the QueueTransportReadCloser with write function. """ + from queue import Queue + self.q = Queue[bytes | None]() self._read_buffer = bytearray() self._closed = False @@ -66,6 +70,8 @@ class QueueTransportReadCloser(TransportReadCloser): NEVER USE IT IN A MULTI-THREADED CONTEXT WITHOUT PROPER SYNCHRONIZATION. """ + from queue import Empty + if n <= 0: return b"" @@ -79,7 +85,12 @@ class QueueTransportReadCloser(TransportReadCloser): round = 0 while len(to_return) < n and not self._closed and (self.q.qsize() > 0 or round == 0): - chunk = self.q.get() + try: + chunk = self.q.get(timeout=self._QUEUE_GET_TIMEOUT) + except Empty: + if self._closed: + raise TransportEOFError("Transport is closed") + continue if chunk is None: self._closed = True if len(to_return) == 0: