From 4a0ccd0823a46c4f0d29205b56cbc738e086fe60 Mon Sep 17 00:00:00 2001 From: hj24 Date: Tue, 28 Oct 2025 18:11:30 +0800 Subject: [PATCH] fix lint errors fix: rm useless script --- api/core/rag/pipeline/queue.py | 13 ++++----- api/docker/entrypoint.sh | 17 ------------ api/services/document_indexing_task_proxy.py | 18 ++++++------- .../test_document_indexing_task_proxy.py | 6 ++--- docker/docker-compose-template.yaml | 27 ------------------- docker/docker-compose.yaml | 26 ------------------ 6 files changed, 17 insertions(+), 90 deletions(-) diff --git a/api/core/rag/pipeline/queue.py b/api/core/rag/pipeline/queue.py index 36db94caa1..4c004ca89f 100644 --- a/api/core/rag/pipeline/queue.py +++ b/api/core/rag/pipeline/queue.py @@ -1,12 +1,9 @@ import json from dataclasses import dataclass -from typing import Any, Generic, TypeVar +from typing import Any from extensions.ext_redis import redis_client -T = TypeVar('T') - - TASK_WRAPPER_PREFIX = "__WRAPPER__:" @@ -23,7 +20,7 @@ class TaskWrapper: return cls(data) -class TenantSelfTaskQueue(Generic[T]): +class TenantSelfTaskQueue: """ Simple queue for tenant self tasks, used for tenant self task isolation. It uses Redis list to store tasks, and Redis key to store task waiting flag. @@ -47,7 +44,7 @@ class TenantSelfTaskQueue(Generic[T]): def delete_task_key(self): redis_client.delete(self.task_key) - def push_tasks(self, tasks: list[T]): + def push_tasks(self, tasks: list): serialized_tasks = [] for task in tasks: # Store str list directly, maintaining full compatibility for pipeline scenarios @@ -61,7 +58,7 @@ class TenantSelfTaskQueue(Generic[T]): redis_client.lpush(self.queue, *serialized_tasks) - def pull_tasks(self, count: int = 1) -> list[T]: + def pull_tasks(self, count: int = 1) -> list: if count <= 0: return [] @@ -87,6 +84,6 @@ class TenantSelfTaskQueue(Generic[T]): return tasks - def get_next_task(self) -> T | None: + def get_next_task(self): tasks = self.pull_tasks(1) return tasks[0] if tasks else None diff --git a/api/docker/entrypoint.sh b/api/docker/entrypoint.sh index 943afc19f6..421d72a3a9 100755 --- a/api/docker/entrypoint.sh +++ b/api/docker/entrypoint.sh @@ -35,23 +35,6 @@ if [[ "${MODE}" == "worker" ]]; then -Q ${CELERY_QUEUES:-dataset,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation} \ --prefetch-multiplier=1 -elif [[ "${MODE}" == "priority_worker" ]]; then - # Get the number of available CPU cores - if [ "${CELERY_AUTO_SCALE,,}" = "true" ]; then - # Set MAX_WORKERS to the number of available cores if not specified - AVAILABLE_CORES=$(nproc) - MAX_WORKERS=${CELERY_MAX_WORKERS:-$AVAILABLE_CORES} - MIN_WORKERS=${CELERY_MIN_WORKERS:-1} - CONCURRENCY_OPTION="--autoscale=${MAX_WORKERS},${MIN_WORKERS}" - else - CONCURRENCY_OPTION="-c ${CELERY_WORKER_AMOUNT:-1}" - fi - - exec celery -A celery_entrypoint.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION \ - --max-tasks-per-child ${MAX_TASKS_PER_CHILD:-50} --loglevel ${LOG_LEVEL:-INFO} \ - -Q ${CELERY_QUEUES:-priority_dataset,priority_pipeline} \ - --prefetch-multiplier=1 - elif [[ "${MODE}" == "beat" ]]; then exec celery -A app.celery beat --loglevel ${LOG_LEVEL:-INFO} else diff --git a/api/services/document_indexing_task_proxy.py b/api/services/document_indexing_task_proxy.py index 5cd746fa91..a86edac8a1 100644 --- a/api/services/document_indexing_task_proxy.py +++ b/api/services/document_indexing_task_proxy.py @@ -12,9 +12,9 @@ logger = logging.getLogger(__name__) class DocumentIndexingTaskProxy: - def __init__(self, tenant_id: str, dateset_id: str, document_ids: list[str]): + def __init__(self, tenant_id: str, dataset_id: str, document_ids: list[str]): self.tenant_id = tenant_id - self.dateset_id = dateset_id + self.dataset_id = dataset_id self.document_ids = document_ids self.tenant_self_task_queue = TenantSelfTaskQueue(tenant_id, "document_indexing") @@ -23,32 +23,32 @@ class DocumentIndexingTaskProxy: return FeatureService.get_features(self.tenant_id) def _send_to_direct_queue(self, task_func: Callable): - logger.info("send dataset %s to direct queue", self.dateset_id) + logger.info("send dataset %s to direct queue", self.dataset_id) task_func.delay( # type: ignore tenant_id=self.tenant_id, - dataset_id=self.dateset_id, + dataset_id=self.dataset_id, document_ids=self.document_ids ) def _send_to_tenant_queue(self, task_func: Callable): - logger.info("send dataset %s to tenant queue", self.dateset_id) + logger.info("send dataset %s to tenant queue", self.dataset_id) if self.tenant_self_task_queue.get_task_key(): # Add to waiting queue using List operations (lpush) self.tenant_self_task_queue.push_tasks([ asdict( - DocumentTask(tenant_id=self.tenant_id, dataset_id=self.dateset_id, document_ids=self.document_ids) + DocumentTask(tenant_id=self.tenant_id, dataset_id=self.dataset_id, document_ids=self.document_ids) ) ]) - logger.info("push tasks: %s - %s", self.dateset_id, self.document_ids) + logger.info("push tasks: %s - %s", self.dataset_id, self.document_ids) else: # Set flag and execute task self.tenant_self_task_queue.set_task_waiting_time() task_func.delay( # type: ignore tenant_id=self.tenant_id, - dataset_id=self.dateset_id, + dataset_id=self.dataset_id, document_ids=self.document_ids ) - logger.info("init tasks: %s - %s", self.dateset_id, self.document_ids) + logger.info("init tasks: %s - %s", self.dataset_id, self.document_ids) def _send_to_default_tenant_queue(self): self._send_to_tenant_queue(normal_document_indexing_task) diff --git a/api/tests/unit_tests/services/test_document_indexing_task_proxy.py b/api/tests/unit_tests/services/test_document_indexing_task_proxy.py index 7eb7d73d51..0f3826830a 100644 --- a/api/tests/unit_tests/services/test_document_indexing_task_proxy.py +++ b/api/tests/unit_tests/services/test_document_indexing_task_proxy.py @@ -58,7 +58,7 @@ class TestDocumentIndexingTaskProxy: # Assert assert proxy.tenant_id == tenant_id - assert proxy.dateset_id == dataset_id # Note: typo in original code + assert proxy.dataset_id == dataset_id assert proxy.document_ids == document_ids assert isinstance(proxy.tenant_self_task_queue, TenantSelfTaskQueue) assert proxy.tenant_self_task_queue.tenant_id == tenant_id @@ -313,7 +313,7 @@ class TestDocumentIndexingTaskProxy: # Assert assert proxy.tenant_id == tenant_id - assert proxy.dateset_id == dataset_id + assert proxy.dataset_id == dataset_id assert proxy.document_ids == document_ids def test_initialization_with_single_document_id(self): @@ -328,5 +328,5 @@ class TestDocumentIndexingTaskProxy: # Assert assert proxy.tenant_id == tenant_id - assert proxy.dateset_id == dataset_id + assert proxy.dataset_id == dataset_id assert proxy.document_ids == document_ids diff --git a/docker/docker-compose-template.yaml b/docker/docker-compose-template.yaml index daeae52f3f..9650be90db 100644 --- a/docker/docker-compose-template.yaml +++ b/docker/docker-compose-template.yaml @@ -28,33 +28,6 @@ services: - ssrf_proxy_network - default - # worker service - # The Celery worker for processing the queue. - priority_worker: - image: langgenius/dify-api:1.9.2 - restart: always - environment: - # Use the shared environment variables. - <<: *shared-api-worker-env - # Startup mode, 'worker' starts the Celery worker for processing the queue. - MODE: priority_worker - SENTRY_DSN: ${API_SENTRY_DSN:-} - SENTRY_TRACES_SAMPLE_RATE: ${API_SENTRY_TRACES_SAMPLE_RATE:-1.0} - SENTRY_PROFILES_SAMPLE_RATE: ${API_SENTRY_PROFILES_SAMPLE_RATE:-1.0} - PLUGIN_MAX_PACKAGE_SIZE: ${PLUGIN_MAX_PACKAGE_SIZE:-52428800} - INNER_API_KEY_FOR_PLUGIN: ${PLUGIN_DIFY_INNER_API_KEY:-QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1} - depends_on: - db: - condition: service_healthy - redis: - condition: service_started - volumes: - # Mount the storage directory to the container, for storing user files. - - ./volumes/app/storage:/app/api/storage - networks: - - ssrf_proxy_network - - default - # worker service # The Celery worker for processing the queue. worker: diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 392f739db6..d2ca6b859e 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -424,7 +424,6 @@ x-shared-env: &shared-api-worker-env WORKFLOW_LOG_CLEANUP_ENABLED: ${WORKFLOW_LOG_CLEANUP_ENABLED:-false} WORKFLOW_LOG_RETENTION_DAYS: ${WORKFLOW_LOG_RETENTION_DAYS:-30} WORKFLOW_LOG_CLEANUP_BATCH_SIZE: ${WORKFLOW_LOG_CLEANUP_BATCH_SIZE:-100} - TENANT_SELF_TASK_QUEUE_PULL_SIZE: ${TENANT_SELF_TASK_QUEUE_PULL_SIZE:-1} HTTP_REQUEST_NODE_MAX_BINARY_SIZE: ${HTTP_REQUEST_NODE_MAX_BINARY_SIZE:-10485760} HTTP_REQUEST_NODE_MAX_TEXT_SIZE: ${HTTP_REQUEST_NODE_MAX_TEXT_SIZE:-1048576} HTTP_REQUEST_NODE_SSL_VERIFY: ${HTTP_REQUEST_NODE_SSL_VERIFY:-True} @@ -637,31 +636,6 @@ services: networks: - ssrf_proxy_network - default - - priority_worker: - image: langgenius/dify-api:1.9.2 - restart: always - environment: - # Use the shared environment variables. - <<: *shared-api-worker-env - # Startup mode, 'worker' starts the Celery worker for processing the queue. - MODE: priority_worker - SENTRY_DSN: ${API_SENTRY_DSN:-} - SENTRY_TRACES_SAMPLE_RATE: ${API_SENTRY_TRACES_SAMPLE_RATE:-1.0} - SENTRY_PROFILES_SAMPLE_RATE: ${API_SENTRY_PROFILES_SAMPLE_RATE:-1.0} - PLUGIN_MAX_PACKAGE_SIZE: ${PLUGIN_MAX_PACKAGE_SIZE:-52428800} - INNER_API_KEY_FOR_PLUGIN: ${PLUGIN_DIFY_INNER_API_KEY:-QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1} - depends_on: - db: - condition: service_healthy - redis: - condition: service_started - volumes: - # Mount the storage directory to the container, for storing user files. - - ./volumes/app/storage:/app/api/storage - networks: - - ssrf_proxy_network - - default # worker service # The Celery worker for processing the queue.