fix: isolate Langfuse v3 SDK TracerProvider to prevent cross-task interference (#36136)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Xiyuan Chen 2026-05-13 18:46:23 -07:00 committed by GitHub
parent 91251ad5a5
commit 301a470e7a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 106 additions and 5 deletions

View File

@ -13,6 +13,8 @@ from langfuse.api import (
TraceBody,
)
from langfuse.api.commons.types.usage import Usage
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from sqlalchemy.orm import sessionmaker
from core.ops.base_trace_instance import BaseTraceInstance
@ -52,13 +54,40 @@ class LangFuseDataTrace(BaseTraceInstance):
langfuse_config: LangfuseConfig,
):
super().__init__(langfuse_config)
# Isolated TracerProvider prevents the langfuse v3 SDK from attaching its
# SpanProcessor to the global OpenTelemetry TracerProvider, which would
# otherwise siphon every Flask/Celery/SQLAlchemy span in the process into
# this tenant's Langfuse project. See langfuse upgrade guide v2 -> v3.
self._tracer_provider: TracerProvider | None = TracerProvider(
resource=Resource.create({"service.name": "dify-langfuse-app-trace"}),
)
self.langfuse_client = Langfuse(
public_key=langfuse_config.public_key,
secret_key=langfuse_config.secret_key,
host=langfuse_config.host,
tracer_provider=self._tracer_provider,
)
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
def close(self) -> None:
"""Flush and shut down the isolated TracerProvider.
Called explicitly when the trace instance is evicted from the cache, or
implicitly via ``__del__`` on garbage collection. Idempotent.
"""
provider = getattr(self, "_tracer_provider", None)
if provider is None:
return
try:
provider.shutdown()
except Exception:
logger.debug("Failed to shut down Langfuse TracerProvider", exc_info=True)
finally:
self._tracer_provider = None
def __del__(self) -> None:
self.close()
@staticmethod
def _get_completion_start_time(
start_time: datetime | None, time_to_first_token: float | int | None

View File

@ -50,20 +50,92 @@ def trace_instance(langfuse_config, monkeypatch: pytest.MonkeyPatch):
def test_init(langfuse_config, monkeypatch: pytest.MonkeyPatch):
from opentelemetry.sdk.trace import TracerProvider
mock_langfuse = MagicMock()
monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", mock_langfuse)
monkeypatch.setenv("FILES_URL", "http://test.url")
instance = LangFuseDataTrace(langfuse_config)
mock_langfuse.assert_called_once_with(
public_key=langfuse_config.public_key,
secret_key=langfuse_config.secret_key,
host=langfuse_config.host,
)
mock_langfuse.assert_called_once()
kwargs = mock_langfuse.call_args.kwargs
assert kwargs["public_key"] == langfuse_config.public_key
assert kwargs["secret_key"] == langfuse_config.secret_key
assert kwargs["host"] == langfuse_config.host
assert isinstance(kwargs["tracer_provider"], TracerProvider)
assert kwargs["tracer_provider"] is instance._tracer_provider
assert instance.file_base_url == "http://test.url"
def test_init_passes_isolated_tracer_provider_to_langfuse(langfuse_config, monkeypatch: pytest.MonkeyPatch):
"""Regression test for langfuse v3 SDK side effect.
Without an explicit ``tracer_provider=`` kwarg, the Langfuse v3 SDK
attaches a ``LangfuseSpanProcessor`` to the *global* OpenTelemetry
TracerProvider siphoning every Flask / Celery / SQLAlchemy span in the
process into the tenant's Langfuse project. See langfuse upgrade-path
docs (v2 -> v3) and GitHub discussion #9136.
The fix is to construct an isolated ``TracerProvider`` and pass it via
``tracer_provider=`` so the SDK never touches the global one.
"""
from opentelemetry import trace as otel_trace_api
from opentelemetry.sdk.trace import TracerProvider
captured: dict[str, object] = {}
def fake_langfuse(**kwargs):
captured.update(kwargs)
return MagicMock()
monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", fake_langfuse)
instance = LangFuseDataTrace(langfuse_config)
# 1. tracer_provider kwarg must be supplied (drives the no-pollution branch
# in langfuse.LangfuseResourceManager._init_tracer_provider).
assert "tracer_provider" in captured, (
"Langfuse() must receive an explicit tracer_provider=; without it the "
"v3 SDK attaches its SpanProcessor to the global OTEL TracerProvider."
)
passed_provider = captured["tracer_provider"]
assert isinstance(passed_provider, TracerProvider)
assert passed_provider is instance._tracer_provider
# 2. The instance's provider must not be the global one.
global_provider = otel_trace_api.get_tracer_provider()
assert passed_provider is not global_provider
def test_close_shuts_down_tracer_provider(langfuse_config, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", lambda **kwargs: MagicMock())
instance = LangFuseDataTrace(langfuse_config)
provider = instance._tracer_provider
provider_shutdown = MagicMock()
monkeypatch.setattr(provider, "shutdown", provider_shutdown)
instance.close()
provider_shutdown.assert_called_once()
assert instance._tracer_provider is None
def test_close_is_idempotent(langfuse_config, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", lambda **kwargs: MagicMock())
instance = LangFuseDataTrace(langfuse_config)
provider_shutdown = MagicMock()
monkeypatch.setattr(instance._tracer_provider, "shutdown", provider_shutdown)
instance.close()
instance.close()
provider_shutdown.assert_called_once()
def test_trace_dispatch(trace_instance, monkeypatch: pytest.MonkeyPatch):
methods = [
"workflow_trace",