dify/api/extensions/otel/instrumentation.py

126 lines
4.6 KiB
Python

import contextlib
import logging
import flask
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.metrics import get_meter, get_meter_provider
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span, get_tracer_provider
from opentelemetry.trace.status import StatusCode
from configs import dify_config
from dify_app import DifyApp
from extensions.otel.runtime import is_celery_worker
logger = logging.getLogger(__name__)
class ExceptionLoggingHandler(logging.Handler):
"""
Handler that records exceptions to the current OpenTelemetry span.
Unlike creating a new span, this records exceptions on the existing span
to maintain trace context consistency throughout the request lifecycle.
"""
def emit(self, record: logging.LogRecord):
with contextlib.suppress(Exception):
if not record.exc_info:
return
from opentelemetry.trace import get_current_span
span = get_current_span()
if not span or not span.is_recording():
return
# Record exception on the current span instead of creating a new one
span.set_status(StatusCode.ERROR, record.getMessage())
# Add log context as span events/attributes
span.add_event(
"log.exception",
attributes={
"log.level": record.levelname,
"log.message": record.getMessage(),
"log.logger": record.name,
"log.file.path": record.pathname,
"log.file.line": record.lineno,
},
)
if record.exc_info[1]:
span.record_exception(record.exc_info[1])
if record.exc_info[0]:
span.set_attribute("exception.type", record.exc_info[0].__name__)
def instrument_exception_logging() -> None:
exception_handler = ExceptionLoggingHandler()
logging.getLogger().addHandler(exception_handler)
def init_flask_instrumentor(app: DifyApp) -> None:
meter = get_meter("http_metrics", version=dify_config.project.version)
_http_response_counter = meter.create_counter(
"http.server.response.count",
description="Total number of HTTP responses by status code, method and target",
unit="{response}",
)
def response_hook(span: Span, status: str, response_headers: list) -> None:
if span and span.is_recording():
try:
if status.startswith("2"):
span.set_status(StatusCode.OK)
else:
span.set_status(StatusCode.ERROR, status)
status = status.split(" ")[0]
status_code = int(status)
status_class = f"{status_code // 100}xx"
attributes: dict[str, str | int] = {"status_code": status_code, "status_class": status_class}
request = flask.request
if request and request.url_rule:
attributes[SpanAttributes.HTTP_TARGET] = str(request.url_rule.rule)
if request and request.method:
attributes[SpanAttributes.HTTP_METHOD] = str(request.method)
_http_response_counter.add(1, attributes)
except Exception:
logger.exception("Error setting status and attributes")
from opentelemetry.instrumentation.flask import FlaskInstrumentor
instrumentor = FlaskInstrumentor()
if dify_config.DEBUG:
logger.info("Initializing Flask instrumentor")
instrumentor.instrument_app(app, response_hook=response_hook)
def init_sqlalchemy_instrumentor(app: DifyApp) -> None:
with app.app_context():
engines = list(app.extensions["sqlalchemy"].engines.values())
SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines)
def init_redis_instrumentor() -> None:
RedisInstrumentor().instrument()
def init_httpx_instrumentor() -> None:
HTTPXClientInstrumentor().instrument()
def init_instruments(app: DifyApp) -> None:
if not is_celery_worker():
init_flask_instrumentor(app)
CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument()
instrument_exception_logging()
init_sqlalchemy_instrumentor(app)
init_redis_instrumentor()
init_httpx_instrumentor()