From 956ed5a48ca98ddae4772badc69df6787301bf88 Mon Sep 17 00:00:00 2001 From: GareArc Date: Mon, 2 Feb 2026 23:28:39 -0800 Subject: [PATCH] fix(enterprise): Remove OTEL log export --- api/configs/enterprise/__init__.py | 10 -- api/enterprise/telemetry/exporter.py | 113 +-------------------- api/extensions/ext_enterprise_telemetry.py | 1 - 3 files changed, 3 insertions(+), 121 deletions(-) diff --git a/api/configs/enterprise/__init__.py b/api/configs/enterprise/__init__.py index eb9dfe69b4..4920eeba07 100644 --- a/api/configs/enterprise/__init__.py +++ b/api/configs/enterprise/__init__.py @@ -30,21 +30,11 @@ class EnterpriseTelemetryConfig(BaseSettings): default=False, ) - ENTERPRISE_OTEL_LOGS_ENABLED: bool = Field( - description="Enable enterprise OTEL log export (requires enterprise telemetry enabled).", - default=False, - ) - ENTERPRISE_OTLP_ENDPOINT: str = Field( description="Enterprise OTEL collector endpoint.", default="", ) - ENTERPRISE_OTLP_LOGS_ENDPOINT: str = Field( - description="Enterprise OTEL logs endpoint (defaults to ENTERPRISE_OTLP_ENDPOINT + /v1/logs).", - default="", - ) - ENTERPRISE_OTLP_HEADERS: str = Field( description="Auth headers for OTLP export (key=value,key2=value2).", default="", diff --git a/api/enterprise/telemetry/exporter.py b/api/enterprise/telemetry/exporter.py index 423ae20d69..a2728fb7ba 100644 --- a/api/enterprise/telemetry/exporter.py +++ b/api/enterprise/telemetry/exporter.py @@ -1,6 +1,6 @@ """Enterprise OTEL exporter — shared by EnterpriseOtelTrace, event handlers, and direct instrumentation. -Uses dedicated TracerProvider, LoggerProvider, and MeterProvider instances (configurable sampling, +Uses dedicated TracerProvider and MeterProvider instances (configurable sampling, independent from ext_otel.py infrastructure). Initialized once during Flask extension init (single-threaded via ext_enterprise_telemetry.py). @@ -15,14 +15,10 @@ from typing import Any, cast from opentelemetry import trace from opentelemetry.context import Context -from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter as GRPCLogExporter from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GRPCMetricExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter as HTTPLogExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HTTPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPSpanExporter -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -85,18 +81,6 @@ class _ExporterFactory: trace_endpoint = f"{self._endpoint}/v1/traces" if self._endpoint else "" return HTTPSpanExporter(endpoint=trace_endpoint or None, headers=self._http_headers) - def create_log_exporter(self, logs_endpoint_override: str) -> tuple[HTTPLogExporter | GRPCLogExporter | None, str]: - if self._protocol == "grpc": - logs_endpoint = logs_endpoint_override or self._endpoint - if not logs_endpoint: - return None, "Enterprise OTEL logs enabled but endpoint is empty" - return GRPCLogExporter(endpoint=logs_endpoint, headers=self._grpc_headers, insecure=True), "" - - logs_endpoint = logs_endpoint_override or self._append_logs_path() - if not logs_endpoint: - return None, "Enterprise OTEL logs enabled but endpoint is empty" - return HTTPLogExporter(endpoint=logs_endpoint, headers=self._http_headers), "" - def create_metric_exporter(self) -> HTTPMetricExporter | GRPCMetricExporter: if self._protocol == "grpc": return GRPCMetricExporter( @@ -107,59 +91,6 @@ class _ExporterFactory: metric_endpoint = f"{self._endpoint}/v1/metrics" if self._endpoint else "" return HTTPMetricExporter(endpoint=metric_endpoint or None, headers=self._http_headers) - def _append_logs_path(self) -> str: - if not self._endpoint: - return "" - if self._endpoint.endswith("/"): - return f"{self._endpoint}v1/logs" - return f"{self._endpoint}/v1/logs" - - -class EnterpriseLoggingHandler(LoggingHandler): - @staticmethod - def _coerce_attribute_value(value: object) -> AttributeValue: - if isinstance(value, (str, bool, int, float)): - return value - return str(value) - - @staticmethod - def _parse_hex_id(value: object, expected_len: int) -> int | None: - if isinstance(value, int): - return value if value != 0 else None - if not isinstance(value, str): - return None - raw = value.strip().lower() - raw = raw.removeprefix("0x") - if expected_len and len(raw) != expected_len: - return None - try: - parsed = int(raw, 16) - except ValueError: - return None - return parsed if parsed != 0 else None - - @staticmethod - def _get_attributes(record: logging.LogRecord) -> Attributes: - raw_attributes = LoggingHandler._get_attributes(record) or {} - attributes: dict[str, AttributeValue] = dict(raw_attributes) - extra_attrs = attributes.pop("attributes", None) - if isinstance(extra_attrs, dict): - extra_attrs_dict = cast(dict[str, object], extra_attrs) - for key, value in extra_attrs_dict.items(): - if key not in attributes: - attributes[key] = EnterpriseLoggingHandler._coerce_attribute_value(value) - return attributes - - def _translate(self, record: logging.LogRecord): - log_record = super()._translate(record) - trace_id = self._parse_hex_id(getattr(record, "trace_id", None), 32) - span_id = self._parse_hex_id(getattr(record, "span_id", None), 16) - if trace_id is not None: - log_record.trace_id = trace_id - if span_id is not None: - log_record.span_id = span_id - return log_record - class EnterpriseExporter: """Shared OTEL exporter for all enterprise telemetry. @@ -176,10 +107,6 @@ class EnterpriseExporter: service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify") sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0) self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True) - self._logs_enabled: bool = bool(getattr(config, "ENTERPRISE_OTEL_LOGS_ENABLED", False)) - self._logs_endpoint: str = getattr(config, "ENTERPRISE_OTLP_LOGS_ENDPOINT", "") - self._log_provider: LoggerProvider | None = None - self._log_handler: logging.Handler | None = None resource = Resource( attributes={ @@ -198,8 +125,6 @@ class EnterpriseExporter: self._tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter)) self._tracer = self._tracer_provider.get_tracer("dify.enterprise") - self._init_logs_pipeline(factory, resource) - metric_exporter = factory.create_metric_exporter() self._meter_provider = MeterProvider( resource=resource, @@ -225,19 +150,6 @@ class EnterpriseExporter: EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"), } - def _init_logs_pipeline(self, factory: _ExporterFactory, resource: Resource) -> None: - if not self._logs_enabled: - return - - log_exporter, warning = factory.create_log_exporter(self._logs_endpoint) - if not log_exporter: - logger.warning(warning) - return - - self._log_provider = LoggerProvider(resource=resource, shutdown_on_exit=False) - self._log_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) - self._log_handler = EnterpriseLoggingHandler(logger_provider=self._log_provider) - def export_span( self, name: str, @@ -275,9 +187,7 @@ class EnterpriseExporter: if parent_span_id_source: # Cross-workflow linking: parent is an explicit span (e.g. tool node in outer workflow) parent_span_id = compute_deterministic_span_id(parent_span_id_source) - parent_trace_id = ( - cast(int, uuid.UUID(effective_trace_correlation).int) if effective_trace_correlation else 0 - ) + parent_trace_id = int(uuid.UUID(effective_trace_correlation)) if effective_trace_correlation else 0 if parent_trace_id: parent_span_context = SpanContext( trace_id=parent_trace_id, @@ -289,7 +199,7 @@ class EnterpriseExporter: elif correlation_id and correlation_id != span_id_source: # Child span: parent is the correlation-group root (workflow root span) parent_span_id = compute_deterministic_span_id(correlation_id) - parent_trace_id = cast(int, uuid.UUID(effective_trace_correlation or correlation_id).int) + parent_trace_id = int(uuid.UUID(effective_trace_correlation or correlation_id)) parent_span_context = SpanContext( trace_id=parent_trace_id, span_id=parent_span_id, @@ -334,21 +244,4 @@ class EnterpriseExporter: def shutdown(self) -> None: self._tracer_provider.shutdown() - if self._log_provider: - self._log_provider.shutdown() self._meter_provider.shutdown() - - def attach_log_handler(self) -> None: - if not self._log_handler: - return - telemetry_logger = logging.getLogger("dify.telemetry") - if self._log_handler in telemetry_logger.handlers: - return - try: - from core.logging.filters import IdentityContextFilter, TraceContextFilter - - self._log_handler.addFilter(TraceContextFilter()) - self._log_handler.addFilter(IdentityContextFilter()) - except Exception: - logger.exception("Failed to attach log filters to enterprise handler") - telemetry_logger.addHandler(self._log_handler) diff --git a/api/extensions/ext_enterprise_telemetry.py b/api/extensions/ext_enterprise_telemetry.py index dd33f5e377..f2c68c479c 100644 --- a/api/extensions/ext_enterprise_telemetry.py +++ b/api/extensions/ext_enterprise_telemetry.py @@ -37,7 +37,6 @@ def init_app(app: DifyApp) -> None: _exporter = EnterpriseExporter(dify_config) atexit.register(_exporter.shutdown) - _exporter.attach_log_handler() # Import to trigger @signal.connect decorator registration import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport]