From 301a470e7abd8872db1af9124159d159b722fb73 Mon Sep 17 00:00:00 2001 From: Xiyuan Chen <52963600+GareArc@users.noreply.github.com> Date: Wed, 13 May 2026 18:46:23 -0700 Subject: [PATCH] fix: isolate Langfuse v3 SDK TracerProvider to prevent cross-task interference (#36136) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../src/dify_trace_langfuse/langfuse_trace.py | 29 +++++++ .../langfuse_trace/test_langfuse_trace.py | 82 +++++++++++++++++-- 2 files changed, 106 insertions(+), 5 deletions(-) diff --git a/api/providers/trace/trace-langfuse/src/dify_trace_langfuse/langfuse_trace.py b/api/providers/trace/trace-langfuse/src/dify_trace_langfuse/langfuse_trace.py index 68881378a7..9b19b1100c 100644 --- a/api/providers/trace/trace-langfuse/src/dify_trace_langfuse/langfuse_trace.py +++ b/api/providers/trace/trace-langfuse/src/dify_trace_langfuse/langfuse_trace.py @@ -13,6 +13,8 @@ from langfuse.api import ( TraceBody, ) from langfuse.api.commons.types.usage import Usage +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider from sqlalchemy.orm import sessionmaker from core.ops.base_trace_instance import BaseTraceInstance @@ -52,13 +54,40 @@ class LangFuseDataTrace(BaseTraceInstance): langfuse_config: LangfuseConfig, ): super().__init__(langfuse_config) + # Isolated TracerProvider prevents the langfuse v3 SDK from attaching its + # SpanProcessor to the global OpenTelemetry TracerProvider, which would + # otherwise siphon every Flask/Celery/SQLAlchemy span in the process into + # this tenant's Langfuse project. See langfuse upgrade guide v2 -> v3. + self._tracer_provider: TracerProvider | None = TracerProvider( + resource=Resource.create({"service.name": "dify-langfuse-app-trace"}), + ) self.langfuse_client = Langfuse( public_key=langfuse_config.public_key, secret_key=langfuse_config.secret_key, host=langfuse_config.host, + tracer_provider=self._tracer_provider, ) self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001") + def close(self) -> None: + """Flush and shut down the isolated TracerProvider. + + Called explicitly when the trace instance is evicted from the cache, or + implicitly via ``__del__`` on garbage collection. Idempotent. + """ + provider = getattr(self, "_tracer_provider", None) + if provider is None: + return + try: + provider.shutdown() + except Exception: + logger.debug("Failed to shut down Langfuse TracerProvider", exc_info=True) + finally: + self._tracer_provider = None + + def __del__(self) -> None: + self.close() + @staticmethod def _get_completion_start_time( start_time: datetime | None, time_to_first_token: float | int | None diff --git a/api/providers/trace/trace-langfuse/tests/unit_tests/langfuse_trace/test_langfuse_trace.py b/api/providers/trace/trace-langfuse/tests/unit_tests/langfuse_trace/test_langfuse_trace.py index 95e27c791f..0580051f54 100644 --- a/api/providers/trace/trace-langfuse/tests/unit_tests/langfuse_trace/test_langfuse_trace.py +++ b/api/providers/trace/trace-langfuse/tests/unit_tests/langfuse_trace/test_langfuse_trace.py @@ -50,20 +50,92 @@ def trace_instance(langfuse_config, monkeypatch: pytest.MonkeyPatch): def test_init(langfuse_config, monkeypatch: pytest.MonkeyPatch): + from opentelemetry.sdk.trace import TracerProvider + mock_langfuse = MagicMock() monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", mock_langfuse) monkeypatch.setenv("FILES_URL", "http://test.url") instance = LangFuseDataTrace(langfuse_config) - mock_langfuse.assert_called_once_with( - public_key=langfuse_config.public_key, - secret_key=langfuse_config.secret_key, - host=langfuse_config.host, - ) + mock_langfuse.assert_called_once() + kwargs = mock_langfuse.call_args.kwargs + assert kwargs["public_key"] == langfuse_config.public_key + assert kwargs["secret_key"] == langfuse_config.secret_key + assert kwargs["host"] == langfuse_config.host + assert isinstance(kwargs["tracer_provider"], TracerProvider) + assert kwargs["tracer_provider"] is instance._tracer_provider assert instance.file_base_url == "http://test.url" +def test_init_passes_isolated_tracer_provider_to_langfuse(langfuse_config, monkeypatch: pytest.MonkeyPatch): + """Regression test for langfuse v3 SDK side effect. + + Without an explicit ``tracer_provider=`` kwarg, the Langfuse v3 SDK + attaches a ``LangfuseSpanProcessor`` to the *global* OpenTelemetry + TracerProvider — siphoning every Flask / Celery / SQLAlchemy span in the + process into the tenant's Langfuse project. See langfuse upgrade-path + docs (v2 -> v3) and GitHub discussion #9136. + + The fix is to construct an isolated ``TracerProvider`` and pass it via + ``tracer_provider=`` so the SDK never touches the global one. + """ + from opentelemetry import trace as otel_trace_api + from opentelemetry.sdk.trace import TracerProvider + + captured: dict[str, object] = {} + + def fake_langfuse(**kwargs): + captured.update(kwargs) + return MagicMock() + + monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", fake_langfuse) + + instance = LangFuseDataTrace(langfuse_config) + + # 1. tracer_provider kwarg must be supplied (drives the no-pollution branch + # in langfuse.LangfuseResourceManager._init_tracer_provider). + assert "tracer_provider" in captured, ( + "Langfuse() must receive an explicit tracer_provider=; without it the " + "v3 SDK attaches its SpanProcessor to the global OTEL TracerProvider." + ) + + passed_provider = captured["tracer_provider"] + assert isinstance(passed_provider, TracerProvider) + assert passed_provider is instance._tracer_provider + + # 2. The instance's provider must not be the global one. + global_provider = otel_trace_api.get_tracer_provider() + assert passed_provider is not global_provider + + +def test_close_shuts_down_tracer_provider(langfuse_config, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", lambda **kwargs: MagicMock()) + + instance = LangFuseDataTrace(langfuse_config) + provider = instance._tracer_provider + provider_shutdown = MagicMock() + monkeypatch.setattr(provider, "shutdown", provider_shutdown) + + instance.close() + + provider_shutdown.assert_called_once() + assert instance._tracer_provider is None + + +def test_close_is_idempotent(langfuse_config, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", lambda **kwargs: MagicMock()) + + instance = LangFuseDataTrace(langfuse_config) + provider_shutdown = MagicMock() + monkeypatch.setattr(instance._tracer_provider, "shutdown", provider_shutdown) + + instance.close() + instance.close() + + provider_shutdown.assert_called_once() + + def test_trace_dispatch(trace_instance, monkeypatch: pytest.MonkeyPatch): methods = [ "workflow_trace",