mirror of
https://github.com/langgenius/dify.git
synced 2026-04-28 20:17:29 +08:00
feat(enterprise): Add OTEL logs export with span_id correlation
- Add ENTERPRISE_OTEL_LOGS_ENABLED and ENTERPRISE_OTLP_LOGS_ENDPOINT config options - Implement EnterpriseLoggingHandler for log record translation with trace/span ID parsing - Add LoggerProvider and BatchLogRecordProcessor for OTLP log export - Correlate telemetry logs with spans via span_id_source parameter - Attach log handler during enterprise telemetry initialization
This commit is contained in:
parent
3461c3a8ef
commit
4d7ab24eb1
@ -30,11 +30,21 @@ class EnterpriseTelemetryConfig(BaseSettings):
|
|||||||
default=False,
|
default=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ENTERPRISE_OTEL_LOGS_ENABLED: bool = Field(
|
||||||
|
description="Enable enterprise OTEL log export (requires enterprise telemetry enabled).",
|
||||||
|
default=False,
|
||||||
|
)
|
||||||
|
|
||||||
ENTERPRISE_OTLP_ENDPOINT: str = Field(
|
ENTERPRISE_OTLP_ENDPOINT: str = Field(
|
||||||
description="Enterprise OTEL collector endpoint.",
|
description="Enterprise OTEL collector endpoint.",
|
||||||
default="",
|
default="",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ENTERPRISE_OTLP_LOGS_ENDPOINT: str = Field(
|
||||||
|
description="Enterprise OTEL logs endpoint (defaults to ENTERPRISE_OTLP_ENDPOINT + /v1/logs).",
|
||||||
|
default="",
|
||||||
|
)
|
||||||
|
|
||||||
ENTERPRISE_OTLP_HEADERS: str = Field(
|
ENTERPRISE_OTLP_HEADERS: str = Field(
|
||||||
description="Auth headers for OTLP export (key=value,key2=value2).",
|
description="Auth headers for OTLP export (key=value,key2=value2).",
|
||||||
default="",
|
default="",
|
||||||
|
|||||||
@ -163,6 +163,7 @@ class EnterpriseOtelTrace:
|
|||||||
attributes=log_attrs,
|
attributes=log_attrs,
|
||||||
signal="span_detail",
|
signal="span_detail",
|
||||||
trace_id_source=info.workflow_run_id,
|
trace_id_source=info.workflow_run_id,
|
||||||
|
span_id_source=info.workflow_run_id,
|
||||||
tenant_id=info.metadata.get("tenant_id"),
|
tenant_id=info.metadata.get("tenant_id"),
|
||||||
user_id=info.metadata.get("user_id"),
|
user_id=info.metadata.get("user_id"),
|
||||||
)
|
)
|
||||||
@ -287,6 +288,7 @@ class EnterpriseOtelTrace:
|
|||||||
attributes=log_attrs,
|
attributes=log_attrs,
|
||||||
signal="span_detail",
|
signal="span_detail",
|
||||||
trace_id_source=info.workflow_run_id,
|
trace_id_source=info.workflow_run_id,
|
||||||
|
span_id_source=info.node_execution_id,
|
||||||
tenant_id=info.tenant_id,
|
tenant_id=info.tenant_id,
|
||||||
user_id=info.metadata.get("user_id"),
|
user_id=info.metadata.get("user_id"),
|
||||||
)
|
)
|
||||||
|
|||||||
@ -11,16 +11,21 @@ import logging
|
|||||||
import socket
|
import socket
|
||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import cast
|
from typing import Any, cast
|
||||||
|
|
||||||
from opentelemetry import metrics, trace
|
from opentelemetry import metrics, trace
|
||||||
|
from opentelemetry.context import Context
|
||||||
|
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
|
||||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||||
|
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
|
||||||
|
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
|
||||||
from opentelemetry.sdk.resources import Resource
|
from opentelemetry.sdk.resources import Resource
|
||||||
from opentelemetry.sdk.trace import TracerProvider
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
|
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
|
||||||
from opentelemetry.semconv.resource import ResourceAttributes
|
from opentelemetry.semconv.resource import ResourceAttributes
|
||||||
from opentelemetry.trace import SpanContext, TraceFlags
|
from opentelemetry.trace import SpanContext, TraceFlags
|
||||||
|
from opentelemetry.util.types import Attributes, AttributeValue
|
||||||
|
|
||||||
from configs import dify_config
|
from configs import dify_config
|
||||||
from enterprise.telemetry.entities import EnterpriseTelemetryCounter, EnterpriseTelemetryHistogram
|
from enterprise.telemetry.entities import EnterpriseTelemetryCounter, EnterpriseTelemetryHistogram
|
||||||
@ -56,6 +61,58 @@ def _datetime_to_ns(dt: datetime) -> int:
|
|||||||
return int(dt.timestamp() * 1_000_000_000)
|
return int(dt.timestamp() * 1_000_000_000)
|
||||||
|
|
||||||
|
|
||||||
|
def _append_logs_path(endpoint: str) -> str:
|
||||||
|
if endpoint.endswith("/"):
|
||||||
|
return endpoint + "v1/logs"
|
||||||
|
return f"{endpoint}/v1/logs"
|
||||||
|
|
||||||
|
|
||||||
|
class EnterpriseLoggingHandler(LoggingHandler):
|
||||||
|
@staticmethod
|
||||||
|
def _coerce_attribute_value(value: object) -> AttributeValue:
|
||||||
|
if isinstance(value, (str, bool, int, float)):
|
||||||
|
return value
|
||||||
|
return str(value)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_hex_id(value: object, expected_len: int) -> int | None:
|
||||||
|
if isinstance(value, int):
|
||||||
|
return value if value != 0 else None
|
||||||
|
if not isinstance(value, str):
|
||||||
|
return None
|
||||||
|
raw = value.strip().lower()
|
||||||
|
raw = raw.removeprefix("0x")
|
||||||
|
if expected_len and len(raw) != expected_len:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
parsed = int(raw, 16)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
return parsed if parsed != 0 else None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_attributes(record: logging.LogRecord) -> Attributes:
|
||||||
|
raw_attributes = LoggingHandler._get_attributes(record) or {}
|
||||||
|
attributes: dict[str, AttributeValue] = dict(raw_attributes)
|
||||||
|
extra_attrs = attributes.pop("attributes", None)
|
||||||
|
if isinstance(extra_attrs, dict):
|
||||||
|
extra_attrs_dict = cast(dict[str, object], extra_attrs)
|
||||||
|
for key, value in extra_attrs_dict.items():
|
||||||
|
if key not in attributes:
|
||||||
|
attributes[key] = EnterpriseLoggingHandler._coerce_attribute_value(value)
|
||||||
|
return attributes
|
||||||
|
|
||||||
|
def _translate(self, record: logging.LogRecord):
|
||||||
|
log_record = super()._translate(record)
|
||||||
|
trace_id = self._parse_hex_id(getattr(record, "trace_id", None), 32)
|
||||||
|
span_id = self._parse_hex_id(getattr(record, "span_id", None), 16)
|
||||||
|
if trace_id is not None:
|
||||||
|
log_record.trace_id = trace_id
|
||||||
|
if span_id is not None:
|
||||||
|
log_record.span_id = span_id
|
||||||
|
return log_record
|
||||||
|
|
||||||
|
|
||||||
class EnterpriseExporter:
|
class EnterpriseExporter:
|
||||||
"""Shared OTEL exporter for all enterprise telemetry.
|
"""Shared OTEL exporter for all enterprise telemetry.
|
||||||
|
|
||||||
@ -70,6 +127,10 @@ class EnterpriseExporter:
|
|||||||
service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify")
|
service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify")
|
||||||
sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0)
|
sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0)
|
||||||
self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True)
|
self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True)
|
||||||
|
self._logs_enabled: bool = bool(getattr(config, "ENTERPRISE_OTEL_LOGS_ENABLED", False))
|
||||||
|
self._logs_endpoint: str = getattr(config, "ENTERPRISE_OTLP_LOGS_ENDPOINT", "")
|
||||||
|
self._log_provider: LoggerProvider | None = None
|
||||||
|
self._log_handler: logging.Handler | None = None
|
||||||
|
|
||||||
resource = Resource(
|
resource = Resource(
|
||||||
attributes={
|
attributes={
|
||||||
@ -88,6 +149,8 @@ class EnterpriseExporter:
|
|||||||
)
|
)
|
||||||
self._tracer = self._tracer_provider.get_tracer("dify.enterprise")
|
self._tracer = self._tracer_provider.get_tracer("dify.enterprise")
|
||||||
|
|
||||||
|
self._init_logs_pipeline(endpoint, headers, resource)
|
||||||
|
|
||||||
meter = metrics.get_meter("dify.enterprise")
|
meter = metrics.get_meter("dify.enterprise")
|
||||||
self._counters = {
|
self._counters = {
|
||||||
EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"),
|
EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"),
|
||||||
@ -108,10 +171,23 @@ class EnterpriseExporter:
|
|||||||
EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"),
|
EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def _init_logs_pipeline(self, endpoint: str, headers: dict[str, str], resource: Resource) -> None:
|
||||||
|
if not self._logs_enabled:
|
||||||
|
return
|
||||||
|
logs_endpoint = self._logs_endpoint or (_append_logs_path(endpoint) if endpoint else "")
|
||||||
|
if not logs_endpoint:
|
||||||
|
logger.warning("Enterprise OTEL logs enabled but endpoint is empty")
|
||||||
|
return
|
||||||
|
|
||||||
|
self._log_provider = LoggerProvider(resource=resource, shutdown_on_exit=False)
|
||||||
|
exporter = OTLPLogExporter(endpoint=logs_endpoint, headers=headers or None)
|
||||||
|
self._log_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
|
||||||
|
self._log_handler = EnterpriseLoggingHandler(logger_provider=self._log_provider)
|
||||||
|
|
||||||
def export_span(
|
def export_span(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
attributes: dict,
|
attributes: dict[str, Any],
|
||||||
correlation_id: str | None = None,
|
correlation_id: str | None = None,
|
||||||
span_id_source: str | None = None,
|
span_id_source: str | None = None,
|
||||||
start_time: datetime | None = None,
|
start_time: datetime | None = None,
|
||||||
@ -139,7 +215,7 @@ class EnterpriseExporter:
|
|||||||
set_span_id_source(span_id_source)
|
set_span_id_source(span_id_source)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
parent_context = None
|
parent_context: Context | None = None
|
||||||
# A span is the "root" of its correlation group when span_id_source == correlation_id
|
# A span is the "root" of its correlation group when span_id_source == correlation_id
|
||||||
# (i.e. a workflow root span). All other spans are children.
|
# (i.e. a workflow root span). All other spans are children.
|
||||||
if parent_span_id_source:
|
if parent_span_id_source:
|
||||||
@ -168,13 +244,15 @@ class EnterpriseExporter:
|
|||||||
)
|
)
|
||||||
parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context))
|
parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context))
|
||||||
|
|
||||||
span_kwargs: dict = {"context": parent_context}
|
span_start_time = _datetime_to_ns(start_time) if start_time is not None else None
|
||||||
if start_time is not None:
|
span_end_on_exit = end_time is None
|
||||||
span_kwargs["start_time"] = _datetime_to_ns(start_time)
|
|
||||||
if end_time is not None:
|
|
||||||
span_kwargs["end_on_exit"] = False
|
|
||||||
|
|
||||||
with self._tracer.start_as_current_span(name, **span_kwargs) as span:
|
with self._tracer.start_as_current_span(
|
||||||
|
name,
|
||||||
|
context=parent_context,
|
||||||
|
start_time=span_start_time,
|
||||||
|
end_on_exit=span_end_on_exit,
|
||||||
|
) as span:
|
||||||
for key, value in attributes.items():
|
for key, value in attributes.items():
|
||||||
if value is not None:
|
if value is not None:
|
||||||
span.set_attribute(key, value)
|
span.set_attribute(key, value)
|
||||||
@ -186,15 +264,36 @@ class EnterpriseExporter:
|
|||||||
set_correlation_id(None)
|
set_correlation_id(None)
|
||||||
set_span_id_source(None)
|
set_span_id_source(None)
|
||||||
|
|
||||||
def increment_counter(self, name: EnterpriseTelemetryCounter, value: int, labels: dict) -> None:
|
def increment_counter(
|
||||||
|
self, name: EnterpriseTelemetryCounter, value: int, labels: dict[str, AttributeValue]
|
||||||
|
) -> None:
|
||||||
counter = self._counters.get(name)
|
counter = self._counters.get(name)
|
||||||
if counter:
|
if counter:
|
||||||
counter.add(value, labels)
|
counter.add(value, cast(Attributes, labels))
|
||||||
|
|
||||||
def record_histogram(self, name: EnterpriseTelemetryHistogram, value: float, labels: dict) -> None:
|
def record_histogram(
|
||||||
|
self, name: EnterpriseTelemetryHistogram, value: float, labels: dict[str, AttributeValue]
|
||||||
|
) -> None:
|
||||||
histogram = self._histograms.get(name)
|
histogram = self._histograms.get(name)
|
||||||
if histogram:
|
if histogram:
|
||||||
histogram.record(value, labels)
|
histogram.record(value, cast(Attributes, labels))
|
||||||
|
|
||||||
def shutdown(self) -> None:
|
def shutdown(self) -> None:
|
||||||
self._tracer_provider.shutdown()
|
self._tracer_provider.shutdown()
|
||||||
|
if self._log_provider:
|
||||||
|
self._log_provider.shutdown()
|
||||||
|
|
||||||
|
def attach_log_handler(self) -> None:
|
||||||
|
if not self._log_handler:
|
||||||
|
return
|
||||||
|
root_logger = logging.getLogger()
|
||||||
|
if self._log_handler in root_logger.handlers:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
from core.logging.filters import IdentityContextFilter, TraceContextFilter
|
||||||
|
|
||||||
|
self._log_handler.addFilter(TraceContextFilter())
|
||||||
|
self._log_handler.addFilter(IdentityContextFilter())
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to attach log filters to enterprise handler")
|
||||||
|
root_logger.addHandler(self._log_handler)
|
||||||
|
|||||||
@ -26,12 +26,24 @@ def compute_trace_id_hex(uuid_str: str | None) -> str:
|
|||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def compute_span_id_hex(uuid_str: str | None) -> str:
|
||||||
|
if not uuid_str:
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
from enterprise.telemetry.id_generator import compute_deterministic_span_id
|
||||||
|
|
||||||
|
return f"{compute_deterministic_span_id(uuid_str):016x}"
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
def emit_telemetry_log(
|
def emit_telemetry_log(
|
||||||
*,
|
*,
|
||||||
event_name: str,
|
event_name: str,
|
||||||
attributes: dict[str, Any],
|
attributes: dict[str, Any],
|
||||||
signal: str = "metric_only",
|
signal: str = "metric_only",
|
||||||
trace_id_source: str | None = None,
|
trace_id_source: str | None = None,
|
||||||
|
span_id_source: str | None = None,
|
||||||
tenant_id: str | None = None,
|
tenant_id: str | None = None,
|
||||||
user_id: str | None = None,
|
user_id: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -65,6 +77,9 @@ def emit_telemetry_log(
|
|||||||
trace_id_hex = compute_trace_id_hex(trace_id_source)
|
trace_id_hex = compute_trace_id_hex(trace_id_source)
|
||||||
if trace_id_hex:
|
if trace_id_hex:
|
||||||
extra["trace_id"] = trace_id_hex
|
extra["trace_id"] = trace_id_hex
|
||||||
|
span_id_hex = compute_span_id_hex(span_id_source)
|
||||||
|
if span_id_hex:
|
||||||
|
extra["span_id"] = span_id_hex
|
||||||
if tenant_id:
|
if tenant_id:
|
||||||
extra["tenant_id"] = tenant_id
|
extra["tenant_id"] = tenant_id
|
||||||
if user_id:
|
if user_id:
|
||||||
@ -78,6 +93,7 @@ def emit_metric_only_event(
|
|||||||
event_name: str,
|
event_name: str,
|
||||||
attributes: dict[str, Any],
|
attributes: dict[str, Any],
|
||||||
trace_id_source: str | None = None,
|
trace_id_source: str | None = None,
|
||||||
|
span_id_source: str | None = None,
|
||||||
tenant_id: str | None = None,
|
tenant_id: str | None = None,
|
||||||
user_id: str | None = None,
|
user_id: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -86,6 +102,7 @@ def emit_metric_only_event(
|
|||||||
attributes=attributes,
|
attributes=attributes,
|
||||||
signal="metric_only",
|
signal="metric_only",
|
||||||
trace_id_source=trace_id_source,
|
trace_id_source=trace_id_source,
|
||||||
|
span_id_source=span_id_source,
|
||||||
tenant_id=tenant_id,
|
tenant_id=tenant_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
)
|
)
|
||||||
|
|||||||
@ -37,6 +37,7 @@ def init_app(app: DifyApp) -> None:
|
|||||||
|
|
||||||
_exporter = EnterpriseExporter(dify_config)
|
_exporter = EnterpriseExporter(dify_config)
|
||||||
atexit.register(_exporter.shutdown)
|
atexit.register(_exporter.shutdown)
|
||||||
|
_exporter.attach_log_handler()
|
||||||
|
|
||||||
# Import to trigger @signal.connect decorator registration
|
# Import to trigger @signal.connect decorator registration
|
||||||
import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport]
|
import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport]
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user