From ed222945aaf253c78266090fbb0b740bb5835bca Mon Sep 17 00:00:00 2001 From: GareArc Date: Thu, 5 Feb 2026 04:23:56 -0800 Subject: [PATCH] refactor(telemetry): introduce TelemetryFacade to centralize event emission Migrate from direct TraceQueueManager.add_trace_task calls to TelemetryFacade.emit with TelemetryEvent abstraction. This reduces CE code invasion by consolidating telemetry logic in core/telemetry/ with a single guard in ops_trace_manager.py. --- .../advanced_chat/generate_task_pipeline.py | 22 +- .../easy_ui_based_generate_task_pipeline.py | 21 +- api/core/app/workflow/layers/persistence.py | 52 +- .../agent_tool_callback_handler.py | 26 +- api/core/llm_generator/llm_generator.py | 67 +- api/core/moderation/input_moderation.py | 24 +- api/core/ops/entities/trace_entity.py | 10 +- api/core/ops/ops_trace_manager.py | 4 +- api/core/rag/retrieval/dataset_retrieval.py | 23 +- api/core/telemetry/__init__.py | 4 + api/core/telemetry/events.py | 18 + api/core/telemetry/facade.py | 55 ++ api/enterprise/telemetry/draft_trace.py | 17 +- api/enterprise/telemetry/enterprise_trace.py | 595 +++++++++++------- api/enterprise/telemetry/telemetry_log.py | 15 +- api/services/message_service.py | 16 +- 16 files changed, 639 insertions(+), 330 deletions(-) create mode 100644 api/core/telemetry/__init__.py create mode 100644 api/core/telemetry/events.py create mode 100644 api/core/telemetry/facade.py diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index a707053ab0..6f734df1da 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -62,7 +62,8 @@ from core.app.task_pipeline.message_cycle_manager import MessageCycleManager from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.utils.encoders import jsonable_encoder -from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName +from core.ops.ops_trace_manager import TraceQueueManager +from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade from core.workflow.enums import WorkflowExecutionStatus from core.workflow.nodes import NodeType from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory @@ -831,12 +832,19 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): session.add_all(message_files) if trace_manager: - trace_manager.add_trace_task( - TraceTask( - TraceTaskName.MESSAGE_TRACE, - conversation_id=str(message.conversation_id), - message_id=str(message.id), - ) + TelemetryFacade.emit( + TelemetryEvent( + name="message", + context=TelemetryContext( + tenant_id=self._application_generate_entity.app_config.tenant_id, + app_id=self._application_generate_entity.app_config.app_id, + ), + payload={ + "conversation_id": str(message.conversation_id), + "message_id": str(message.id), + }, + ), + trace_manager=trace_manager, ) def _seed_graph_runtime_state_from_queue_manager(self) -> None: diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py index 6c997753fa..c188fe6d84 100644 --- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py @@ -52,10 +52,10 @@ from core.model_runtime.entities.message_entities import ( TextPromptMessageContent, ) from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel -from core.ops.entities.trace_entity import TraceTaskName -from core.ops.ops_trace_manager import TraceQueueManager, TraceTask +from core.ops.ops_trace_manager import TraceQueueManager from core.prompt.utils.prompt_message_util import PromptMessageUtil from core.prompt.utils.prompt_template_parser import PromptTemplateParser +from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade from events.message_event import message_was_created from extensions.ext_database import db from libs.datetime_utils import naive_utc_now @@ -409,10 +409,19 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): message.message_metadata = self._task_state.metadata.model_dump_json() if trace_manager: - trace_manager.add_trace_task( - TraceTask( - TraceTaskName.MESSAGE_TRACE, conversation_id=self._conversation_id, message_id=self._message_id - ) + TelemetryFacade.emit( + TelemetryEvent( + name="message", + context=TelemetryContext( + tenant_id=self._application_generate_entity.app_config.tenant_id, + app_id=self._application_generate_entity.app_config.app_id, + ), + payload={ + "conversation_id": self._conversation_id, + "message_id": self._message_id, + }, + ), + trace_manager=trace_manager, ) message_was_created.send( diff --git a/api/core/app/workflow/layers/persistence.py b/api/core/app/workflow/layers/persistence.py index f7e598a486..fc70b81cbe 100644 --- a/api/core/app/workflow/layers/persistence.py +++ b/api/core/app/workflow/layers/persistence.py @@ -15,8 +15,7 @@ from datetime import datetime from typing import Any, Union from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity -from core.ops.entities.trace_entity import TraceTaskName -from core.ops.ops_trace_manager import TraceQueueManager, TraceTask +from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.entities import WorkflowExecution, WorkflowNodeExecution from core.workflow.enums import ( @@ -396,25 +395,31 @@ class WorkflowPersistenceLayer(GraphEngineLayer): external_trace_id = self._application_generate_entity.extras.get("external_trace_id") parent_trace_context = self._application_generate_entity.extras.get("parent_trace_context") - trace_task = TraceTask( - TraceTaskName.WORKFLOW_TRACE, - workflow_execution=execution, - conversation_id=conversation_id, - user_id=self._trace_manager.user_id, - external_trace_id=external_trace_id, - parent_trace_context=parent_trace_context, + from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade + + TelemetryFacade.emit( + TelemetryEvent( + name="workflow", + context=TelemetryContext( + tenant_id=self._application_generate_entity.app_config.tenant_id, + user_id=self._trace_manager.user_id, + app_id=self._application_generate_entity.app_config.app_id, + ), + payload={ + "workflow_execution": execution, + "conversation_id": conversation_id, + "user_id": self._trace_manager.user_id, + "external_trace_id": external_trace_id, + "parent_trace_context": parent_trace_context, + }, + ), + trace_manager=self._trace_manager, ) - self._trace_manager.add_trace_task(trace_task) def _enqueue_node_trace_task(self, domain_execution: WorkflowNodeExecution) -> None: if not self._trace_manager: return - from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled - - if not is_enterprise_telemetry_enabled(): - return - execution = self._get_workflow_execution() meta = domain_execution.metadata or {} @@ -494,11 +499,20 @@ class WorkflowPersistenceLayer(GraphEngineLayer): if parent_trace_context: node_data["parent_trace_context"] = parent_trace_context - trace_task = TraceTask( - TraceTaskName.NODE_EXECUTION_TRACE, - node_execution_data=node_data, + from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade + + TelemetryFacade.emit( + TelemetryEvent( + name="node_execution", + context=TelemetryContext( + tenant_id=node_data.get("tenant_id"), + user_id=node_data.get("user_id"), + app_id=node_data.get("app_id"), + ), + payload={"node_execution_data": node_data}, + ), + trace_manager=self._trace_manager, ) - self._trace_manager.add_trace_task(trace_task) def _system_variables(self) -> Mapping[str, Any]: runtime_state = self.graph_runtime_state diff --git a/api/core/callback_handler/agent_tool_callback_handler.py b/api/core/callback_handler/agent_tool_callback_handler.py index 6591b08a7e..b617bd28de 100644 --- a/api/core/callback_handler/agent_tool_callback_handler.py +++ b/api/core/callback_handler/agent_tool_callback_handler.py @@ -4,8 +4,8 @@ from typing import Any, TextIO, Union from pydantic import BaseModel from configs import dify_config -from core.ops.entities.trace_entity import TraceTaskName -from core.ops.ops_trace_manager import TraceQueueManager, TraceTask +from core.ops.ops_trace_manager import TraceQueueManager +from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade from core.tools.entities.tool_entities import ToolInvokeMessage _TEXT_COLOR_MAPPING = { @@ -71,15 +71,19 @@ class DifyAgentCallbackHandler(BaseModel): print_text("\n") if trace_manager: - trace_manager.add_trace_task( - TraceTask( - TraceTaskName.TOOL_TRACE, - message_id=message_id, - tool_name=tool_name, - tool_inputs=tool_inputs, - tool_outputs=tool_outputs, - timer=timer, - ) + TelemetryFacade.emit( + TelemetryEvent( + name="tool", + context=TelemetryContext(app_id=trace_manager.app_id, user_id=trace_manager.user_id), + payload={ + "message_id": message_id, + "tool_name": tool_name, + "tool_inputs": tool_inputs, + "tool_outputs": tool_outputs, + "timer": timer, + }, + ), + trace_manager=trace_manager, ) def on_tool_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any): diff --git a/api/core/llm_generator/llm_generator.py b/api/core/llm_generator/llm_generator.py index e754b5bb11..b4c9471ef1 100644 --- a/api/core/llm_generator/llm_generator.py +++ b/api/core/llm_generator/llm_generator.py @@ -25,10 +25,9 @@ from core.model_runtime.entities.llm_entities import LLMResult from core.model_runtime.entities.message_entities import PromptMessage, SystemPromptMessage, UserPromptMessage from core.model_runtime.entities.model_entities import ModelType from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError -from core.ops.entities.trace_entity import TraceTaskName -from core.ops.ops_trace_manager import TraceQueueManager, TraceTask from core.ops.utils import measure_time from core.prompt.utils.prompt_template_parser import PromptTemplateParser +from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from extensions.ext_database import db from extensions.ext_storage import storage @@ -94,15 +93,17 @@ class LLMGenerator: name = name[:75] + "..." # get tracing instance - trace_manager = TraceQueueManager(app_id=app_id) - trace_manager.add_trace_task( - TraceTask( - TraceTaskName.GENERATE_NAME_TRACE, - conversation_id=conversation_id, - generate_conversation_name=name, - inputs=prompt, - timer=timer, - tenant_id=tenant_id, + TelemetryFacade.emit( + TelemetryEvent( + name="generate_name", + context=TelemetryContext(tenant_id=tenant_id, app_id=app_id), + payload={ + "conversation_id": conversation_id, + "generate_conversation_name": name, + "inputs": prompt, + "timer": timer, + "tenant_id": tenant_id, + }, ) ) @@ -787,25 +788,29 @@ class LLMGenerator: total_price = None currency = None - trace_manager = TraceQueueManager(app_id=app_id) - trace_manager.add_trace_task( - TraceTask( - TraceTaskName.PROMPT_GENERATION_TRACE, - tenant_id=tenant_id, - user_id=user_id, - app_id=app_id, - operation_type=operation_type, - instruction=instruction, - generated_output=generated_output, - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=total_tokens, - model_provider=model_provider, - model_name=model_name, - latency=latency, - total_price=total_price, - currency=currency, - timer=timer, - error=error, + from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade + + TelemetryFacade.emit( + TelemetryEvent( + name="prompt_generation", + context=TelemetryContext(tenant_id=tenant_id, user_id=user_id, app_id=app_id), + payload={ + "tenant_id": tenant_id, + "user_id": user_id, + "app_id": app_id, + "operation_type": operation_type, + "instruction": instruction, + "generated_output": generated_output, + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + "model_provider": model_provider, + "model_name": model_name, + "latency": latency, + "total_price": total_price, + "currency": currency, + "timer": timer, + "error": error, + }, ) ) diff --git a/api/core/moderation/input_moderation.py b/api/core/moderation/input_moderation.py index 21dc58f16f..c73170bf12 100644 --- a/api/core/moderation/input_moderation.py +++ b/api/core/moderation/input_moderation.py @@ -5,9 +5,9 @@ from typing import Any from core.app.app_config.entities import AppConfig from core.moderation.base import ModerationAction, ModerationError from core.moderation.factory import ModerationFactory -from core.ops.entities.trace_entity import TraceTaskName -from core.ops.ops_trace_manager import TraceQueueManager, TraceTask +from core.ops.ops_trace_manager import TraceQueueManager from core.ops.utils import measure_time +from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade logger = logging.getLogger(__name__) @@ -49,14 +49,18 @@ class InputModeration: moderation_result = moderation_factory.moderation_for_inputs(inputs, query) if trace_manager: - trace_manager.add_trace_task( - TraceTask( - TraceTaskName.MODERATION_TRACE, - message_id=message_id, - moderation_result=moderation_result, - inputs=inputs, - timer=timer, - ) + TelemetryFacade.emit( + TelemetryEvent( + name="moderation", + context=TelemetryContext(tenant_id=tenant_id, app_id=app_id), + payload={ + "message_id": message_id, + "moderation_result": moderation_result, + "inputs": inputs, + "timer": timer, + }, + ), + trace_manager=trace_manager, ) if not moderation_result.flagged: diff --git a/api/core/ops/entities/trace_entity.py b/api/core/ops/entities/trace_entity.py index 47074da8b1..5c878281a6 100644 --- a/api/core/ops/entities/trace_entity.py +++ b/api/core/ops/entities/trace_entity.py @@ -9,8 +9,8 @@ from pydantic import BaseModel, ConfigDict, field_serializer, field_validator class BaseTraceInfo(BaseModel): message_id: str | None = None message_data: Any | None = None - inputs: Union[str, dict[str, Any], list] | None = None - outputs: Union[str, dict[str, Any], list] | None = None + inputs: Union[str, dict[str, Any], list[Any]] | None = None + outputs: Union[str, dict[str, Any], list[Any]] | None = None start_time: datetime | None = None end_time: datetime | None = None metadata: dict[str, Any] @@ -18,7 +18,7 @@ class BaseTraceInfo(BaseModel): @field_validator("inputs", "outputs") @classmethod - def ensure_type(cls, v): + def ensure_type(cls, v: str | dict[str, Any] | list[Any] | None) -> str | dict[str, Any] | list[Any] | None: if v is None: return None if isinstance(v, str | dict | list): @@ -63,7 +63,7 @@ class MessageTraceInfo(BaseTraceInfo): answer_tokens: int total_tokens: int error: str | None = None - file_list: Union[str, dict[str, Any], list] | None = None + file_list: Union[str, dict[str, Any], list[Any]] | None = None message_file_data: Any | None = None conversation_mode: str gen_ai_server_time_to_first_token: float | None = None @@ -110,7 +110,7 @@ class ToolTraceInfo(BaseTraceInfo): tool_config: dict[str, Any] time_cost: Union[int, float] tool_parameters: dict[str, Any] - file_url: Union[str, None, list] = None + file_url: Union[str, None, list[str]] = None class GenerateNameTraceInfo(BaseTraceInfo): diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index f405191b67..b91448d499 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -1272,9 +1272,9 @@ class TraceQueueManager: self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id) self.flask_app = current_app._get_current_object() # type: ignore - from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled + from core.telemetry import is_telemetry_enabled - self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled() + self._enterprise_telemetry_enabled = is_telemetry_enabled() if trace_manager_timer is None: self.start_timer() diff --git a/api/core/rag/retrieval/dataset_retrieval.py b/api/core/rag/retrieval/dataset_retrieval.py index 541c241ae5..6db9fb6c30 100644 --- a/api/core/rag/retrieval/dataset_retrieval.py +++ b/api/core/rag/retrieval/dataset_retrieval.py @@ -27,8 +27,7 @@ from core.model_runtime.entities.llm_entities import LLMResult, LLMUsage from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool from core.model_runtime.entities.model_entities import ModelFeature, ModelType from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel -from core.ops.entities.trace_entity import TraceTaskName -from core.ops.ops_trace_manager import TraceQueueManager, TraceTask +from core.ops.ops_trace_manager import TraceQueueManager from core.ops.utils import measure_time from core.prompt.advanced_prompt_transform import AdvancedPromptTransform from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate @@ -56,6 +55,7 @@ from core.rag.retrieval.template_prompts import ( METADATA_FILTER_USER_PROMPT_2, METADATA_FILTER_USER_PROMPT_3, ) +from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade from core.tools.signature import sign_upload_file from core.tools.utils.dataset_retriever.dataset_retriever_base_tool import DatasetRetrieverBaseTool from extensions.ext_database import db @@ -728,10 +728,21 @@ class DatasetRetrieval: self.application_generate_entity.trace_manager if self.application_generate_entity else None ) if trace_manager: - trace_manager.add_trace_task( - TraceTask( - TraceTaskName.DATASET_RETRIEVAL_TRACE, message_id=message_id, documents=documents, timer=timer - ) + app_config = self.application_generate_entity.app_config if self.application_generate_entity else None + TelemetryFacade.emit( + TelemetryEvent( + name="dataset_retrieval", + context=TelemetryContext( + tenant_id=app_config.tenant_id if app_config else None, + app_id=app_config.app_id if app_config else None, + ), + payload={ + "message_id": message_id, + "documents": documents, + "timer": timer, + }, + ), + trace_manager=trace_manager, ) def _on_query( diff --git a/api/core/telemetry/__init__.py b/api/core/telemetry/__init__.py new file mode 100644 index 0000000000..89b076a97f --- /dev/null +++ b/api/core/telemetry/__init__.py @@ -0,0 +1,4 @@ +from core.telemetry.events import TelemetryContext, TelemetryEvent +from core.telemetry.facade import TelemetryFacade, emit, is_telemetry_enabled + +__all__ = ["TelemetryContext", "TelemetryEvent", "TelemetryFacade", "emit", "is_telemetry_enabled"] diff --git a/api/core/telemetry/events.py b/api/core/telemetry/events.py new file mode 100644 index 0000000000..ef90368c3a --- /dev/null +++ b/api/core/telemetry/events.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True) +class TelemetryContext: + tenant_id: str | None = None + user_id: str | None = None + app_id: str | None = None + + +@dataclass(frozen=True) +class TelemetryEvent: + name: str + context: TelemetryContext + payload: dict[str, Any] diff --git a/api/core/telemetry/facade.py b/api/core/telemetry/facade.py new file mode 100644 index 0000000000..fda1a039bb --- /dev/null +++ b/api/core/telemetry/facade.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from core.telemetry.events import TelemetryEvent + +if TYPE_CHECKING: + from core.ops.ops_trace_manager import TraceQueueManager + + +class TelemetryFacade: + @staticmethod + def emit(event: TelemetryEvent, trace_manager: TraceQueueManager | None = None) -> None: + from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName + + trace_task_name_map = { + "draft_node_execution": TraceTaskName.DRAFT_NODE_EXECUTION_TRACE, + "dataset_retrieval": TraceTaskName.DATASET_RETRIEVAL_TRACE, + "generate_name": TraceTaskName.GENERATE_NAME_TRACE, + "message": TraceTaskName.MESSAGE_TRACE, + "moderation": TraceTaskName.MODERATION_TRACE, + "node_execution": TraceTaskName.NODE_EXECUTION_TRACE, + "prompt_generation": TraceTaskName.PROMPT_GENERATION_TRACE, + "suggested_question": TraceTaskName.SUGGESTED_QUESTION_TRACE, + "tool": TraceTaskName.TOOL_TRACE, + "workflow": TraceTaskName.WORKFLOW_TRACE, + } + + trace_task_name = trace_task_name_map.get(event.name) + if not trace_task_name: + return + + trace_queue_manager = trace_manager or TraceQueueManager( + app_id=event.context.app_id, + user_id=event.context.user_id, + ) + trace_queue_manager.add_trace_task( + TraceTask( + trace_task_name, + **event.payload, + ) + ) + + +def is_telemetry_enabled() -> bool: + try: + from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled + except Exception: + return False + + return is_enterprise_telemetry_enabled() + + +def emit(event: TelemetryEvent, trace_manager: TraceQueueManager | None = None) -> None: + TelemetryFacade.emit(event, trace_manager=trace_manager) diff --git a/api/enterprise/telemetry/draft_trace.py b/api/enterprise/telemetry/draft_trace.py index cbc7c66cd8..a5560c7e5b 100644 --- a/api/enterprise/telemetry/draft_trace.py +++ b/api/enterprise/telemetry/draft_trace.py @@ -3,8 +3,7 @@ from __future__ import annotations from collections.abc import Mapping from typing import Any -from core.ops.entities.trace_entity import TraceTaskName -from core.ops.ops_trace_manager import TraceQueueManager, TraceTask +from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade from core.workflow.enums import WorkflowNodeExecutionMetadataKey from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled from models.workflow import WorkflowNodeExecutionModel @@ -20,16 +19,20 @@ def enqueue_draft_node_execution_trace( if not is_enterprise_telemetry_enabled(): return - trace_manager = TraceQueueManager(app_id=execution.app_id, user_id=user_id) node_data = _build_node_execution_data( execution=execution, outputs=outputs, workflow_execution_id=workflow_execution_id, ) - trace_manager.add_trace_task( - TraceTask( - TraceTaskName.DRAFT_NODE_EXECUTION_TRACE, - node_execution_data=node_data, + TelemetryFacade.emit( + TelemetryEvent( + name="draft_node_execution", + context=TelemetryContext( + tenant_id=execution.tenant_id, + user_id=user_id, + app_id=execution.app_id, + ), + payload={"node_execution_data": node_data}, ) ) diff --git a/api/enterprise/telemetry/enterprise_trace.py b/api/enterprise/telemetry/enterprise_trace.py index 6388356fd7..2b0c9ade7c 100644 --- a/api/enterprise/telemetry/enterprise_trace.py +++ b/api/enterprise/telemetry/enterprise_trace.py @@ -12,7 +12,9 @@ from __future__ import annotations import json import logging -from typing import Any +from typing import Any, cast + +from opentelemetry.util.types import AttributeValue from core.ops.entities.trace_entity import ( BaseTraceInfo, @@ -75,16 +77,51 @@ class EnterpriseOtelTrace: self._prompt_generation_trace(trace_info) def _common_attrs(self, trace_info: BaseTraceInfo) -> dict[str, Any]: + metadata = self._metadata(trace_info) + tenant_id, app_id, user_id = self._context_ids(trace_info, metadata) return { "dify.trace_id": trace_info.trace_id, - "dify.tenant_id": trace_info.metadata.get("tenant_id"), - "dify.app_id": trace_info.metadata.get("app_id"), - "dify.app.name": trace_info.metadata.get("app_name"), - "dify.workspace.name": trace_info.metadata.get("workspace_name"), - "gen_ai.user.id": trace_info.metadata.get("user_id"), + "dify.tenant_id": tenant_id, + "dify.app_id": app_id, + "dify.app.name": metadata.get("app_name"), + "dify.workspace.name": metadata.get("workspace_name"), + "gen_ai.user.id": user_id, "dify.message.id": trace_info.message_id, } + def _metadata(self, trace_info: BaseTraceInfo) -> dict[str, Any]: + return trace_info.metadata + + def _context_ids( + self, + trace_info: BaseTraceInfo, + metadata: dict[str, Any], + ) -> tuple[str | None, str | None, str | None]: + tenant_id = getattr(trace_info, "tenant_id", None) or metadata.get("tenant_id") + app_id = getattr(trace_info, "app_id", None) or metadata.get("app_id") + user_id = getattr(trace_info, "user_id", None) or metadata.get("user_id") + return tenant_id, app_id, user_id + + def _labels(self, **values: AttributeValue) -> dict[str, AttributeValue]: + return dict(values) + + def _safe_payload_value(self, value: Any) -> str | dict[str, Any] | list[object] | None: + if isinstance(value, str): + return value + if isinstance(value, dict): + return cast(dict[str, Any], value) + if isinstance(value, list): + items: list[object] = [] + for item in cast(list[object], value): + items.append(item) + return items + return None + + def _content_or_ref(self, value: Any, ref: str) -> Any: + if self._exporter.include_content: + return self._maybe_json(value) + return ref + def _maybe_json(self, value: Any) -> str | None: if value is None: return None @@ -100,17 +137,19 @@ class EnterpriseOtelTrace: # ------------------------------------------------------------------ def _workflow_trace(self, info: WorkflowTraceInfo) -> None: + metadata = self._metadata(info) + tenant_id, app_id, user_id = self._context_ids(info, metadata) # -- Slim span attrs: identity + structure + status + timing only -- span_attrs: dict[str, Any] = { "dify.trace_id": info.trace_id, - "dify.tenant_id": info.metadata.get("tenant_id"), - "dify.app_id": info.metadata.get("app_id"), + "dify.tenant_id": tenant_id, + "dify.app_id": app_id, "dify.workflow.id": info.workflow_id, "dify.workflow.run_id": info.workflow_run_id, "dify.workflow.status": info.workflow_run_status, "dify.workflow.error": info.error, "dify.workflow.elapsed_time": info.workflow_run_elapsed_time, - "dify.invoke_from": info.metadata.get("triggered_from"), + "dify.invoke_from": metadata.get("triggered_from"), "dify.conversation.id": info.conversation_id, "dify.message.id": info.message_id, "dify.invoked_by": info.invoked_by, @@ -119,15 +158,20 @@ class EnterpriseOtelTrace: trace_correlation_override: str | None = None parent_span_id_source: str | None = None - parent_ctx = info.metadata.get("parent_trace_context") - if parent_ctx and isinstance(parent_ctx, dict): - span_attrs["dify.parent.trace_id"] = parent_ctx.get("trace_id") - span_attrs["dify.parent.node.execution_id"] = parent_ctx.get("parent_node_execution_id") - span_attrs["dify.parent.workflow.run_id"] = parent_ctx.get("parent_workflow_run_id") - span_attrs["dify.parent.app.id"] = parent_ctx.get("parent_app_id") + parent_ctx = metadata.get("parent_trace_context") + if isinstance(parent_ctx, dict): + parent_ctx_dict = cast(dict[str, Any], parent_ctx) + span_attrs["dify.parent.trace_id"] = parent_ctx_dict.get("trace_id") + span_attrs["dify.parent.node.execution_id"] = parent_ctx_dict.get("parent_node_execution_id") + span_attrs["dify.parent.workflow.run_id"] = parent_ctx_dict.get("parent_workflow_run_id") + span_attrs["dify.parent.app.id"] = parent_ctx_dict.get("parent_app_id") - trace_correlation_override = parent_ctx.get("parent_workflow_run_id") - parent_span_id_source = parent_ctx.get("parent_node_execution_id") + trace_override_value = parent_ctx_dict.get("parent_workflow_run_id") + if isinstance(trace_override_value, str): + trace_correlation_override = trace_override_value + parent_span_value = parent_ctx_dict.get("parent_node_execution_id") + if isinstance(parent_span_value, str): + parent_span_id_source = parent_span_value self._exporter.export_span( EnterpriseTelemetrySpan.WORKFLOW_RUN, @@ -144,23 +188,18 @@ class EnterpriseOtelTrace: log_attrs: dict[str, Any] = {**span_attrs} log_attrs.update( { - "dify.app.name": info.metadata.get("app_name"), - "dify.workspace.name": info.metadata.get("workspace_name"), - "gen_ai.user.id": info.metadata.get("user_id"), + "dify.app.name": metadata.get("app_name"), + "dify.workspace.name": metadata.get("workspace_name"), + "gen_ai.user.id": user_id, "gen_ai.usage.total_tokens": info.total_tokens, "dify.workflow.version": info.workflow_run_version, } ) - if self._exporter.include_content: - log_attrs["dify.workflow.inputs"] = self._maybe_json(info.workflow_run_inputs) - log_attrs["dify.workflow.outputs"] = self._maybe_json(info.workflow_run_outputs) - log_attrs["dify.workflow.query"] = info.query - else: - ref = f"ref:workflow_run_id={info.workflow_run_id}" - log_attrs["dify.workflow.inputs"] = ref - log_attrs["dify.workflow.outputs"] = ref - log_attrs["dify.workflow.query"] = ref + ref = f"ref:workflow_run_id={info.workflow_run_id}" + log_attrs["dify.workflow.inputs"] = self._content_or_ref(info.workflow_run_inputs, ref) + log_attrs["dify.workflow.outputs"] = self._content_or_ref(info.workflow_run_outputs, ref) + log_attrs["dify.workflow.query"] = self._content_or_ref(info.query, ref) emit_telemetry_log( event_name="dify.workflow.run", @@ -168,34 +207,49 @@ class EnterpriseOtelTrace: signal="span_detail", trace_id_source=info.workflow_run_id, span_id_source=info.workflow_run_id, - tenant_id=info.metadata.get("tenant_id"), - user_id=info.metadata.get("user_id"), + tenant_id=tenant_id, + user_id=user_id, ) # -- Metrics -- - labels = { - "tenant_id": info.tenant_id, - "app_id": info.metadata.get("app_id", ""), - } + labels = self._labels( + tenant_id=tenant_id or "", + app_id=app_id or "", + ) self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels) if info.prompt_tokens is not None and info.prompt_tokens > 0: self._exporter.increment_counter(EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, labels) if info.completion_tokens is not None and info.completion_tokens > 0: self._exporter.increment_counter(EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, labels) - invoke_from = info.metadata.get("triggered_from", "") + invoke_from = metadata.get("triggered_from", "") self._exporter.increment_counter( EnterpriseTelemetryCounter.REQUESTS, 1, - {**labels, "type": "workflow", "status": info.workflow_run_status, "invoke_from": invoke_from}, + self._labels( + **labels, + type="workflow", + status=info.workflow_run_status, + invoke_from=invoke_from, + ), ) self._exporter.record_histogram( EnterpriseTelemetryHistogram.WORKFLOW_DURATION, float(info.workflow_run_elapsed_time), - {**labels, "status": info.workflow_run_status}, + self._labels( + **labels, + status=info.workflow_run_status, + ), ) if info.error: - self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "workflow"}) + self._exporter.increment_counter( + EnterpriseTelemetryCounter.ERRORS, + 1, + self._labels( + **labels, + type="workflow", + ), + ) def _node_execution_trace(self, info: WorkflowNodeTraceInfo) -> None: self._emit_node_execution_trace(info, EnterpriseTelemetrySpan.NODE_EXECUTION, "node") @@ -217,15 +271,17 @@ class EnterpriseOtelTrace: correlation_id_override: str | None = None, trace_correlation_override_param: str | None = None, ) -> None: + metadata = self._metadata(info) + tenant_id, app_id, user_id = self._context_ids(info, metadata) # -- Slim span attrs: identity + structure + status + timing -- span_attrs: dict[str, Any] = { "dify.trace_id": info.trace_id, - "dify.tenant_id": info.tenant_id, - "dify.app_id": info.metadata.get("app_id"), + "dify.tenant_id": tenant_id, + "dify.app_id": app_id, "dify.workflow.id": info.workflow_id, "dify.workflow.run_id": info.workflow_run_id, "dify.message.id": info.message_id, - "dify.conversation.id": info.metadata.get("conversation_id"), + "dify.conversation.id": metadata.get("conversation_id"), "dify.node.execution_id": info.node_execution_id, "dify.node.id": info.node_id, "dify.node.type": info.node_type, @@ -242,9 +298,12 @@ class EnterpriseOtelTrace: } trace_correlation_override = trace_correlation_override_param - parent_ctx = info.metadata.get("parent_trace_context") - if parent_ctx and isinstance(parent_ctx, dict): - trace_correlation_override = parent_ctx.get("parent_workflow_run_id") or trace_correlation_override + parent_ctx = metadata.get("parent_trace_context") + if isinstance(parent_ctx, dict): + parent_ctx_dict = cast(dict[str, Any], parent_ctx) + override_value = parent_ctx_dict.get("parent_workflow_run_id") + if isinstance(override_value, str): + trace_correlation_override = override_value effective_correlation_id = correlation_id_override or info.workflow_run_id self._exporter.export_span( @@ -261,10 +320,10 @@ class EnterpriseOtelTrace: log_attrs: dict[str, Any] = {**span_attrs} log_attrs.update( { - "dify.app.name": info.metadata.get("app_name"), - "dify.workspace.name": info.metadata.get("workspace_name"), - "dify.invoke_from": info.metadata.get("invoke_from"), - "gen_ai.user.id": info.metadata.get("user_id"), + "dify.app.name": metadata.get("app_name"), + "dify.workspace.name": metadata.get("workspace_name"), + "dify.invoke_from": metadata.get("invoke_from"), + "gen_ai.user.id": user_id, "gen_ai.usage.total_tokens": info.total_tokens, "dify.node.total_price": info.total_price, "dify.node.currency": info.currency, @@ -273,22 +332,17 @@ class EnterpriseOtelTrace: "gen_ai.tool.name": info.tool_name, "dify.node.iteration_index": info.iteration_index, "dify.node.loop_index": info.loop_index, - "dify.plugin.name": info.metadata.get("plugin_name"), - "dify.credential.name": info.metadata.get("credential_name"), - "dify.dataset.ids": self._maybe_json(info.metadata.get("dataset_ids")), - "dify.dataset.names": self._maybe_json(info.metadata.get("dataset_names")), + "dify.plugin.name": metadata.get("plugin_name"), + "dify.credential.name": metadata.get("credential_name"), + "dify.dataset.ids": self._maybe_json(metadata.get("dataset_ids")), + "dify.dataset.names": self._maybe_json(metadata.get("dataset_names")), } ) - if self._exporter.include_content: - log_attrs["dify.node.inputs"] = self._maybe_json(info.node_inputs) - log_attrs["dify.node.outputs"] = self._maybe_json(info.node_outputs) - log_attrs["dify.node.process_data"] = self._maybe_json(info.process_data) - else: - ref = f"ref:node_execution_id={info.node_execution_id}" - log_attrs["dify.node.inputs"] = ref - log_attrs["dify.node.outputs"] = ref - log_attrs["dify.node.process_data"] = ref + ref = f"ref:node_execution_id={info.node_execution_id}" + log_attrs["dify.node.inputs"] = self._content_or_ref(info.node_inputs, ref) + log_attrs["dify.node.outputs"] = self._content_or_ref(info.node_outputs, ref) + log_attrs["dify.node.process_data"] = self._content_or_ref(info.process_data, ref) emit_telemetry_log( event_name=span_name.value, @@ -296,19 +350,22 @@ class EnterpriseOtelTrace: signal="span_detail", trace_id_source=info.workflow_run_id, span_id_source=info.node_execution_id, - tenant_id=info.tenant_id, - user_id=info.metadata.get("user_id"), + tenant_id=tenant_id, + user_id=user_id, ) # -- Metrics -- - labels = { - "tenant_id": info.tenant_id, - "app_id": info.metadata.get("app_id", ""), - "node_type": info.node_type, - "model_provider": info.model_provider or "", - } + labels = self._labels( + tenant_id=tenant_id or "", + app_id=app_id or "", + node_type=info.node_type, + model_provider=info.model_provider or "", + ) if info.total_tokens: - token_labels = {**labels, "model_name": info.model_name or ""} + token_labels = self._labels( + **labels, + model_name=info.model_name or "", + ) self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels) if info.prompt_tokens is not None and info.prompt_tokens > 0: self._exporter.increment_counter( @@ -319,76 +376,95 @@ class EnterpriseOtelTrace: EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, token_labels ) self._exporter.increment_counter( - EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": request_type, "status": info.status} + EnterpriseTelemetryCounter.REQUESTS, + 1, + self._labels( + **labels, + type=request_type, + status=info.status, + ), ) duration_labels = dict(labels) - plugin_name = info.metadata.get("plugin_name") + plugin_name = metadata.get("plugin_name") if plugin_name and info.node_type in {"tool", "knowledge-retrieval"}: duration_labels["plugin_name"] = plugin_name self._exporter.record_histogram(EnterpriseTelemetryHistogram.NODE_DURATION, info.elapsed_time, duration_labels) if info.error: - self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": request_type}) + self._exporter.increment_counter( + EnterpriseTelemetryCounter.ERRORS, + 1, + self._labels( + **labels, + type=request_type, + ), + ) # ------------------------------------------------------------------ # METRIC-ONLY handlers (structured log + counters/histograms) # ------------------------------------------------------------------ def _message_trace(self, info: MessageTraceInfo) -> None: + metadata = self._metadata(info) + tenant_id, app_id, user_id = self._context_ids(info, metadata) attrs = self._common_attrs(info) attrs.update( { - "dify.invoke_from": info.metadata.get("from_source"), - "dify.conversation.id": info.metadata.get("conversation_id"), + "dify.invoke_from": metadata.get("from_source"), + "dify.conversation.id": metadata.get("conversation_id"), "dify.conversation.mode": info.conversation_mode, - "gen_ai.provider.name": info.metadata.get("ls_provider"), - "gen_ai.request.model": info.metadata.get("ls_model_name"), + "gen_ai.provider.name": metadata.get("ls_provider"), + "gen_ai.request.model": metadata.get("ls_model_name"), "gen_ai.usage.input_tokens": info.message_tokens, "gen_ai.usage.output_tokens": info.answer_tokens, "gen_ai.usage.total_tokens": info.total_tokens, - "dify.message.status": info.metadata.get("status"), + "dify.message.status": metadata.get("status"), "dify.message.error": info.error, - "dify.message.from_source": info.metadata.get("from_source"), - "dify.message.from_end_user_id": info.metadata.get("from_end_user_id"), - "dify.message.from_account_id": info.metadata.get("from_account_id"), + "dify.message.from_source": metadata.get("from_source"), + "dify.message.from_end_user_id": metadata.get("from_end_user_id"), + "dify.message.from_account_id": metadata.get("from_account_id"), "dify.streaming": info.is_streaming_request, "dify.message.time_to_first_token": info.gen_ai_server_time_to_first_token, "dify.message.streaming_duration": info.llm_streaming_time_to_generate, - "dify.workflow.run_id": info.metadata.get("workflow_run_id"), + "dify.workflow.run_id": metadata.get("workflow_run_id"), } ) - if info.metadata.get("node_execution_id"): - attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") + node_execution_id = metadata.get("node_execution_id") + if node_execution_id: + attrs["dify.node.execution_id"] = node_execution_id - if self._exporter.include_content: - attrs["dify.message.inputs"] = self._maybe_json(info.inputs) - attrs["dify.message.outputs"] = self._maybe_json(info.outputs) - else: - ref = f"ref:message_id={info.message_id}" - attrs["dify.message.inputs"] = ref - attrs["dify.message.outputs"] = ref + ref = f"ref:message_id={info.message_id}" + inputs = self._safe_payload_value(info.inputs) + outputs = self._safe_payload_value(info.outputs) + attrs["dify.message.inputs"] = self._content_or_ref(inputs, ref) + attrs["dify.message.outputs"] = self._content_or_ref(outputs, ref) emit_metric_only_event( event_name="dify.message.run", attributes=attrs, - trace_id_source=info.metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None, - span_id_source=info.metadata.get("node_execution_id"), - tenant_id=info.metadata.get("tenant_id"), - user_id=info.metadata.get("user_id"), + trace_id_source=metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None, + span_id_source=node_execution_id, + tenant_id=tenant_id, + user_id=user_id, ) - labels = { - "tenant_id": info.metadata.get("tenant_id", ""), - "app_id": info.metadata.get("app_id", ""), - "model_provider": info.metadata.get("ls_provider", ""), - "model_name": info.metadata.get("ls_model_name", ""), - } + labels = self._labels( + tenant_id=tenant_id or "", + app_id=app_id or "", + model_provider=metadata.get("ls_provider", ""), + model_name=metadata.get("ls_model_name", ""), + ) self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels) - invoke_from = info.metadata.get("from_source", "") + invoke_from = metadata.get("from_source", "") self._exporter.increment_counter( EnterpriseTelemetryCounter.REQUESTS, 1, - {**labels, "type": "message", "status": info.metadata.get("status", ""), "invoke_from": invoke_from}, + self._labels( + **labels, + type="message", + status=metadata.get("status", ""), + invoke_from=invoke_from, + ), ) if info.start_time and info.end_time: @@ -401,82 +477,115 @@ class EnterpriseOtelTrace: ) if info.error: - self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "message"}) + self._exporter.increment_counter( + EnterpriseTelemetryCounter.ERRORS, + 1, + self._labels( + **labels, + type="message", + ), + ) def _tool_trace(self, info: ToolTraceInfo) -> None: + metadata = self._metadata(info) + tenant_id, app_id, user_id = self._context_ids(info, metadata) attrs = self._common_attrs(info) attrs.update( { "gen_ai.tool.name": info.tool_name, "dify.tool.time_cost": info.time_cost, "dify.tool.error": info.error, - "dify.workflow.run_id": info.metadata.get("workflow_run_id"), + "dify.workflow.run_id": metadata.get("workflow_run_id"), } ) - if info.metadata.get("node_execution_id"): - attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") + node_execution_id = metadata.get("node_execution_id") + if node_execution_id: + attrs["dify.node.execution_id"] = node_execution_id - if self._exporter.include_content: - attrs["dify.tool.inputs"] = self._maybe_json(info.tool_inputs) - attrs["dify.tool.outputs"] = info.tool_outputs - attrs["dify.tool.parameters"] = self._maybe_json(info.tool_parameters) - attrs["dify.tool.config"] = self._maybe_json(info.tool_config) - else: - ref = f"ref:message_id={info.message_id}" - attrs["dify.tool.inputs"] = ref - attrs["dify.tool.outputs"] = ref - attrs["dify.tool.parameters"] = ref - attrs["dify.tool.config"] = ref + ref = f"ref:message_id={info.message_id}" + attrs["dify.tool.inputs"] = self._content_or_ref(info.tool_inputs, ref) + attrs["dify.tool.outputs"] = self._content_or_ref(info.tool_outputs, ref) + attrs["dify.tool.parameters"] = self._content_or_ref(info.tool_parameters, ref) + attrs["dify.tool.config"] = self._content_or_ref(info.tool_config, ref) emit_metric_only_event( event_name="dify.tool.execution", attributes=attrs, - span_id_source=info.metadata.get("node_execution_id"), - tenant_id=info.metadata.get("tenant_id"), - user_id=info.metadata.get("user_id"), + span_id_source=node_execution_id, + tenant_id=tenant_id, + user_id=user_id, ) - labels = { - "tenant_id": info.metadata.get("tenant_id", ""), - "app_id": info.metadata.get("app_id", ""), - "tool_name": info.tool_name, - } - self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "tool"}) + labels = self._labels( + tenant_id=tenant_id or "", + app_id=app_id or "", + tool_name=info.tool_name, + ) + self._exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, + 1, + self._labels( + **labels, + type="tool", + ), + ) self._exporter.record_histogram(EnterpriseTelemetryHistogram.TOOL_DURATION, float(info.time_cost), labels) if info.error: - self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "tool"}) + self._exporter.increment_counter( + EnterpriseTelemetryCounter.ERRORS, + 1, + self._labels( + **labels, + type="tool", + ), + ) def _moderation_trace(self, info: ModerationTraceInfo) -> None: + metadata = self._metadata(info) + tenant_id, app_id, user_id = self._context_ids(info, metadata) attrs = self._common_attrs(info) attrs.update( { "dify.moderation.flagged": info.flagged, "dify.moderation.action": info.action, "dify.moderation.preset_response": info.preset_response, - "dify.workflow.run_id": info.metadata.get("workflow_run_id"), + "dify.workflow.run_id": metadata.get("workflow_run_id"), } ) - if info.metadata.get("node_execution_id"): - attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") + node_execution_id = metadata.get("node_execution_id") + if node_execution_id: + attrs["dify.node.execution_id"] = node_execution_id - if self._exporter.include_content: - attrs["dify.moderation.query"] = info.query - else: - attrs["dify.moderation.query"] = f"ref:message_id={info.message_id}" + attrs["dify.moderation.query"] = self._content_or_ref( + info.query, + f"ref:message_id={info.message_id}", + ) emit_metric_only_event( event_name="dify.moderation.check", attributes=attrs, - span_id_source=info.metadata.get("node_execution_id"), - tenant_id=info.metadata.get("tenant_id"), - user_id=info.metadata.get("user_id"), + span_id_source=node_execution_id, + tenant_id=tenant_id, + user_id=user_id, ) - labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")} - self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "moderation"}) + labels = self._labels( + tenant_id=tenant_id or "", + app_id=app_id or "", + ) + self._exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, + 1, + self._labels( + **labels, + type="moderation", + ), + ) def _suggested_question_trace(self, info: SuggestedQuestionTraceInfo) -> None: + metadata = self._metadata(info) + tenant_id, app_id, user_id = self._context_ids(info, metadata) attrs = self._common_attrs(info) attrs.update( { @@ -486,43 +595,62 @@ class EnterpriseOtelTrace: "gen_ai.provider.name": info.model_provider, "gen_ai.request.model": info.model_id, "dify.suggested_question.count": len(info.suggested_question), - "dify.workflow.run_id": info.metadata.get("workflow_run_id"), + "dify.workflow.run_id": metadata.get("workflow_run_id"), } ) - if info.metadata.get("node_execution_id"): - attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") + node_execution_id = metadata.get("node_execution_id") + if node_execution_id: + attrs["dify.node.execution_id"] = node_execution_id - if self._exporter.include_content: - attrs["dify.suggested_question.questions"] = self._maybe_json(info.suggested_question) - else: - attrs["dify.suggested_question.questions"] = f"ref:message_id={info.message_id}" + attrs["dify.suggested_question.questions"] = self._content_or_ref( + info.suggested_question, + f"ref:message_id={info.message_id}", + ) emit_metric_only_event( event_name="dify.suggested_question.generation", attributes=attrs, - span_id_source=info.metadata.get("node_execution_id"), - tenant_id=info.metadata.get("tenant_id"), - user_id=info.metadata.get("user_id"), + span_id_source=node_execution_id, + tenant_id=tenant_id, + user_id=user_id, ) - labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")} + labels = self._labels( + tenant_id=tenant_id or "", + app_id=app_id or "", + ) self._exporter.increment_counter( - EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "suggested_question"} + EnterpriseTelemetryCounter.REQUESTS, + 1, + self._labels( + **labels, + type="suggested_question", + ), ) def _dataset_retrieval_trace(self, info: DatasetRetrievalTraceInfo) -> None: + metadata = self._metadata(info) + tenant_id, app_id, user_id = self._context_ids(info, metadata) attrs = self._common_attrs(info) attrs["dify.dataset.error"] = info.error - attrs["dify.workflow.run_id"] = info.metadata.get("workflow_run_id") - if info.metadata.get("node_execution_id"): - attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") + attrs["dify.workflow.run_id"] = metadata.get("workflow_run_id") + node_execution_id = metadata.get("node_execution_id") + if node_execution_id: + attrs["dify.node.execution_id"] = node_execution_id - docs = info.documents or [] + docs: list[dict[str, Any]] = [] + documents_any: Any = info.documents + documents_list: list[Any] = cast(list[Any], documents_any) if isinstance(documents_any, list) else [] + for entry in documents_list: + if isinstance(entry, dict): + entry_dict: dict[str, Any] = cast(dict[str, Any], entry) + docs.append(entry_dict) dataset_ids: list[str] = [] dataset_names: list[str] = [] - structured_docs: list[dict] = [] + structured_docs: list[dict[str, Any]] = [] for doc in docs: - meta = doc.get("metadata", {}) if isinstance(doc, dict) else {} + meta_raw = doc.get("metadata") + meta: dict[str, Any] = cast(dict[str, Any], meta_raw) if isinstance(meta_raw, dict) else {} did = meta.get("dataset_id") dname = meta.get("dataset_name") if did and did not in dataset_ids: @@ -542,14 +670,18 @@ class EnterpriseOtelTrace: attrs["dify.dataset.names"] = self._maybe_json(dataset_names) attrs["dify.retrieval.document_count"] = len(docs) - embedding_models = info.metadata.get("embedding_models") or {} - if isinstance(embedding_models, dict): + embedding_models_raw: Any = metadata.get("embedding_models") + embedding_models: dict[str, Any] = ( + cast(dict[str, Any], embedding_models_raw) if isinstance(embedding_models_raw, dict) else {} + ) + if embedding_models: providers: list[str] = [] models: list[str] = [] for ds_info in embedding_models.values(): if isinstance(ds_info, dict): - p = ds_info.get("embedding_model_provider", "") - m = ds_info.get("embedding_model", "") + ds_info_dict: dict[str, Any] = cast(dict[str, Any], ds_info) + p = ds_info_dict.get("embedding_model_provider", "") + m = ds_info_dict.get("embedding_model", "") if p and p not in providers: providers.append(p) if m and m not in models: @@ -557,67 +689,89 @@ class EnterpriseOtelTrace: attrs["dify.dataset.embedding_providers"] = self._maybe_json(providers) attrs["dify.dataset.embedding_models"] = self._maybe_json(models) - if self._exporter.include_content: - attrs["dify.retrieval.query"] = self._maybe_json(info.inputs) - attrs["dify.dataset.documents"] = self._maybe_json(structured_docs) - else: - ref = f"ref:message_id={info.message_id}" - attrs["dify.retrieval.query"] = ref - attrs["dify.dataset.documents"] = ref + ref = f"ref:message_id={info.message_id}" + retrieval_inputs = self._safe_payload_value(info.inputs) + attrs["dify.retrieval.query"] = self._content_or_ref(retrieval_inputs, ref) + attrs["dify.dataset.documents"] = self._content_or_ref(structured_docs, ref) emit_metric_only_event( event_name="dify.dataset.retrieval", attributes=attrs, - trace_id_source=info.metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None, - span_id_source=info.metadata.get("node_execution_id") - or (str(info.message_id) if info.message_id else None), - tenant_id=info.metadata.get("tenant_id"), - user_id=info.metadata.get("user_id"), + trace_id_source=metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None, + span_id_source=node_execution_id or (str(info.message_id) if info.message_id else None), + tenant_id=tenant_id, + user_id=user_id, ) - labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")} + labels = self._labels( + tenant_id=tenant_id or "", + app_id=app_id or "", + ) self._exporter.increment_counter( - EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "dataset_retrieval"} + EnterpriseTelemetryCounter.REQUESTS, + 1, + self._labels( + **labels, + type="dataset_retrieval", + ), ) for did in dataset_ids: self._exporter.increment_counter( - EnterpriseTelemetryCounter.DATASET_RETRIEVALS, 1, {**labels, "dataset_id": did} + EnterpriseTelemetryCounter.DATASET_RETRIEVALS, + 1, + self._labels( + **labels, + dataset_id=did, + ), ) def _generate_name_trace(self, info: GenerateNameTraceInfo) -> None: + metadata = self._metadata(info) + tenant_id, app_id, user_id = self._context_ids(info, metadata) attrs = self._common_attrs(info) attrs["dify.conversation.id"] = info.conversation_id - if info.metadata.get("node_execution_id"): - attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") + node_execution_id = metadata.get("node_execution_id") + if node_execution_id: + attrs["dify.node.execution_id"] = node_execution_id - if self._exporter.include_content: - attrs["dify.generate_name.inputs"] = self._maybe_json(info.inputs) - attrs["dify.generate_name.outputs"] = self._maybe_json(info.outputs) - else: - ref = f"ref:conversation_id={info.conversation_id}" - attrs["dify.generate_name.inputs"] = ref - attrs["dify.generate_name.outputs"] = ref + ref = f"ref:conversation_id={info.conversation_id}" + inputs = self._safe_payload_value(info.inputs) + outputs = self._safe_payload_value(info.outputs) + attrs["dify.generate_name.inputs"] = self._content_or_ref(inputs, ref) + attrs["dify.generate_name.outputs"] = self._content_or_ref(outputs, ref) emit_metric_only_event( event_name="dify.generate_name.execution", attributes=attrs, - span_id_source=info.metadata.get("node_execution_id"), - tenant_id=info.tenant_id, - user_id=info.metadata.get("user_id"), + span_id_source=node_execution_id, + tenant_id=tenant_id, + user_id=user_id, ) - labels = {"tenant_id": info.tenant_id, "app_id": info.metadata.get("app_id", "")} - self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "generate_name"}) + labels = self._labels( + tenant_id=tenant_id or "", + app_id=app_id or "", + ) + self._exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, + 1, + self._labels( + **labels, + type="generate_name", + ), + ) def _prompt_generation_trace(self, info: PromptGenerationTraceInfo) -> None: + metadata = self._metadata(info) + tenant_id, app_id, user_id = self._context_ids(info, metadata) attrs = { "dify.trace_id": info.trace_id, - "dify.tenant_id": info.tenant_id, - "dify.user.id": info.user_id, - "dify.app.id": info.app_id or "", - "dify.app.name": info.metadata.get("app_name"), - "dify.workspace.name": info.metadata.get("workspace_name"), + "dify.tenant_id": tenant_id, + "dify.user.id": user_id, + "dify.app.id": app_id or "", + "dify.app.name": metadata.get("app_name"), + "dify.workspace.name": metadata.get("workspace_name"), "dify.operation.type": info.operation_type, "gen_ai.provider.name": info.model_provider, "gen_ai.request.model": info.model_name, @@ -627,36 +781,34 @@ class EnterpriseOtelTrace: "dify.prompt_generation.latency": info.latency, "dify.prompt_generation.error": info.error, } - if info.metadata.get("node_execution_id"): - attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") + node_execution_id = metadata.get("node_execution_id") + if node_execution_id: + attrs["dify.node.execution_id"] = node_execution_id if info.total_price is not None: attrs["dify.prompt_generation.total_price"] = info.total_price attrs["dify.prompt_generation.currency"] = info.currency - if self._exporter.include_content: - attrs["dify.prompt_generation.instruction"] = info.instruction - attrs["dify.prompt_generation.output"] = self._maybe_json(info.outputs) - else: - ref = f"ref:trace_id={info.trace_id}" - attrs["dify.prompt_generation.instruction"] = ref - attrs["dify.prompt_generation.output"] = ref + ref = f"ref:trace_id={info.trace_id}" + outputs = self._safe_payload_value(info.outputs) + attrs["dify.prompt_generation.instruction"] = self._content_or_ref(info.instruction, ref) + attrs["dify.prompt_generation.output"] = self._content_or_ref(outputs, ref) emit_metric_only_event( event_name="dify.prompt_generation.execution", attributes=attrs, - span_id_source=info.metadata.get("node_execution_id"), - tenant_id=info.tenant_id, - user_id=info.user_id, + span_id_source=node_execution_id, + tenant_id=tenant_id, + user_id=user_id, ) - labels: dict[str, Any] = { - "tenant_id": info.tenant_id, - "app_id": info.app_id or "", - "operation_type": info.operation_type, - "model_provider": info.model_provider, - "model_name": info.model_name, - } + labels = self._labels( + tenant_id=tenant_id or "", + app_id=app_id or "", + operation_type=info.operation_type, + model_provider=info.model_provider, + model_name=info.model_name, + ) self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels) if info.prompt_tokens > 0: @@ -668,7 +820,11 @@ class EnterpriseOtelTrace: self._exporter.increment_counter( EnterpriseTelemetryCounter.REQUESTS, 1, - {**labels, "type": "prompt_generation", "status": status}, + self._labels( + **labels, + type="prompt_generation", + status=status, + ), ) self._exporter.record_histogram( @@ -681,5 +837,8 @@ class EnterpriseOtelTrace: self._exporter.increment_counter( EnterpriseTelemetryCounter.ERRORS, 1, - {**labels, "type": "prompt_generation"}, + self._labels( + **labels, + type="prompt_generation", + ), ) diff --git a/api/enterprise/telemetry/telemetry_log.py b/api/enterprise/telemetry/telemetry_log.py index 513f6e18e1..63d79e8dc4 100644 --- a/api/enterprise/telemetry/telemetry_log.py +++ b/api/enterprise/telemetry/telemetry_log.py @@ -8,11 +8,13 @@ from __future__ import annotations import logging import uuid +from functools import lru_cache from typing import Any logger = logging.getLogger("dify.telemetry") +@lru_cache(maxsize=4096) def compute_trace_id_hex(uuid_str: str | None) -> str: """Convert a business UUID string to a 32-hex OTEL-compatible trace_id. @@ -20,19 +22,26 @@ def compute_trace_id_hex(uuid_str: str | None) -> str: """ if not uuid_str: return "" + normalized = uuid_str.strip().lower() + if len(normalized) == 32 and all(ch in "0123456789abcdef" for ch in normalized): + return normalized try: - return f"{uuid.UUID(uuid_str).int:032x}" + return f"{uuid.UUID(normalized).int:032x}" except (ValueError, AttributeError): return "" +@lru_cache(maxsize=4096) def compute_span_id_hex(uuid_str: str | None) -> str: if not uuid_str: return "" + normalized = uuid_str.strip().lower() + if len(normalized) == 16 and all(ch in "0123456789abcdef" for ch in normalized): + return normalized try: from enterprise.telemetry.id_generator import compute_deterministic_span_id - return f"{compute_deterministic_span_id(uuid_str):016x}" + return f"{compute_deterministic_span_id(normalized):016x}" except (ValueError, AttributeError): return "" @@ -66,6 +75,8 @@ def emit_telemetry_log( user_id: User identifier (for the ``IdentityContextFilter``). """ + if not logger.isEnabledFor(logging.INFO): + return attrs = { "dify.event.name": event_name, "dify.event.signal": signal, diff --git a/api/services/message_service.py b/api/services/message_service.py index 6491d1e0a2..92b9460611 100644 --- a/api/services/message_service.py +++ b/api/services/message_service.py @@ -7,9 +7,8 @@ from core.llm_generator.llm_generator import LLMGenerator from core.memory.token_buffer_memory import TokenBufferMemory from core.model_manager import ModelManager from core.model_runtime.entities.model_entities import ModelType -from core.ops.entities.trace_entity import TraceTaskName -from core.ops.ops_trace_manager import TraceQueueManager, TraceTask from core.ops.utils import measure_time +from core.telemetry import TelemetryContext, TelemetryEvent, TelemetryFacade from events.feedback_event import feedback_was_created from extensions.ext_database import db from libs.infinite_scroll_pagination import InfiniteScrollPagination @@ -298,10 +297,15 @@ class MessageService: questions: list[str] = list(questions_sequence) # get tracing instance - trace_manager = TraceQueueManager(app_id=app_model.id) - trace_manager.add_trace_task( - TraceTask( - TraceTaskName.SUGGESTED_QUESTION_TRACE, message_id=message_id, suggested_question=questions, timer=timer + TelemetryFacade.emit( + TelemetryEvent( + name="suggested_question", + context=TelemetryContext(tenant_id=app_model.tenant_id, app_id=app_model.id), + payload={ + "message_id": message_id, + "suggested_question": questions, + "timer": timer, + }, ) )