feat(enterprise): Add OTEL telemetry with slim traces, metrics, and structured logs

- Add EnterpriseOtelTrace handler with span emission for workflows and nodes
- Implement minimal-span strategy: slim spans + detailed companion logs
- Add deterministic span/trace IDs for cross-workflow trace correlation
- Add metric collection at 100% accuracy (counters & histograms)
- Add event handlers for app lifecycle and feedback telemetry
- Add cross-workflow trace linking with parent context propagation
- Add OTEL exporter with configurable sampling and privacy controls
- Wire enterprise telemetry into workflow execution pipeline
- Add telemetry configuration in enterprise configs
This commit is contained in:
GareArc 2026-02-02 15:09:44 -08:00
parent 915b4ce840
commit 8fc0cbe20d
No known key found for this signature in database
29 changed files with 1804 additions and 45 deletions

View File

@ -79,6 +79,7 @@ def initialize_extensions(app: DifyApp):
ext_commands,
ext_compress,
ext_database,
ext_enterprise_telemetry,
ext_forward_refs,
ext_hosting_provider,
ext_import_modules,
@ -125,6 +126,7 @@ def initialize_extensions(app: DifyApp):
ext_blueprints,
ext_commands,
ext_otel,
ext_enterprise_telemetry,
ext_request_logging,
ext_session_factory,
]

View File

@ -8,7 +8,7 @@ from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, Settings
from libs.file_utils import search_file_upwards
from .deploy import DeploymentConfig
from .enterprise import EnterpriseFeatureConfig
from .enterprise import EnterpriseFeatureConfig, EnterpriseTelemetryConfig
from .extra import ExtraServiceConfig
from .feature import FeatureConfig
from .middleware import MiddlewareConfig
@ -73,6 +73,8 @@ class DifyConfig(
# Enterprise feature configs
# **Before using, please contact business@dify.ai by email to inquire about licensing matters.**
EnterpriseFeatureConfig,
# Enterprise telemetry configs
EnterpriseTelemetryConfig,
):
model_config = SettingsConfigDict(
# read from dotenv format config file

View File

@ -18,3 +18,39 @@ class EnterpriseFeatureConfig(BaseSettings):
description="Allow customization of the enterprise logo.",
default=False,
)
class EnterpriseTelemetryConfig(BaseSettings):
"""
Configuration for enterprise telemetry.
"""
ENTERPRISE_TELEMETRY_ENABLED: bool = Field(
description="Enable enterprise telemetry collection (also requires ENTERPRISE_ENABLED=true).",
default=False,
)
ENTERPRISE_OTLP_ENDPOINT: str = Field(
description="Enterprise OTEL collector endpoint.",
default="",
)
ENTERPRISE_OTLP_HEADERS: str = Field(
description="Auth headers for OTLP export (key=value,key2=value2).",
default="",
)
ENTERPRISE_INCLUDE_CONTENT: bool = Field(
description="Include input/output content in traces (privacy toggle).",
default=True,
)
ENTERPRISE_SERVICE_NAME: str = Field(
description="Service name for OTEL resource.",
default="dify",
)
ENTERPRISE_OTEL_SAMPLING_RATE: float = Field(
description="Sampling rate for enterprise traces (0.0 to 1.0, default 1.0 = 100%).",
default=1.0,
)

View File

@ -62,7 +62,7 @@ from core.app.task_pipeline.message_cycle_manager import MessageCycleManager
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.ops_trace_manager import TraceQueueManager
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName
from core.workflow.enums import WorkflowExecutionStatus
from core.workflow.nodes import NodeType
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
@ -564,7 +564,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
**kwargs,
) -> Generator[StreamResponse, None, None]:
"""Handle stop events."""
_ = trace_manager
resolved_state = None
if self._workflow_run_id:
resolved_state = self._resolve_graph_runtime_state(graph_runtime_state)
@ -579,8 +578,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
)
with self._database_session() as session:
# Save message
self._save_message(session=session, graph_runtime_state=resolved_state)
self._save_message(session=session, graph_runtime_state=resolved_state, trace_manager=trace_manager)
yield workflow_finish_resp
elif event.stopped_by in (
@ -589,8 +587,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
):
# When hitting input-moderation or annotation-reply, the workflow will not start
with self._database_session() as session:
# Save message
self._save_message(session=session)
self._save_message(session=session, trace_manager=trace_manager)
yield self._message_end_to_stream_response()
@ -599,6 +596,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
event: QueueAdvancedChatMessageEndEvent,
*,
graph_runtime_state: GraphRuntimeState | None = None,
trace_manager: TraceQueueManager | None = None,
**kwargs,
) -> Generator[StreamResponse, None, None]:
"""Handle advanced chat message end events."""
@ -616,7 +614,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
# Save message
with self._database_session() as session:
self._save_message(session=session, graph_runtime_state=resolved_state)
self._save_message(session=session, graph_runtime_state=resolved_state, trace_manager=trace_manager)
yield self._message_end_to_stream_response()
@ -770,7 +768,13 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
if self._conversation_name_generate_thread:
logger.debug("Conversation name generation running as daemon thread")
def _save_message(self, *, session: Session, graph_runtime_state: GraphRuntimeState | None = None):
def _save_message(
self,
*,
session: Session,
graph_runtime_state: GraphRuntimeState | None = None,
trace_manager: TraceQueueManager | None = None,
):
message = self._get_message(session=session)
# If there are assistant files, remove markdown image links from answer
@ -826,6 +830,15 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
]
session.add_all(message_files)
if trace_manager:
trace_manager.add_trace_task(
TraceTask(
TraceTaskName.MESSAGE_TRACE,
conversation_id=str(message.conversation_id),
message_id=str(message.id),
)
)
def _seed_graph_runtime_state_from_queue_manager(self) -> None:
"""Bootstrap the cached runtime state from the queue manager when present."""
candidate = self._base_task_pipeline.queue_manager.graph_runtime_state

View File

@ -15,16 +15,23 @@ class TraceContextFilter(logging.Filter):
"""
def filter(self, record: logging.LogRecord) -> bool:
# Get trace context from OpenTelemetry
trace_id, span_id = self._get_otel_context()
# Preserve explicit trace_id set by the caller (e.g. emit_metric_only_event)
existing_trace_id = getattr(record, "trace_id", "")
if not existing_trace_id:
# Get trace context from OpenTelemetry
trace_id, span_id = self._get_otel_context()
# Set trace_id (fallback to ContextVar if no OTEL context)
if trace_id:
record.trace_id = trace_id
# Set trace_id (fallback to ContextVar if no OTEL context)
if trace_id:
record.trace_id = trace_id
else:
record.trace_id = get_trace_id()
record.span_id = span_id or ""
else:
record.trace_id = get_trace_id()
record.span_id = span_id or ""
# Keep existing trace_id; only fill span_id if missing
if not getattr(record, "span_id", ""):
record.span_id = ""
# For backward compatibility, also set req_id
record.req_id = get_request_id()
@ -55,9 +62,12 @@ class IdentityContextFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
identity = self._extract_identity()
record.tenant_id = identity.get("tenant_id", "")
record.user_id = identity.get("user_id", "")
record.user_type = identity.get("user_type", "")
if not getattr(record, "tenant_id", ""):
record.tenant_id = identity.get("tenant_id", "")
if not getattr(record, "user_id", ""):
record.user_id = identity.get("user_id", "")
if not getattr(record, "user_type", ""):
record.user_type = identity.get("user_type", "")
return True
def _extract_identity(self) -> dict[str, str]:

View File

@ -114,6 +114,50 @@ class GenerateNameTraceInfo(BaseTraceInfo):
tenant_id: str
class WorkflowNodeTraceInfo(BaseTraceInfo):
workflow_id: str
workflow_run_id: str
tenant_id: str
node_execution_id: str
node_id: str
node_type: str
title: str
status: str
error: str | None = None
elapsed_time: float
index: int
predecessor_node_id: str | None = None
total_tokens: int = 0
total_price: float = 0.0
currency: str | None = None
model_provider: str | None = None
model_name: str | None = None
prompt_tokens: int | None = None
completion_tokens: int | None = None
tool_name: str | None = None
iteration_id: str | None = None
iteration_index: int | None = None
loop_id: str | None = None
loop_index: int | None = None
parallel_id: str | None = None
node_inputs: Mapping[str, Any] | None = None
node_outputs: Mapping[str, Any] | None = None
process_data: Mapping[str, Any] | None = None
model_config = ConfigDict(protected_namespaces=())
class DraftNodeExecutionTrace(WorkflowNodeTraceInfo):
pass
class TaskData(BaseModel):
app_id: str
trace_info_type: str
@ -128,12 +172,15 @@ trace_info_info_map = {
"DatasetRetrievalTraceInfo": DatasetRetrievalTraceInfo,
"ToolTraceInfo": ToolTraceInfo,
"GenerateNameTraceInfo": GenerateNameTraceInfo,
"WorkflowNodeTraceInfo": WorkflowNodeTraceInfo,
"DraftNodeExecutionTrace": DraftNodeExecutionTrace,
}
class TraceTaskName(StrEnum):
CONVERSATION_TRACE = "conversation"
WORKFLOW_TRACE = "workflow"
DRAFT_NODE_EXECUTION_TRACE = "draft_node_execution"
MESSAGE_TRACE = "message"
MODERATION_TRACE = "moderation"
SUGGESTED_QUESTION_TRACE = "suggested_question"
@ -141,3 +188,4 @@ class TraceTaskName(StrEnum):
TOOL_TRACE = "tool"
GENERATE_NAME_TRACE = "generate_conversation_name"
DATASOURCE_TRACE = "datasource"
NODE_EXECUTION_TRACE = "node_execution"

View File

@ -3,6 +3,7 @@ import os
from datetime import datetime, timedelta
from langfuse import Langfuse
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from core.ops.base_trace_instance import BaseTraceInstance
@ -30,7 +31,7 @@ from core.ops.utils import filter_none_values
from core.repositories import DifyCoreRepositoryFactory
from core.workflow.enums import NodeType
from extensions.ext_database import db
from models import EndUser, WorkflowNodeExecutionTriggeredFrom
from models import EndUser, Message, WorkflowNodeExecutionTriggeredFrom
from models.enums import MessageStatus
logger = logging.getLogger(__name__)
@ -71,7 +72,50 @@ class LangFuseDataTrace(BaseTraceInstance):
metadata = trace_info.metadata
metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id
if trace_info.message_id:
# Check for parent_trace_context to detect nested workflow
parent_trace_context = trace_info.metadata.get("parent_trace_context")
if parent_trace_context:
# Nested workflow: create span under outer trace
outer_trace_id = parent_trace_context.get("trace_id")
parent_node_execution_id = parent_trace_context.get("parent_node_execution_id")
parent_conversation_id = parent_trace_context.get("parent_conversation_id")
parent_workflow_run_id = parent_trace_context.get("parent_workflow_run_id")
# Resolve outer trace_id: try message_id lookup first, fallback to workflow_run_id
if parent_conversation_id:
session_factory = sessionmaker(bind=db.engine)
with session_factory() as session:
message_data_stmt = select(Message.id).where(
Message.conversation_id == parent_conversation_id,
Message.workflow_run_id == parent_workflow_run_id,
)
resolved_message_id = session.scalar(message_data_stmt)
if resolved_message_id:
outer_trace_id = resolved_message_id
else:
outer_trace_id = parent_workflow_run_id
else:
outer_trace_id = parent_workflow_run_id
# Create inner workflow span under outer trace
workflow_span_data = LangfuseSpan(
id=trace_info.workflow_run_id,
name=TraceTaskName.WORKFLOW_TRACE,
input=dict(trace_info.workflow_run_inputs),
output=dict(trace_info.workflow_run_outputs),
trace_id=outer_trace_id,
parent_observation_id=parent_node_execution_id,
start_time=trace_info.start_time,
end_time=trace_info.end_time,
metadata=metadata,
level=LevelEnum.DEFAULT if trace_info.error == "" else LevelEnum.ERROR,
status_message=trace_info.error or "",
)
self.add_span(langfuse_span_data=workflow_span_data)
# Use outer_trace_id for all node spans/generations
trace_id = outer_trace_id
elif trace_info.message_id:
trace_id = trace_info.trace_id or trace_info.message_id
name = TraceTaskName.MESSAGE_TRACE
trace_data = LangfuseTrace(
@ -174,6 +218,11 @@ class LangFuseDataTrace(BaseTraceInstance):
}
)
# Determine parent_observation_id for nested workflows
node_parent_observation_id = None
if parent_trace_context or trace_info.message_id:
node_parent_observation_id = trace_info.workflow_run_id
# add generation span
if process_data and process_data.get("model_mode") == "chat":
total_token = metadata.get("total_tokens", 0)
@ -206,7 +255,7 @@ class LangFuseDataTrace(BaseTraceInstance):
metadata=metadata,
level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
status_message=trace_info.error or "",
parent_observation_id=trace_info.workflow_run_id if trace_info.message_id else None,
parent_observation_id=node_parent_observation_id,
usage=generation_usage,
)
@ -225,7 +274,7 @@ class LangFuseDataTrace(BaseTraceInstance):
metadata=metadata,
level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
status_message=trace_info.error or "",
parent_observation_id=trace_info.workflow_run_id if trace_info.message_id else None,
parent_observation_id=node_parent_observation_id,
)
self.add_span(langfuse_span_data=span_data)

View File

@ -6,6 +6,7 @@ from typing import cast
from langsmith import Client
from langsmith.schemas import RunBase
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from core.ops.base_trace_instance import BaseTraceInstance
@ -30,7 +31,7 @@ from core.ops.utils import filter_none_values, generate_dotted_order
from core.repositories import DifyCoreRepositoryFactory
from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
from models import EndUser, Message, MessageFile, WorkflowNodeExecutionTriggeredFrom
logger = logging.getLogger(__name__)
@ -64,7 +65,35 @@ class LangSmithDataTrace(BaseTraceInstance):
self.generate_name_trace(trace_info)
def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = trace_info.trace_id or trace_info.message_id or trace_info.workflow_run_id
# Check for parent_trace_context for cross-workflow linking
parent_trace_context = trace_info.metadata.get("parent_trace_context")
if parent_trace_context:
# Inner workflow: resolve outer trace_id and link to parent node
outer_trace_id = parent_trace_context.get("parent_workflow_run_id")
# Try to resolve message_id from conversation_id if available
if parent_trace_context.get("parent_conversation_id"):
try:
session_factory = sessionmaker(bind=db.engine)
with session_factory() as session:
message_data_stmt = select(Message.id).where(
Message.conversation_id == parent_trace_context["parent_conversation_id"],
Message.workflow_run_id == parent_trace_context["parent_workflow_run_id"],
)
resolved_message_id = session.scalar(message_data_stmt)
if resolved_message_id:
outer_trace_id = resolved_message_id
except Exception as e:
logger.debug("Failed to resolve message_id from conversation_id: %s", str(e))
trace_id = outer_trace_id
parent_run_id = parent_trace_context.get("parent_node_execution_id")
else:
# Outer workflow: existing behavior
trace_id = trace_info.trace_id or trace_info.message_id or trace_info.workflow_run_id
parent_run_id = trace_info.message_id or None
if trace_info.start_time is None:
trace_info.start_time = datetime.now()
message_dotted_order = (
@ -78,7 +107,8 @@ class LangSmithDataTrace(BaseTraceInstance):
metadata = trace_info.metadata
metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id
if trace_info.message_id:
# Only create message_run for outer workflows (no parent_trace_context)
if trace_info.message_id and not parent_trace_context:
message_run = LangSmithRunModel(
id=trace_info.message_id,
name=TraceTaskName.MESSAGE_TRACE,
@ -121,9 +151,9 @@ class LangSmithDataTrace(BaseTraceInstance):
},
error=trace_info.error,
tags=["workflow"],
parent_run_id=trace_info.message_id or None,
parent_run_id=parent_run_id,
trace_id=trace_id,
dotted_order=workflow_dotted_order,
dotted_order=None if parent_trace_context else workflow_dotted_order,
serialized=None,
events=[],
session_id=None,

View File

@ -21,6 +21,7 @@ from core.ops.entities.config_entity import (
)
from core.ops.entities.trace_entity import (
DatasetRetrievalTraceInfo,
DraftNodeExecutionTrace,
GenerateNameTraceInfo,
MessageTraceInfo,
ModerationTraceInfo,
@ -28,12 +29,16 @@ from core.ops.entities.trace_entity import (
TaskData,
ToolTraceInfo,
TraceTaskName,
WorkflowNodeTraceInfo,
WorkflowTraceInfo,
)
from core.ops.utils import get_message_data
from extensions.ext_database import db
from extensions.ext_storage import storage
from models.account import Tenant
from models.dataset import Dataset
from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
from models.tools import ApiToolProvider, BuiltinToolProvider, MCPToolProvider, WorkflowToolProvider
from models.workflow import WorkflowAppLog
from repositories.factory import DifyAPIRepositoryFactory
from tasks.ops_trace_task import process_trace_tasks
@ -44,6 +49,44 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
def _lookup_app_and_workspace_names(app_id: str | None, tenant_id: str | None) -> tuple[str, str]:
"""Return (app_name, workspace_name) for the given IDs. Falls back to empty strings."""
app_name = ""
workspace_name = ""
if not app_id and not tenant_id:
return app_name, workspace_name
with Session(db.engine) as session:
if app_id:
name = session.scalar(select(App.name).where(App.id == app_id))
if name:
app_name = name
if tenant_id:
name = session.scalar(select(Tenant.name).where(Tenant.id == tenant_id))
if name:
workspace_name = name
return app_name, workspace_name
_PROVIDER_TYPE_TO_MODEL: dict[str, type] = {
"builtin": BuiltinToolProvider,
"plugin": BuiltinToolProvider,
"api": ApiToolProvider,
"workflow": WorkflowToolProvider,
"mcp": MCPToolProvider,
}
def _lookup_credential_name(credential_id: str | None, provider_type: str | None) -> str:
if not credential_id:
return ""
model_cls = _PROVIDER_TYPE_TO_MODEL.get(provider_type or "")
if not model_cls:
return ""
with Session(db.engine) as session:
name = session.scalar(select(model_cls.name).where(model_cls.id == credential_id))
return str(name) if name else ""
class OpsTraceProviderConfigMap(collections.UserDict[str, dict[str, Any]]):
def __getitem__(self, provider: str) -> dict[str, Any]:
match provider:
@ -526,6 +569,8 @@ class TraceTask:
TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
conversation_id=self.conversation_id, timer=self.timer, **self.kwargs
),
TraceTaskName.NODE_EXECUTION_TRACE: lambda: self.node_execution_trace(**self.kwargs),
TraceTaskName.DRAFT_NODE_EXECUTION_TRACE: lambda: self.draft_node_execution_trace(**self.kwargs),
}
return preprocess_map.get(self.trace_type, lambda: None)()
@ -581,7 +626,9 @@ class TraceTask:
)
message_id = session.scalar(message_data_stmt)
metadata = {
app_name, workspace_name = _lookup_app_and_workspace_names(workflow_run.app_id, tenant_id)
metadata: dict[str, Any] = {
"workflow_id": workflow_id,
"conversation_id": conversation_id,
"workflow_run_id": workflow_run_id,
@ -594,8 +641,14 @@ class TraceTask:
"triggered_from": workflow_run.triggered_from,
"user_id": user_id,
"app_id": workflow_run.app_id,
"app_name": app_name,
"workspace_name": workspace_name,
}
parent_trace_context = self.kwargs.get("parent_trace_context")
if parent_trace_context:
metadata["parent_trace_context"] = parent_trace_context
workflow_trace_info = WorkflowTraceInfo(
trace_id=self.trace_id,
workflow_data=workflow_run.to_dict(),
@ -643,6 +696,14 @@ class TraceTask:
streaming_metrics = self._extract_streaming_metrics(message_data)
tenant_id = ""
with Session(db.engine) as session:
tid = session.scalar(select(App.tenant_id).where(App.id == message_data.app_id))
if tid:
tenant_id = str(tid)
app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id)
metadata = {
"conversation_id": message_data.conversation_id,
"ls_provider": message_data.model_provider,
@ -654,6 +715,11 @@ class TraceTask:
"workflow_run_id": message_data.workflow_run_id,
"from_source": message_data.from_source,
"message_id": message_id,
"tenant_id": tenant_id,
"app_id": message_data.app_id,
"user_id": message_data.from_end_user_id or message_data.from_account_id,
"app_name": app_name,
"workspace_name": workspace_name,
}
message_tokens = message_data.message_tokens
@ -776,6 +842,36 @@ class TraceTask:
if not message_data:
return {}
tenant_id = ""
with Session(db.engine) as session:
tid = session.scalar(select(App.tenant_id).where(App.id == message_data.app_id))
if tid:
tenant_id = str(tid)
app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id)
doc_list = [doc.model_dump() for doc in documents] if documents else []
dataset_ids: set[str] = set()
for doc in doc_list:
doc_meta = doc.get("metadata") or {}
did = doc_meta.get("dataset_id")
if did:
dataset_ids.add(did)
embedding_models: dict[str, dict[str, str]] = {}
if dataset_ids:
with Session(db.engine) as session:
rows = session.execute(
select(Dataset.id, Dataset.embedding_model, Dataset.embedding_model_provider).where(
Dataset.id.in_(list(dataset_ids))
)
).all()
for row in rows:
embedding_models[str(row[0])] = {
"embedding_model": row[1] or "",
"embedding_model_provider": row[2] or "",
}
metadata = {
"message_id": message_id,
"ls_provider": message_data.model_provider,
@ -786,13 +882,19 @@ class TraceTask:
"agent_based": message_data.agent_based,
"workflow_run_id": message_data.workflow_run_id,
"from_source": message_data.from_source,
"tenant_id": tenant_id,
"app_id": message_data.app_id,
"user_id": message_data.from_end_user_id or message_data.from_account_id,
"app_name": app_name,
"workspace_name": workspace_name,
"embedding_models": embedding_models,
}
dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
trace_id=self.trace_id,
message_id=message_id,
inputs=message_data.query or message_data.inputs,
documents=[doc.model_dump() for doc in documents] if documents else [],
documents=doc_list,
start_time=timer.get("start"),
end_time=timer.get("end"),
metadata=metadata,
@ -903,6 +1005,90 @@ class TraceTask:
return generate_name_trace_info
def node_execution_trace(self, **kwargs) -> WorkflowNodeTraceInfo | dict:
node_data: dict = kwargs.get("node_execution_data", {})
if not node_data:
return {}
app_name, workspace_name = _lookup_app_and_workspace_names(node_data.get("app_id"), node_data.get("tenant_id"))
credential_name = _lookup_credential_name(
node_data.get("credential_id"), node_data.get("credential_provider_type")
)
metadata: dict[str, Any] = {
"tenant_id": node_data.get("tenant_id"),
"app_id": node_data.get("app_id"),
"app_name": app_name,
"workspace_name": workspace_name,
"user_id": node_data.get("user_id"),
"dataset_ids": node_data.get("dataset_ids"),
"dataset_names": node_data.get("dataset_names"),
"plugin_name": node_data.get("plugin_name"),
"credential_name": credential_name,
}
parent_trace_context = node_data.get("parent_trace_context")
if parent_trace_context:
metadata["parent_trace_context"] = parent_trace_context
message_id: str | None = None
conversation_id = node_data.get("conversation_id")
workflow_execution_id = node_data.get("workflow_execution_id")
if conversation_id and workflow_execution_id and not parent_trace_context:
with Session(db.engine) as session:
msg_id = session.scalar(
select(Message.id).where(
Message.conversation_id == conversation_id,
Message.workflow_run_id == workflow_execution_id,
)
)
if msg_id:
message_id = str(msg_id)
metadata["message_id"] = message_id
return WorkflowNodeTraceInfo(
trace_id=self.trace_id,
message_id=message_id,
start_time=node_data.get("created_at"),
end_time=node_data.get("finished_at"),
metadata=metadata,
workflow_id=node_data.get("workflow_id", ""),
workflow_run_id=node_data.get("workflow_execution_id", ""),
tenant_id=node_data.get("tenant_id", ""),
node_execution_id=node_data.get("node_execution_id", ""),
node_id=node_data.get("node_id", ""),
node_type=node_data.get("node_type", ""),
title=node_data.get("title", ""),
status=node_data.get("status", ""),
error=node_data.get("error"),
elapsed_time=node_data.get("elapsed_time", 0.0),
index=node_data.get("index", 0),
predecessor_node_id=node_data.get("predecessor_node_id"),
total_tokens=node_data.get("total_tokens", 0),
total_price=node_data.get("total_price", 0.0),
currency=node_data.get("currency"),
model_provider=node_data.get("model_provider"),
model_name=node_data.get("model_name"),
prompt_tokens=node_data.get("prompt_tokens"),
completion_tokens=node_data.get("completion_tokens"),
tool_name=node_data.get("tool_name"),
iteration_id=node_data.get("iteration_id"),
iteration_index=node_data.get("iteration_index"),
loop_id=node_data.get("loop_id"),
loop_index=node_data.get("loop_index"),
parallel_id=node_data.get("parallel_id"),
node_inputs=node_data.get("node_inputs"),
node_outputs=node_data.get("node_outputs"),
process_data=node_data.get("process_data"),
)
def draft_node_execution_trace(self, **kwargs) -> DraftNodeExecutionTrace | dict:
node_trace = self.node_execution_trace(**kwargs)
if not node_trace or not isinstance(node_trace, WorkflowNodeTraceInfo):
return node_trace
return DraftNodeExecutionTrace(**node_trace.model_dump())
def _extract_streaming_metrics(self, message_data) -> dict:
if not message_data.message_metadata:
return {}
@ -936,13 +1122,17 @@ class TraceQueueManager:
self.user_id = user_id
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
self.flask_app = current_app._get_current_object() # type: ignore
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled()
if trace_manager_timer is None:
self.start_timer()
def add_trace_task(self, trace_task: TraceTask):
global trace_manager_timer, trace_manager_queue
try:
if self.trace_instance:
if self._enterprise_telemetry_enabled or self.trace_instance:
trace_task.app_id = self.app_id
trace_manager_queue.put(trace_task)
except Exception:

View File

@ -5,7 +5,6 @@ import logging
from collections.abc import Generator, Mapping, Sequence
from typing import Any, cast
from flask import has_request_context
from sqlalchemy import select
from core.db.session_factory import session_factory

View File

@ -371,6 +371,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._workflow_node_execution_repository.save(domain_execution)
self._workflow_node_execution_repository.save_execution_data(domain_execution)
self._enqueue_node_trace_task(domain_execution)
def _fail_running_node_executions(self, *, error_message: str) -> None:
now = naive_utc_now()
@ -388,8 +389,10 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
conversation_id = self._system_variables().get(SystemVariableKey.CONVERSATION_ID.value)
external_trace_id = None
parent_trace_context = None
if isinstance(self._application_generate_entity, (WorkflowAppGenerateEntity, AdvancedChatAppGenerateEntity)):
external_trace_id = self._application_generate_entity.extras.get("external_trace_id")
parent_trace_context = self._application_generate_entity.extras.get("parent_trace_context")
trace_task = TraceTask(
TraceTaskName.WORKFLOW_TRACE,
@ -397,6 +400,99 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
conversation_id=conversation_id,
user_id=self._trace_manager.user_id,
external_trace_id=external_trace_id,
parent_trace_context=parent_trace_context,
)
self._trace_manager.add_trace_task(trace_task)
def _enqueue_node_trace_task(self, domain_execution: WorkflowNodeExecution) -> None:
if not self._trace_manager:
return
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
if not is_enterprise_telemetry_enabled():
return
execution = self._get_workflow_execution()
meta = domain_execution.metadata or {}
parent_trace_context = None
if isinstance(self._application_generate_entity, (WorkflowAppGenerateEntity, AdvancedChatAppGenerateEntity)):
parent_trace_context = self._application_generate_entity.extras.get("parent_trace_context")
node_data: dict[str, Any] = {
"workflow_id": domain_execution.workflow_id,
"workflow_execution_id": execution.id_,
"tenant_id": self._application_generate_entity.app_config.tenant_id,
"app_id": self._application_generate_entity.app_config.app_id,
"node_execution_id": domain_execution.id,
"node_id": domain_execution.node_id,
"node_type": str(domain_execution.node_type.value),
"title": domain_execution.title,
"status": str(domain_execution.status.value),
"error": domain_execution.error,
"elapsed_time": domain_execution.elapsed_time,
"index": domain_execution.index,
"predecessor_node_id": domain_execution.predecessor_node_id,
"created_at": domain_execution.created_at,
"finished_at": domain_execution.finished_at,
"total_tokens": meta.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS, 0),
"total_price": meta.get(WorkflowNodeExecutionMetadataKey.TOTAL_PRICE, 0.0),
"currency": meta.get(WorkflowNodeExecutionMetadataKey.CURRENCY),
"tool_name": (meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO) or {}).get("tool_name")
if isinstance(meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO), dict)
else None,
"iteration_id": meta.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID),
"iteration_index": meta.get(WorkflowNodeExecutionMetadataKey.ITERATION_INDEX),
"loop_id": meta.get(WorkflowNodeExecutionMetadataKey.LOOP_ID),
"loop_index": meta.get(WorkflowNodeExecutionMetadataKey.LOOP_INDEX),
"parallel_id": meta.get(WorkflowNodeExecutionMetadataKey.PARALLEL_ID),
"node_inputs": dict(domain_execution.inputs) if domain_execution.inputs else None,
"node_outputs": dict(domain_execution.outputs) if domain_execution.outputs else None,
"process_data": dict(domain_execution.process_data) if domain_execution.process_data else None,
}
node_data["invoke_from"] = self._application_generate_entity.invoke_from.value
node_data["user_id"] = self._system_variables().get(SystemVariableKey.USER_ID.value)
if domain_execution.node_type.value == "knowledge-retrieval" and domain_execution.outputs:
results = domain_execution.outputs.get("result") or []
dataset_ids: list[str] = []
dataset_names: list[str] = []
for doc in results:
if not isinstance(doc, dict):
continue
doc_meta = doc.get("metadata") or {}
did = doc_meta.get("dataset_id")
dname = doc_meta.get("dataset_name")
if did and did not in dataset_ids:
dataset_ids.append(did)
if dname and dname not in dataset_names:
dataset_names.append(dname)
if dataset_ids:
node_data["dataset_ids"] = dataset_ids
if dataset_names:
node_data["dataset_names"] = dataset_names
tool_info = meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO)
if isinstance(tool_info, dict):
plugin_id = tool_info.get("plugin_unique_identifier")
if plugin_id:
node_data["plugin_name"] = plugin_id
credential_id = tool_info.get("credential_id")
if credential_id:
node_data["credential_id"] = credential_id
node_data["credential_provider_type"] = tool_info.get("provider_type")
conversation_id = self._system_variables().get(SystemVariableKey.CONVERSATION_ID.value)
if conversation_id:
node_data["conversation_id"] = conversation_id
if parent_trace_context:
node_data["parent_trace_context"] = parent_trace_context
trace_task = TraceTask(
TraceTaskName.NODE_EXECUTION_TRACE,
node_execution_data=node_data,
)
self._trace_manager.add_trace_task(trace_task)

View File

@ -61,6 +61,7 @@ class ToolNode(Node[ToolNodeData]):
"provider_type": self.node_data.provider_type.value,
"provider_id": self.node_data.provider_id,
"plugin_unique_identifier": self.node_data.plugin_unique_identifier,
"credential_id": self.node_data.credential_id,
}
# get tool runtime
@ -108,12 +109,15 @@ class ToolNode(Node[ToolNodeData]):
from core.tools.workflow_as_tool.tool import WorkflowTool
if isinstance(tool_runtime, WorkflowTool):
workflow_run_id_var = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.WORKFLOW_RUN_ID])
workflow_run_id_var = self.graph_runtime_state.variable_pool.get(
["sys", SystemVariableKey.WORKFLOW_EXECUTION_ID]
)
tool_runtime.parent_trace_context = {
"trace_id": str(workflow_run_id_var.text) if workflow_run_id_var else "",
"parent_node_execution_id": self.execution_id,
"parent_workflow_run_id": str(workflow_run_id_var.text) if workflow_run_id_var else "",
"parent_app_id": self.app_id,
"parent_conversation_id": conversation_id.text if conversation_id else None,
}
try:

View File

View File

View File

@ -0,0 +1,77 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.workflow.enums import WorkflowNodeExecutionMetadataKey
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
from models.workflow import WorkflowNodeExecutionModel
def enqueue_draft_node_execution_trace(
*,
execution: WorkflowNodeExecutionModel,
outputs: Mapping[str, Any] | None,
workflow_execution_id: str | None,
user_id: str,
) -> None:
if not is_enterprise_telemetry_enabled():
return
trace_manager = TraceQueueManager(app_id=execution.app_id, user_id=user_id)
node_data = _build_node_execution_data(
execution=execution,
outputs=outputs,
workflow_execution_id=workflow_execution_id,
)
trace_manager.add_trace_task(
TraceTask(
TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
node_execution_data=node_data,
)
)
def _build_node_execution_data(
*,
execution: WorkflowNodeExecutionModel,
outputs: Mapping[str, Any] | None,
workflow_execution_id: str | None,
) -> dict[str, Any]:
metadata = execution.execution_metadata_dict
node_outputs = outputs if outputs is not None else execution.outputs_dict
execution_id = workflow_execution_id or execution.workflow_run_id or execution.id
return {
"workflow_id": execution.workflow_id,
"workflow_execution_id": execution_id,
"tenant_id": execution.tenant_id,
"app_id": execution.app_id,
"node_execution_id": execution.id,
"node_id": execution.node_id,
"node_type": execution.node_type,
"title": execution.title,
"status": execution.status,
"error": execution.error,
"elapsed_time": execution.elapsed_time,
"index": execution.index,
"predecessor_node_id": execution.predecessor_node_id,
"created_at": execution.created_at,
"finished_at": execution.finished_at,
"total_tokens": metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS, 0),
"total_price": metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_PRICE, 0.0),
"currency": metadata.get(WorkflowNodeExecutionMetadataKey.CURRENCY),
"tool_name": (metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO) or {}).get("tool_name")
if isinstance(metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO), dict)
else None,
"iteration_id": metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID),
"iteration_index": metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_INDEX),
"loop_id": metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_ID),
"loop_index": metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_INDEX),
"parallel_id": metadata.get(WorkflowNodeExecutionMetadataKey.PARALLEL_ID),
"node_inputs": execution.inputs_dict,
"node_outputs": node_outputs,
"process_data": execution.process_data_dict,
}

View File

@ -0,0 +1,570 @@
"""Enterprise trace handler — duck-typed, NOT a BaseTraceInstance subclass.
Invoked directly in the Celery task, not through OpsTraceManager dispatch.
Only requires a matching ``trace(trace_info)`` method signature.
Signal strategy:
- **Traces (spans)**: workflow run, node execution, draft node execution only.
- **Metrics + structured logs**: all other event types.
"""
from __future__ import annotations
import json
import logging
from typing import Any
from core.ops.entities.trace_entity import (
BaseTraceInfo,
DatasetRetrievalTraceInfo,
DraftNodeExecutionTrace,
GenerateNameTraceInfo,
MessageTraceInfo,
ModerationTraceInfo,
SuggestedQuestionTraceInfo,
ToolTraceInfo,
WorkflowNodeTraceInfo,
WorkflowTraceInfo,
)
from enterprise.telemetry.entities import (
EnterpriseTelemetryCounter,
EnterpriseTelemetryHistogram,
EnterpriseTelemetrySpan,
)
from enterprise.telemetry.telemetry_log import emit_metric_only_event, emit_telemetry_log
logger = logging.getLogger(__name__)
class EnterpriseOtelTrace:
"""Duck-typed enterprise trace handler.
``*_trace`` methods emit spans (workflow/node only) or structured logs
(all other events), plus metrics at 100 % accuracy.
"""
def __init__(self) -> None:
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
exporter = get_enterprise_exporter()
if exporter is None:
raise RuntimeError("EnterpriseOtelTrace instantiated but exporter is not initialized")
self._exporter = exporter
def trace(self, trace_info: BaseTraceInfo) -> None:
if isinstance(trace_info, WorkflowTraceInfo):
self._workflow_trace(trace_info)
elif isinstance(trace_info, MessageTraceInfo):
self._message_trace(trace_info)
elif isinstance(trace_info, ToolTraceInfo):
self._tool_trace(trace_info)
elif isinstance(trace_info, DraftNodeExecutionTrace):
self._draft_node_execution_trace(trace_info)
elif isinstance(trace_info, WorkflowNodeTraceInfo):
self._node_execution_trace(trace_info)
elif isinstance(trace_info, ModerationTraceInfo):
self._moderation_trace(trace_info)
elif isinstance(trace_info, SuggestedQuestionTraceInfo):
self._suggested_question_trace(trace_info)
elif isinstance(trace_info, DatasetRetrievalTraceInfo):
self._dataset_retrieval_trace(trace_info)
elif isinstance(trace_info, GenerateNameTraceInfo):
self._generate_name_trace(trace_info)
def _common_attrs(self, trace_info: BaseTraceInfo) -> dict[str, Any]:
return {
"dify.trace_id": trace_info.trace_id,
"dify.tenant_id": trace_info.metadata.get("tenant_id"),
"dify.app_id": trace_info.metadata.get("app_id"),
"dify.app.name": trace_info.metadata.get("app_name"),
"dify.workspace.name": trace_info.metadata.get("workspace_name"),
"gen_ai.user.id": trace_info.metadata.get("user_id"),
"dify.message.id": trace_info.message_id,
}
def _maybe_json(self, value: Any) -> str | None:
if value is None:
return None
if isinstance(value, str):
return value
try:
return json.dumps(value, default=str)
except (TypeError, ValueError):
return str(value)
# ------------------------------------------------------------------
# SPAN-emitting handlers (workflow, node execution, draft node)
# ------------------------------------------------------------------
def _workflow_trace(self, info: WorkflowTraceInfo) -> None:
# -- Slim span attrs: identity + structure + status + timing only --
span_attrs: dict[str, Any] = {
"dify.trace_id": info.trace_id,
"dify.tenant_id": info.metadata.get("tenant_id"),
"dify.app_id": info.metadata.get("app_id"),
"dify.workflow.id": info.workflow_id,
"dify.workflow.run_id": info.workflow_run_id,
"dify.workflow.status": info.workflow_run_status,
"dify.workflow.error": info.error,
"dify.workflow.elapsed_time": info.workflow_run_elapsed_time,
"dify.invoke_from": info.metadata.get("triggered_from"),
"dify.conversation.id": info.conversation_id,
"dify.message.id": info.message_id,
}
trace_correlation_override: str | None = None
parent_span_id_source: str | None = None
parent_ctx = info.metadata.get("parent_trace_context")
if parent_ctx and isinstance(parent_ctx, dict):
span_attrs["dify.parent.trace_id"] = parent_ctx.get("trace_id")
span_attrs["dify.parent.node.execution_id"] = parent_ctx.get("parent_node_execution_id")
span_attrs["dify.parent.workflow.run_id"] = parent_ctx.get("parent_workflow_run_id")
span_attrs["dify.parent.app.id"] = parent_ctx.get("parent_app_id")
trace_correlation_override = parent_ctx.get("parent_workflow_run_id")
parent_span_id_source = parent_ctx.get("parent_node_execution_id")
self._exporter.export_span(
EnterpriseTelemetrySpan.WORKFLOW_RUN,
span_attrs,
correlation_id=info.workflow_run_id,
span_id_source=info.workflow_run_id,
start_time=info.start_time,
end_time=info.end_time,
trace_correlation_override=trace_correlation_override,
parent_span_id_source=parent_span_id_source,
)
# -- Companion log: ALL attrs (span + detail) for full picture --
log_attrs: dict[str, Any] = {**span_attrs}
log_attrs.update(
{
"dify.app.name": info.metadata.get("app_name"),
"dify.workspace.name": info.metadata.get("workspace_name"),
"gen_ai.user.id": info.metadata.get("user_id"),
"gen_ai.usage.total_tokens": info.total_tokens,
"dify.workflow.version": info.workflow_run_version,
}
)
if self._exporter.include_content:
log_attrs["dify.workflow.inputs"] = self._maybe_json(info.workflow_run_inputs)
log_attrs["dify.workflow.outputs"] = self._maybe_json(info.workflow_run_outputs)
log_attrs["dify.workflow.query"] = info.query
else:
ref = f"ref:workflow_run_id={info.workflow_run_id}"
log_attrs["dify.workflow.inputs"] = ref
log_attrs["dify.workflow.outputs"] = ref
log_attrs["dify.workflow.query"] = ref
emit_telemetry_log(
event_name="dify.workflow.run",
attributes=log_attrs,
signal="span_detail",
trace_id_source=info.workflow_run_id,
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
# -- Metrics --
labels = {
"tenant_id": info.tenant_id,
"app_id": info.metadata.get("app_id", ""),
}
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels)
invoke_from = info.metadata.get("triggered_from", "")
self._exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS,
1,
{**labels, "type": "workflow", "status": info.workflow_run_status, "invoke_from": invoke_from},
)
self._exporter.record_histogram(
EnterpriseTelemetryHistogram.WORKFLOW_DURATION,
float(info.workflow_run_elapsed_time),
{**labels, "status": info.workflow_run_status},
)
if info.error:
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "workflow"})
def _node_execution_trace(self, info: WorkflowNodeTraceInfo) -> None:
self._emit_node_execution_trace(info, EnterpriseTelemetrySpan.NODE_EXECUTION, "node")
def _draft_node_execution_trace(self, info: DraftNodeExecutionTrace) -> None:
self._emit_node_execution_trace(
info,
EnterpriseTelemetrySpan.DRAFT_NODE_EXECUTION,
"draft_node",
correlation_id_override=info.node_execution_id,
trace_correlation_override_param=info.workflow_run_id,
)
def _emit_node_execution_trace(
self,
info: WorkflowNodeTraceInfo,
span_name: EnterpriseTelemetrySpan,
request_type: str,
correlation_id_override: str | None = None,
trace_correlation_override_param: str | None = None,
) -> None:
# -- Slim span attrs: identity + structure + status + timing --
span_attrs: dict[str, Any] = {
"dify.trace_id": info.trace_id,
"dify.tenant_id": info.tenant_id,
"dify.app_id": info.metadata.get("app_id"),
"dify.workflow.id": info.workflow_id,
"dify.workflow.run_id": info.workflow_run_id,
"dify.message.id": info.message_id,
"dify.conversation.id": info.metadata.get("conversation_id"),
"dify.node.execution_id": info.node_execution_id,
"dify.node.id": info.node_id,
"dify.node.type": info.node_type,
"dify.node.title": info.title,
"dify.node.status": info.status,
"dify.node.error": info.error,
"dify.node.elapsed_time": info.elapsed_time,
"dify.node.index": info.index,
"dify.node.predecessor_node_id": info.predecessor_node_id,
"dify.node.iteration_id": info.iteration_id,
"dify.node.loop_id": info.loop_id,
"dify.node.parallel_id": info.parallel_id,
}
trace_correlation_override = trace_correlation_override_param
parent_ctx = info.metadata.get("parent_trace_context")
if parent_ctx and isinstance(parent_ctx, dict):
trace_correlation_override = parent_ctx.get("parent_workflow_run_id") or trace_correlation_override
effective_correlation_id = correlation_id_override or info.workflow_run_id
self._exporter.export_span(
span_name,
span_attrs,
correlation_id=effective_correlation_id,
span_id_source=info.node_execution_id,
start_time=info.start_time,
end_time=info.end_time,
trace_correlation_override=trace_correlation_override,
)
# -- Companion log: ALL attrs (span + detail) --
log_attrs: dict[str, Any] = {**span_attrs}
log_attrs.update(
{
"dify.app.name": info.metadata.get("app_name"),
"dify.workspace.name": info.metadata.get("workspace_name"),
"dify.invoke_from": info.metadata.get("invoke_from"),
"gen_ai.user.id": info.metadata.get("user_id"),
"gen_ai.usage.total_tokens": info.total_tokens,
"dify.node.total_price": info.total_price,
"dify.node.currency": info.currency,
"gen_ai.provider.name": info.model_provider,
"gen_ai.request.model": info.model_name,
"gen_ai.usage.input_tokens": info.prompt_tokens,
"gen_ai.usage.output_tokens": info.completion_tokens,
"gen_ai.tool.name": info.tool_name,
"dify.node.iteration_index": info.iteration_index,
"dify.node.loop_index": info.loop_index,
"dify.plugin.name": info.metadata.get("plugin_name"),
"dify.credential.name": info.metadata.get("credential_name"),
"dify.dataset.ids": self._maybe_json(info.metadata.get("dataset_ids")),
"dify.dataset.names": self._maybe_json(info.metadata.get("dataset_names")),
}
)
if self._exporter.include_content:
log_attrs["dify.node.inputs"] = self._maybe_json(info.node_inputs)
log_attrs["dify.node.outputs"] = self._maybe_json(info.node_outputs)
log_attrs["dify.node.process_data"] = self._maybe_json(info.process_data)
else:
ref = f"ref:node_execution_id={info.node_execution_id}"
log_attrs["dify.node.inputs"] = ref
log_attrs["dify.node.outputs"] = ref
log_attrs["dify.node.process_data"] = ref
emit_telemetry_log(
event_name=span_name.value,
attributes=log_attrs,
signal="span_detail",
trace_id_source=info.workflow_run_id,
tenant_id=info.tenant_id,
user_id=info.metadata.get("user_id"),
)
# -- Metrics --
labels = {
"tenant_id": info.tenant_id,
"app_id": info.metadata.get("app_id", ""),
"node_type": info.node_type,
"model_provider": info.model_provider or "",
}
if info.total_tokens:
token_labels = {**labels, "model_name": info.model_name or ""}
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels)
self._exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": request_type, "status": info.status}
)
duration_labels = dict(labels)
plugin_name = info.metadata.get("plugin_name")
if plugin_name and info.node_type in {"tool", "knowledge-retrieval"}:
duration_labels["plugin_name"] = plugin_name
self._exporter.record_histogram(EnterpriseTelemetryHistogram.NODE_DURATION, info.elapsed_time, duration_labels)
if info.error:
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": request_type})
# ------------------------------------------------------------------
# METRIC-ONLY handlers (structured log + counters/histograms)
# ------------------------------------------------------------------
def _message_trace(self, info: MessageTraceInfo) -> None:
attrs = self._common_attrs(info)
attrs.update(
{
"dify.invoke_from": info.metadata.get("from_source"),
"dify.conversation.id": info.metadata.get("conversation_id"),
"dify.conversation.mode": info.conversation_mode,
"gen_ai.provider.name": info.metadata.get("ls_provider"),
"gen_ai.request.model": info.metadata.get("ls_model_name"),
"gen_ai.usage.input_tokens": info.message_tokens,
"gen_ai.usage.output_tokens": info.answer_tokens,
"gen_ai.usage.total_tokens": info.total_tokens,
"dify.message.status": info.metadata.get("status"),
"dify.message.error": info.error,
"dify.message.from_source": info.metadata.get("from_source"),
"dify.message.from_end_user_id": info.metadata.get("from_end_user_id"),
"dify.message.from_account_id": info.metadata.get("from_account_id"),
"dify.streaming": info.is_streaming_request,
"dify.message.time_to_first_token": info.gen_ai_server_time_to_first_token,
"dify.message.streaming_duration": info.llm_streaming_time_to_generate,
"dify.workflow.run_id": info.metadata.get("workflow_run_id"),
}
)
if self._exporter.include_content:
attrs["dify.message.inputs"] = self._maybe_json(info.inputs)
attrs["dify.message.outputs"] = self._maybe_json(info.outputs)
else:
ref = f"ref:message_id={info.message_id}"
attrs["dify.message.inputs"] = ref
attrs["dify.message.outputs"] = ref
emit_metric_only_event(
event_name="dify.message.run",
attributes=attrs,
trace_id_source=info.metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None,
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
labels = {
"tenant_id": info.metadata.get("tenant_id", ""),
"app_id": info.metadata.get("app_id", ""),
"model_provider": info.metadata.get("ls_provider", ""),
"model_name": info.metadata.get("ls_model_name", ""),
}
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels)
invoke_from = info.metadata.get("from_source", "")
self._exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS,
1,
{**labels, "type": "message", "status": info.metadata.get("status", ""), "invoke_from": invoke_from},
)
if info.start_time and info.end_time:
duration = (info.end_time - info.start_time).total_seconds()
self._exporter.record_histogram(EnterpriseTelemetryHistogram.MESSAGE_DURATION, duration, labels)
if info.gen_ai_server_time_to_first_token is not None:
self._exporter.record_histogram(
EnterpriseTelemetryHistogram.MESSAGE_TTFT, info.gen_ai_server_time_to_first_token, labels
)
if info.error:
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "message"})
def _tool_trace(self, info: ToolTraceInfo) -> None:
attrs = self._common_attrs(info)
attrs.update(
{
"gen_ai.tool.name": info.tool_name,
"dify.tool.time_cost": info.time_cost,
"dify.tool.error": info.error,
}
)
if self._exporter.include_content:
attrs["dify.tool.inputs"] = self._maybe_json(info.tool_inputs)
attrs["dify.tool.outputs"] = info.tool_outputs
attrs["dify.tool.parameters"] = self._maybe_json(info.tool_parameters)
attrs["dify.tool.config"] = self._maybe_json(info.tool_config)
else:
ref = f"ref:message_id={info.message_id}"
attrs["dify.tool.inputs"] = ref
attrs["dify.tool.outputs"] = ref
attrs["dify.tool.parameters"] = ref
attrs["dify.tool.config"] = ref
emit_metric_only_event(
event_name="dify.tool.execution",
attributes=attrs,
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
labels = {
"tenant_id": info.metadata.get("tenant_id", ""),
"app_id": info.metadata.get("app_id", ""),
"tool_name": info.tool_name,
}
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "tool"})
self._exporter.record_histogram(EnterpriseTelemetryHistogram.TOOL_DURATION, float(info.time_cost), labels)
if info.error:
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "tool"})
def _moderation_trace(self, info: ModerationTraceInfo) -> None:
attrs = self._common_attrs(info)
attrs.update(
{
"dify.moderation.flagged": info.flagged,
"dify.moderation.action": info.action,
"dify.moderation.preset_response": info.preset_response,
}
)
if self._exporter.include_content:
attrs["dify.moderation.query"] = info.query
else:
attrs["dify.moderation.query"] = f"ref:message_id={info.message_id}"
emit_metric_only_event(
event_name="dify.moderation.check",
attributes=attrs,
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")}
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "moderation"})
def _suggested_question_trace(self, info: SuggestedQuestionTraceInfo) -> None:
attrs = self._common_attrs(info)
attrs.update(
{
"gen_ai.usage.total_tokens": info.total_tokens,
"dify.suggested_question.status": info.status,
"dify.suggested_question.error": info.error,
"gen_ai.provider.name": info.model_provider,
"gen_ai.request.model": info.model_id,
"dify.suggested_question.count": len(info.suggested_question),
}
)
if self._exporter.include_content:
attrs["dify.suggested_question.questions"] = self._maybe_json(info.suggested_question)
else:
attrs["dify.suggested_question.questions"] = f"ref:message_id={info.message_id}"
emit_metric_only_event(
event_name="dify.suggested_question.generation",
attributes=attrs,
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")}
self._exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "suggested_question"}
)
def _dataset_retrieval_trace(self, info: DatasetRetrievalTraceInfo) -> None:
attrs = self._common_attrs(info)
attrs["dify.dataset.error"] = info.error
docs = info.documents or []
dataset_ids: list[str] = []
dataset_names: list[str] = []
structured_docs: list[dict] = []
for doc in docs:
meta = doc.get("metadata", {}) if isinstance(doc, dict) else {}
did = meta.get("dataset_id")
dname = meta.get("dataset_name")
if did and did not in dataset_ids:
dataset_ids.append(did)
if dname and dname not in dataset_names:
dataset_names.append(dname)
structured_docs.append(
{
"dataset_id": did,
"document_id": meta.get("document_id"),
"segment_id": meta.get("segment_id"),
"score": meta.get("score"),
}
)
attrs["dify.dataset.ids"] = self._maybe_json(dataset_ids)
attrs["dify.dataset.names"] = self._maybe_json(dataset_names)
attrs["dify.retrieval.document_count"] = len(docs)
embedding_models = info.metadata.get("embedding_models") or {}
if isinstance(embedding_models, dict):
providers: list[str] = []
models: list[str] = []
for ds_info in embedding_models.values():
if isinstance(ds_info, dict):
p = ds_info.get("embedding_model_provider", "")
m = ds_info.get("embedding_model", "")
if p and p not in providers:
providers.append(p)
if m and m not in models:
models.append(m)
attrs["dify.dataset.embedding_providers"] = self._maybe_json(providers)
attrs["dify.dataset.embedding_models"] = self._maybe_json(models)
if self._exporter.include_content:
attrs["dify.retrieval.query"] = self._maybe_json(info.inputs)
attrs["dify.dataset.documents"] = self._maybe_json(structured_docs)
else:
ref = f"ref:message_id={info.message_id}"
attrs["dify.retrieval.query"] = ref
attrs["dify.dataset.documents"] = ref
emit_metric_only_event(
event_name="dify.dataset.retrieval",
attributes=attrs,
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")}
self._exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "dataset_retrieval"}
)
for did in dataset_ids:
self._exporter.increment_counter(
EnterpriseTelemetryCounter.DATASET_RETRIEVALS, 1, {**labels, "dataset_id": did}
)
def _generate_name_trace(self, info: GenerateNameTraceInfo) -> None:
attrs = self._common_attrs(info)
attrs["dify.conversation.id"] = info.conversation_id
if self._exporter.include_content:
attrs["dify.generate_name.inputs"] = self._maybe_json(info.inputs)
attrs["dify.generate_name.outputs"] = self._maybe_json(info.outputs)
else:
ref = f"ref:conversation_id={info.conversation_id}"
attrs["dify.generate_name.inputs"] = ref
attrs["dify.generate_name.outputs"] = ref
emit_metric_only_event(
event_name="dify.generate_name.execution",
attributes=attrs,
tenant_id=info.tenant_id,
user_id=info.metadata.get("user_id"),
)
labels = {"tenant_id": info.tenant_id, "app_id": info.metadata.get("app_id", "")}
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "generate_name"})

View File

@ -0,0 +1,30 @@
from enum import StrEnum
class EnterpriseTelemetrySpan(StrEnum):
WORKFLOW_RUN = "dify.workflow.run"
NODE_EXECUTION = "dify.node.execution"
DRAFT_NODE_EXECUTION = "dify.node.execution.draft"
class EnterpriseTelemetryCounter(StrEnum):
TOKENS = "tokens"
REQUESTS = "requests"
ERRORS = "errors"
FEEDBACK = "feedback"
DATASET_RETRIEVALS = "dataset_retrievals"
class EnterpriseTelemetryHistogram(StrEnum):
WORKFLOW_DURATION = "workflow_duration"
NODE_DURATION = "node_duration"
MESSAGE_DURATION = "message_duration"
MESSAGE_TTFT = "message_ttft"
TOOL_DURATION = "tool_duration"
__all__ = [
"EnterpriseTelemetryCounter",
"EnterpriseTelemetryHistogram",
"EnterpriseTelemetrySpan",
]

View File

@ -0,0 +1,146 @@
"""Blinker signal handlers for enterprise telemetry.
Registered at import time via ``@signal.connect`` decorators.
Import must happen during ``ext_enterprise_telemetry.init_app()`` to ensure handlers fire.
"""
from __future__ import annotations
import logging
from enterprise.telemetry.entities import EnterpriseTelemetryCounter
from enterprise.telemetry.telemetry_log import emit_metric_only_event
from events.app_event import app_was_created, app_was_deleted, app_was_updated
from events.feedback_event import feedback_was_created
logger = logging.getLogger(__name__)
__all__ = [
"_handle_app_created",
"_handle_app_deleted",
"_handle_app_updated",
"_handle_feedback_created",
]
@app_was_created.connect
def _handle_app_created(sender: object, **kwargs: object) -> None:
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
exporter = get_enterprise_exporter()
if not exporter:
return
attrs = {
"dify.app.id": getattr(sender, "id", None),
"dify.tenant_id": getattr(sender, "tenant_id", None),
"dify.app.mode": getattr(sender, "mode", None),
}
emit_metric_only_event(
event_name="dify.app.created",
attributes=attrs,
tenant_id=str(getattr(sender, "tenant_id", "") or ""),
)
exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS,
1,
{
"type": "app.created",
"tenant_id": getattr(sender, "tenant_id", ""),
},
)
@app_was_deleted.connect
def _handle_app_deleted(sender: object, **kwargs: object) -> None:
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
exporter = get_enterprise_exporter()
if not exporter:
return
attrs = {
"dify.app.id": getattr(sender, "id", None),
"dify.tenant_id": getattr(sender, "tenant_id", None),
}
emit_metric_only_event(
event_name="dify.app.deleted",
attributes=attrs,
tenant_id=str(getattr(sender, "tenant_id", "") or ""),
)
exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS,
1,
{
"type": "app.deleted",
"tenant_id": getattr(sender, "tenant_id", ""),
},
)
@app_was_updated.connect
def _handle_app_updated(sender: object, **kwargs: object) -> None:
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
exporter = get_enterprise_exporter()
if not exporter:
return
attrs = {
"dify.app.id": getattr(sender, "id", None),
"dify.tenant_id": getattr(sender, "tenant_id", None),
}
emit_metric_only_event(
event_name="dify.app.updated",
attributes=attrs,
tenant_id=str(getattr(sender, "tenant_id", "") or ""),
)
exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS,
1,
{
"type": "app.updated",
"tenant_id": getattr(sender, "tenant_id", ""),
},
)
@feedback_was_created.connect
def _handle_feedback_created(sender: object, **kwargs: object) -> None:
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
exporter = get_enterprise_exporter()
if not exporter:
return
include_content = exporter.include_content
attrs: dict = {
"dify.message.id": getattr(sender, "message_id", None),
"dify.tenant_id": kwargs.get("tenant_id"),
"dify.app_id": getattr(sender, "app_id", None),
"dify.conversation.id": getattr(sender, "conversation_id", None),
"gen_ai.user.id": getattr(sender, "from_end_user_id", None) or getattr(sender, "from_account_id", None),
"dify.feedback.rating": getattr(sender, "rating", None),
"dify.feedback.from_source": getattr(sender, "from_source", None),
}
if include_content:
attrs["dify.feedback.content"] = getattr(sender, "content", None)
emit_metric_only_event(
event_name="dify.feedback.created",
attributes=attrs,
tenant_id=str(kwargs.get("tenant_id", "") or ""),
user_id=str(getattr(sender, "from_end_user_id", None) or getattr(sender, "from_account_id", None) or ""),
)
exporter.increment_counter(
EnterpriseTelemetryCounter.FEEDBACK,
1,
{
"tenant_id": str(kwargs.get("tenant_id", "")),
"app_id": str(getattr(sender, "app_id", "")),
"rating": str(getattr(sender, "rating", "")),
},
)

View File

@ -0,0 +1,200 @@
"""Enterprise OTEL exporter — shared by EnterpriseOtelTrace, event handlers, and direct instrumentation.
Uses its own TracerProvider (configurable sampling, separate from ext_otel.py infrastructure)
and the global MeterProvider (shared with ext_otel.py both target the same collector).
Initialized once during Flask extension init (single-threaded via ext_enterprise_telemetry.py).
Accessed via ``ext_enterprise_telemetry.get_enterprise_exporter()`` from any thread/process.
"""
import logging
import socket
import uuid
from datetime import datetime
from typing import cast
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import SpanContext, TraceFlags
from configs import dify_config
from enterprise.telemetry.entities import EnterpriseTelemetryCounter, EnterpriseTelemetryHistogram
from enterprise.telemetry.id_generator import (
CorrelationIdGenerator,
compute_deterministic_span_id,
set_correlation_id,
set_span_id_source,
)
logger = logging.getLogger(__name__)
def is_enterprise_telemetry_enabled() -> bool:
return bool(dify_config.ENTERPRISE_ENABLED and dify_config.ENTERPRISE_TELEMETRY_ENABLED)
def _parse_otlp_headers(raw: str) -> dict[str, str]:
"""Parse ``key=value,key2=value2`` into a dict."""
if not raw:
return {}
headers: dict[str, str] = {}
for pair in raw.split(","):
if "=" not in pair:
continue
k, v = pair.split("=", 1)
headers[k.strip()] = v.strip()
return headers
def _datetime_to_ns(dt: datetime) -> int:
"""Convert a datetime to nanoseconds since epoch (OTEL convention)."""
return int(dt.timestamp() * 1_000_000_000)
class EnterpriseExporter:
"""Shared OTEL exporter for all enterprise telemetry.
``export_span`` creates spans with optional real timestamps, deterministic
span/trace IDs, and cross-workflow parent linking.
``increment_counter`` / ``record_histogram`` emit OTEL metrics at 100% accuracy.
"""
def __init__(self, config: object) -> None:
endpoint: str = getattr(config, "ENTERPRISE_OTLP_ENDPOINT", "")
headers_raw: str = getattr(config, "ENTERPRISE_OTLP_HEADERS", "")
service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify")
sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0)
self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True)
resource = Resource(
attributes={
ResourceAttributes.SERVICE_NAME: service_name,
ResourceAttributes.HOST_NAME: socket.gethostname(),
}
)
sampler = ParentBasedTraceIdRatio(sampling_rate)
id_generator = CorrelationIdGenerator()
self._tracer_provider = TracerProvider(resource=resource, sampler=sampler, id_generator=id_generator)
headers = _parse_otlp_headers(headers_raw)
trace_endpoint = f"{endpoint}/v1/traces" if endpoint else ""
self._tracer_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=trace_endpoint or None, headers=headers))
)
self._tracer = self._tracer_provider.get_tracer("dify.enterprise")
meter = metrics.get_meter("dify.enterprise")
self._counters = {
EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"),
EnterpriseTelemetryCounter.REQUESTS: meter.create_counter("dify.requests.total", unit="{request}"),
EnterpriseTelemetryCounter.ERRORS: meter.create_counter("dify.errors.total", unit="{error}"),
EnterpriseTelemetryCounter.FEEDBACK: meter.create_counter("dify.feedback.total", unit="{feedback}"),
EnterpriseTelemetryCounter.DATASET_RETRIEVALS: meter.create_counter(
"dify.dataset.retrievals.total", unit="{retrieval}"
),
}
self._histograms = {
EnterpriseTelemetryHistogram.WORKFLOW_DURATION: meter.create_histogram("dify.workflow.duration", unit="s"),
EnterpriseTelemetryHistogram.NODE_DURATION: meter.create_histogram("dify.node.duration", unit="s"),
EnterpriseTelemetryHistogram.MESSAGE_DURATION: meter.create_histogram("dify.message.duration", unit="s"),
EnterpriseTelemetryHistogram.MESSAGE_TTFT: meter.create_histogram(
"dify.message.time_to_first_token", unit="s"
),
EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"),
}
def export_span(
self,
name: str,
attributes: dict,
correlation_id: str | None = None,
span_id_source: str | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
trace_correlation_override: str | None = None,
parent_span_id_source: str | None = None,
) -> None:
"""Export an OTEL span with optional deterministic IDs and real timestamps.
Args:
name: Span operation name.
attributes: Span attributes dict.
correlation_id: Source for trace_id derivation (groups spans in one trace).
span_id_source: Source for deterministic span_id (e.g. workflow_run_id or node_execution_id).
start_time: Real span start time. When None, uses current time.
end_time: Real span end time. When None, span ends immediately.
trace_correlation_override: Override trace_id source (for cross-workflow linking).
When set, trace_id is derived from this instead of ``correlation_id``.
parent_span_id_source: Override parent span_id source (for cross-workflow linking).
When set, parent span_id is derived from this value. When None and
``correlation_id`` is set, parent is the workflow root span.
"""
effective_trace_correlation = trace_correlation_override or correlation_id
set_correlation_id(effective_trace_correlation)
set_span_id_source(span_id_source)
try:
parent_context = None
# A span is the "root" of its correlation group when span_id_source == correlation_id
# (i.e. a workflow root span). All other spans are children.
if parent_span_id_source:
# Cross-workflow linking: parent is an explicit span (e.g. tool node in outer workflow)
parent_span_id = compute_deterministic_span_id(parent_span_id_source)
parent_trace_id = (
cast(int, uuid.UUID(effective_trace_correlation).int) if effective_trace_correlation else 0
)
if parent_trace_id:
parent_span_context = SpanContext(
trace_id=parent_trace_id,
span_id=parent_span_id,
is_remote=True,
trace_flags=TraceFlags(TraceFlags.SAMPLED),
)
parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context))
elif correlation_id and correlation_id != span_id_source:
# Child span: parent is the correlation-group root (workflow root span)
parent_span_id = compute_deterministic_span_id(correlation_id)
parent_trace_id = cast(int, uuid.UUID(effective_trace_correlation or correlation_id).int)
parent_span_context = SpanContext(
trace_id=parent_trace_id,
span_id=parent_span_id,
is_remote=True,
trace_flags=TraceFlags(TraceFlags.SAMPLED),
)
parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context))
span_kwargs: dict = {"context": parent_context}
if start_time is not None:
span_kwargs["start_time"] = _datetime_to_ns(start_time)
if end_time is not None:
span_kwargs["end_on_exit"] = False
with self._tracer.start_as_current_span(name, **span_kwargs) as span:
for key, value in attributes.items():
if value is not None:
span.set_attribute(key, value)
if end_time is not None:
span.end(end_time=_datetime_to_ns(end_time))
except Exception:
logger.exception("Failed to export span %s", name)
finally:
set_correlation_id(None)
set_span_id_source(None)
def increment_counter(self, name: EnterpriseTelemetryCounter, value: int, labels: dict) -> None:
counter = self._counters.get(name)
if counter:
counter.add(value, labels)
def record_histogram(self, name: EnterpriseTelemetryHistogram, value: float, labels: dict) -> None:
histogram = self._histograms.get(name)
if histogram:
histogram.record(value, labels)
def shutdown(self) -> None:
self._tracer_provider.shutdown()

View File

@ -0,0 +1,76 @@
"""Custom OTEL ID Generator for correlation-based trace/span ID derivation.
Uses contextvars for thread-safe correlation_id -> trace_id mapping.
When a span_id_source is set, the span_id is derived deterministically
from that value, enabling any span to reference another as parent
without depending on span creation order.
"""
import random
import uuid
from contextvars import ContextVar
from typing import cast
from opentelemetry.sdk.trace.id_generator import IdGenerator
_correlation_id_context: ContextVar[str | None] = ContextVar("correlation_id", default=None)
_span_id_source_context: ContextVar[str | None] = ContextVar("span_id_source", default=None)
def set_correlation_id(correlation_id: str | None) -> None:
_correlation_id_context.set(correlation_id)
def get_correlation_id() -> str | None:
return _correlation_id_context.get()
def set_span_id_source(source_id: str | None) -> None:
"""Set the source for deterministic span_id generation.
When set, ``generate_span_id()`` derives the span_id from this value
(lower 64 bits of the UUID). Pass the ``workflow_run_id`` for workflow
root spans or ``node_execution_id`` for node spans.
"""
_span_id_source_context.set(source_id)
def compute_deterministic_span_id(source_id: str) -> int:
"""Derive a deterministic span_id from any UUID string.
Uses the lower 64 bits of the UUID, guaranteeing non-zero output
(OTEL requires span_id != 0).
"""
span_id = cast(int, uuid.UUID(source_id).int) & ((1 << 64) - 1)
return span_id if span_id != 0 else 1
class CorrelationIdGenerator(IdGenerator):
"""ID generator that derives trace_id and optionally span_id from context.
- trace_id: always derived from correlation_id (groups all spans in one trace)
- span_id: derived from span_id_source when set (enables deterministic
parent-child linking), otherwise random
"""
def generate_trace_id(self) -> int:
correlation_id = _correlation_id_context.get()
if correlation_id:
try:
return cast(int, uuid.UUID(correlation_id).int)
except (ValueError, AttributeError):
pass
return random.getrandbits(128)
def generate_span_id(self) -> int:
source = _span_id_source_context.get()
if source:
try:
return compute_deterministic_span_id(source)
except (ValueError, AttributeError):
pass
span_id = random.getrandbits(64)
while span_id == 0:
span_id = random.getrandbits(64)
return span_id

View File

@ -0,0 +1,91 @@
"""Structured-log emitter for enterprise telemetry events.
Emits structured JSON log lines correlated with OTEL traces via trace_id.
Picked up by ``StructuredJSONFormatter`` stdout/Loki/Elastic.
"""
from __future__ import annotations
import logging
import uuid
from typing import Any
logger = logging.getLogger("dify.telemetry")
def compute_trace_id_hex(uuid_str: str | None) -> str:
"""Convert a business UUID string to a 32-hex OTEL-compatible trace_id.
Returns empty string when *uuid_str* is ``None`` or invalid.
"""
if not uuid_str:
return ""
try:
return f"{uuid.UUID(uuid_str).int:032x}"
except (ValueError, AttributeError):
return ""
def emit_telemetry_log(
*,
event_name: str,
attributes: dict[str, Any],
signal: str = "metric_only",
trace_id_source: str | None = None,
tenant_id: str | None = None,
user_id: str | None = None,
) -> None:
"""Emit a structured log line for a telemetry event.
Parameters
----------
event_name:
Canonical event name, e.g. ``"dify.workflow.run"``.
attributes:
All event-specific attributes (already built by the caller).
signal:
``"metric_only"`` for events with no span, ``"span_detail"``
for detail logs accompanying a slim span.
trace_id_source:
A UUID string (e.g. ``workflow_run_id``) used to derive a 32-hex
trace_id for cross-signal correlation.
tenant_id:
Tenant identifier (for the ``IdentityContextFilter``).
user_id:
User identifier (for the ``IdentityContextFilter``).
"""
attrs = {
"dify.event.name": event_name,
"dify.event.signal": signal,
**attributes,
}
extra: dict[str, Any] = {"attributes": attrs}
trace_id_hex = compute_trace_id_hex(trace_id_source)
if trace_id_hex:
extra["trace_id"] = trace_id_hex
if tenant_id:
extra["tenant_id"] = tenant_id
if user_id:
extra["user_id"] = user_id
logger.info("telemetry.%s", signal, extra=extra)
def emit_metric_only_event(
*,
event_name: str,
attributes: dict[str, Any],
trace_id_source: str | None = None,
tenant_id: str | None = None,
user_id: str | None = None,
) -> None:
emit_telemetry_log(
event_name=event_name,
attributes=attributes,
signal="metric_only",
trace_id_source=trace_id_source,
tenant_id=tenant_id,
user_id=user_id,
)

View File

@ -3,6 +3,12 @@ from blinker import signal
# sender: app
app_was_created = signal("app-was-created")
# sender: app
app_was_deleted = signal("app-was-deleted")
# sender: app
app_was_updated = signal("app-was-updated")
# sender: app, kwargs: app_model_config
app_model_config_was_updated = signal("app-model-config-was-updated")

View File

@ -0,0 +1,4 @@
from blinker import signal
# sender: MessageFeedback, kwargs: tenant_id
feedback_was_created = signal("feedback-was-created")

View File

@ -0,0 +1,48 @@
"""Flask extension for enterprise telemetry lifecycle management.
Initializes the EnterpriseExporter singleton during ``create_app()`` (single-threaded),
registers blinker event handlers, and hooks atexit for graceful shutdown.
Skipped entirely when ``ENTERPRISE_ENABLED`` and ``ENTERPRISE_TELEMETRY_ENABLED`` are false (``is_enabled()`` gate).
"""
from __future__ import annotations
import atexit
import logging
from typing import TYPE_CHECKING
from configs import dify_config
if TYPE_CHECKING:
from dify_app import DifyApp
from enterprise.telemetry.exporter import EnterpriseExporter
logger = logging.getLogger(__name__)
_exporter: EnterpriseExporter | None = None
def is_enabled() -> bool:
return bool(dify_config.ENTERPRISE_ENABLED and dify_config.ENTERPRISE_TELEMETRY_ENABLED)
def init_app(app: DifyApp) -> None:
global _exporter
if not is_enabled():
return
from enterprise.telemetry.exporter import EnterpriseExporter
_exporter = EnterpriseExporter(dify_config)
atexit.register(_exporter.shutdown)
# Import to trigger @signal.connect decorator registration
import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport]
logger.info("Enterprise telemetry initialized")
def get_enterprise_exporter() -> EnterpriseExporter | None:
return _exporter

View File

@ -14,7 +14,7 @@ from core.model_runtime.entities.model_entities import ModelPropertyKey, ModelTy
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from core.tools.tool_manager import ToolManager
from core.tools.utils.configuration import ToolParameterConfigurationManager
from events.app_event import app_was_created
from events.app_event import app_was_created, app_was_deleted
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from libs.login import current_user
@ -341,6 +341,8 @@ class AppService:
db.session.delete(app)
db.session.commit()
app_was_deleted.send(app)
# clean up web app settings
if FeatureService.get_system_features().webapp_auth.enabled:
EnterpriseService.WebAppAuth.cleanup_webapp(app.id)

View File

@ -81,7 +81,7 @@ def sync_workspace_member_removal(workspace_id: str, member_id: str, *, source:
bool: True if task was queued (or skipped in community), False if queueing failed
"""
if not dify_config.ENTERPRISE_ENABLED:
return True
return True
return _queue_task(workspace_id=workspace_id, member_id=member_id, source=source)
@ -101,7 +101,7 @@ def sync_account_deletion(account_id: str, *, source: str) -> bool:
bool: True if all tasks were queued (or skipped in community), False if any queueing failed
"""
if not dify_config.ENTERPRISE_ENABLED:
return True
return True
# Fetch all workspaces the account belongs to
workspace_joins = db.session.query(TenantAccountJoin).filter_by(account_id=account_id).all()

View File

@ -10,6 +10,7 @@ from core.model_runtime.entities.model_entities import ModelType
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.ops.utils import measure_time
from events.feedback_event import feedback_was_created
from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models import Account
@ -179,6 +180,9 @@ class MessageService:
db.session.commit()
if feedback and rating:
feedback_was_created.send(feedback, tenant_id=app_model.tenant_id)
return feedback
@classmethod

View File

@ -27,6 +27,7 @@ from core.workflow.nodes.start.entities import StartNodeData
from core.workflow.runtime import VariablePool
from core.workflow.system_variable import SystemVariable
from core.workflow.workflow_entry import WorkflowEntry
from enterprise.telemetry.draft_trace import enqueue_draft_node_execution_trace
from enums.cloud_plan import CloudPlan
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
from extensions.ext_database import db
@ -647,6 +648,7 @@ class WorkflowService:
node_config = draft_workflow.get_node_config_by_id(node_id)
node_type = Workflow.get_node_type_from_node_config(node_config)
node_data = node_config.get("data", {})
workflow_execution_id: str | None = None
if node_type.is_start_node:
with Session(bind=db.engine) as session, session.begin():
draft_var_srv = WorkflowDraftVariableService(session)
@ -672,10 +674,13 @@ class WorkflowService:
node_type=node_type,
conversation_id=conversation_id,
)
workflow_execution_id = variable_pool.system_variables.workflow_execution_id
else:
workflow_execution_id = str(uuid.uuid4())
system_variable = SystemVariable(workflow_execution_id=workflow_execution_id)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=system_variable,
user_inputs=user_inputs,
environment_variables=draft_workflow.environment_variables,
conversation_variables=[],
@ -729,6 +734,13 @@ class WorkflowService:
with Session(db.engine) as session:
outputs = workflow_node_execution.load_full_outputs(session, storage)
enqueue_draft_node_execution_trace(
execution=workflow_node_execution,
outputs=outputs,
workflow_execution_id=workflow_execution_id,
user_id=account.id,
)
with Session(bind=db.engine) as session, session.begin():
draft_var_saver = DraftVariableSaver(
session=session,
@ -784,19 +796,20 @@ class WorkflowService:
Returns:
WorkflowNodeExecution: The execution result
"""
created_at = naive_utc_now()
node, node_run_result, run_succeeded, error = self._execute_node_safely(invoke_node_fn)
finished_at = naive_utc_now()
# Create base node execution
node_execution = WorkflowNodeExecution(
id=str(uuid.uuid4()),
id=node.execution_id or str(uuid.uuid4()),
workflow_id="", # Single-step execution has no workflow ID
index=1,
node_id=node_id,
node_type=node.node_type,
title=node.title,
elapsed_time=time.perf_counter() - start_at,
created_at=naive_utc_now(),
finished_at=naive_utc_now(),
created_at=created_at,
finished_at=finished_at,
)
# Populate execution result data

View File

@ -39,12 +39,25 @@ def process_trace_tasks(file_info):
trace_info["documents"] = [Document.model_validate(doc) for doc in trace_info["documents"]]
try:
trace_type = trace_info_info_map.get(trace_info_type)
if trace_type:
trace_info = trace_type(**trace_info)
# process enterprise trace separately
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
if is_enterprise_telemetry_enabled():
from enterprise.telemetry.enterprise_trace import EnterpriseOtelTrace
try:
EnterpriseOtelTrace().trace(trace_info)
except Exception:
logger.warning("Enterprise trace failed for app_id: %s", app_id, exc_info=True)
if trace_instance:
with current_app.app_context():
trace_type = trace_info_info_map.get(trace_info_type)
if trace_type:
trace_info = trace_type(**trace_info)
trace_instance.trace(trace_info)
logger.info("Processing trace tasks success, app_id: %s", app_id)
except Exception as e:
logger.info("error:\n\n\n%s\n\n\n\n", e)