From a5b93e13e6c612b3444d1e38de2f2edb52309006 Mon Sep 17 00:00:00 2001 From: GareArc Date: Mon, 2 Feb 2026 21:41:16 -0800 Subject: [PATCH] feat(enterprise): Add gRPC protocol support for OTLP telemetry - Add ENTERPRISE_OTLP_PROTOCOL config (http/grpc, default: http) - Introduce _ExporterFactory class for protocol-agnostic exporter creation - Support both HTTP and gRPC OTLP endpoints for traces and logs - Refactor endpoint path handling into factory methods --- api/configs/enterprise/__init__.py | 5 ++ api/enterprise/telemetry/exporter.py | 69 +++++++++++++++++++++------- 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/api/configs/enterprise/__init__.py b/api/configs/enterprise/__init__.py index e4f08c9224..eb9dfe69b4 100644 --- a/api/configs/enterprise/__init__.py +++ b/api/configs/enterprise/__init__.py @@ -50,6 +50,11 @@ class EnterpriseTelemetryConfig(BaseSettings): default="", ) + ENTERPRISE_OTLP_PROTOCOL: str = Field( + description="OTLP protocol: 'http' or 'grpc' (default: http).", + default="http", + ) + ENTERPRISE_INCLUDE_CONTENT: bool = Field( description="Include input/output content in traces (privacy toggle).", default=True, diff --git a/api/enterprise/telemetry/exporter.py b/api/enterprise/telemetry/exporter.py index f2ea54b445..6591c2fd97 100644 --- a/api/enterprise/telemetry/exporter.py +++ b/api/enterprise/telemetry/exporter.py @@ -15,8 +15,10 @@ from typing import Any, cast from opentelemetry import metrics, trace from opentelemetry.context import Context -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter as GRPCLogExporter +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter +from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter as HTTPLogExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPSpanExporter from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.resources import Resource @@ -61,10 +63,42 @@ def _datetime_to_ns(dt: datetime) -> int: return int(dt.timestamp() * 1_000_000_000) -def _append_logs_path(endpoint: str) -> str: - if endpoint.endswith("/"): - return endpoint + "v1/logs" - return f"{endpoint}/v1/logs" +class _ExporterFactory: + def __init__(self, protocol: str, endpoint: str, headers: dict[str, str]): + self._protocol = protocol + self._endpoint = endpoint + self._headers = headers + self._grpc_headers = tuple(headers.items()) if headers else None + self._http_headers = headers or None + + def create_trace_exporter(self) -> HTTPSpanExporter | GRPCSpanExporter: + if self._protocol == "grpc": + return GRPCSpanExporter( + endpoint=self._endpoint or None, + headers=self._grpc_headers, + insecure=True, + ) + trace_endpoint = f"{self._endpoint}/v1/traces" if self._endpoint else "" + return HTTPSpanExporter(endpoint=trace_endpoint or None, headers=self._http_headers) + + def create_log_exporter(self, logs_endpoint_override: str) -> tuple[HTTPLogExporter | GRPCLogExporter | None, str]: + if self._protocol == "grpc": + logs_endpoint = logs_endpoint_override or self._endpoint + if not logs_endpoint: + return None, "Enterprise OTEL logs enabled but endpoint is empty" + return GRPCLogExporter(endpoint=logs_endpoint, headers=self._grpc_headers, insecure=True), "" + + logs_endpoint = logs_endpoint_override or self._append_logs_path() + if not logs_endpoint: + return None, "Enterprise OTEL logs enabled but endpoint is empty" + return HTTPLogExporter(endpoint=logs_endpoint, headers=self._http_headers), "" + + def _append_logs_path(self) -> str: + if not self._endpoint: + return "" + if self._endpoint.endswith("/"): + return f"{self._endpoint}v1/logs" + return f"{self._endpoint}/v1/logs" class EnterpriseLoggingHandler(LoggingHandler): @@ -124,6 +158,7 @@ class EnterpriseExporter: def __init__(self, config: object) -> None: endpoint: str = getattr(config, "ENTERPRISE_OTLP_ENDPOINT", "") headers_raw: str = getattr(config, "ENTERPRISE_OTLP_HEADERS", "") + protocol: str = (getattr(config, "ENTERPRISE_OTLP_PROTOCOL", "http") or "http").lower() service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify") sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0) self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True) @@ -143,13 +178,13 @@ class EnterpriseExporter: self._tracer_provider = TracerProvider(resource=resource, sampler=sampler, id_generator=id_generator) headers = _parse_otlp_headers(headers_raw) - trace_endpoint = f"{endpoint}/v1/traces" if endpoint else "" - self._tracer_provider.add_span_processor( - BatchSpanProcessor(OTLPSpanExporter(endpoint=trace_endpoint or None, headers=headers)) - ) + factory = _ExporterFactory(protocol, endpoint, headers) + + trace_exporter = factory.create_trace_exporter() + self._tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter)) self._tracer = self._tracer_provider.get_tracer("dify.enterprise") - self._init_logs_pipeline(endpoint, headers, resource) + self._init_logs_pipeline(factory, resource) meter = metrics.get_meter("dify.enterprise") self._counters = { @@ -171,17 +206,17 @@ class EnterpriseExporter: EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"), } - def _init_logs_pipeline(self, endpoint: str, headers: dict[str, str], resource: Resource) -> None: + def _init_logs_pipeline(self, factory: _ExporterFactory, resource: Resource) -> None: if not self._logs_enabled: return - logs_endpoint = self._logs_endpoint or (_append_logs_path(endpoint) if endpoint else "") - if not logs_endpoint: - logger.warning("Enterprise OTEL logs enabled but endpoint is empty") + + log_exporter, warning = factory.create_log_exporter(self._logs_endpoint) + if not log_exporter: + logger.warning(warning) return self._log_provider = LoggerProvider(resource=resource, shutdown_on_exit=False) - exporter = OTLPLogExporter(endpoint=logs_endpoint, headers=headers or None) - self._log_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) + self._log_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) self._log_handler = EnterpriseLoggingHandler(logger_provider=self._log_provider) def export_span(