Merge branch '1.11.4-otel-telemetry-ee' into deploy/enterprise

This commit is contained in:
GareArc 2026-02-02 18:21:18 -08:00
commit 87b65f9cf6
No known key found for this signature in database
5 changed files with 142 additions and 13 deletions

View File

@ -30,11 +30,21 @@ class EnterpriseTelemetryConfig(BaseSettings):
default=False,
)
ENTERPRISE_OTEL_LOGS_ENABLED: bool = Field(
description="Enable enterprise OTEL log export (requires enterprise telemetry enabled).",
default=False,
)
ENTERPRISE_OTLP_ENDPOINT: str = Field(
description="Enterprise OTEL collector endpoint.",
default="",
)
ENTERPRISE_OTLP_LOGS_ENDPOINT: str = Field(
description="Enterprise OTEL logs endpoint (defaults to ENTERPRISE_OTLP_ENDPOINT + /v1/logs).",
default="",
)
ENTERPRISE_OTLP_HEADERS: str = Field(
description="Auth headers for OTLP export (key=value,key2=value2).",
default="",

View File

@ -163,6 +163,7 @@ class EnterpriseOtelTrace:
attributes=log_attrs,
signal="span_detail",
trace_id_source=info.workflow_run_id,
span_id_source=info.workflow_run_id,
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
@ -287,6 +288,7 @@ class EnterpriseOtelTrace:
attributes=log_attrs,
signal="span_detail",
trace_id_source=info.workflow_run_id,
span_id_source=info.node_execution_id,
tenant_id=info.tenant_id,
user_id=info.metadata.get("user_id"),
)

View File

