From e761f38d2688381eb8da9a34cba0f39f41b0b281 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Wed, 3 Sep 2025 03:47:08 +0800 Subject: [PATCH] fix(api): adjust gevent patching --- api/celery_entrypoint.py | 22 ++++++++++++++++++++++ api/docker/entrypoint.sh | 2 +- api/gunicorn.conf.py | 10 ++++++++++ 3 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 api/celery_entrypoint.py create mode 100644 api/gunicorn.conf.py diff --git a/api/celery_entrypoint.py b/api/celery_entrypoint.py new file mode 100644 index 0000000000..0773029775 --- /dev/null +++ b/api/celery_entrypoint.py @@ -0,0 +1,22 @@ +import logging + +import psycogreen.gevent as pscycogreen_gevent # type: ignore +from grpc.experimental import gevent as grpc_gevent # type: ignore + +_logger = logging.getLogger(__name__) + + +def _log(message: str): + print(message, flush=True) + + +# grpc gevent +grpc_gevent.init_gevent() +_log("gRPC patched with gevent.") +pscycogreen_gevent.patch_psycopg() +_log("psycopg2 patched with gevent.") + + +from app import app, celery + +__all__ = ["app", "celery"] diff --git a/api/docker/entrypoint.sh b/api/docker/entrypoint.sh index ddef26faaf..d89a35ffdc 100755 --- a/api/docker/entrypoint.sh +++ b/api/docker/entrypoint.sh @@ -30,7 +30,7 @@ if [[ "${MODE}" == "worker" ]]; then CONCURRENCY_OPTION="-c ${CELERY_WORKER_AMOUNT:-1}" fi - exec celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION \ + exec celery -A celery_entrypoint.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION \ --max-tasks-per-child ${MAX_TASKS_PER_CHILD:-50} --loglevel ${LOG_LEVEL:-INFO} \ -Q ${CELERY_QUEUES:-dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation} diff --git a/api/gunicorn.conf.py b/api/gunicorn.conf.py new file mode 100644 index 0000000000..fc91a43670 --- /dev/null +++ b/api/gunicorn.conf.py @@ -0,0 +1,10 @@ +import psycogreen.gevent as pscycogreen_gevent # type: ignore +from grpc.experimental import gevent as grpc_gevent # type: ignore + + +def post_fork(server, worker): + # grpc gevent + grpc_gevent.init_gevent() + server.log.info("gRPC patched with gevent.") + pscycogreen_gevent.patch_psycopg() + server.log.info("psycopg2 patched with gevent.")