diff --git a/api/controllers/console/app/generator.py b/api/controllers/console/app/generator.py index d14dd52e4e..63870f8038 100644 --- a/api/controllers/console/app/generator.py +++ b/api/controllers/console/app/generator.py @@ -70,7 +70,9 @@ class ContextGeneratePayload(BaseModel): model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration") available_vars: list[AvailableVarPayload] = Field(..., description="Available variables from upstream nodes") parameter_info: ParameterInfoPayload = Field(..., description="Target parameter metadata from the frontend") - code_context: CodeContextPayload = Field(description="Existing code node context for incremental generation") + code_context: CodeContextPayload = Field( + description="Existing code node context for incremental generation" + ) class SuggestedQuestionsPayload(BaseModel): diff --git a/api/core/sandbox/builder.py b/api/core/sandbox/builder.py index 41b0150463..d035c4861b 100644 --- a/api/core/sandbox/builder.py +++ b/api/core/sandbox/builder.py @@ -1,25 +1,10 @@ -""" -Sandbox Builder: Factory for creating and initializing sandboxes. - -This module uses gevent.spawn instead of threading.Thread to ensure proper -cooperative scheduling in gevent-based WSGI servers like Gunicorn. - -Using native threading.Thread in a gevent environment can cause issues because: -1. Native threads hold the GIL during blocking I/O -2. gevent's monkey-patching doesn't affect code running in native threads -3. Blocking operations in native threads prevent greenlet switching - -By using gevent.spawn(), background initialization runs as a greenlet that -cooperatively yields during I/O operations. -""" - from __future__ import annotations import logging +import threading from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any -import gevent from flask import current_app from core.entities.provider_entities import BasicProviderConfig @@ -160,8 +145,7 @@ class SandboxBuilder: sandbox.mark_failed(exc) # Background init completes or signals failure via sandbox state. - # Use gevent.spawn instead of threading.Thread for cooperative scheduling - gevent.spawn(initialize) + threading.Thread(target=initialize, daemon=True).start() return sandbox @staticmethod diff --git a/api/core/sandbox/sandbox.py b/api/core/sandbox/sandbox.py index e654d4f3bc..24c93e4741 100644 --- a/api/core/sandbox/sandbox.py +++ b/api/core/sandbox/sandbox.py @@ -1,24 +1,9 @@ -""" -Sandbox: A managed virtual environment instance. - -This module uses gevent.event.Event instead of threading.Event to ensure -proper cooperative scheduling in gevent-based WSGI servers like Gunicorn. - -Using native threading.Event in a gevent environment can cause issues because: -1. threading.Event.wait() blocks the entire thread, not just the greenlet -2. This prevents other greenlets from running while waiting -3. Can lead to apparent "freezes" when multiple greenlets wait on events - -By using gevent.event.Event, wait() calls cooperatively yield to other greenlets. -""" - from __future__ import annotations import logging +import threading from typing import TYPE_CHECKING -from gevent.event import Event - from libs.attr_map import AttrMap if TYPE_CHECKING: @@ -46,8 +31,8 @@ class Sandbox: self._app_id = app_id self._assets_id = assets_id self._attributes = AttrMap() - self._ready_event: Event = Event() # gevent Event for cooperative waiting - self._cancel_event: Event = Event() # gevent Event for cooperative waiting + self._ready_event = threading.Event() + self._cancel_event = threading.Event() self._init_error: Exception | None = None @property diff --git a/api/core/virtual_environment/__base/command_future.py b/api/core/virtual_environment/__base/command_future.py index 6c9d2736c4..88bef76054 100644 --- a/api/core/virtual_environment/__base/command_future.py +++ b/api/core/virtual_environment/__base/command_future.py @@ -1,26 +1,9 @@ -""" -CommandFuture: Async command execution with gevent compatibility. - -This module uses gevent primitives (greenlets, events) instead of native threads -to ensure proper cooperative scheduling in gevent-based WSGI servers like Gunicorn. - -Using native threading.Thread or concurrent.futures.ThreadPoolExecutor in a gevent -environment can cause deadlocks because: -1. Native threads hold the GIL during blocking I/O -2. gevent's monkey-patching doesn't affect code running in native threads -3. Blocking operations in native threads prevent greenlet switching - -By using gevent.spawn() and gevent.event.Event, all I/O operations become -cooperative, allowing proper greenlet scheduling even during blocking reads. -""" - import contextlib import logging +import threading +import time from collections.abc import Callable -from typing import Any - -import gevent -from gevent.event import Event +from concurrent.futures import ThreadPoolExecutor from core.virtual_environment.__base.entities import CommandResult, CommandStatus from core.virtual_environment.__base.exec import NotSupportedOperationError @@ -40,12 +23,7 @@ class CommandCancelledError(Exception): class CommandFuture: """ - Lightweight future for command execution using gevent greenlets. - - This implementation uses gevent primitives instead of native threads to ensure - proper cooperative scheduling in gevent-based WSGI servers. All blocking I/O - operations are performed in greenlets, allowing gevent to switch between them. - + Lightweight future for command execution. Mirrors concurrent.futures.Future API with 4 essential methods: result(), done(), cancel(), cancelled(). """ @@ -66,20 +44,17 @@ class CommandFuture: self._poll_status = poll_status self._poll_interval = poll_interval - self._done_event: Event = Event() # gevent Event for cooperative waiting + self._done_event = threading.Event() + self._lock = threading.Lock() self._result: CommandResult | None = None self._exception: BaseException | None = None - self._cancelled: bool = False - self._started: bool = False - self._execute_greenlet: Any = None + self._cancelled = False + self._started = False def result(self, timeout: float | None = None) -> CommandResult: """ Block until command completes and return result. - Uses gevent.event.Event.wait() for cooperative waiting, allowing other - greenlets to run while this one waits. - Args: timeout: Maximum seconds to wait. None means wait forever. @@ -89,7 +64,6 @@ class CommandFuture: """ self._ensure_started() - # gevent Event.wait() returns True if set, False on timeout if not self._done_event.wait(timeout): raise CommandTimeoutError(f"Command timed out after {timeout}s") @@ -108,81 +82,65 @@ class CommandFuture: def cancel(self) -> bool: """ - Attempt to cancel command by closing transports and killing greenlets. + Attempt to cancel command by closing transports. Returns True if cancelled, False if already completed. """ - if self._done_event.is_set(): - return False - self._cancelled = True - self._close_transports() - # Kill the execute greenlet if it's still running - if self._execute_greenlet is not None: - self._execute_greenlet.kill(block=False) - self._done_event.set() - return True + with self._lock: + if self._done_event.is_set(): + return False + self._cancelled = True + self._close_transports() + self._done_event.set() + return True def cancelled(self) -> bool: return self._cancelled def _ensure_started(self) -> None: - if not self._started: - self._started = True - # Use gevent.spawn instead of threading.Thread for cooperative scheduling - self._execute_greenlet = gevent.spawn(self._execute) + with self._lock: + if not self._started: + self._started = True + thread = threading.Thread(target=self._execute, daemon=True) + thread.start() def _execute(self) -> None: - """ - Execute command and collect output using gevent greenlets. - - Spawns separate greenlets for stdout/stderr draining to allow concurrent - reading while polling for command completion. - """ stdout_buf = bytearray() stderr_buf = bytearray() is_combined_stream = self._stdout_transport is self._stderr_transport - stdout_greenlet: Any = None - stderr_greenlet: Any = None - try: - # Spawn greenlets for draining transports - stdout_greenlet = gevent.spawn(self._drain_transport, self._stdout_transport, stdout_buf) - if not is_combined_stream: - stderr_greenlet = gevent.spawn(self._drain_transport, self._stderr_transport, stderr_buf) + with ThreadPoolExecutor(max_workers=2) as executor: + stdout_future = executor.submit(self._drain_transport, self._stdout_transport, stdout_buf) + stderr_future = None + if not is_combined_stream: + stderr_future = executor.submit(self._drain_transport, self._stderr_transport, stderr_buf) - exit_code = self._wait_for_completion() + exit_code = self._wait_for_completion() - # Wait for drain greenlets to complete - stdout_greenlet.join() - if stderr_greenlet is not None: - stderr_greenlet.join() + stdout_future.result() + if stderr_future: + stderr_future.result() - if not self._cancelled: - self._result = CommandResult( - stdout=bytes(stdout_buf), - stderr=b"" if is_combined_stream else bytes(stderr_buf), - exit_code=exit_code, - pid=self._pid, - ) - self._done_event.set() + with self._lock: + if not self._cancelled: + self._result = CommandResult( + stdout=bytes(stdout_buf), + stderr=b"" if is_combined_stream else bytes(stderr_buf), + exit_code=exit_code, + pid=self._pid, + ) + self._done_event.set() except Exception as e: logger.exception("Command execution failed for pid %s", self._pid) - if not self._cancelled: - self._exception = e - self._done_event.set() + with self._lock: + if not self._cancelled: + self._exception = e + self._done_event.set() finally: - # Kill any remaining greenlets - if stdout_greenlet is not None: - stdout_greenlet.kill(block=False) - if stderr_greenlet is not None: - stderr_greenlet.kill(block=False) self._close_transports() def _wait_for_completion(self) -> int | None: - """ - Poll for command completion using gevent.sleep for cooperative yielding. - """ while not self._cancelled: try: status = self._poll_status() @@ -192,18 +150,11 @@ class CommandFuture: if status.status == CommandStatus.Status.COMPLETED: return status.exit_code - # Use gevent.sleep for cooperative scheduling - gevent.sleep(self._poll_interval) + time.sleep(self._poll_interval) return None def _drain_transport(self, transport: TransportReadCloser, buffer: bytearray) -> None: - """ - Drain all data from a transport into a buffer. - - This runs in a greenlet, so blocking reads will yield to other greenlets - thanks to gevent's monkey-patching of socket operations. - """ try: while True: buffer.extend(transport.read(4096)) diff --git a/api/core/virtual_environment/providers/docker_daemon_sandbox.py b/api/core/virtual_environment/providers/docker_daemon_sandbox.py index d3cd769d55..fd390649ed 100644 --- a/api/core/virtual_environment/providers/docker_daemon_sandbox.py +++ b/api/core/virtual_environment/providers/docker_daemon_sandbox.py @@ -1,29 +1,18 @@ -""" -Docker Daemon Virtual Environment Provider. - -This module implements a VirtualEnvironment using Docker containers. It uses gevent -primitives for concurrency to ensure compatibility with gevent-based WSGI servers. - -IMPORTANT: This module uses gevent.queue.Queue and gevent.spawn instead of standard -library threading primitives. This is critical for proper cooperative scheduling -in gevent environments like Gunicorn with gevent workers. -""" - import logging import socket import tarfile +import threading from collections.abc import Mapping, Sequence from enum import IntEnum, StrEnum from functools import lru_cache from io import BytesIO from pathlib import PurePosixPath +from queue import Queue from typing import Any, cast from uuid import uuid4 import docker.errors -import gevent from docker.models.containers import Container -from gevent.queue import Queue import docker from configs import dify_config @@ -71,20 +60,14 @@ class DockerDemuxer: - Bytes 1-3: reserved (zeros) - Bytes 4-7: payload size (big-endian uint32) - GEVENT COMPATIBILITY: - This class uses gevent.spawn() and gevent.queue.Queue instead of native threading - to ensure proper cooperative scheduling in gevent-based WSGI servers. Native threads - can cause deadlocks in gevent because: - 1. Native threads hold the GIL during blocking I/O - 2. gevent's monkey-patching doesn't affect code running in native threads - 3. Blocking Queue.get() in native threads prevents greenlet switching - - By using gevent primitives, all I/O operations become cooperative, allowing - proper greenlet scheduling even during blocking socket reads. + THREAD SAFETY: + A single background thread reads frames from the socket and dispatches payloads + to thread-safe queues. This avoids race conditions where multiple threads + calling _read_next_frame() simultaneously caused frame header/body corruption, + resulting in incomplete stdout/stderr output. """ _HEADER_SIZE = 8 - _QUEUE_GET_TIMEOUT = 5.0 # Timeout for queue.get() to allow checking for errors def __init__(self, sock: socket.SocketIO): self._sock = sock @@ -93,16 +76,14 @@ class DockerDemuxer: self._closed = False self._error: BaseException | None = None - # Use gevent.spawn instead of threading.Thread for cooperative scheduling - self._demux_greenlet = gevent.spawn(self._demux_loop) + self._demux_thread = threading.Thread( + target=self._demux_loop, + daemon=True, + name="docker-demuxer", + ) + self._demux_thread.start() def _demux_loop(self) -> None: - """ - Read frames from socket and dispatch to appropriate queues. - - Runs in a greenlet, so socket reads will yield to other greenlets - thanks to gevent's monkey-patching. - """ try: while not self._closed: header = self._read_exact(self._HEADER_SIZE) @@ -131,11 +112,6 @@ class DockerDemuxer: self._stderr_queue.put(None) def _read_exact(self, size: int) -> bytes: - """ - Read exactly `size` bytes from socket. - - Socket reads are cooperative in gevent environment due to monkey-patching. - """ data = bytearray() remaining = size while remaining > 0: @@ -156,42 +132,19 @@ class DockerDemuxer: return self._read_from_queue(self._stderr_queue) def _read_from_queue(self, queue: Queue[bytes | None]) -> bytes: - """ - Read from queue with timeout to allow periodic error checking. - - Uses gevent.queue.Queue which cooperatively yields during blocking get(). - The timeout ensures we can detect errors and closed state even if no data arrives. - """ if self._error: raise TransportEOFError(f"Demuxer error: {self._error}") from self._error - while True: - try: - # Use timeout to periodically check for errors/closed state - chunk = queue.get(timeout=self._QUEUE_GET_TIMEOUT) - if chunk is None: - if self._error: - raise TransportEOFError(f"Demuxer error: {str(self._error)}") - raise TransportEOFError("End of demuxed stream") - return chunk - except gevent.queue.Empty: - # Timeout - check if we should continue waiting - if self._closed: - raise TransportEOFError("Demuxer closed") - if self._error: - raise TransportEOFError(f"Demuxer error: {self._error}") from self._error - # No error, continue waiting - continue + chunk = queue.get() + if chunk is None: + if self._error: + raise TransportEOFError(f"Demuxer error: {str(self._error)}") + raise TransportEOFError("End of demuxed stream") + return chunk def close(self) -> None: - """ - Close the demuxer and kill the background greenlet. - """ if not self._closed: self._closed = True - # Kill the demux greenlet to stop any blocking reads - if self._demux_greenlet is not None: - self._demux_greenlet.kill(block=False) try: self._sock.close() except Exception: