From 5f6e90e4d25befa879e92b46768068de1558d22a Mon Sep 17 00:00:00 2001 From: GareArc Date: Mon, 2 Feb 2026 18:16:58 -0800 Subject: [PATCH] feat(enterprise): Add OTEL logs export with span_id correlation - Add ENTERPRISE_OTEL_LOGS_ENABLED and ENTERPRISE_OTLP_LOGS_ENDPOINT config options - Implement EnterpriseLoggingHandler for log record translation with trace/span ID parsing - Add LoggerProvider and BatchLogRecordProcessor for OTLP log export - Correlate telemetry logs with spans via span_id_source parameter - Attach log handler during enterprise telemetry initialization --- api/configs/enterprise/__init__.py | 10 ++ api/enterprise/telemetry/enterprise_trace.py | 2 + api/enterprise/telemetry/exporter.py | 125 +++++++++++++++++-- api/enterprise/telemetry/telemetry_log.py | 17 +++ api/extensions/ext_enterprise_telemetry.py | 1 + 5 files changed, 142 insertions(+), 13 deletions(-) diff --git a/api/configs/enterprise/__init__.py b/api/configs/enterprise/__init__.py index fdd9fcfdf1..e4f08c9224 100644 --- a/api/configs/enterprise/__init__.py +++ b/api/configs/enterprise/__init__.py @@ -30,11 +30,21 @@ class EnterpriseTelemetryConfig(BaseSettings): default=False, ) + ENTERPRISE_OTEL_LOGS_ENABLED: bool = Field( + description="Enable enterprise OTEL log export (requires enterprise telemetry enabled).", + default=False, + ) + ENTERPRISE_OTLP_ENDPOINT: str = Field( description="Enterprise OTEL collector endpoint.", default="", ) + ENTERPRISE_OTLP_LOGS_ENDPOINT: str = Field( + description="Enterprise OTEL logs endpoint (defaults to ENTERPRISE_OTLP_ENDPOINT + /v1/logs).", + default="", + ) + ENTERPRISE_OTLP_HEADERS: str = Field( description="Auth headers for OTLP export (key=value,key2=value2).", default="", diff --git a/api/enterprise/telemetry/enterprise_trace.py b/api/enterprise/telemetry/enterprise_trace.py index fc45f0db6c..faf3219a6a 100644 --- a/api/enterprise/telemetry/enterprise_trace.py +++ b/api/enterprise/telemetry/enterprise_trace.py @@ -163,6 +163,7 @@ class EnterpriseOtelTrace: attributes=log_attrs, signal="span_detail", trace_id_source=info.workflow_run_id, + span_id_source=info.workflow_run_id, tenant_id=info.metadata.get("tenant_id"), user_id=info.metadata.get("user_id"), ) @@ -287,6 +288,7 @@ class EnterpriseOtelTrace: attributes=log_attrs, signal="span_detail", trace_id_source=info.workflow_run_id, + span_id_source=info.node_execution_id, tenant_id=info.tenant_id, user_id=info.metadata.get("user_id"), ) diff --git a/api/enterprise/telemetry/exporter.py b/api/enterprise/telemetry/exporter.py index 1ad2307ea7..f2ea54b445 100644 --- a/api/enterprise/telemetry/exporter.py +++ b/api/enterprise/telemetry/exporter.py @@ -11,16 +11,21 @@ import logging import socket import uuid from datetime import datetime -from typing import cast +from typing import Any, cast from opentelemetry import metrics, trace +from opentelemetry.context import Context +from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio from opentelemetry.semconv.resource import ResourceAttributes from opentelemetry.trace import SpanContext, TraceFlags +from opentelemetry.util.types import Attributes, AttributeValue from configs import dify_config from enterprise.telemetry.entities import EnterpriseTelemetryCounter, EnterpriseTelemetryHistogram @@ -56,6 +61,58 @@ def _datetime_to_ns(dt: datetime) -> int: return int(dt.timestamp() * 1_000_000_000) +def _append_logs_path(endpoint: str) -> str: + if endpoint.endswith("/"): + return endpoint + "v1/logs" + return f"{endpoint}/v1/logs" + + +class EnterpriseLoggingHandler(LoggingHandler): + @staticmethod + def _coerce_attribute_value(value: object) -> AttributeValue: + if isinstance(value, (str, bool, int, float)): + return value + return str(value) + + @staticmethod + def _parse_hex_id(value: object, expected_len: int) -> int | None: + if isinstance(value, int): + return value if value != 0 else None + if not isinstance(value, str): + return None + raw = value.strip().lower() + raw = raw.removeprefix("0x") + if expected_len and len(raw) != expected_len: + return None + try: + parsed = int(raw, 16) + except ValueError: + return None + return parsed if parsed != 0 else None + + @staticmethod + def _get_attributes(record: logging.LogRecord) -> Attributes: + raw_attributes = LoggingHandler._get_attributes(record) or {} + attributes: dict[str, AttributeValue] = dict(raw_attributes) + extra_attrs = attributes.pop("attributes", None) + if isinstance(extra_attrs, dict): + extra_attrs_dict = cast(dict[str, object], extra_attrs) + for key, value in extra_attrs_dict.items(): + if key not in attributes: + attributes[key] = EnterpriseLoggingHandler._coerce_attribute_value(value) + return attributes + + def _translate(self, record: logging.LogRecord): + log_record = super()._translate(record) + trace_id = self._parse_hex_id(getattr(record, "trace_id", None), 32) + span_id = self._parse_hex_id(getattr(record, "span_id", None), 16) + if trace_id is not None: + log_record.trace_id = trace_id + if span_id is not None: + log_record.span_id = span_id + return log_record + + class EnterpriseExporter: """Shared OTEL exporter for all enterprise telemetry. @@ -70,6 +127,10 @@ class EnterpriseExporter: service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify") sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0) self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True) + self._logs_enabled: bool = bool(getattr(config, "ENTERPRISE_OTEL_LOGS_ENABLED", False)) + self._logs_endpoint: str = getattr(config, "ENTERPRISE_OTLP_LOGS_ENDPOINT", "") + self._log_provider: LoggerProvider | None = None + self._log_handler: logging.Handler | None = None resource = Resource( attributes={ @@ -88,6 +149,8 @@ class EnterpriseExporter: ) self._tracer = self._tracer_provider.get_tracer("dify.enterprise") + self._init_logs_pipeline(endpoint, headers, resource) + meter = metrics.get_meter("dify.enterprise") self._counters = { EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"), @@ -108,10 +171,23 @@ class EnterpriseExporter: EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"), } + def _init_logs_pipeline(self, endpoint: str, headers: dict[str, str], resource: Resource) -> None: + if not self._logs_enabled: + return + logs_endpoint = self._logs_endpoint or (_append_logs_path(endpoint) if endpoint else "") + if not logs_endpoint: + logger.warning("Enterprise OTEL logs enabled but endpoint is empty") + return + + self._log_provider = LoggerProvider(resource=resource, shutdown_on_exit=False) + exporter = OTLPLogExporter(endpoint=logs_endpoint, headers=headers or None) + self._log_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) + self._log_handler = EnterpriseLoggingHandler(logger_provider=self._log_provider) + def export_span( self, name: str, - attributes: dict, + attributes: dict[str, Any], correlation_id: str | None = None, span_id_source: str | None = None, start_time: datetime | None = None, @@ -139,7 +215,7 @@ class EnterpriseExporter: set_span_id_source(span_id_source) try: - parent_context = None + parent_context: Context | None = None # A span is the "root" of its correlation group when span_id_source == correlation_id # (i.e. a workflow root span). All other spans are children. if parent_span_id_source: @@ -168,13 +244,15 @@ class EnterpriseExporter: ) parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context)) - span_kwargs: dict = {"context": parent_context} - if start_time is not None: - span_kwargs["start_time"] = _datetime_to_ns(start_time) - if end_time is not None: - span_kwargs["end_on_exit"] = False + span_start_time = _datetime_to_ns(start_time) if start_time is not None else None + span_end_on_exit = end_time is None - with self._tracer.start_as_current_span(name, **span_kwargs) as span: + with self._tracer.start_as_current_span( + name, + context=parent_context, + start_time=span_start_time, + end_on_exit=span_end_on_exit, + ) as span: for key, value in attributes.items(): if value is not None: span.set_attribute(key, value) @@ -186,15 +264,36 @@ class EnterpriseExporter: set_correlation_id(None) set_span_id_source(None) - def increment_counter(self, name: EnterpriseTelemetryCounter, value: int, labels: dict) -> None: + def increment_counter( + self, name: EnterpriseTelemetryCounter, value: int, labels: dict[str, AttributeValue] + ) -> None: counter = self._counters.get(name) if counter: - counter.add(value, labels) + counter.add(value, cast(Attributes, labels)) - def record_histogram(self, name: EnterpriseTelemetryHistogram, value: float, labels: dict) -> None: + def record_histogram( + self, name: EnterpriseTelemetryHistogram, value: float, labels: dict[str, AttributeValue] + ) -> None: histogram = self._histograms.get(name) if histogram: - histogram.record(value, labels) + histogram.record(value, cast(Attributes, labels)) def shutdown(self) -> None: self._tracer_provider.shutdown() + if self._log_provider: + self._log_provider.shutdown() + + def attach_log_handler(self) -> None: + if not self._log_handler: + return + root_logger = logging.getLogger() + if self._log_handler in root_logger.handlers: + return + try: + from core.logging.filters import IdentityContextFilter, TraceContextFilter + + self._log_handler.addFilter(TraceContextFilter()) + self._log_handler.addFilter(IdentityContextFilter()) + except Exception: + logger.exception("Failed to attach log filters to enterprise handler") + root_logger.addHandler(self._log_handler) diff --git a/api/enterprise/telemetry/telemetry_log.py b/api/enterprise/telemetry/telemetry_log.py index aa44ad59b9..513f6e18e1 100644 --- a/api/enterprise/telemetry/telemetry_log.py +++ b/api/enterprise/telemetry/telemetry_log.py @@ -26,12 +26,24 @@ def compute_trace_id_hex(uuid_str: str | None) -> str: return "" +def compute_span_id_hex(uuid_str: str | None) -> str: + if not uuid_str: + return "" + try: + from enterprise.telemetry.id_generator import compute_deterministic_span_id + + return f"{compute_deterministic_span_id(uuid_str):016x}" + except (ValueError, AttributeError): + return "" + + def emit_telemetry_log( *, event_name: str, attributes: dict[str, Any], signal: str = "metric_only", trace_id_source: str | None = None, + span_id_source: str | None = None, tenant_id: str | None = None, user_id: str | None = None, ) -> None: @@ -65,6 +77,9 @@ def emit_telemetry_log( trace_id_hex = compute_trace_id_hex(trace_id_source) if trace_id_hex: extra["trace_id"] = trace_id_hex + span_id_hex = compute_span_id_hex(span_id_source) + if span_id_hex: + extra["span_id"] = span_id_hex if tenant_id: extra["tenant_id"] = tenant_id if user_id: @@ -78,6 +93,7 @@ def emit_metric_only_event( event_name: str, attributes: dict[str, Any], trace_id_source: str | None = None, + span_id_source: str | None = None, tenant_id: str | None = None, user_id: str | None = None, ) -> None: @@ -86,6 +102,7 @@ def emit_metric_only_event( attributes=attributes, signal="metric_only", trace_id_source=trace_id_source, + span_id_source=span_id_source, tenant_id=tenant_id, user_id=user_id, ) diff --git a/api/extensions/ext_enterprise_telemetry.py b/api/extensions/ext_enterprise_telemetry.py index f2c68c479c..dd33f5e377 100644 --- a/api/extensions/ext_enterprise_telemetry.py +++ b/api/extensions/ext_enterprise_telemetry.py @@ -37,6 +37,7 @@ def init_app(app: DifyApp) -> None: _exporter = EnterpriseExporter(dify_config) atexit.register(_exporter.shutdown) + _exporter.attach_log_handler() # Import to trigger @signal.connect decorator registration import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport]