From 27781d6b7e9d71ea6cadffe5e24091b55cfc94c3 Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 28 Jan 2026 13:29:53 +0800 Subject: [PATCH] refactor: replace threading with gevent primitives for cooperative scheduling Updated multiple modules to utilize gevent for concurrency, ensuring compatibility with gevent-based WSGI servers. This includes replacing threading.Thread and threading.Event with gevent.spawn and gevent.event.Event, respectively, to prevent blocking and improve performance during I/O operations. - Refactored SandboxBuilder, Sandbox, CommandFuture, and DockerDemuxer to use gevent. - Added detailed docstrings explaining the changes and benefits of using gevent primitives. This change enhances the responsiveness and efficiency of the application in a gevent environment. --- api/controllers/console/app/generator.py | 4 +- api/core/sandbox/builder.py | 20 ++- api/core/sandbox/sandbox.py | 21 ++- .../__base/command_future.py | 137 ++++++++++++------ .../providers/docker_daemon_sandbox.py | 85 ++++++++--- 5 files changed, 196 insertions(+), 71 deletions(-) diff --git a/api/controllers/console/app/generator.py b/api/controllers/console/app/generator.py index 63870f8038..d14dd52e4e 100644 --- a/api/controllers/console/app/generator.py +++ b/api/controllers/console/app/generator.py @@ -70,9 +70,7 @@ class ContextGeneratePayload(BaseModel): model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration") available_vars: list[AvailableVarPayload] = Field(..., description="Available variables from upstream nodes") parameter_info: ParameterInfoPayload = Field(..., description="Target parameter metadata from the frontend") - code_context: CodeContextPayload = Field( - description="Existing code node context for incremental generation" - ) + code_context: CodeContextPayload = Field(description="Existing code node context for incremental generation") class SuggestedQuestionsPayload(BaseModel): diff --git a/api/core/sandbox/builder.py b/api/core/sandbox/builder.py index d035c4861b..41b0150463 100644 --- a/api/core/sandbox/builder.py +++ b/api/core/sandbox/builder.py @@ -1,10 +1,25 @@ +""" +Sandbox Builder: Factory for creating and initializing sandboxes. + +This module uses gevent.spawn instead of threading.Thread to ensure proper +cooperative scheduling in gevent-based WSGI servers like Gunicorn. + +Using native threading.Thread in a gevent environment can cause issues because: +1. Native threads hold the GIL during blocking I/O +2. gevent's monkey-patching doesn't affect code running in native threads +3. Blocking operations in native threads prevent greenlet switching + +By using gevent.spawn(), background initialization runs as a greenlet that +cooperatively yields during I/O operations. +""" + from __future__ import annotations import logging -import threading from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any +import gevent from flask import current_app from core.entities.provider_entities import BasicProviderConfig @@ -145,7 +160,8 @@ class SandboxBuilder: sandbox.mark_failed(exc) # Background init completes or signals failure via sandbox state. - threading.Thread(target=initialize, daemon=True).start() + # Use gevent.spawn instead of threading.Thread for cooperative scheduling + gevent.spawn(initialize) return sandbox @staticmethod diff --git a/api/core/sandbox/sandbox.py b/api/core/sandbox/sandbox.py index 24c93e4741..e654d4f3bc 100644 --- a/api/core/sandbox/sandbox.py +++ b/api/core/sandbox/sandbox.py @@ -1,9 +1,24 @@ +""" +Sandbox: A managed virtual environment instance. + +This module uses gevent.event.Event instead of threading.Event to ensure +proper cooperative scheduling in gevent-based WSGI servers like Gunicorn. + +Using native threading.Event in a gevent environment can cause issues because: +1. threading.Event.wait() blocks the entire thread, not just the greenlet +2. This prevents other greenlets from running while waiting +3. Can lead to apparent "freezes" when multiple greenlets wait on events + +By using gevent.event.Event, wait() calls cooperatively yield to other greenlets. +""" + from __future__ import annotations import logging -import threading from typing import TYPE_CHECKING +from gevent.event import Event + from libs.attr_map import AttrMap if TYPE_CHECKING: @@ -31,8 +46,8 @@ class Sandbox: self._app_id = app_id self._assets_id = assets_id self._attributes = AttrMap() - self._ready_event = threading.Event() - self._cancel_event = threading.Event() + self._ready_event: Event = Event() # gevent Event for cooperative waiting + self._cancel_event: Event = Event() # gevent Event for cooperative waiting self._init_error: Exception | None = None @property diff --git a/api/core/virtual_environment/__base/command_future.py b/api/core/virtual_environment/__base/command_future.py index 88bef76054..6c9d2736c4 100644 --- a/api/core/virtual_environment/__base/command_future.py +++ b/api/core/virtual_environment/__base/command_future.py @@ -1,9 +1,26 @@ +""" +CommandFuture: Async command execution with gevent compatibility. + +This module uses gevent primitives (greenlets, events) instead of native threads +to ensure proper cooperative scheduling in gevent-based WSGI servers like Gunicorn. + +Using native threading.Thread or concurrent.futures.ThreadPoolExecutor in a gevent +environment can cause deadlocks because: +1. Native threads hold the GIL during blocking I/O +2. gevent's monkey-patching doesn't affect code running in native threads +3. Blocking operations in native threads prevent greenlet switching + +By using gevent.spawn() and gevent.event.Event, all I/O operations become +cooperative, allowing proper greenlet scheduling even during blocking reads. +""" + import contextlib import logging -import threading -import time from collections.abc import Callable -from concurrent.futures import ThreadPoolExecutor +from typing import Any + +import gevent +from gevent.event import Event from core.virtual_environment.__base.entities import CommandResult, CommandStatus from core.virtual_environment.__base.exec import NotSupportedOperationError @@ -23,7 +40,12 @@ class CommandCancelledError(Exception): class CommandFuture: """ - Lightweight future for command execution. + Lightweight future for command execution using gevent greenlets. + + This implementation uses gevent primitives instead of native threads to ensure + proper cooperative scheduling in gevent-based WSGI servers. All blocking I/O + operations are performed in greenlets, allowing gevent to switch between them. + Mirrors concurrent.futures.Future API with 4 essential methods: result(), done(), cancel(), cancelled(). """ @@ -44,17 +66,20 @@ class CommandFuture: self._poll_status = poll_status self._poll_interval = poll_interval - self._done_event = threading.Event() - self._lock = threading.Lock() + self._done_event: Event = Event() # gevent Event for cooperative waiting self._result: CommandResult | None = None self._exception: BaseException | None = None - self._cancelled = False - self._started = False + self._cancelled: bool = False + self._started: bool = False + self._execute_greenlet: Any = None def result(self, timeout: float | None = None) -> CommandResult: """ Block until command completes and return result. + Uses gevent.event.Event.wait() for cooperative waiting, allowing other + greenlets to run while this one waits. + Args: timeout: Maximum seconds to wait. None means wait forever. @@ -64,6 +89,7 @@ class CommandFuture: """ self._ensure_started() + # gevent Event.wait() returns True if set, False on timeout if not self._done_event.wait(timeout): raise CommandTimeoutError(f"Command timed out after {timeout}s") @@ -82,65 +108,81 @@ class CommandFuture: def cancel(self) -> bool: """ - Attempt to cancel command by closing transports. + Attempt to cancel command by closing transports and killing greenlets. Returns True if cancelled, False if already completed. """ - with self._lock: - if self._done_event.is_set(): - return False - self._cancelled = True - self._close_transports() - self._done_event.set() - return True + if self._done_event.is_set(): + return False + self._cancelled = True + self._close_transports() + # Kill the execute greenlet if it's still running + if self._execute_greenlet is not None: + self._execute_greenlet.kill(block=False) + self._done_event.set() + return True def cancelled(self) -> bool: return self._cancelled def _ensure_started(self) -> None: - with self._lock: - if not self._started: - self._started = True - thread = threading.Thread(target=self._execute, daemon=True) - thread.start() + if not self._started: + self._started = True + # Use gevent.spawn instead of threading.Thread for cooperative scheduling + self._execute_greenlet = gevent.spawn(self._execute) def _execute(self) -> None: + """ + Execute command and collect output using gevent greenlets. + + Spawns separate greenlets for stdout/stderr draining to allow concurrent + reading while polling for command completion. + """ stdout_buf = bytearray() stderr_buf = bytearray() is_combined_stream = self._stdout_transport is self._stderr_transport + stdout_greenlet: Any = None + stderr_greenlet: Any = None + try: - with ThreadPoolExecutor(max_workers=2) as executor: - stdout_future = executor.submit(self._drain_transport, self._stdout_transport, stdout_buf) - stderr_future = None - if not is_combined_stream: - stderr_future = executor.submit(self._drain_transport, self._stderr_transport, stderr_buf) + # Spawn greenlets for draining transports + stdout_greenlet = gevent.spawn(self._drain_transport, self._stdout_transport, stdout_buf) + if not is_combined_stream: + stderr_greenlet = gevent.spawn(self._drain_transport, self._stderr_transport, stderr_buf) - exit_code = self._wait_for_completion() + exit_code = self._wait_for_completion() - stdout_future.result() - if stderr_future: - stderr_future.result() + # Wait for drain greenlets to complete + stdout_greenlet.join() + if stderr_greenlet is not None: + stderr_greenlet.join() - with self._lock: - if not self._cancelled: - self._result = CommandResult( - stdout=bytes(stdout_buf), - stderr=b"" if is_combined_stream else bytes(stderr_buf), - exit_code=exit_code, - pid=self._pid, - ) - self._done_event.set() + if not self._cancelled: + self._result = CommandResult( + stdout=bytes(stdout_buf), + stderr=b"" if is_combined_stream else bytes(stderr_buf), + exit_code=exit_code, + pid=self._pid, + ) + self._done_event.set() except Exception as e: logger.exception("Command execution failed for pid %s", self._pid) - with self._lock: - if not self._cancelled: - self._exception = e - self._done_event.set() + if not self._cancelled: + self._exception = e + self._done_event.set() finally: + # Kill any remaining greenlets + if stdout_greenlet is not None: + stdout_greenlet.kill(block=False) + if stderr_greenlet is not None: + stderr_greenlet.kill(block=False) self._close_transports() def _wait_for_completion(self) -> int | None: + """ + Poll for command completion using gevent.sleep for cooperative yielding. + """ while not self._cancelled: try: status = self._poll_status() @@ -150,11 +192,18 @@ class CommandFuture: if status.status == CommandStatus.Status.COMPLETED: return status.exit_code - time.sleep(self._poll_interval) + # Use gevent.sleep for cooperative scheduling + gevent.sleep(self._poll_interval) return None def _drain_transport(self, transport: TransportReadCloser, buffer: bytearray) -> None: + """ + Drain all data from a transport into a buffer. + + This runs in a greenlet, so blocking reads will yield to other greenlets + thanks to gevent's monkey-patching of socket operations. + """ try: while True: buffer.extend(transport.read(4096)) diff --git a/api/core/virtual_environment/providers/docker_daemon_sandbox.py b/api/core/virtual_environment/providers/docker_daemon_sandbox.py index fd390649ed..d3cd769d55 100644 --- a/api/core/virtual_environment/providers/docker_daemon_sandbox.py +++ b/api/core/virtual_environment/providers/docker_daemon_sandbox.py @@ -1,18 +1,29 @@ +""" +Docker Daemon Virtual Environment Provider. + +This module implements a VirtualEnvironment using Docker containers. It uses gevent +primitives for concurrency to ensure compatibility with gevent-based WSGI servers. + +IMPORTANT: This module uses gevent.queue.Queue and gevent.spawn instead of standard +library threading primitives. This is critical for proper cooperative scheduling +in gevent environments like Gunicorn with gevent workers. +""" + import logging import socket import tarfile -import threading from collections.abc import Mapping, Sequence from enum import IntEnum, StrEnum from functools import lru_cache from io import BytesIO from pathlib import PurePosixPath -from queue import Queue from typing import Any, cast from uuid import uuid4 import docker.errors +import gevent from docker.models.containers import Container +from gevent.queue import Queue import docker from configs import dify_config @@ -60,14 +71,20 @@ class DockerDemuxer: - Bytes 1-3: reserved (zeros) - Bytes 4-7: payload size (big-endian uint32) - THREAD SAFETY: - A single background thread reads frames from the socket and dispatches payloads - to thread-safe queues. This avoids race conditions where multiple threads - calling _read_next_frame() simultaneously caused frame header/body corruption, - resulting in incomplete stdout/stderr output. + GEVENT COMPATIBILITY: + This class uses gevent.spawn() and gevent.queue.Queue instead of native threading + to ensure proper cooperative scheduling in gevent-based WSGI servers. Native threads + can cause deadlocks in gevent because: + 1. Native threads hold the GIL during blocking I/O + 2. gevent's monkey-patching doesn't affect code running in native threads + 3. Blocking Queue.get() in native threads prevents greenlet switching + + By using gevent primitives, all I/O operations become cooperative, allowing + proper greenlet scheduling even during blocking socket reads. """ _HEADER_SIZE = 8 + _QUEUE_GET_TIMEOUT = 5.0 # Timeout for queue.get() to allow checking for errors def __init__(self, sock: socket.SocketIO): self._sock = sock @@ -76,14 +93,16 @@ class DockerDemuxer: self._closed = False self._error: BaseException | None = None - self._demux_thread = threading.Thread( - target=self._demux_loop, - daemon=True, - name="docker-demuxer", - ) - self._demux_thread.start() + # Use gevent.spawn instead of threading.Thread for cooperative scheduling + self._demux_greenlet = gevent.spawn(self._demux_loop) def _demux_loop(self) -> None: + """ + Read frames from socket and dispatch to appropriate queues. + + Runs in a greenlet, so socket reads will yield to other greenlets + thanks to gevent's monkey-patching. + """ try: while not self._closed: header = self._read_exact(self._HEADER_SIZE) @@ -112,6 +131,11 @@ class DockerDemuxer: self._stderr_queue.put(None) def _read_exact(self, size: int) -> bytes: + """ + Read exactly `size` bytes from socket. + + Socket reads are cooperative in gevent environment due to monkey-patching. + """ data = bytearray() remaining = size while remaining > 0: @@ -132,19 +156,42 @@ class DockerDemuxer: return self._read_from_queue(self._stderr_queue) def _read_from_queue(self, queue: Queue[bytes | None]) -> bytes: + """ + Read from queue with timeout to allow periodic error checking. + + Uses gevent.queue.Queue which cooperatively yields during blocking get(). + The timeout ensures we can detect errors and closed state even if no data arrives. + """ if self._error: raise TransportEOFError(f"Demuxer error: {self._error}") from self._error - chunk = queue.get() - if chunk is None: - if self._error: - raise TransportEOFError(f"Demuxer error: {str(self._error)}") - raise TransportEOFError("End of demuxed stream") - return chunk + while True: + try: + # Use timeout to periodically check for errors/closed state + chunk = queue.get(timeout=self._QUEUE_GET_TIMEOUT) + if chunk is None: + if self._error: + raise TransportEOFError(f"Demuxer error: {str(self._error)}") + raise TransportEOFError("End of demuxed stream") + return chunk + except gevent.queue.Empty: + # Timeout - check if we should continue waiting + if self._closed: + raise TransportEOFError("Demuxer closed") + if self._error: + raise TransportEOFError(f"Demuxer error: {self._error}") from self._error + # No error, continue waiting + continue def close(self) -> None: + """ + Close the demuxer and kill the background greenlet. + """ if not self._closed: self._closed = True + # Kill the demux greenlet to stop any blocking reads + if self._demux_greenlet is not None: + self._demux_greenlet.kill(block=False) try: self._sock.close() except Exception: