mirror of https://github.com/langgenius/dify.git
parent
a1b7eceb60
commit
4a0ccd0823
|
|
@ -1,12 +1,9 @@
|
|||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Generic, TypeVar
|
||||
from typing import Any
|
||||
|
||||
from extensions.ext_redis import redis_client
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
TASK_WRAPPER_PREFIX = "__WRAPPER__:"
|
||||
|
||||
|
||||
|
|
@ -23,7 +20,7 @@ class TaskWrapper:
|
|||
return cls(data)
|
||||
|
||||
|
||||
class TenantSelfTaskQueue(Generic[T]):
|
||||
class TenantSelfTaskQueue:
|
||||
"""
|
||||
Simple queue for tenant self tasks, used for tenant self task isolation.
|
||||
It uses Redis list to store tasks, and Redis key to store task waiting flag.
|
||||
|
|
@ -47,7 +44,7 @@ class TenantSelfTaskQueue(Generic[T]):
|
|||
def delete_task_key(self):
|
||||
redis_client.delete(self.task_key)
|
||||
|
||||
def push_tasks(self, tasks: list[T]):
|
||||
def push_tasks(self, tasks: list):
|
||||
serialized_tasks = []
|
||||
for task in tasks:
|
||||
# Store str list directly, maintaining full compatibility for pipeline scenarios
|
||||
|
|
@ -61,7 +58,7 @@ class TenantSelfTaskQueue(Generic[T]):
|
|||
|
||||
redis_client.lpush(self.queue, *serialized_tasks)
|
||||
|
||||
def pull_tasks(self, count: int = 1) -> list[T]:
|
||||
def pull_tasks(self, count: int = 1) -> list:
|
||||
if count <= 0:
|
||||
return []
|
||||
|
||||
|
|
@ -87,6 +84,6 @@ class TenantSelfTaskQueue(Generic[T]):
|
|||
|
||||
return tasks
|
||||
|
||||
def get_next_task(self) -> T | None:
|
||||
def get_next_task(self):
|
||||
tasks = self.pull_tasks(1)
|
||||
return tasks[0] if tasks else None
|
||||
|
|
|
|||
|
|
@ -35,23 +35,6 @@ if [[ "${MODE}" == "worker" ]]; then
|
|||
-Q ${CELERY_QUEUES:-dataset,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation} \
|
||||
--prefetch-multiplier=1
|
||||
|
||||
elif [[ "${MODE}" == "priority_worker" ]]; then
|
||||
# Get the number of available CPU cores
|
||||
if [ "${CELERY_AUTO_SCALE,,}" = "true" ]; then
|
||||
# Set MAX_WORKERS to the number of available cores if not specified
|
||||
AVAILABLE_CORES=$(nproc)
|
||||
MAX_WORKERS=${CELERY_MAX_WORKERS:-$AVAILABLE_CORES}
|
||||
MIN_WORKERS=${CELERY_MIN_WORKERS:-1}
|
||||
CONCURRENCY_OPTION="--autoscale=${MAX_WORKERS},${MIN_WORKERS}"
|
||||
else
|
||||
CONCURRENCY_OPTION="-c ${CELERY_WORKER_AMOUNT:-1}"
|
||||
fi
|
||||
|
||||
exec celery -A celery_entrypoint.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION \
|
||||
--max-tasks-per-child ${MAX_TASKS_PER_CHILD:-50} --loglevel ${LOG_LEVEL:-INFO} \
|
||||
-Q ${CELERY_QUEUES:-priority_dataset,priority_pipeline} \
|
||||
--prefetch-multiplier=1
|
||||
|
||||
elif [[ "${MODE}" == "beat" ]]; then
|
||||
exec celery -A app.celery beat --loglevel ${LOG_LEVEL:-INFO}
|
||||
else
|
||||
|
|
|
|||
|
|
@ -12,9 +12,9 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class DocumentIndexingTaskProxy:
|
||||
def __init__(self, tenant_id: str, dateset_id: str, document_ids: list[str]):
|
||||
def __init__(self, tenant_id: str, dataset_id: str, document_ids: list[str]):
|
||||
self.tenant_id = tenant_id
|
||||
self.dateset_id = dateset_id
|
||||
self.dataset_id = dataset_id
|
||||
self.document_ids = document_ids
|
||||
self.tenant_self_task_queue = TenantSelfTaskQueue(tenant_id, "document_indexing")
|
||||
|
||||
|
|
@ -23,32 +23,32 @@ class DocumentIndexingTaskProxy:
|
|||
return FeatureService.get_features(self.tenant_id)
|
||||
|
||||
def _send_to_direct_queue(self, task_func: Callable):
|
||||
logger.info("send dataset %s to direct queue", self.dateset_id)
|
||||
logger.info("send dataset %s to direct queue", self.dataset_id)
|
||||
task_func.delay( # type: ignore
|
||||
tenant_id=self.tenant_id,
|
||||
dataset_id=self.dateset_id,
|
||||
dataset_id=self.dataset_id,
|
||||
document_ids=self.document_ids
|
||||
)
|
||||
|
||||
def _send_to_tenant_queue(self, task_func: Callable):
|
||||
logger.info("send dataset %s to tenant queue", self.dateset_id)
|
||||
logger.info("send dataset %s to tenant queue", self.dataset_id)
|
||||
if self.tenant_self_task_queue.get_task_key():
|
||||
# Add to waiting queue using List operations (lpush)
|
||||
self.tenant_self_task_queue.push_tasks([
|
||||
asdict(
|
||||
DocumentTask(tenant_id=self.tenant_id, dataset_id=self.dateset_id, document_ids=self.document_ids)
|
||||
DocumentTask(tenant_id=self.tenant_id, dataset_id=self.dataset_id, document_ids=self.document_ids)
|
||||
)
|
||||
])
|
||||
logger.info("push tasks: %s - %s", self.dateset_id, self.document_ids)
|
||||
logger.info("push tasks: %s - %s", self.dataset_id, self.document_ids)
|
||||
else:
|
||||
# Set flag and execute task
|
||||
self.tenant_self_task_queue.set_task_waiting_time()
|
||||
task_func.delay( # type: ignore
|
||||
tenant_id=self.tenant_id,
|
||||
dataset_id=self.dateset_id,
|
||||
dataset_id=self.dataset_id,
|
||||
document_ids=self.document_ids
|
||||
)
|
||||
logger.info("init tasks: %s - %s", self.dateset_id, self.document_ids)
|
||||
logger.info("init tasks: %s - %s", self.dataset_id, self.document_ids)
|
||||
|
||||
def _send_to_default_tenant_queue(self):
|
||||
self._send_to_tenant_queue(normal_document_indexing_task)
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ class TestDocumentIndexingTaskProxy:
|
|||
|
||||
# Assert
|
||||
assert proxy.tenant_id == tenant_id
|
||||
assert proxy.dateset_id == dataset_id # Note: typo in original code
|
||||
assert proxy.dataset_id == dataset_id
|
||||
assert proxy.document_ids == document_ids
|
||||
assert isinstance(proxy.tenant_self_task_queue, TenantSelfTaskQueue)
|
||||
assert proxy.tenant_self_task_queue.tenant_id == tenant_id
|
||||
|
|
@ -313,7 +313,7 @@ class TestDocumentIndexingTaskProxy:
|
|||
|
||||
# Assert
|
||||
assert proxy.tenant_id == tenant_id
|
||||
assert proxy.dateset_id == dataset_id
|
||||
assert proxy.dataset_id == dataset_id
|
||||
assert proxy.document_ids == document_ids
|
||||
|
||||
def test_initialization_with_single_document_id(self):
|
||||
|
|
@ -328,5 +328,5 @@ class TestDocumentIndexingTaskProxy:
|
|||
|
||||
# Assert
|
||||
assert proxy.tenant_id == tenant_id
|
||||
assert proxy.dateset_id == dataset_id
|
||||
assert proxy.dataset_id == dataset_id
|
||||
assert proxy.document_ids == document_ids
|
||||
|
|
|
|||
|
|
@ -28,33 +28,6 @@ services:
|
|||
- ssrf_proxy_network
|
||||
- default
|
||||
|
||||
# worker service
|
||||
# The Celery worker for processing the queue.
|
||||
priority_worker:
|
||||
image: langgenius/dify-api:1.9.2
|
||||
restart: always
|
||||
environment:
|
||||
# Use the shared environment variables.
|
||||
<<: *shared-api-worker-env
|
||||
# Startup mode, 'worker' starts the Celery worker for processing the queue.
|
||||
MODE: priority_worker
|
||||
SENTRY_DSN: ${API_SENTRY_DSN:-}
|
||||
SENTRY_TRACES_SAMPLE_RATE: ${API_SENTRY_TRACES_SAMPLE_RATE:-1.0}
|
||||
SENTRY_PROFILES_SAMPLE_RATE: ${API_SENTRY_PROFILES_SAMPLE_RATE:-1.0}
|
||||
PLUGIN_MAX_PACKAGE_SIZE: ${PLUGIN_MAX_PACKAGE_SIZE:-52428800}
|
||||
INNER_API_KEY_FOR_PLUGIN: ${PLUGIN_DIFY_INNER_API_KEY:-QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1}
|
||||
depends_on:
|
||||
db:
|
||||
condition: service_healthy
|
||||
redis:
|
||||
condition: service_started
|
||||
volumes:
|
||||
# Mount the storage directory to the container, for storing user files.
|
||||
- ./volumes/app/storage:/app/api/storage
|
||||
networks:
|
||||
- ssrf_proxy_network
|
||||
- default
|
||||
|
||||
# worker service
|
||||
# The Celery worker for processing the queue.
|
||||
worker:
|
||||
|
|
|
|||
|
|
@ -424,7 +424,6 @@ x-shared-env: &shared-api-worker-env
|
|||
WORKFLOW_LOG_CLEANUP_ENABLED: ${WORKFLOW_LOG_CLEANUP_ENABLED:-false}
|
||||
WORKFLOW_LOG_RETENTION_DAYS: ${WORKFLOW_LOG_RETENTION_DAYS:-30}
|
||||
WORKFLOW_LOG_CLEANUP_BATCH_SIZE: ${WORKFLOW_LOG_CLEANUP_BATCH_SIZE:-100}
|
||||
TENANT_SELF_TASK_QUEUE_PULL_SIZE: ${TENANT_SELF_TASK_QUEUE_PULL_SIZE:-1}
|
||||
HTTP_REQUEST_NODE_MAX_BINARY_SIZE: ${HTTP_REQUEST_NODE_MAX_BINARY_SIZE:-10485760}
|
||||
HTTP_REQUEST_NODE_MAX_TEXT_SIZE: ${HTTP_REQUEST_NODE_MAX_TEXT_SIZE:-1048576}
|
||||
HTTP_REQUEST_NODE_SSL_VERIFY: ${HTTP_REQUEST_NODE_SSL_VERIFY:-True}
|
||||
|
|
@ -637,31 +636,6 @@ services:
|
|||
networks:
|
||||
- ssrf_proxy_network
|
||||
- default
|
||||
|
||||
priority_worker:
|
||||
image: langgenius/dify-api:1.9.2
|
||||
restart: always
|
||||
environment:
|
||||
# Use the shared environment variables.
|
||||
<<: *shared-api-worker-env
|
||||
# Startup mode, 'worker' starts the Celery worker for processing the queue.
|
||||
MODE: priority_worker
|
||||
SENTRY_DSN: ${API_SENTRY_DSN:-}
|
||||
SENTRY_TRACES_SAMPLE_RATE: ${API_SENTRY_TRACES_SAMPLE_RATE:-1.0}
|
||||
SENTRY_PROFILES_SAMPLE_RATE: ${API_SENTRY_PROFILES_SAMPLE_RATE:-1.0}
|
||||
PLUGIN_MAX_PACKAGE_SIZE: ${PLUGIN_MAX_PACKAGE_SIZE:-52428800}
|
||||
INNER_API_KEY_FOR_PLUGIN: ${PLUGIN_DIFY_INNER_API_KEY:-QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1}
|
||||
depends_on:
|
||||
db:
|
||||
condition: service_healthy
|
||||
redis:
|
||||
condition: service_started
|
||||
volumes:
|
||||
# Mount the storage directory to the container, for storing user files.
|
||||
- ./volumes/app/storage:/app/api/storage
|
||||
networks:
|
||||
- ssrf_proxy_network
|
||||
- default
|
||||
|
||||
# worker service
|
||||
# The Celery worker for processing the queue.
|
||||
|
|
|
|||
Loading…
Reference in New Issue