mirror of https://github.com/langgenius/dify.git
feat(enterprise): Add gRPC protocol support for OTLP telemetry
- Add ENTERPRISE_OTLP_PROTOCOL config (http/grpc, default: http) - Introduce _ExporterFactory class for protocol-agnostic exporter creation - Support both HTTP and gRPC OTLP endpoints for traces and logs - Refactor endpoint path handling into factory methods
This commit is contained in:
parent
232916a89d
commit
a5b93e13e6
|
|
@ -50,6 +50,11 @@ class EnterpriseTelemetryConfig(BaseSettings):
|
|||
default="",
|
||||
)
|
||||
|
||||
ENTERPRISE_OTLP_PROTOCOL: str = Field(
|
||||
description="OTLP protocol: 'http' or 'grpc' (default: http).",
|
||||
default="http",
|
||||
)
|
||||
|
||||
ENTERPRISE_INCLUDE_CONTENT: bool = Field(
|
||||
description="Include input/output content in traces (privacy toggle).",
|
||||
default=True,
|
||||
|
|
|
|||
|
|
@ -15,8 +15,10 @@ from typing import Any, cast
|
|||
|
||||
from opentelemetry import metrics, trace
|
||||
from opentelemetry.context import Context
|
||||
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter as GRPCLogExporter
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter
|
||||
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter as HTTPLogExporter
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPSpanExporter
|
||||
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
|
||||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
|
|
@ -61,10 +63,42 @@ def _datetime_to_ns(dt: datetime) -> int:
|
|||
return int(dt.timestamp() * 1_000_000_000)
|
||||
|
||||
|
||||
def _append_logs_path(endpoint: str) -> str:
|
||||
if endpoint.endswith("/"):
|
||||
return endpoint + "v1/logs"
|
||||
return f"{endpoint}/v1/logs"
|
||||
class _ExporterFactory:
|
||||
def __init__(self, protocol: str, endpoint: str, headers: dict[str, str]):
|
||||
self._protocol = protocol
|
||||
self._endpoint = endpoint
|
||||
self._headers = headers
|
||||
self._grpc_headers = tuple(headers.items()) if headers else None
|
||||
self._http_headers = headers or None
|
||||
|
||||
def create_trace_exporter(self) -> HTTPSpanExporter | GRPCSpanExporter:
|
||||
if self._protocol == "grpc":
|
||||
return GRPCSpanExporter(
|
||||
endpoint=self._endpoint or None,
|
||||
headers=self._grpc_headers,
|
||||
insecure=True,
|
||||
)
|
||||
trace_endpoint = f"{self._endpoint}/v1/traces" if self._endpoint else ""
|
||||
return HTTPSpanExporter(endpoint=trace_endpoint or None, headers=self._http_headers)
|
||||
|
||||
def create_log_exporter(self, logs_endpoint_override: str) -> tuple[HTTPLogExporter | GRPCLogExporter | None, str]:
|
||||
if self._protocol == "grpc":
|
||||
logs_endpoint = logs_endpoint_override or self._endpoint
|
||||
if not logs_endpoint:
|
||||
return None, "Enterprise OTEL logs enabled but endpoint is empty"
|
||||
return GRPCLogExporter(endpoint=logs_endpoint, headers=self._grpc_headers, insecure=True), ""
|
||||
|
||||
logs_endpoint = logs_endpoint_override or self._append_logs_path()
|
||||
if not logs_endpoint:
|
||||
return None, "Enterprise OTEL logs enabled but endpoint is empty"
|
||||
return HTTPLogExporter(endpoint=logs_endpoint, headers=self._http_headers), ""
|
||||
|
||||
def _append_logs_path(self) -> str:
|
||||
if not self._endpoint:
|
||||
return ""
|
||||
if self._endpoint.endswith("/"):
|
||||
return f"{self._endpoint}v1/logs"
|
||||
return f"{self._endpoint}/v1/logs"
|
||||
|
||||
|
||||
class EnterpriseLoggingHandler(LoggingHandler):
|
||||
|
|
@ -124,6 +158,7 @@ class EnterpriseExporter:
|
|||
def __init__(self, config: object) -> None:
|
||||
endpoint: str = getattr(config, "ENTERPRISE_OTLP_ENDPOINT", "")
|
||||
headers_raw: str = getattr(config, "ENTERPRISE_OTLP_HEADERS", "")
|
||||
protocol: str = (getattr(config, "ENTERPRISE_OTLP_PROTOCOL", "http") or "http").lower()
|
||||
service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify")
|
||||
sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0)
|
||||
self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True)
|
||||
|
|
@ -143,13 +178,13 @@ class EnterpriseExporter:
|
|||
self._tracer_provider = TracerProvider(resource=resource, sampler=sampler, id_generator=id_generator)
|
||||
|
||||
headers = _parse_otlp_headers(headers_raw)
|
||||
trace_endpoint = f"{endpoint}/v1/traces" if endpoint else ""
|
||||
self._tracer_provider.add_span_processor(
|
||||
BatchSpanProcessor(OTLPSpanExporter(endpoint=trace_endpoint or None, headers=headers))
|
||||
)
|
||||
factory = _ExporterFactory(protocol, endpoint, headers)
|
||||
|
||||
trace_exporter = factory.create_trace_exporter()
|
||||
self._tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
|
||||
self._tracer = self._tracer_provider.get_tracer("dify.enterprise")
|
||||
|
||||
self._init_logs_pipeline(endpoint, headers, resource)
|
||||
self._init_logs_pipeline(factory, resource)
|
||||
|
||||
meter = metrics.get_meter("dify.enterprise")
|
||||
self._counters = {
|
||||
|
|
@ -171,17 +206,17 @@ class EnterpriseExporter:
|
|||
EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"),
|
||||
}
|
||||
|
||||
def _init_logs_pipeline(self, endpoint: str, headers: dict[str, str], resource: Resource) -> None:
|
||||
def _init_logs_pipeline(self, factory: _ExporterFactory, resource: Resource) -> None:
|
||||
if not self._logs_enabled:
|
||||
return
|
||||
logs_endpoint = self._logs_endpoint or (_append_logs_path(endpoint) if endpoint else "")
|
||||
if not logs_endpoint:
|
||||
logger.warning("Enterprise OTEL logs enabled but endpoint is empty")
|
||||
|
||||
log_exporter, warning = factory.create_log_exporter(self._logs_endpoint)
|
||||
if not log_exporter:
|
||||
logger.warning(warning)
|
||||
return
|
||||
|
||||
self._log_provider = LoggerProvider(resource=resource, shutdown_on_exit=False)
|
||||
exporter = OTLPLogExporter(endpoint=logs_endpoint, headers=headers or None)
|
||||
self._log_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
|
||||
self._log_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
|
||||
self._log_handler = EnterpriseLoggingHandler(logger_provider=self._log_provider)
|
||||
|
||||
def export_span(
|
||||
|
|
|
|||
Loading…
Reference in New Issue