@ -11,16 +11,21 @@ import logging
import socket
import uuid
from datetime import datetime
from typing import cast
from typing import Any, cast
from opentelemetry import metrics, trace
from opentelemetry.context import Context
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import SpanContext, TraceFlags
from opentelemetry.util.types import Attributes, AttributeValue
from configs import dify_config
from enterprise.telemetry.entities import EnterpriseTelemetryCounter, EnterpriseTelemetryHistogram
@ -56,6 +61,58 @@ def _datetime_to_ns(dt: datetime) -> int:
return int(dt.timestamp() * 1_000_000_000)
def _append_logs_path(endpoint: str) -> str:
if endpoint.endswith("/"):
return endpoint + "v1/logs"
return f"{endpoint}/v1/logs"
class EnterpriseLoggingHandler(LoggingHandler):
@staticmethod
def _coerce_attribute_value(value: object) -> AttributeValue:
if isinstance(value, (str, bool, int, float)):
return value
return str(value)
@staticmethod
def _parse_hex_id(value: object, expected_len: int) -> int | None:
if isinstance(value, int):
return value if value != 0 else None
if not isinstance(value, str):
return None
raw = value.strip().lower()
raw = raw.removeprefix("0x")
if expected_len and len(raw) != expected_len:
return None
try:
parsed = int(raw, 16)
except ValueError:
return None
return parsed if parsed != 0 else None
@staticmethod
def _get_attributes(record: logging.LogRecord) -> Attributes:
raw_attributes = LoggingHandler._get_attributes(record) or {}
attributes: dict[str, AttributeValue] = dict(raw_attributes)
extra_attrs = attributes.pop("attributes", None)
if isinstance(extra_attrs, dict):
extra_attrs_dict = cast(dict[str, object], extra_attrs)
for key, value in extra_attrs_dict.items():
if key not in attributes:
attributes[key] = EnterpriseLoggingHandler._coerce_attribute_value(value)
return attributes
def _translate(self, record: logging.LogRecord):
log_record = super()._translate(record)
trace_id = self._parse_hex_id(getattr(record, "trace_id", None), 32)
span_id = self._parse_hex_id(getattr(record, "span_id", None), 16)
if trace_id is not None:
log_record.trace_id = trace_id
if span_id is not None:
log_record.span_id = span_id
return log_record
class EnterpriseExporter:
"""Shared OTEL exporter for all enterprise telemetry.
@ -70,6 +127,10 @@ class EnterpriseExporter:
service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify")
sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0)
self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True)
self._logs_enabled: bool = bool(getattr(config, "ENTERPRISE_OTEL_LOGS_ENABLED", False))
self._logs_endpoint: str = getattr(config, "ENTERPRISE_OTLP_LOGS_ENDPOINT", "")
self._log_provider: LoggerProvider | None = None
self._log_handler: logging.Handler | None = None
resource = Resource(
attributes={
@ -88,6 +149,8 @@ class EnterpriseExporter:
)
self._tracer = self._tracer_provider.get_tracer("dify.enterprise")
self._init_logs_pipeline(endpoint, headers, resource)
meter = metrics.get_meter("dify.enterprise")
self._counters = {
EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"),
@ -108,10 +171,23 @@ class EnterpriseExporter:
EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"),
}
def _init_logs_pipeline(self, endpoint: str, headers: dict[str, str], resource: Resource) -> None:
if not self._logs_enabled:
return
logs_endpoint = self._logs_endpoint or (_append_logs_path(endpoint) if endpoint else "")
if not logs_endpoint:
logger.warning("Enterprise OTEL logs enabled but endpoint is empty")
return
self._log_provider = LoggerProvider(resource=resource, shutdown_on_exit=False)
exporter = OTLPLogExporter(endpoint=logs_endpoint, headers=headers or None)
self._log_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
self._log_handler = EnterpriseLoggingHandler(logger_provider=self._log_provider)
def export_span(
self,
name: str,
attributes: dict,
attributes: dict[str, Any],
correlation_id: str | None = None,
span_id_source: str | None = None,
start_time: datetime | None = None,
@ -139,7 +215,7 @@ class EnterpriseExporter:
set_span_id_source(span_id_source)
try:
parent_context = None
parent_context: Context | None = None
# A span is the "root" of its correlation group when span_id_source == correlation_id
# (i.e. a workflow root span). All other spans are children.
if parent_span_id_source:
@ -168,13 +244,15 @@ class EnterpriseExporter:
)
parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context))
span_kwargs: dict = {"context": parent_context}
if start_time is not None:
span_kwargs["start_time"] = _datetime_to_ns(start_time)
if end_time is not None:
span_kwargs["end_on_exit"] = False
span_start_time = _datetime_to_ns(start_time) if start_time is not None else None
span_end_on_exit = end_time is None
with self._tracer.start_as_current_span(name, **span_kwargs) as span:
with self._tracer.start_as_current_span(
name,
context=parent_context,
start_time=span_start_time,
end_on_exit=span_end_on_exit,
) as span:
for key, value in attributes.items():
if value is not None:
span.set_attribute(key, value)
@ -186,15 +264,36 @@ class EnterpriseExporter:
set_correlation_id(None)
set_span_id_source(None)
def increment_counter(self, name: EnterpriseTelemetryCounter, value: int, labels: dict) -> None:
def increment_counter(
self, name: EnterpriseTelemetryCounter, value: int, labels: dict[str, AttributeValue]
) -> None:
counter = self._counters.get(name)
if counter:
counter.add(value, labels)
counter.add(value, cast(Attributes, labels))
def record_histogram(self, name: EnterpriseTelemetryHistogram, value: float, labels: dict) -> None:
def record_histogram(
self, name: EnterpriseTelemetryHistogram, value: float, labels: dict[str, AttributeValue]
) -> None:
histogram = self._histograms.get(name)
if histogram:
histogram.record(value, labels)
histogram.record(value, cast(Attributes, labels))
def shutdown(self) -> None:
self._tracer_provider.shutdown()
if self._log_provider:
self._log_provider.shutdown()
def attach_log_handler(self) -> None:
if not self._log_handler:
return
root_logger = logging.getLogger()
if self._log_handler in root_logger.handlers:
return
try:
from core.logging.filters import IdentityContextFilter, TraceContextFilter
self._log_handler.addFilter(TraceContextFilter())
self._log_handler.addFilter(IdentityContextFilter())
except Exception:
logger.exception("Failed to attach log filters to enterprise handler")
root_logger.addHandler(self._log_handler)

View File

@ -26,12 +26,24 @@ def compute_trace_id_hex(uuid_str: str | None) -> str:
return ""
def compute_span_id_hex(uuid_str: str | None) -> str:
if not uuid_str:
return ""
try:
from enterprise.telemetry.id_generator import compute_deterministic_span_id
return f"{compute_deterministic_span_id(uuid_str):016x}"
except (ValueError, AttributeError):
return ""
def emit_telemetry_log(
*,
event_name: str,
attributes: dict[str, Any],
signal: str = "metric_only",
trace_id_source: str | None = None,
span_id_source: str | None = None,
tenant_id: str | None = None,
user_id: str | None = None,
) -> None:
@ -65,6 +77,9 @@ def emit_telemetry_log(
trace_id_hex = compute_trace_id_hex(trace_id_source)
if trace_id_hex:
extra["trace_id"] = trace_id_hex
span_id_hex = compute_span_id_hex(span_id_source)
if span_id_hex:
extra["span_id"] = span_id_hex
if tenant_id:
extra["tenant_id"] = tenant_id
if user_id:
@ -78,6 +93,7 @@ def emit_metric_only_event(
event_name: str,
attributes: dict[str, Any],
trace_id_source: str | None = None,
span_id_source: str | None = None,
tenant_id: str | None = None,
user_id: str | None = None,
) -> None:
@ -86,6 +102,7 @@ def emit_metric_only_event(
attributes=attributes,
signal="metric_only",
trace_id_source=trace_id_source,
span_id_source=span_id_source,
tenant_id=tenant_id,
user_id=user_id,
)

View File

@ -37,6 +37,7 @@ def init_app(app: DifyApp) -> None:
_exporter = EnterpriseExporter(dify_config)
atexit.register(_exporter.shutdown)
_exporter.attach_log_handler()
# Import to trigger @signal.connect decorator registration
import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport]