From d463a450ddd96f8a9b1f73699f8cf3df78c01cb4 Mon Sep 17 00:00:00 2001 From: GareArc Date: Mon, 2 Feb 2026 21:41:16 -0800 Subject: [PATCH 1/5] feat(enterprise): Add gRPC protocol support for OTLP telemetry - Add ENTERPRISE_OTLP_PROTOCOL config (http/grpc, default: http) - Introduce _ExporterFactory class for protocol-agnostic exporter creation - Support both HTTP and gRPC OTLP endpoints for traces and logs - Refactor endpoint path handling into factory methods --- api/configs/enterprise/__init__.py | 5 ++ api/enterprise/telemetry/exporter.py | 69 +++++++++++++++++++++------- 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/api/configs/enterprise/__init__.py b/api/configs/enterprise/__init__.py index e4f08c9224..eb9dfe69b4 100644 --- a/api/configs/enterprise/__init__.py +++ b/api/configs/enterprise/__init__.py @@ -50,6 +50,11 @@ class EnterpriseTelemetryConfig(BaseSettings): default="", ) + ENTERPRISE_OTLP_PROTOCOL: str = Field( + description="OTLP protocol: 'http' or 'grpc' (default: http).", + default="http", + ) + ENTERPRISE_INCLUDE_CONTENT: bool = Field( description="Include input/output content in traces (privacy toggle).", default=True, diff --git a/api/enterprise/telemetry/exporter.py b/api/enterprise/telemetry/exporter.py index f2ea54b445..6591c2fd97 100644 --- a/api/enterprise/telemetry/exporter.py +++ b/api/enterprise/telemetry/exporter.py @@ -15,8 +15,10 @@ from typing import Any, cast from opentelemetry import metrics, trace from opentelemetry.context import Context -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter as GRPCLogExporter +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter +from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter as HTTPLogExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPSpanExporter from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.resources import Resource @@ -61,10 +63,42 @@ def _datetime_to_ns(dt: datetime) -> int: return int(dt.timestamp() * 1_000_000_000) -def _append_logs_path(endpoint: str) -> str: - if endpoint.endswith("/"): - return endpoint + "v1/logs" - return f"{endpoint}/v1/logs" +class _ExporterFactory: + def __init__(self, protocol: str, endpoint: str, headers: dict[str, str]): + self._protocol = protocol + self._endpoint = endpoint + self._headers = headers + self._grpc_headers = tuple(headers.items()) if headers else None + self._http_headers = headers or None + + def create_trace_exporter(self) -> HTTPSpanExporter | GRPCSpanExporter: + if self._protocol == "grpc": + return GRPCSpanExporter( + endpoint=self._endpoint or None, + headers=self._grpc_headers, + insecure=True, + ) + trace_endpoint = f"{self._endpoint}/v1/traces" if self._endpoint else "" + return HTTPSpanExporter(endpoint=trace_endpoint or None, headers=self._http_headers) + + def create_log_exporter(self, logs_endpoint_override: str) -> tuple[HTTPLogExporter | GRPCLogExporter | None, str]: + if self._protocol == "grpc": + logs_endpoint = logs_endpoint_override or self._endpoint + if not logs_endpoint: + return None, "Enterprise OTEL logs enabled but endpoint is empty" + return GRPCLogExporter(endpoint=logs_endpoint, headers=self._grpc_headers, insecure=True), "" + + logs_endpoint = logs_endpoint_override or self._append_logs_path() + if not logs_endpoint: + return None, "Enterprise OTEL logs enabled but endpoint is empty" + return HTTPLogExporter(endpoint=logs_endpoint, headers=self._http_headers), "" + + def _append_logs_path(self) -> str: + if not self._endpoint: + return "" + if self._endpoint.endswith("/"): + return f"{self._endpoint}v1/logs" + return f"{self._endpoint}/v1/logs" class EnterpriseLoggingHandler(LoggingHandler): @@ -124,6 +158,7 @@ class EnterpriseExporter: def __init__(self, config: object) -> None: endpoint: str = getattr(config, "ENTERPRISE_OTLP_ENDPOINT", "") headers_raw: str = getattr(config, "ENTERPRISE_OTLP_HEADERS", "") + protocol: str = (getattr(config, "ENTERPRISE_OTLP_PROTOCOL", "http") or "http").lower() service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify") sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0) self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True) @@ -143,13 +178,13 @@ class EnterpriseExporter: self._tracer_provider = TracerProvider(resource=resource, sampler=sampler, id_generator=id_generator) headers = _parse_otlp_headers(headers_raw) - trace_endpoint = f"{endpoint}/v1/traces" if endpoint else "" - self._tracer_provider.add_span_processor( - BatchSpanProcessor(OTLPSpanExporter(endpoint=trace_endpoint or None, headers=headers)) - ) + factory = _ExporterFactory(protocol, endpoint, headers) + + trace_exporter = factory.create_trace_exporter() + self._tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter)) self._tracer = self._tracer_provider.get_tracer("dify.enterprise") - self._init_logs_pipeline(endpoint, headers, resource) + self._init_logs_pipeline(factory, resource) meter = metrics.get_meter("dify.enterprise") self._counters = { @@ -171,17 +206,17 @@ class EnterpriseExporter: EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"), } - def _init_logs_pipeline(self, endpoint: str, headers: dict[str, str], resource: Resource) -> None: + def _init_logs_pipeline(self, factory: _ExporterFactory, resource: Resource) -> None: if not self._logs_enabled: return - logs_endpoint = self._logs_endpoint or (_append_logs_path(endpoint) if endpoint else "") - if not logs_endpoint: - logger.warning("Enterprise OTEL logs enabled but endpoint is empty") + + log_exporter, warning = factory.create_log_exporter(self._logs_endpoint) + if not log_exporter: + logger.warning(warning) return self._log_provider = LoggerProvider(resource=resource, shutdown_on_exit=False) - exporter = OTLPLogExporter(endpoint=logs_endpoint, headers=headers or None) - self._log_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) + self._log_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) self._log_handler = EnterpriseLoggingHandler(logger_provider=self._log_provider) def export_span( From 64e5005e3dd97d49f7184988d4560c82c886881c Mon Sep 17 00:00:00 2001 From: GareArc Date: Mon, 2 Feb 2026 22:15:11 -0800 Subject: [PATCH 2/5] fix(enterprise): Scope log handler to telemetry logger only Only export structured telemetry logs, not all application logs. The attach_log_handler method now attaches to the 'dify.telemetry' logger instead of the root logger. --- api/enterprise/telemetry/exporter.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/enterprise/telemetry/exporter.py b/api/enterprise/telemetry/exporter.py index 6591c2fd97..54a60595b6 100644 --- a/api/enterprise/telemetry/exporter.py +++ b/api/enterprise/telemetry/exporter.py @@ -321,8 +321,8 @@ class EnterpriseExporter: def attach_log_handler(self) -> None: if not self._log_handler: return - root_logger = logging.getLogger() - if self._log_handler in root_logger.handlers: + telemetry_logger = logging.getLogger("dify.telemetry") + if self._log_handler in telemetry_logger.handlers: return try: from core.logging.filters import IdentityContextFilter, TraceContextFilter @@ -331,4 +331,4 @@ class EnterpriseExporter: self._log_handler.addFilter(IdentityContextFilter()) except Exception: logger.exception("Failed to attach log filters to enterprise handler") - root_logger.addHandler(self._log_handler) + telemetry_logger.addHandler(self._log_handler) From aaf324e1509b62b6d8fb6a55b0305c01fb66c914 Mon Sep 17 00:00:00 2001 From: GareArc Date: Mon, 2 Feb 2026 22:41:25 -0800 Subject: [PATCH 3/5] feat(enterprise): Add independent metrics export with dedicated MeterProvider - Create dedicated MeterProvider instance (independent from ext_otel.py) - Add create_metric_exporter() to _ExporterFactory with HTTP/gRPC support - Enterprise metrics now work without requiring standard OTEL to be enabled - Add MeterProvider shutdown to cleanup lifecycle - Update module docstring to reflect full independence (Tracer, Logger, Meter) --- api/enterprise/telemetry/exporter.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/api/enterprise/telemetry/exporter.py b/api/enterprise/telemetry/exporter.py index 54a60595b6..423ae20d69 100644 --- a/api/enterprise/telemetry/exporter.py +++ b/api/enterprise/telemetry/exporter.py @@ -1,7 +1,7 @@ """Enterprise OTEL exporter — shared by EnterpriseOtelTrace, event handlers, and direct instrumentation. -Uses its own TracerProvider (configurable sampling, separate from ext_otel.py infrastructure) -and the global MeterProvider (shared with ext_otel.py — both target the same collector). +Uses dedicated TracerProvider, LoggerProvider, and MeterProvider instances (configurable sampling, +independent from ext_otel.py infrastructure). Initialized once during Flask extension init (single-threaded via ext_enterprise_telemetry.py). Accessed via ``ext_enterprise_telemetry.get_enterprise_exporter()`` from any thread/process. @@ -13,14 +13,18 @@ import uuid from datetime import datetime from typing import Any, cast -from opentelemetry import metrics, trace +from opentelemetry import trace from opentelemetry.context import Context from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter as GRPCLogExporter +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GRPCMetricExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter as HTTPLogExporter +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HTTPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPSpanExporter from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor @@ -93,6 +97,16 @@ class _ExporterFactory: return None, "Enterprise OTEL logs enabled but endpoint is empty" return HTTPLogExporter(endpoint=logs_endpoint, headers=self._http_headers), "" + def create_metric_exporter(self) -> HTTPMetricExporter | GRPCMetricExporter: + if self._protocol == "grpc": + return GRPCMetricExporter( + endpoint=self._endpoint or None, + headers=self._grpc_headers, + insecure=True, + ) + metric_endpoint = f"{self._endpoint}/v1/metrics" if self._endpoint else "" + return HTTPMetricExporter(endpoint=metric_endpoint or None, headers=self._http_headers) + def _append_logs_path(self) -> str: if not self._endpoint: return "" @@ -186,7 +200,12 @@ class EnterpriseExporter: self._init_logs_pipeline(factory, resource) - meter = metrics.get_meter("dify.enterprise") + metric_exporter = factory.create_metric_exporter() + self._meter_provider = MeterProvider( + resource=resource, + metric_readers=[PeriodicExportingMetricReader(metric_exporter)], + ) + meter = self._meter_provider.get_meter("dify.enterprise") self._counters = { EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"), EnterpriseTelemetryCounter.REQUESTS: meter.create_counter("dify.requests.total", unit="{request}"), @@ -317,6 +336,7 @@ class EnterpriseExporter: self._tracer_provider.shutdown() if self._log_provider: self._log_provider.shutdown() + self._meter_provider.shutdown() def attach_log_handler(self) -> None: if not self._log_handler: From f627c7fdabb7ad22596cdc62f538ea23d0ff4b7b Mon Sep 17 00:00:00 2001 From: GareArc Date: Mon, 2 Feb 2026 23:28:39 -0800 Subject: [PATCH 4/5] fix(enterprise): Remove OTEL log export --- api/configs/enterprise/__init__.py | 10 -- api/enterprise/telemetry/exporter.py | 113 +-------------------- api/extensions/ext_enterprise_telemetry.py | 1 - 3 files changed, 3 insertions(+), 121 deletions(-) diff --git a/api/configs/enterprise/__init__.py b/api/configs/enterprise/__init__.py index eb9dfe69b4..4920eeba07 100644 --- a/api/configs/enterprise/__init__.py +++ b/api/configs/enterprise/__init__.py @@ -30,21 +30,11 @@ class EnterpriseTelemetryConfig(BaseSettings): default=False, ) - ENTERPRISE_OTEL_LOGS_ENABLED: bool = Field( - description="Enable enterprise OTEL log export (requires enterprise telemetry enabled).", - default=False, - ) - ENTERPRISE_OTLP_ENDPOINT: str = Field( description="Enterprise OTEL collector endpoint.", default="", ) - ENTERPRISE_OTLP_LOGS_ENDPOINT: str = Field( - description="Enterprise OTEL logs endpoint (defaults to ENTERPRISE_OTLP_ENDPOINT + /v1/logs).", - default="", - ) - ENTERPRISE_OTLP_HEADERS: str = Field( description="Auth headers for OTLP export (key=value,key2=value2).", default="", diff --git a/api/enterprise/telemetry/exporter.py b/api/enterprise/telemetry/exporter.py index 423ae20d69..a2728fb7ba 100644 --- a/api/enterprise/telemetry/exporter.py +++ b/api/enterprise/telemetry/exporter.py @@ -1,6 +1,6 @@ """Enterprise OTEL exporter — shared by EnterpriseOtelTrace, event handlers, and direct instrumentation. -Uses dedicated TracerProvider, LoggerProvider, and MeterProvider instances (configurable sampling, +Uses dedicated TracerProvider and MeterProvider instances (configurable sampling, independent from ext_otel.py infrastructure). Initialized once during Flask extension init (single-threaded via ext_enterprise_telemetry.py). @@ -15,14 +15,10 @@ from typing import Any, cast from opentelemetry import trace from opentelemetry.context import Context -from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter as GRPCLogExporter from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GRPCMetricExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter as HTTPLogExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HTTPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPSpanExporter -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -85,18 +81,6 @@ class _ExporterFactory: trace_endpoint = f"{self._endpoint}/v1/traces" if self._endpoint else "" return HTTPSpanExporter(endpoint=trace_endpoint or None, headers=self._http_headers) - def create_log_exporter(self, logs_endpoint_override: str) -> tuple[HTTPLogExporter | GRPCLogExporter | None, str]: - if self._protocol == "grpc": - logs_endpoint = logs_endpoint_override or self._endpoint - if not logs_endpoint: - return None, "Enterprise OTEL logs enabled but endpoint is empty" - return GRPCLogExporter(endpoint=logs_endpoint, headers=self._grpc_headers, insecure=True), "" - - logs_endpoint = logs_endpoint_override or self._append_logs_path() - if not logs_endpoint: - return None, "Enterprise OTEL logs enabled but endpoint is empty" - return HTTPLogExporter(endpoint=logs_endpoint, headers=self._http_headers), "" - def create_metric_exporter(self) -> HTTPMetricExporter | GRPCMetricExporter: if self._protocol == "grpc": return GRPCMetricExporter( @@ -107,59 +91,6 @@ class _ExporterFactory: metric_endpoint = f"{self._endpoint}/v1/metrics" if self._endpoint else "" return HTTPMetricExporter(endpoint=metric_endpoint or None, headers=self._http_headers) - def _append_logs_path(self) -> str: - if not self._endpoint: - return "" - if self._endpoint.endswith("/"): - return f"{self._endpoint}v1/logs" - return f"{self._endpoint}/v1/logs" - - -class EnterpriseLoggingHandler(LoggingHandler): - @staticmethod - def _coerce_attribute_value(value: object) -> AttributeValue: - if isinstance(value, (str, bool, int, float)): - return value - return str(value) - - @staticmethod - def _parse_hex_id(value: object, expected_len: int) -> int | None: - if isinstance(value, int): - return value if value != 0 else None - if not isinstance(value, str): - return None - raw = value.strip().lower() - raw = raw.removeprefix("0x") - if expected_len and len(raw) != expected_len: - return None - try: - parsed = int(raw, 16) - except ValueError: - return None - return parsed if parsed != 0 else None - - @staticmethod - def _get_attributes(record: logging.LogRecord) -> Attributes: - raw_attributes = LoggingHandler._get_attributes(record) or {} - attributes: dict[str, AttributeValue] = dict(raw_attributes) - extra_attrs = attributes.pop("attributes", None) - if isinstance(extra_attrs, dict): - extra_attrs_dict = cast(dict[str, object], extra_attrs) - for key, value in extra_attrs_dict.items(): - if key not in attributes: - attributes[key] = EnterpriseLoggingHandler._coerce_attribute_value(value) - return attributes - - def _translate(self, record: logging.LogRecord): - log_record = super()._translate(record) - trace_id = self._parse_hex_id(getattr(record, "trace_id", None), 32) - span_id = self._parse_hex_id(getattr(record, "span_id", None), 16) - if trace_id is not None: - log_record.trace_id = trace_id - if span_id is not None: - log_record.span_id = span_id - return log_record - class EnterpriseExporter: """Shared OTEL exporter for all enterprise telemetry. @@ -176,10 +107,6 @@ class EnterpriseExporter: service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify") sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0) self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True) - self._logs_enabled: bool = bool(getattr(config, "ENTERPRISE_OTEL_LOGS_ENABLED", False)) - self._logs_endpoint: str = getattr(config, "ENTERPRISE_OTLP_LOGS_ENDPOINT", "") - self._log_provider: LoggerProvider | None = None - self._log_handler: logging.Handler | None = None resource = Resource( attributes={ @@ -198,8 +125,6 @@ class EnterpriseExporter: self._tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter)) self._tracer = self._tracer_provider.get_tracer("dify.enterprise") - self._init_logs_pipeline(factory, resource) - metric_exporter = factory.create_metric_exporter() self._meter_provider = MeterProvider( resource=resource, @@ -225,19 +150,6 @@ class EnterpriseExporter: EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"), } - def _init_logs_pipeline(self, factory: _ExporterFactory, resource: Resource) -> None: - if not self._logs_enabled: - return - - log_exporter, warning = factory.create_log_exporter(self._logs_endpoint) - if not log_exporter: - logger.warning(warning) - return - - self._log_provider = LoggerProvider(resource=resource, shutdown_on_exit=False) - self._log_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) - self._log_handler = EnterpriseLoggingHandler(logger_provider=self._log_provider) - def export_span( self, name: str, @@ -275,9 +187,7 @@ class EnterpriseExporter: if parent_span_id_source: # Cross-workflow linking: parent is an explicit span (e.g. tool node in outer workflow) parent_span_id = compute_deterministic_span_id(parent_span_id_source) - parent_trace_id = ( - cast(int, uuid.UUID(effective_trace_correlation).int) if effective_trace_correlation else 0 - ) + parent_trace_id = int(uuid.UUID(effective_trace_correlation)) if effective_trace_correlation else 0 if parent_trace_id: parent_span_context = SpanContext( trace_id=parent_trace_id, @@ -289,7 +199,7 @@ class EnterpriseExporter: elif correlation_id and correlation_id != span_id_source: # Child span: parent is the correlation-group root (workflow root span) parent_span_id = compute_deterministic_span_id(correlation_id) - parent_trace_id = cast(int, uuid.UUID(effective_trace_correlation or correlation_id).int) + parent_trace_id = int(uuid.UUID(effective_trace_correlation or correlation_id)) parent_span_context = SpanContext( trace_id=parent_trace_id, span_id=parent_span_id, @@ -334,21 +244,4 @@ class EnterpriseExporter: def shutdown(self) -> None: self._tracer_provider.shutdown() - if self._log_provider: - self._log_provider.shutdown() self._meter_provider.shutdown() - - def attach_log_handler(self) -> None: - if not self._log_handler: - return - telemetry_logger = logging.getLogger("dify.telemetry") - if self._log_handler in telemetry_logger.handlers: - return - try: - from core.logging.filters import IdentityContextFilter, TraceContextFilter - - self._log_handler.addFilter(TraceContextFilter()) - self._log_handler.addFilter(IdentityContextFilter()) - except Exception: - logger.exception("Failed to attach log filters to enterprise handler") - telemetry_logger.addHandler(self._log_handler) diff --git a/api/extensions/ext_enterprise_telemetry.py b/api/extensions/ext_enterprise_telemetry.py index dd33f5e377..f2c68c479c 100644 --- a/api/extensions/ext_enterprise_telemetry.py +++ b/api/extensions/ext_enterprise_telemetry.py @@ -37,7 +37,6 @@ def init_app(app: DifyApp) -> None: _exporter = EnterpriseExporter(dify_config) atexit.register(_exporter.shutdown) - _exporter.attach_log_handler() # Import to trigger @signal.connect decorator registration import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport] From ae43fd96bf9c19b4ffb218eca0ec7a3297c258a0 Mon Sep 17 00:00:00 2001 From: GareArc Date: Tue, 3 Feb 2026 01:23:15 -0800 Subject: [PATCH 5/5] feat(telemetry): add invoked_by user tracking to enterprise OTEL --- api/core/ops/entities/trace_entity.py | 4 ++++ api/core/ops/ops_trace_manager.py | 23 ++++++++++++++++++++ api/enterprise/telemetry/enterprise_trace.py | 2 ++ api/extensions/otel/semconv/dify.py | 3 +++ 4 files changed, 32 insertions(+) diff --git a/api/core/ops/entities/trace_entity.py b/api/core/ops/entities/trace_entity.py index 580540847c..2133b15663 100644 --- a/api/core/ops/entities/trace_entity.py +++ b/api/core/ops/entities/trace_entity.py @@ -52,6 +52,8 @@ class WorkflowTraceInfo(BaseTraceInfo): query: str metadata: dict[str, Any] + invoked_by: str | None = None + class MessageTraceInfo(BaseTraceInfo): conversation_model: str @@ -151,6 +153,8 @@ class WorkflowNodeTraceInfo(BaseTraceInfo): node_outputs: Mapping[str, Any] | None = None process_data: Mapping[str, Any] | None = None + invoked_by: str | None = None + model_config = ConfigDict(protected_namespaces=()) diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index c5ac634e2d..8aad70ed01 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -520,6 +520,27 @@ class TraceTask: cls._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) return cls._workflow_run_repo + @classmethod + def _get_user_id_from_metadata(cls, metadata: dict[str, Any]) -> str: + """Extract user ID from metadata, prioritizing end_user over account. + + Returns the actual user ID (end_user or account) who invoked the workflow, + regardless of invoke_from context. + """ + # Priority 1: End user (external users via API/WebApp) + if user_id := metadata.get("from_end_user_id"): + return f"end_user:{user_id}" + + # Priority 2: Account user (internal users via console/debugger) + if user_id := metadata.get("from_account_id"): + return f"account:{user_id}" + + # Priority 3: User (internal users via console/debugger) + if user_id := metadata.get("user_id"): + return f"user:{user_id}" + + return "anonymous" + def __init__( self, trace_type: Any, @@ -670,6 +691,7 @@ class TraceTask: message_id=message_id, start_time=workflow_run.created_at, end_time=workflow_run.finished_at, + invoked_by=self._get_user_id_from_metadata(metadata), ) return workflow_trace_info @@ -1081,6 +1103,7 @@ class TraceTask: node_inputs=node_data.get("node_inputs"), node_outputs=node_data.get("node_outputs"), process_data=node_data.get("process_data"), + invoked_by=self._get_user_id_from_metadata(metadata), ) def draft_node_execution_trace(self, **kwargs) -> DraftNodeExecutionTrace | dict: diff --git a/api/enterprise/telemetry/enterprise_trace.py b/api/enterprise/telemetry/enterprise_trace.py index faf3219a6a..faa2ff02b0 100644 --- a/api/enterprise/telemetry/enterprise_trace.py +++ b/api/enterprise/telemetry/enterprise_trace.py @@ -110,6 +110,7 @@ class EnterpriseOtelTrace: "dify.invoke_from": info.metadata.get("triggered_from"), "dify.conversation.id": info.conversation_id, "dify.message.id": info.message_id, + "dify.invoked_by": info.invoked_by, } trace_correlation_override: str | None = None @@ -230,6 +231,7 @@ class EnterpriseOtelTrace: "dify.node.iteration_id": info.iteration_id, "dify.node.loop_id": info.loop_id, "dify.node.parallel_id": info.parallel_id, + "dify.node.invoked_by": info.invoked_by, } trace_correlation_override = trace_correlation_override_param diff --git a/api/extensions/otel/semconv/dify.py b/api/extensions/otel/semconv/dify.py index a20b9b358d..9ffc58fde9 100644 --- a/api/extensions/otel/semconv/dify.py +++ b/api/extensions/otel/semconv/dify.py @@ -21,3 +21,6 @@ class DifySpanAttributes: INVOKE_FROM = "dify.invoke_from" """Invocation source, e.g. SERVICE_API, WEB_APP, DEBUGGER.""" + + INVOKED_BY = "dify.invoked_by" + """Invoked by, e.g. end_user, account, user."""