mirror of
https://github.com/langgenius/dify.git
synced 2026-04-28 11:56:55 +08:00
feat(enterprise): Add OTEL telemetry with slim traces, metrics, and structured logs
- Add EnterpriseOtelTrace handler with span emission for workflows and nodes - Implement minimal-span strategy: slim spans + detailed companion logs - Add deterministic span/trace IDs for cross-workflow trace correlation - Add metric collection at 100% accuracy (counters & histograms) - Add event handlers for app lifecycle and feedback telemetry - Add cross-workflow trace linking with parent context propagation - Add OTEL exporter with configurable sampling and privacy controls - Wire enterprise telemetry into workflow execution pipeline - Add telemetry configuration in enterprise configs
This commit is contained in:
parent
d8402f686e
commit
3461c3a8ef
@ -81,6 +81,7 @@ def initialize_extensions(app: DifyApp):
|
|||||||
ext_commands,
|
ext_commands,
|
||||||
ext_compress,
|
ext_compress,
|
||||||
ext_database,
|
ext_database,
|
||||||
|
ext_enterprise_telemetry,
|
||||||
ext_fastopenapi,
|
ext_fastopenapi,
|
||||||
ext_forward_refs,
|
ext_forward_refs,
|
||||||
ext_hosting_provider,
|
ext_hosting_provider,
|
||||||
@ -131,6 +132,7 @@ def initialize_extensions(app: DifyApp):
|
|||||||
ext_commands,
|
ext_commands,
|
||||||
ext_fastopenapi,
|
ext_fastopenapi,
|
||||||
ext_otel,
|
ext_otel,
|
||||||
|
ext_enterprise_telemetry,
|
||||||
ext_request_logging,
|
ext_request_logging,
|
||||||
ext_session_factory,
|
ext_session_factory,
|
||||||
]
|
]
|
||||||
|
|||||||
@ -8,7 +8,7 @@ from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, Settings
|
|||||||
from libs.file_utils import search_file_upwards
|
from libs.file_utils import search_file_upwards
|
||||||
|
|
||||||
from .deploy import DeploymentConfig
|
from .deploy import DeploymentConfig
|
||||||
from .enterprise import EnterpriseFeatureConfig
|
from .enterprise import EnterpriseFeatureConfig, EnterpriseTelemetryConfig
|
||||||
from .extra import ExtraServiceConfig
|
from .extra import ExtraServiceConfig
|
||||||
from .feature import FeatureConfig
|
from .feature import FeatureConfig
|
||||||
from .middleware import MiddlewareConfig
|
from .middleware import MiddlewareConfig
|
||||||
@ -73,6 +73,8 @@ class DifyConfig(
|
|||||||
# Enterprise feature configs
|
# Enterprise feature configs
|
||||||
# **Before using, please contact business@dify.ai by email to inquire about licensing matters.**
|
# **Before using, please contact business@dify.ai by email to inquire about licensing matters.**
|
||||||
EnterpriseFeatureConfig,
|
EnterpriseFeatureConfig,
|
||||||
|
# Enterprise telemetry configs
|
||||||
|
EnterpriseTelemetryConfig,
|
||||||
):
|
):
|
||||||
model_config = SettingsConfigDict(
|
model_config = SettingsConfigDict(
|
||||||
# read from dotenv format config file
|
# read from dotenv format config file
|
||||||
|
|||||||
@ -18,3 +18,39 @@ class EnterpriseFeatureConfig(BaseSettings):
|
|||||||
description="Allow customization of the enterprise logo.",
|
description="Allow customization of the enterprise logo.",
|
||||||
default=False,
|
default=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class EnterpriseTelemetryConfig(BaseSettings):
|
||||||
|
"""
|
||||||
|
Configuration for enterprise telemetry.
|
||||||
|
"""
|
||||||
|
|
||||||
|
ENTERPRISE_TELEMETRY_ENABLED: bool = Field(
|
||||||
|
description="Enable enterprise telemetry collection (also requires ENTERPRISE_ENABLED=true).",
|
||||||
|
default=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
ENTERPRISE_OTLP_ENDPOINT: str = Field(
|
||||||
|
description="Enterprise OTEL collector endpoint.",
|
||||||
|
default="",
|
||||||
|
)
|
||||||
|
|
||||||
|
ENTERPRISE_OTLP_HEADERS: str = Field(
|
||||||
|
description="Auth headers for OTLP export (key=value,key2=value2).",
|
||||||
|
default="",
|
||||||
|
)
|
||||||
|
|
||||||
|
ENTERPRISE_INCLUDE_CONTENT: bool = Field(
|
||||||
|
description="Include input/output content in traces (privacy toggle).",
|
||||||
|
default=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
ENTERPRISE_SERVICE_NAME: str = Field(
|
||||||
|
description="Service name for OTEL resource.",
|
||||||
|
default="dify",
|
||||||
|
)
|
||||||
|
|
||||||
|
ENTERPRISE_OTEL_SAMPLING_RATE: float = Field(
|
||||||
|
description="Sampling rate for enterprise traces (0.0 to 1.0, default 1.0 = 100%).",
|
||||||
|
default=1.0,
|
||||||
|
)
|
||||||
|
|||||||
@ -62,7 +62,7 @@ from core.app.task_pipeline.message_cycle_manager import MessageCycleManager
|
|||||||
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
|
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
|
||||||
from core.model_runtime.entities.llm_entities import LLMUsage
|
from core.model_runtime.entities.llm_entities import LLMUsage
|
||||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||||
from core.ops.ops_trace_manager import TraceQueueManager
|
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName
|
||||||
from core.workflow.enums import WorkflowExecutionStatus
|
from core.workflow.enums import WorkflowExecutionStatus
|
||||||
from core.workflow.nodes import NodeType
|
from core.workflow.nodes import NodeType
|
||||||
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
|
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
|
||||||
@ -564,7 +564,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||||||
**kwargs,
|
**kwargs,
|
||||||
) -> Generator[StreamResponse, None, None]:
|
) -> Generator[StreamResponse, None, None]:
|
||||||
"""Handle stop events."""
|
"""Handle stop events."""
|
||||||
_ = trace_manager
|
|
||||||
resolved_state = None
|
resolved_state = None
|
||||||
if self._workflow_run_id:
|
if self._workflow_run_id:
|
||||||
resolved_state = self._resolve_graph_runtime_state(graph_runtime_state)
|
resolved_state = self._resolve_graph_runtime_state(graph_runtime_state)
|
||||||
@ -579,8 +578,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||||||
)
|
)
|
||||||
|
|
||||||
with self._database_session() as session:
|
with self._database_session() as session:
|
||||||
# Save message
|
self._save_message(session=session, graph_runtime_state=resolved_state, trace_manager=trace_manager)
|
||||||
self._save_message(session=session, graph_runtime_state=resolved_state)
|
|
||||||
|
|
||||||
yield workflow_finish_resp
|
yield workflow_finish_resp
|
||||||
elif event.stopped_by in (
|
elif event.stopped_by in (
|
||||||
@ -589,8 +587,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||||||
):
|
):
|
||||||
# When hitting input-moderation or annotation-reply, the workflow will not start
|
# When hitting input-moderation or annotation-reply, the workflow will not start
|
||||||
with self._database_session() as session:
|
with self._database_session() as session:
|
||||||
# Save message
|
self._save_message(session=session, trace_manager=trace_manager)
|
||||||
self._save_message(session=session)
|
|
||||||
|
|
||||||
yield self._message_end_to_stream_response()
|
yield self._message_end_to_stream_response()
|
||||||
|
|
||||||
@ -599,6 +596,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||||||
event: QueueAdvancedChatMessageEndEvent,
|
event: QueueAdvancedChatMessageEndEvent,
|
||||||
*,
|
*,
|
||||||
graph_runtime_state: GraphRuntimeState | None = None,
|
graph_runtime_state: GraphRuntimeState | None = None,
|
||||||
|
trace_manager: TraceQueueManager | None = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> Generator[StreamResponse, None, None]:
|
) -> Generator[StreamResponse, None, None]:
|
||||||
"""Handle advanced chat message end events."""
|
"""Handle advanced chat message end events."""
|
||||||
@ -616,7 +614,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||||||
|
|
||||||
# Save message
|
# Save message
|
||||||
with self._database_session() as session:
|
with self._database_session() as session:
|
||||||
self._save_message(session=session, graph_runtime_state=resolved_state)
|
self._save_message(session=session, graph_runtime_state=resolved_state, trace_manager=trace_manager)
|
||||||
|
|
||||||
yield self._message_end_to_stream_response()
|
yield self._message_end_to_stream_response()
|
||||||
|
|
||||||
@ -770,7 +768,13 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||||||
if self._conversation_name_generate_thread:
|
if self._conversation_name_generate_thread:
|
||||||
logger.debug("Conversation name generation running as daemon thread")
|
logger.debug("Conversation name generation running as daemon thread")
|
||||||
|
|
||||||
def _save_message(self, *, session: Session, graph_runtime_state: GraphRuntimeState | None = None):
|
def _save_message(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
session: Session,
|
||||||
|
graph_runtime_state: GraphRuntimeState | None = None,
|
||||||
|
trace_manager: TraceQueueManager | None = None,
|
||||||
|
):
|
||||||
message = self._get_message(session=session)
|
message = self._get_message(session=session)
|
||||||
|
|
||||||
# If there are assistant files, remove markdown image links from answer
|
# If there are assistant files, remove markdown image links from answer
|
||||||
@ -826,6 +830,15 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||||||
]
|
]
|
||||||
session.add_all(message_files)
|
session.add_all(message_files)
|
||||||
|
|
||||||
|
if trace_manager:
|
||||||
|
trace_manager.add_trace_task(
|
||||||
|
TraceTask(
|
||||||
|
TraceTaskName.MESSAGE_TRACE,
|
||||||
|
conversation_id=str(message.conversation_id),
|
||||||
|
message_id=str(message.id),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def _seed_graph_runtime_state_from_queue_manager(self) -> None:
|
def _seed_graph_runtime_state_from_queue_manager(self) -> None:
|
||||||
"""Bootstrap the cached runtime state from the queue manager when present."""
|
"""Bootstrap the cached runtime state from the queue manager when present."""
|
||||||
candidate = self._base_task_pipeline.queue_manager.graph_runtime_state
|
candidate = self._base_task_pipeline.queue_manager.graph_runtime_state
|
||||||
|
|||||||
@ -373,6 +373,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
|||||||
|
|
||||||
self._workflow_node_execution_repository.save(domain_execution)
|
self._workflow_node_execution_repository.save(domain_execution)
|
||||||
self._workflow_node_execution_repository.save_execution_data(domain_execution)
|
self._workflow_node_execution_repository.save_execution_data(domain_execution)
|
||||||
|
self._enqueue_node_trace_task(domain_execution)
|
||||||
|
|
||||||
def _fail_running_node_executions(self, *, error_message: str) -> None:
|
def _fail_running_node_executions(self, *, error_message: str) -> None:
|
||||||
now = naive_utc_now()
|
now = naive_utc_now()
|
||||||
@ -390,8 +391,10 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
|||||||
|
|
||||||
conversation_id = self._system_variables().get(SystemVariableKey.CONVERSATION_ID.value)
|
conversation_id = self._system_variables().get(SystemVariableKey.CONVERSATION_ID.value)
|
||||||
external_trace_id = None
|
external_trace_id = None
|
||||||
|
parent_trace_context = None
|
||||||
if isinstance(self._application_generate_entity, (WorkflowAppGenerateEntity, AdvancedChatAppGenerateEntity)):
|
if isinstance(self._application_generate_entity, (WorkflowAppGenerateEntity, AdvancedChatAppGenerateEntity)):
|
||||||
external_trace_id = self._application_generate_entity.extras.get("external_trace_id")
|
external_trace_id = self._application_generate_entity.extras.get("external_trace_id")
|
||||||
|
parent_trace_context = self._application_generate_entity.extras.get("parent_trace_context")
|
||||||
|
|
||||||
trace_task = TraceTask(
|
trace_task = TraceTask(
|
||||||
TraceTaskName.WORKFLOW_TRACE,
|
TraceTaskName.WORKFLOW_TRACE,
|
||||||
@ -399,6 +402,99 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
|||||||
conversation_id=conversation_id,
|
conversation_id=conversation_id,
|
||||||
user_id=self._trace_manager.user_id,
|
user_id=self._trace_manager.user_id,
|
||||||
external_trace_id=external_trace_id,
|
external_trace_id=external_trace_id,
|
||||||
|
parent_trace_context=parent_trace_context,
|
||||||
|
)
|
||||||
|
self._trace_manager.add_trace_task(trace_task)
|
||||||
|
|
||||||
|
def _enqueue_node_trace_task(self, domain_execution: WorkflowNodeExecution) -> None:
|
||||||
|
if not self._trace_manager:
|
||||||
|
return
|
||||||
|
|
||||||
|
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||||
|
|
||||||
|
if not is_enterprise_telemetry_enabled():
|
||||||
|
return
|
||||||
|
|
||||||
|
execution = self._get_workflow_execution()
|
||||||
|
meta = domain_execution.metadata or {}
|
||||||
|
|
||||||
|
parent_trace_context = None
|
||||||
|
if isinstance(self._application_generate_entity, (WorkflowAppGenerateEntity, AdvancedChatAppGenerateEntity)):
|
||||||
|
parent_trace_context = self._application_generate_entity.extras.get("parent_trace_context")
|
||||||
|
|
||||||
|
node_data: dict[str, Any] = {
|
||||||
|
"workflow_id": domain_execution.workflow_id,
|
||||||
|
"workflow_execution_id": execution.id_,
|
||||||
|
"tenant_id": self._application_generate_entity.app_config.tenant_id,
|
||||||
|
"app_id": self._application_generate_entity.app_config.app_id,
|
||||||
|
"node_execution_id": domain_execution.id,
|
||||||
|
"node_id": domain_execution.node_id,
|
||||||
|
"node_type": str(domain_execution.node_type.value),
|
||||||
|
"title": domain_execution.title,
|
||||||
|
"status": str(domain_execution.status.value),
|
||||||
|
"error": domain_execution.error,
|
||||||
|
"elapsed_time": domain_execution.elapsed_time,
|
||||||
|
"index": domain_execution.index,
|
||||||
|
"predecessor_node_id": domain_execution.predecessor_node_id,
|
||||||
|
"created_at": domain_execution.created_at,
|
||||||
|
"finished_at": domain_execution.finished_at,
|
||||||
|
"total_tokens": meta.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS, 0),
|
||||||
|
"total_price": meta.get(WorkflowNodeExecutionMetadataKey.TOTAL_PRICE, 0.0),
|
||||||
|
"currency": meta.get(WorkflowNodeExecutionMetadataKey.CURRENCY),
|
||||||
|
"tool_name": (meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO) or {}).get("tool_name")
|
||||||
|
if isinstance(meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO), dict)
|
||||||
|
else None,
|
||||||
|
"iteration_id": meta.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID),
|
||||||
|
"iteration_index": meta.get(WorkflowNodeExecutionMetadataKey.ITERATION_INDEX),
|
||||||
|
"loop_id": meta.get(WorkflowNodeExecutionMetadataKey.LOOP_ID),
|
||||||
|
"loop_index": meta.get(WorkflowNodeExecutionMetadataKey.LOOP_INDEX),
|
||||||
|
"parallel_id": meta.get(WorkflowNodeExecutionMetadataKey.PARALLEL_ID),
|
||||||
|
"node_inputs": dict(domain_execution.inputs) if domain_execution.inputs else None,
|
||||||
|
"node_outputs": dict(domain_execution.outputs) if domain_execution.outputs else None,
|
||||||
|
"process_data": dict(domain_execution.process_data) if domain_execution.process_data else None,
|
||||||
|
}
|
||||||
|
node_data["invoke_from"] = self._application_generate_entity.invoke_from.value
|
||||||
|
node_data["user_id"] = self._system_variables().get(SystemVariableKey.USER_ID.value)
|
||||||
|
|
||||||
|
if domain_execution.node_type.value == "knowledge-retrieval" and domain_execution.outputs:
|
||||||
|
results = domain_execution.outputs.get("result") or []
|
||||||
|
dataset_ids: list[str] = []
|
||||||
|
dataset_names: list[str] = []
|
||||||
|
for doc in results:
|
||||||
|
if not isinstance(doc, dict):
|
||||||
|
continue
|
||||||
|
doc_meta = doc.get("metadata") or {}
|
||||||
|
did = doc_meta.get("dataset_id")
|
||||||
|
dname = doc_meta.get("dataset_name")
|
||||||
|
if did and did not in dataset_ids:
|
||||||
|
dataset_ids.append(did)
|
||||||
|
if dname and dname not in dataset_names:
|
||||||
|
dataset_names.append(dname)
|
||||||
|
if dataset_ids:
|
||||||
|
node_data["dataset_ids"] = dataset_ids
|
||||||
|
if dataset_names:
|
||||||
|
node_data["dataset_names"] = dataset_names
|
||||||
|
|
||||||
|
tool_info = meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO)
|
||||||
|
if isinstance(tool_info, dict):
|
||||||
|
plugin_id = tool_info.get("plugin_unique_identifier")
|
||||||
|
if plugin_id:
|
||||||
|
node_data["plugin_name"] = plugin_id
|
||||||
|
credential_id = tool_info.get("credential_id")
|
||||||
|
if credential_id:
|
||||||
|
node_data["credential_id"] = credential_id
|
||||||
|
node_data["credential_provider_type"] = tool_info.get("provider_type")
|
||||||
|
|
||||||
|
conversation_id = self._system_variables().get(SystemVariableKey.CONVERSATION_ID.value)
|
||||||
|
if conversation_id:
|
||||||
|
node_data["conversation_id"] = conversation_id
|
||||||
|
|
||||||
|
if parent_trace_context:
|
||||||
|
node_data["parent_trace_context"] = parent_trace_context
|
||||||
|
|
||||||
|
trace_task = TraceTask(
|
||||||
|
TraceTaskName.NODE_EXECUTION_TRACE,
|
||||||
|
node_execution_data=node_data,
|
||||||
)
|
)
|
||||||
self._trace_manager.add_trace_task(trace_task)
|
self._trace_manager.add_trace_task(trace_task)
|
||||||
|
|
||||||
|
|||||||
@ -15,16 +15,23 @@ class TraceContextFilter(logging.Filter):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def filter(self, record: logging.LogRecord) -> bool:
|
def filter(self, record: logging.LogRecord) -> bool:
|
||||||
# Get trace context from OpenTelemetry
|
# Preserve explicit trace_id set by the caller (e.g. emit_metric_only_event)
|
||||||
trace_id, span_id = self._get_otel_context()
|
existing_trace_id = getattr(record, "trace_id", "")
|
||||||
|
if not existing_trace_id:
|
||||||
|
# Get trace context from OpenTelemetry
|
||||||
|
trace_id, span_id = self._get_otel_context()
|
||||||
|
|
||||||
# Set trace_id (fallback to ContextVar if no OTEL context)
|
# Set trace_id (fallback to ContextVar if no OTEL context)
|
||||||
if trace_id:
|
if trace_id:
|
||||||
record.trace_id = trace_id
|
record.trace_id = trace_id
|
||||||
|
else:
|
||||||
|
record.trace_id = get_trace_id()
|
||||||
|
|
||||||
|
record.span_id = span_id or ""
|
||||||
else:
|
else:
|
||||||
record.trace_id = get_trace_id()
|
# Keep existing trace_id; only fill span_id if missing
|
||||||
|
if not getattr(record, "span_id", ""):
|
||||||
record.span_id = span_id or ""
|
record.span_id = ""
|
||||||
|
|
||||||
# For backward compatibility, also set req_id
|
# For backward compatibility, also set req_id
|
||||||
record.req_id = get_request_id()
|
record.req_id = get_request_id()
|
||||||
@ -55,9 +62,12 @@ class IdentityContextFilter(logging.Filter):
|
|||||||
|
|
||||||
def filter(self, record: logging.LogRecord) -> bool:
|
def filter(self, record: logging.LogRecord) -> bool:
|
||||||
identity = self._extract_identity()
|
identity = self._extract_identity()
|
||||||
record.tenant_id = identity.get("tenant_id", "")
|
if not getattr(record, "tenant_id", ""):
|
||||||
record.user_id = identity.get("user_id", "")
|
record.tenant_id = identity.get("tenant_id", "")
|
||||||
record.user_type = identity.get("user_type", "")
|
if not getattr(record, "user_id", ""):
|
||||||
|
record.user_id = identity.get("user_id", "")
|
||||||
|
if not getattr(record, "user_type", ""):
|
||||||
|
record.user_type = identity.get("user_type", "")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _extract_identity(self) -> dict[str, str]:
|
def _extract_identity(self) -> dict[str, str]:
|
||||||
|
|||||||
@ -114,6 +114,50 @@ class GenerateNameTraceInfo(BaseTraceInfo):
|
|||||||
tenant_id: str
|
tenant_id: str
|
||||||
|
|
||||||
|
|
||||||
|
class WorkflowNodeTraceInfo(BaseTraceInfo):
|
||||||
|
workflow_id: str
|
||||||
|
workflow_run_id: str
|
||||||
|
tenant_id: str
|
||||||
|
node_execution_id: str
|
||||||
|
node_id: str
|
||||||
|
node_type: str
|
||||||
|
title: str
|
||||||
|
|
||||||
|
status: str
|
||||||
|
error: str | None = None
|
||||||
|
elapsed_time: float
|
||||||
|
|
||||||
|
index: int
|
||||||
|
predecessor_node_id: str | None = None
|
||||||
|
|
||||||
|
total_tokens: int = 0
|
||||||
|
total_price: float = 0.0
|
||||||
|
currency: str | None = None
|
||||||
|
|
||||||
|
model_provider: str | None = None
|
||||||
|
model_name: str | None = None
|
||||||
|
prompt_tokens: int | None = None
|
||||||
|
completion_tokens: int | None = None
|
||||||
|
|
||||||
|
tool_name: str | None = None
|
||||||
|
|
||||||
|
iteration_id: str | None = None
|
||||||
|
iteration_index: int | None = None
|
||||||
|
loop_id: str | None = None
|
||||||
|
loop_index: int | None = None
|
||||||
|
parallel_id: str | None = None
|
||||||
|
|
||||||
|
node_inputs: Mapping[str, Any] | None = None
|
||||||
|
node_outputs: Mapping[str, Any] | None = None
|
||||||
|
process_data: Mapping[str, Any] | None = None
|
||||||
|
|
||||||
|
model_config = ConfigDict(protected_namespaces=())
|
||||||
|
|
||||||
|
|
||||||
|
class DraftNodeExecutionTrace(WorkflowNodeTraceInfo):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TaskData(BaseModel):
|
class TaskData(BaseModel):
|
||||||
app_id: str
|
app_id: str
|
||||||
trace_info_type: str
|
trace_info_type: str
|
||||||
@ -128,12 +172,15 @@ trace_info_info_map = {
|
|||||||
"DatasetRetrievalTraceInfo": DatasetRetrievalTraceInfo,
|
"DatasetRetrievalTraceInfo": DatasetRetrievalTraceInfo,
|
||||||
"ToolTraceInfo": ToolTraceInfo,
|
"ToolTraceInfo": ToolTraceInfo,
|
||||||
"GenerateNameTraceInfo": GenerateNameTraceInfo,
|
"GenerateNameTraceInfo": GenerateNameTraceInfo,
|
||||||
|
"WorkflowNodeTraceInfo": WorkflowNodeTraceInfo,
|
||||||
|
"DraftNodeExecutionTrace": DraftNodeExecutionTrace,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class TraceTaskName(StrEnum):
|
class TraceTaskName(StrEnum):
|
||||||
CONVERSATION_TRACE = "conversation"
|
CONVERSATION_TRACE = "conversation"
|
||||||
WORKFLOW_TRACE = "workflow"
|
WORKFLOW_TRACE = "workflow"
|
||||||
|
DRAFT_NODE_EXECUTION_TRACE = "draft_node_execution"
|
||||||
MESSAGE_TRACE = "message"
|
MESSAGE_TRACE = "message"
|
||||||
MODERATION_TRACE = "moderation"
|
MODERATION_TRACE = "moderation"
|
||||||
SUGGESTED_QUESTION_TRACE = "suggested_question"
|
SUGGESTED_QUESTION_TRACE = "suggested_question"
|
||||||
@ -141,3 +188,4 @@ class TraceTaskName(StrEnum):
|
|||||||
TOOL_TRACE = "tool"
|
TOOL_TRACE = "tool"
|
||||||
GENERATE_NAME_TRACE = "generate_conversation_name"
|
GENERATE_NAME_TRACE = "generate_conversation_name"
|
||||||
DATASOURCE_TRACE = "datasource"
|
DATASOURCE_TRACE = "datasource"
|
||||||
|
NODE_EXECUTION_TRACE = "node_execution"
|
||||||
|
|||||||
@ -3,6 +3,7 @@ import os
|
|||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from langfuse import Langfuse
|
from langfuse import Langfuse
|
||||||
|
from sqlalchemy import select
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
|
||||||
from core.ops.base_trace_instance import BaseTraceInstance
|
from core.ops.base_trace_instance import BaseTraceInstance
|
||||||
@ -30,7 +31,7 @@ from core.ops.utils import filter_none_values
|
|||||||
from core.repositories import DifyCoreRepositoryFactory
|
from core.repositories import DifyCoreRepositoryFactory
|
||||||
from core.workflow.enums import NodeType
|
from core.workflow.enums import NodeType
|
||||||
from extensions.ext_database import db
|
from extensions.ext_database import db
|
||||||
from models import EndUser, WorkflowNodeExecutionTriggeredFrom
|
from models import EndUser, Message, WorkflowNodeExecutionTriggeredFrom
|
||||||
from models.enums import MessageStatus
|
from models.enums import MessageStatus
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -71,7 +72,50 @@ class LangFuseDataTrace(BaseTraceInstance):
|
|||||||
metadata = trace_info.metadata
|
metadata = trace_info.metadata
|
||||||
metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id
|
metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id
|
||||||
|
|
||||||
if trace_info.message_id:
|
# Check for parent_trace_context to detect nested workflow
|
||||||
|
parent_trace_context = trace_info.metadata.get("parent_trace_context")
|
||||||
|
|
||||||
|
if parent_trace_context:
|
||||||
|
# Nested workflow: create span under outer trace
|
||||||
|
outer_trace_id = parent_trace_context.get("trace_id")
|
||||||
|
parent_node_execution_id = parent_trace_context.get("parent_node_execution_id")
|
||||||
|
parent_conversation_id = parent_trace_context.get("parent_conversation_id")
|
||||||
|
parent_workflow_run_id = parent_trace_context.get("parent_workflow_run_id")
|
||||||
|
|
||||||
|
# Resolve outer trace_id: try message_id lookup first, fallback to workflow_run_id
|
||||||
|
if parent_conversation_id:
|
||||||
|
session_factory = sessionmaker(bind=db.engine)
|
||||||
|
with session_factory() as session:
|
||||||
|
message_data_stmt = select(Message.id).where(
|
||||||
|
Message.conversation_id == parent_conversation_id,
|
||||||
|
Message.workflow_run_id == parent_workflow_run_id,
|
||||||
|
)
|
||||||
|
resolved_message_id = session.scalar(message_data_stmt)
|
||||||
|
if resolved_message_id:
|
||||||
|
outer_trace_id = resolved_message_id
|
||||||
|
else:
|
||||||
|
outer_trace_id = parent_workflow_run_id
|
||||||
|
else:
|
||||||
|
outer_trace_id = parent_workflow_run_id
|
||||||
|
|
||||||
|
# Create inner workflow span under outer trace
|
||||||
|
workflow_span_data = LangfuseSpan(
|
||||||
|
id=trace_info.workflow_run_id,
|
||||||
|
name=TraceTaskName.WORKFLOW_TRACE,
|
||||||
|
input=dict(trace_info.workflow_run_inputs),
|
||||||
|
output=dict(trace_info.workflow_run_outputs),
|
||||||
|
trace_id=outer_trace_id,
|
||||||
|
parent_observation_id=parent_node_execution_id,
|
||||||
|
start_time=trace_info.start_time,
|
||||||
|
end_time=trace_info.end_time,
|
||||||
|
metadata=metadata,
|
||||||
|
level=LevelEnum.DEFAULT if trace_info.error == "" else LevelEnum.ERROR,
|
||||||
|
status_message=trace_info.error or "",
|
||||||
|
)
|
||||||
|
self.add_span(langfuse_span_data=workflow_span_data)
|
||||||
|
# Use outer_trace_id for all node spans/generations
|
||||||
|
trace_id = outer_trace_id
|
||||||
|
elif trace_info.message_id:
|
||||||
trace_id = trace_info.trace_id or trace_info.message_id
|
trace_id = trace_info.trace_id or trace_info.message_id
|
||||||
name = TraceTaskName.MESSAGE_TRACE
|
name = TraceTaskName.MESSAGE_TRACE
|
||||||
trace_data = LangfuseTrace(
|
trace_data = LangfuseTrace(
|
||||||
@ -174,6 +218,11 @@ class LangFuseDataTrace(BaseTraceInstance):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Determine parent_observation_id for nested workflows
|
||||||
|
node_parent_observation_id = None
|
||||||
|
if parent_trace_context or trace_info.message_id:
|
||||||
|
node_parent_observation_id = trace_info.workflow_run_id
|
||||||
|
|
||||||
# add generation span
|
# add generation span
|
||||||
if process_data and process_data.get("model_mode") == "chat":
|
if process_data and process_data.get("model_mode") == "chat":
|
||||||
total_token = metadata.get("total_tokens", 0)
|
total_token = metadata.get("total_tokens", 0)
|
||||||
@ -206,7 +255,7 @@ class LangFuseDataTrace(BaseTraceInstance):
|
|||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
|
level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
|
||||||
status_message=trace_info.error or "",
|
status_message=trace_info.error or "",
|
||||||
parent_observation_id=trace_info.workflow_run_id if trace_info.message_id else None,
|
parent_observation_id=node_parent_observation_id,
|
||||||
usage=generation_usage,
|
usage=generation_usage,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -225,7 +274,7 @@ class LangFuseDataTrace(BaseTraceInstance):
|
|||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
|
level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
|
||||||
status_message=trace_info.error or "",
|
status_message=trace_info.error or "",
|
||||||
parent_observation_id=trace_info.workflow_run_id if trace_info.message_id else None,
|
parent_observation_id=node_parent_observation_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.add_span(langfuse_span_data=span_data)
|
self.add_span(langfuse_span_data=span_data)
|
||||||
|
|||||||
@ -6,6 +6,7 @@ from typing import cast
|
|||||||
|
|
||||||
from langsmith import Client
|
from langsmith import Client
|
||||||
from langsmith.schemas import RunBase
|
from langsmith.schemas import RunBase
|
||||||
|
from sqlalchemy import select
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
|
||||||
from core.ops.base_trace_instance import BaseTraceInstance
|
from core.ops.base_trace_instance import BaseTraceInstance
|
||||||
@ -30,7 +31,7 @@ from core.ops.utils import filter_none_values, generate_dotted_order
|
|||||||
from core.repositories import DifyCoreRepositoryFactory
|
from core.repositories import DifyCoreRepositoryFactory
|
||||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey
|
from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey
|
||||||
from extensions.ext_database import db
|
from extensions.ext_database import db
|
||||||
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
|
from models import EndUser, Message, MessageFile, WorkflowNodeExecutionTriggeredFrom
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -64,7 +65,35 @@ class LangSmithDataTrace(BaseTraceInstance):
|
|||||||
self.generate_name_trace(trace_info)
|
self.generate_name_trace(trace_info)
|
||||||
|
|
||||||
def workflow_trace(self, trace_info: WorkflowTraceInfo):
|
def workflow_trace(self, trace_info: WorkflowTraceInfo):
|
||||||
trace_id = trace_info.trace_id or trace_info.message_id or trace_info.workflow_run_id
|
# Check for parent_trace_context for cross-workflow linking
|
||||||
|
parent_trace_context = trace_info.metadata.get("parent_trace_context")
|
||||||
|
|
||||||
|
if parent_trace_context:
|
||||||
|
# Inner workflow: resolve outer trace_id and link to parent node
|
||||||
|
outer_trace_id = parent_trace_context.get("parent_workflow_run_id")
|
||||||
|
|
||||||
|
# Try to resolve message_id from conversation_id if available
|
||||||
|
if parent_trace_context.get("parent_conversation_id"):
|
||||||
|
try:
|
||||||
|
session_factory = sessionmaker(bind=db.engine)
|
||||||
|
with session_factory() as session:
|
||||||
|
message_data_stmt = select(Message.id).where(
|
||||||
|
Message.conversation_id == parent_trace_context["parent_conversation_id"],
|
||||||
|
Message.workflow_run_id == parent_trace_context["parent_workflow_run_id"],
|
||||||
|
)
|
||||||
|
resolved_message_id = session.scalar(message_data_stmt)
|
||||||
|
if resolved_message_id:
|
||||||
|
outer_trace_id = resolved_message_id
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Failed to resolve message_id from conversation_id: %s", str(e))
|
||||||
|
|
||||||
|
trace_id = outer_trace_id
|
||||||
|
parent_run_id = parent_trace_context.get("parent_node_execution_id")
|
||||||
|
else:
|
||||||
|
# Outer workflow: existing behavior
|
||||||
|
trace_id = trace_info.trace_id or trace_info.message_id or trace_info.workflow_run_id
|
||||||
|
parent_run_id = trace_info.message_id or None
|
||||||
|
|
||||||
if trace_info.start_time is None:
|
if trace_info.start_time is None:
|
||||||
trace_info.start_time = datetime.now()
|
trace_info.start_time = datetime.now()
|
||||||
message_dotted_order = (
|
message_dotted_order = (
|
||||||
@ -78,7 +107,8 @@ class LangSmithDataTrace(BaseTraceInstance):
|
|||||||
metadata = trace_info.metadata
|
metadata = trace_info.metadata
|
||||||
metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id
|
metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id
|
||||||
|
|
||||||
if trace_info.message_id:
|
# Only create message_run for outer workflows (no parent_trace_context)
|
||||||
|
if trace_info.message_id and not parent_trace_context:
|
||||||
message_run = LangSmithRunModel(
|
message_run = LangSmithRunModel(
|
||||||
id=trace_info.message_id,
|
id=trace_info.message_id,
|
||||||
name=TraceTaskName.MESSAGE_TRACE,
|
name=TraceTaskName.MESSAGE_TRACE,
|
||||||
@ -121,9 +151,9 @@ class LangSmithDataTrace(BaseTraceInstance):
|
|||||||
},
|
},
|
||||||
error=trace_info.error,
|
error=trace_info.error,
|
||||||
tags=["workflow"],
|
tags=["workflow"],
|
||||||
parent_run_id=trace_info.message_id or None,
|
parent_run_id=parent_run_id,
|
||||||
trace_id=trace_id,
|
trace_id=trace_id,
|
||||||
dotted_order=workflow_dotted_order,
|
dotted_order=None if parent_trace_context else workflow_dotted_order,
|
||||||
serialized=None,
|
serialized=None,
|
||||||
events=[],
|
events=[],
|
||||||
session_id=None,
|
session_id=None,
|
||||||
|
|||||||
@ -21,6 +21,7 @@ from core.ops.entities.config_entity import (
|
|||||||
)
|
)
|
||||||
from core.ops.entities.trace_entity import (
|
from core.ops.entities.trace_entity import (
|
||||||
DatasetRetrievalTraceInfo,
|
DatasetRetrievalTraceInfo,
|
||||||
|
DraftNodeExecutionTrace,
|
||||||
GenerateNameTraceInfo,
|
GenerateNameTraceInfo,
|
||||||
MessageTraceInfo,
|
MessageTraceInfo,
|
||||||
ModerationTraceInfo,
|
ModerationTraceInfo,
|
||||||
@ -28,12 +29,16 @@ from core.ops.entities.trace_entity import (
|
|||||||
TaskData,
|
TaskData,
|
||||||
ToolTraceInfo,
|
ToolTraceInfo,
|
||||||
TraceTaskName,
|
TraceTaskName,
|
||||||
|
WorkflowNodeTraceInfo,
|
||||||
WorkflowTraceInfo,
|
WorkflowTraceInfo,
|
||||||
)
|
)
|
||||||
from core.ops.utils import get_message_data
|
from core.ops.utils import get_message_data
|
||||||
from extensions.ext_database import db
|
from extensions.ext_database import db
|
||||||
from extensions.ext_storage import storage
|
from extensions.ext_storage import storage
|
||||||
|
from models.account import Tenant
|
||||||
|
from models.dataset import Dataset
|
||||||
from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
|
from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
|
||||||
|
from models.tools import ApiToolProvider, BuiltinToolProvider, MCPToolProvider, WorkflowToolProvider
|
||||||
from models.workflow import WorkflowAppLog
|
from models.workflow import WorkflowAppLog
|
||||||
from tasks.ops_trace_task import process_trace_tasks
|
from tasks.ops_trace_task import process_trace_tasks
|
||||||
|
|
||||||
@ -43,6 +48,44 @@ if TYPE_CHECKING:
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _lookup_app_and_workspace_names(app_id: str | None, tenant_id: str | None) -> tuple[str, str]:
|
||||||
|
"""Return (app_name, workspace_name) for the given IDs. Falls back to empty strings."""
|
||||||
|
app_name = ""
|
||||||
|
workspace_name = ""
|
||||||
|
if not app_id and not tenant_id:
|
||||||
|
return app_name, workspace_name
|
||||||
|
with Session(db.engine) as session:
|
||||||
|
if app_id:
|
||||||
|
name = session.scalar(select(App.name).where(App.id == app_id))
|
||||||
|
if name:
|
||||||
|
app_name = name
|
||||||
|
if tenant_id:
|
||||||
|
name = session.scalar(select(Tenant.name).where(Tenant.id == tenant_id))
|
||||||
|
if name:
|
||||||
|
workspace_name = name
|
||||||
|
return app_name, workspace_name
|
||||||
|
|
||||||
|
|
||||||
|
_PROVIDER_TYPE_TO_MODEL: dict[str, type] = {
|
||||||
|
"builtin": BuiltinToolProvider,
|
||||||
|
"plugin": BuiltinToolProvider,
|
||||||
|
"api": ApiToolProvider,
|
||||||
|
"workflow": WorkflowToolProvider,
|
||||||
|
"mcp": MCPToolProvider,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _lookup_credential_name(credential_id: str | None, provider_type: str | None) -> str:
|
||||||
|
if not credential_id:
|
||||||
|
return ""
|
||||||
|
model_cls = _PROVIDER_TYPE_TO_MODEL.get(provider_type or "")
|
||||||
|
if not model_cls:
|
||||||
|
return ""
|
||||||
|
with Session(db.engine) as session:
|
||||||
|
name = session.scalar(select(model_cls.name).where(model_cls.id == credential_id))
|
||||||
|
return str(name) if name else ""
|
||||||
|
|
||||||
|
|
||||||
class OpsTraceProviderConfigMap(collections.UserDict[str, dict[str, Any]]):
|
class OpsTraceProviderConfigMap(collections.UserDict[str, dict[str, Any]]):
|
||||||
def __getitem__(self, provider: str) -> dict[str, Any]:
|
def __getitem__(self, provider: str) -> dict[str, Any]:
|
||||||
match provider:
|
match provider:
|
||||||
@ -528,6 +571,8 @@ class TraceTask:
|
|||||||
TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
|
TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
|
||||||
conversation_id=self.conversation_id, timer=self.timer, **self.kwargs
|
conversation_id=self.conversation_id, timer=self.timer, **self.kwargs
|
||||||
),
|
),
|
||||||
|
TraceTaskName.NODE_EXECUTION_TRACE: lambda: self.node_execution_trace(**self.kwargs),
|
||||||
|
TraceTaskName.DRAFT_NODE_EXECUTION_TRACE: lambda: self.draft_node_execution_trace(**self.kwargs),
|
||||||
}
|
}
|
||||||
|
|
||||||
return preprocess_map.get(self.trace_type, lambda: None)()
|
return preprocess_map.get(self.trace_type, lambda: None)()
|
||||||
@ -583,7 +628,9 @@ class TraceTask:
|
|||||||
)
|
)
|
||||||
message_id = session.scalar(message_data_stmt)
|
message_id = session.scalar(message_data_stmt)
|
||||||
|
|
||||||
metadata = {
|
app_name, workspace_name = _lookup_app_and_workspace_names(workflow_run.app_id, tenant_id)
|
||||||
|
|
||||||
|
metadata: dict[str, Any] = {
|
||||||
"workflow_id": workflow_id,
|
"workflow_id": workflow_id,
|
||||||
"conversation_id": conversation_id,
|
"conversation_id": conversation_id,
|
||||||
"workflow_run_id": workflow_run_id,
|
"workflow_run_id": workflow_run_id,
|
||||||
@ -596,8 +643,14 @@ class TraceTask:
|
|||||||
"triggered_from": workflow_run.triggered_from,
|
"triggered_from": workflow_run.triggered_from,
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
"app_id": workflow_run.app_id,
|
"app_id": workflow_run.app_id,
|
||||||
|
"app_name": app_name,
|
||||||
|
"workspace_name": workspace_name,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
parent_trace_context = self.kwargs.get("parent_trace_context")
|
||||||
|
if parent_trace_context:
|
||||||
|
metadata["parent_trace_context"] = parent_trace_context
|
||||||
|
|
||||||
workflow_trace_info = WorkflowTraceInfo(
|
workflow_trace_info = WorkflowTraceInfo(
|
||||||
trace_id=self.trace_id,
|
trace_id=self.trace_id,
|
||||||
workflow_data=workflow_run.to_dict(),
|
workflow_data=workflow_run.to_dict(),
|
||||||
@ -645,6 +698,14 @@ class TraceTask:
|
|||||||
|
|
||||||
streaming_metrics = self._extract_streaming_metrics(message_data)
|
streaming_metrics = self._extract_streaming_metrics(message_data)
|
||||||
|
|
||||||
|
tenant_id = ""
|
||||||
|
with Session(db.engine) as session:
|
||||||
|
tid = session.scalar(select(App.tenant_id).where(App.id == message_data.app_id))
|
||||||
|
if tid:
|
||||||
|
tenant_id = str(tid)
|
||||||
|
|
||||||
|
app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id)
|
||||||
|
|
||||||
metadata = {
|
metadata = {
|
||||||
"conversation_id": message_data.conversation_id,
|
"conversation_id": message_data.conversation_id,
|
||||||
"ls_provider": message_data.model_provider,
|
"ls_provider": message_data.model_provider,
|
||||||
@ -656,6 +717,11 @@ class TraceTask:
|
|||||||
"workflow_run_id": message_data.workflow_run_id,
|
"workflow_run_id": message_data.workflow_run_id,
|
||||||
"from_source": message_data.from_source,
|
"from_source": message_data.from_source,
|
||||||
"message_id": message_id,
|
"message_id": message_id,
|
||||||
|
"tenant_id": tenant_id,
|
||||||
|
"app_id": message_data.app_id,
|
||||||
|
"user_id": message_data.from_end_user_id or message_data.from_account_id,
|
||||||
|
"app_name": app_name,
|
||||||
|
"workspace_name": workspace_name,
|
||||||
}
|
}
|
||||||
|
|
||||||
message_tokens = message_data.message_tokens
|
message_tokens = message_data.message_tokens
|
||||||
@ -778,6 +844,36 @@ class TraceTask:
|
|||||||
if not message_data:
|
if not message_data:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
tenant_id = ""
|
||||||
|
with Session(db.engine) as session:
|
||||||
|
tid = session.scalar(select(App.tenant_id).where(App.id == message_data.app_id))
|
||||||
|
if tid:
|
||||||
|
tenant_id = str(tid)
|
||||||
|
|
||||||
|
app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id)
|
||||||
|
|
||||||
|
doc_list = [doc.model_dump() for doc in documents] if documents else []
|
||||||
|
dataset_ids: set[str] = set()
|
||||||
|
for doc in doc_list:
|
||||||
|
doc_meta = doc.get("metadata") or {}
|
||||||
|
did = doc_meta.get("dataset_id")
|
||||||
|
if did:
|
||||||
|
dataset_ids.add(did)
|
||||||
|
|
||||||
|
embedding_models: dict[str, dict[str, str]] = {}
|
||||||
|
if dataset_ids:
|
||||||
|
with Session(db.engine) as session:
|
||||||
|
rows = session.execute(
|
||||||
|
select(Dataset.id, Dataset.embedding_model, Dataset.embedding_model_provider).where(
|
||||||
|
Dataset.id.in_(list(dataset_ids))
|
||||||
|
)
|
||||||
|
).all()
|
||||||
|
for row in rows:
|
||||||
|
embedding_models[str(row[0])] = {
|
||||||
|
"embedding_model": row[1] or "",
|
||||||
|
"embedding_model_provider": row[2] or "",
|
||||||
|
}
|
||||||
|
|
||||||
metadata = {
|
metadata = {
|
||||||
"message_id": message_id,
|
"message_id": message_id,
|
||||||
"ls_provider": message_data.model_provider,
|
"ls_provider": message_data.model_provider,
|
||||||
@ -788,13 +884,19 @@ class TraceTask:
|
|||||||
"agent_based": message_data.agent_based,
|
"agent_based": message_data.agent_based,
|
||||||
"workflow_run_id": message_data.workflow_run_id,
|
"workflow_run_id": message_data.workflow_run_id,
|
||||||
"from_source": message_data.from_source,
|
"from_source": message_data.from_source,
|
||||||
|
"tenant_id": tenant_id,
|
||||||
|
"app_id": message_data.app_id,
|
||||||
|
"user_id": message_data.from_end_user_id or message_data.from_account_id,
|
||||||
|
"app_name": app_name,
|
||||||
|
"workspace_name": workspace_name,
|
||||||
|
"embedding_models": embedding_models,
|
||||||
}
|
}
|
||||||
|
|
||||||
dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
|
dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
|
||||||
trace_id=self.trace_id,
|
trace_id=self.trace_id,
|
||||||
message_id=message_id,
|
message_id=message_id,
|
||||||
inputs=message_data.query or message_data.inputs,
|
inputs=message_data.query or message_data.inputs,
|
||||||
documents=[doc.model_dump() for doc in documents] if documents else [],
|
documents=doc_list,
|
||||||
start_time=timer.get("start"),
|
start_time=timer.get("start"),
|
||||||
end_time=timer.get("end"),
|
end_time=timer.get("end"),
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
@ -905,6 +1007,90 @@ class TraceTask:
|
|||||||
|
|
||||||
return generate_name_trace_info
|
return generate_name_trace_info
|
||||||
|
|
||||||
|
def node_execution_trace(self, **kwargs) -> WorkflowNodeTraceInfo | dict:
|
||||||
|
node_data: dict = kwargs.get("node_execution_data", {})
|
||||||
|
if not node_data:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
app_name, workspace_name = _lookup_app_and_workspace_names(node_data.get("app_id"), node_data.get("tenant_id"))
|
||||||
|
|
||||||
|
credential_name = _lookup_credential_name(
|
||||||
|
node_data.get("credential_id"), node_data.get("credential_provider_type")
|
||||||
|
)
|
||||||
|
|
||||||
|
metadata: dict[str, Any] = {
|
||||||
|
"tenant_id": node_data.get("tenant_id"),
|
||||||
|
"app_id": node_data.get("app_id"),
|
||||||
|
"app_name": app_name,
|
||||||
|
"workspace_name": workspace_name,
|
||||||
|
"user_id": node_data.get("user_id"),
|
||||||
|
"dataset_ids": node_data.get("dataset_ids"),
|
||||||
|
"dataset_names": node_data.get("dataset_names"),
|
||||||
|
"plugin_name": node_data.get("plugin_name"),
|
||||||
|
"credential_name": credential_name,
|
||||||
|
}
|
||||||
|
|
||||||
|
parent_trace_context = node_data.get("parent_trace_context")
|
||||||
|
if parent_trace_context:
|
||||||
|
metadata["parent_trace_context"] = parent_trace_context
|
||||||
|
|
||||||
|
message_id: str | None = None
|
||||||
|
conversation_id = node_data.get("conversation_id")
|
||||||
|
workflow_execution_id = node_data.get("workflow_execution_id")
|
||||||
|
if conversation_id and workflow_execution_id and not parent_trace_context:
|
||||||
|
with Session(db.engine) as session:
|
||||||
|
msg_id = session.scalar(
|
||||||
|
select(Message.id).where(
|
||||||
|
Message.conversation_id == conversation_id,
|
||||||
|
Message.workflow_run_id == workflow_execution_id,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if msg_id:
|
||||||
|
message_id = str(msg_id)
|
||||||
|
metadata["message_id"] = message_id
|
||||||
|
|
||||||
|
return WorkflowNodeTraceInfo(
|
||||||
|
trace_id=self.trace_id,
|
||||||
|
message_id=message_id,
|
||||||
|
start_time=node_data.get("created_at"),
|
||||||
|
end_time=node_data.get("finished_at"),
|
||||||
|
metadata=metadata,
|
||||||
|
workflow_id=node_data.get("workflow_id", ""),
|
||||||
|
workflow_run_id=node_data.get("workflow_execution_id", ""),
|
||||||
|
tenant_id=node_data.get("tenant_id", ""),
|
||||||
|
node_execution_id=node_data.get("node_execution_id", ""),
|
||||||
|
node_id=node_data.get("node_id", ""),
|
||||||
|
node_type=node_data.get("node_type", ""),
|
||||||
|
title=node_data.get("title", ""),
|
||||||
|
status=node_data.get("status", ""),
|
||||||
|
error=node_data.get("error"),
|
||||||
|
elapsed_time=node_data.get("elapsed_time", 0.0),
|
||||||
|
index=node_data.get("index", 0),
|
||||||
|
predecessor_node_id=node_data.get("predecessor_node_id"),
|
||||||
|
total_tokens=node_data.get("total_tokens", 0),
|
||||||
|
total_price=node_data.get("total_price", 0.0),
|
||||||
|
currency=node_data.get("currency"),
|
||||||
|
model_provider=node_data.get("model_provider"),
|
||||||
|
model_name=node_data.get("model_name"),
|
||||||
|
prompt_tokens=node_data.get("prompt_tokens"),
|
||||||
|
completion_tokens=node_data.get("completion_tokens"),
|
||||||
|
tool_name=node_data.get("tool_name"),
|
||||||
|
iteration_id=node_data.get("iteration_id"),
|
||||||
|
iteration_index=node_data.get("iteration_index"),
|
||||||
|
loop_id=node_data.get("loop_id"),
|
||||||
|
loop_index=node_data.get("loop_index"),
|
||||||
|
parallel_id=node_data.get("parallel_id"),
|
||||||
|
node_inputs=node_data.get("node_inputs"),
|
||||||
|
node_outputs=node_data.get("node_outputs"),
|
||||||
|
process_data=node_data.get("process_data"),
|
||||||
|
)
|
||||||
|
|
||||||
|
def draft_node_execution_trace(self, **kwargs) -> DraftNodeExecutionTrace | dict:
|
||||||
|
node_trace = self.node_execution_trace(**kwargs)
|
||||||
|
if not node_trace or not isinstance(node_trace, WorkflowNodeTraceInfo):
|
||||||
|
return node_trace
|
||||||
|
return DraftNodeExecutionTrace(**node_trace.model_dump())
|
||||||
|
|
||||||
def _extract_streaming_metrics(self, message_data) -> dict:
|
def _extract_streaming_metrics(self, message_data) -> dict:
|
||||||
if not message_data.message_metadata:
|
if not message_data.message_metadata:
|
||||||
return {}
|
return {}
|
||||||
@ -938,13 +1124,17 @@ class TraceQueueManager:
|
|||||||
self.user_id = user_id
|
self.user_id = user_id
|
||||||
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
|
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
|
||||||
self.flask_app = current_app._get_current_object() # type: ignore
|
self.flask_app = current_app._get_current_object() # type: ignore
|
||||||
|
|
||||||
|
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||||
|
|
||||||
|
self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled()
|
||||||
if trace_manager_timer is None:
|
if trace_manager_timer is None:
|
||||||
self.start_timer()
|
self.start_timer()
|
||||||
|
|
||||||
def add_trace_task(self, trace_task: TraceTask):
|
def add_trace_task(self, trace_task: TraceTask):
|
||||||
global trace_manager_timer, trace_manager_queue
|
global trace_manager_timer, trace_manager_queue
|
||||||
try:
|
try:
|
||||||
if self.trace_instance:
|
if self._enterprise_telemetry_enabled or self.trace_instance:
|
||||||
trace_task.app_id = self.app_id
|
trace_task.app_id = self.app_id
|
||||||
trace_manager_queue.put(trace_task)
|
trace_manager_queue.put(trace_task)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|||||||
@ -61,6 +61,7 @@ class ToolNode(Node[ToolNodeData]):
|
|||||||
"provider_type": self.node_data.provider_type.value,
|
"provider_type": self.node_data.provider_type.value,
|
||||||
"provider_id": self.node_data.provider_id,
|
"provider_id": self.node_data.provider_id,
|
||||||
"plugin_unique_identifier": self.node_data.plugin_unique_identifier,
|
"plugin_unique_identifier": self.node_data.plugin_unique_identifier,
|
||||||
|
"credential_id": self.node_data.credential_id,
|
||||||
}
|
}
|
||||||
|
|
||||||
# get tool runtime
|
# get tool runtime
|
||||||
@ -105,6 +106,20 @@ class ToolNode(Node[ToolNodeData]):
|
|||||||
# get conversation id
|
# get conversation id
|
||||||
conversation_id = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID])
|
conversation_id = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID])
|
||||||
|
|
||||||
|
from core.tools.workflow_as_tool.tool import WorkflowTool
|
||||||
|
|
||||||
|
if isinstance(tool_runtime, WorkflowTool):
|
||||||
|
workflow_run_id_var = self.graph_runtime_state.variable_pool.get(
|
||||||
|
["sys", SystemVariableKey.WORKFLOW_EXECUTION_ID]
|
||||||
|
)
|
||||||
|
tool_runtime.parent_trace_context = {
|
||||||
|
"trace_id": str(workflow_run_id_var.text) if workflow_run_id_var else "",
|
||||||
|
"parent_node_execution_id": self.execution_id,
|
||||||
|
"parent_workflow_run_id": str(workflow_run_id_var.text) if workflow_run_id_var else "",
|
||||||
|
"parent_app_id": self.app_id,
|
||||||
|
"parent_conversation_id": conversation_id.text if conversation_id else None,
|
||||||
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
message_stream = ToolEngine.generic_invoke(
|
message_stream = ToolEngine.generic_invoke(
|
||||||
tool=tool_runtime,
|
tool=tool_runtime,
|
||||||
|
|||||||
0
api/enterprise/__init__.py
Normal file
0
api/enterprise/__init__.py
Normal file
0
api/enterprise/telemetry/__init__.py
Normal file
0
api/enterprise/telemetry/__init__.py
Normal file
77
api/enterprise/telemetry/draft_trace.py
Normal file
77
api/enterprise/telemetry/draft_trace.py
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections.abc import Mapping
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from core.ops.entities.trace_entity import TraceTaskName
|
||||||
|
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
|
||||||
|
from core.workflow.enums import WorkflowNodeExecutionMetadataKey
|
||||||
|
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||||
|
from models.workflow import WorkflowNodeExecutionModel
|
||||||
|
|
||||||
|
|
||||||
|
def enqueue_draft_node_execution_trace(
|
||||||
|
*,
|
||||||
|
execution: WorkflowNodeExecutionModel,
|
||||||
|
outputs: Mapping[str, Any] | None,
|
||||||
|
workflow_execution_id: str | None,
|
||||||
|
user_id: str,
|
||||||
|
) -> None:
|
||||||
|
if not is_enterprise_telemetry_enabled():
|
||||||
|
return
|
||||||
|
|
||||||
|
trace_manager = TraceQueueManager(app_id=execution.app_id, user_id=user_id)
|
||||||
|
node_data = _build_node_execution_data(
|
||||||
|
execution=execution,
|
||||||
|
outputs=outputs,
|
||||||
|
workflow_execution_id=workflow_execution_id,
|
||||||
|
)
|
||||||
|
trace_manager.add_trace_task(
|
||||||
|
TraceTask(
|
||||||
|
TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
|
||||||
|
node_execution_data=node_data,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_node_execution_data(
|
||||||
|
*,
|
||||||
|
execution: WorkflowNodeExecutionModel,
|
||||||
|
outputs: Mapping[str, Any] | None,
|
||||||
|
workflow_execution_id: str | None,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
metadata = execution.execution_metadata_dict
|
||||||
|
node_outputs = outputs if outputs is not None else execution.outputs_dict
|
||||||
|
execution_id = workflow_execution_id or execution.workflow_run_id or execution.id
|
||||||
|
|
||||||
|
return {
|
||||||
|
"workflow_id": execution.workflow_id,
|
||||||
|
"workflow_execution_id": execution_id,
|
||||||
|
"tenant_id": execution.tenant_id,
|
||||||
|
"app_id": execution.app_id,
|
||||||
|
"node_execution_id": execution.id,
|
||||||
|
"node_id": execution.node_id,
|
||||||
|
"node_type": execution.node_type,
|
||||||
|
"title": execution.title,
|
||||||
|
"status": execution.status,
|
||||||
|
"error": execution.error,
|
||||||
|
"elapsed_time": execution.elapsed_time,
|
||||||
|
"index": execution.index,
|
||||||
|
"predecessor_node_id": execution.predecessor_node_id,
|
||||||
|
"created_at": execution.created_at,
|
||||||
|
"finished_at": execution.finished_at,
|
||||||
|
"total_tokens": metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS, 0),
|
||||||
|
"total_price": metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_PRICE, 0.0),
|
||||||
|
"currency": metadata.get(WorkflowNodeExecutionMetadataKey.CURRENCY),
|
||||||
|
"tool_name": (metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO) or {}).get("tool_name")
|
||||||
|
if isinstance(metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO), dict)
|
||||||
|
else None,
|
||||||
|
"iteration_id": metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID),
|
||||||
|
"iteration_index": metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_INDEX),
|
||||||
|
"loop_id": metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_ID),
|
||||||
|
"loop_index": metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_INDEX),
|
||||||
|
"parallel_id": metadata.get(WorkflowNodeExecutionMetadataKey.PARALLEL_ID),
|
||||||
|
"node_inputs": execution.inputs_dict,
|
||||||
|
"node_outputs": node_outputs,
|
||||||
|
"process_data": execution.process_data_dict,
|
||||||
|
}
|
||||||
570
api/enterprise/telemetry/enterprise_trace.py
Normal file
570
api/enterprise/telemetry/enterprise_trace.py
Normal file
@ -0,0 +1,570 @@
|
|||||||
|
"""Enterprise trace handler — duck-typed, NOT a BaseTraceInstance subclass.
|
||||||
|
|
||||||
|
Invoked directly in the Celery task, not through OpsTraceManager dispatch.
|
||||||
|
Only requires a matching ``trace(trace_info)`` method signature.
|
||||||
|
|
||||||
|
Signal strategy:
|
||||||
|
- **Traces (spans)**: workflow run, node execution, draft node execution only.
|
||||||
|
- **Metrics + structured logs**: all other event types.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from core.ops.entities.trace_entity import (
|
||||||
|
BaseTraceInfo,
|
||||||
|
DatasetRetrievalTraceInfo,
|
||||||
|
DraftNodeExecutionTrace,
|
||||||
|
GenerateNameTraceInfo,
|
||||||
|
MessageTraceInfo,
|
||||||
|
ModerationTraceInfo,
|
||||||
|
SuggestedQuestionTraceInfo,
|
||||||
|
ToolTraceInfo,
|
||||||
|
WorkflowNodeTraceInfo,
|
||||||
|
WorkflowTraceInfo,
|
||||||
|
)
|
||||||
|
from enterprise.telemetry.entities import (
|
||||||
|
EnterpriseTelemetryCounter,
|
||||||
|
EnterpriseTelemetryHistogram,
|
||||||
|
EnterpriseTelemetrySpan,
|
||||||
|
)
|
||||||
|
from enterprise.telemetry.telemetry_log import emit_metric_only_event, emit_telemetry_log
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class EnterpriseOtelTrace:
|
||||||
|
"""Duck-typed enterprise trace handler.
|
||||||
|
|
||||||
|
``*_trace`` methods emit spans (workflow/node only) or structured logs
|
||||||
|
(all other events), plus metrics at 100 % accuracy.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||||
|
|
||||||
|
exporter = get_enterprise_exporter()
|
||||||
|
if exporter is None:
|
||||||
|
raise RuntimeError("EnterpriseOtelTrace instantiated but exporter is not initialized")
|
||||||
|
self._exporter = exporter
|
||||||
|
|
||||||
|
def trace(self, trace_info: BaseTraceInfo) -> None:
|
||||||
|
if isinstance(trace_info, WorkflowTraceInfo):
|
||||||
|
self._workflow_trace(trace_info)
|
||||||
|
elif isinstance(trace_info, MessageTraceInfo):
|
||||||
|
self._message_trace(trace_info)
|
||||||
|
elif isinstance(trace_info, ToolTraceInfo):
|
||||||
|
self._tool_trace(trace_info)
|
||||||
|
elif isinstance(trace_info, DraftNodeExecutionTrace):
|
||||||
|
self._draft_node_execution_trace(trace_info)
|
||||||
|
elif isinstance(trace_info, WorkflowNodeTraceInfo):
|
||||||
|
self._node_execution_trace(trace_info)
|
||||||
|
elif isinstance(trace_info, ModerationTraceInfo):
|
||||||
|
self._moderation_trace(trace_info)
|
||||||
|
elif isinstance(trace_info, SuggestedQuestionTraceInfo):
|
||||||
|
self._suggested_question_trace(trace_info)
|
||||||
|
elif isinstance(trace_info, DatasetRetrievalTraceInfo):
|
||||||
|
self._dataset_retrieval_trace(trace_info)
|
||||||
|
elif isinstance(trace_info, GenerateNameTraceInfo):
|
||||||
|
self._generate_name_trace(trace_info)
|
||||||
|
|
||||||
|
def _common_attrs(self, trace_info: BaseTraceInfo) -> dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"dify.trace_id": trace_info.trace_id,
|
||||||
|
"dify.tenant_id": trace_info.metadata.get("tenant_id"),
|
||||||
|
"dify.app_id": trace_info.metadata.get("app_id"),
|
||||||
|
"dify.app.name": trace_info.metadata.get("app_name"),
|
||||||
|
"dify.workspace.name": trace_info.metadata.get("workspace_name"),
|
||||||
|
"gen_ai.user.id": trace_info.metadata.get("user_id"),
|
||||||
|
"dify.message.id": trace_info.message_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
def _maybe_json(self, value: Any) -> str | None:
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
if isinstance(value, str):
|
||||||
|
return value
|
||||||
|
try:
|
||||||
|
return json.dumps(value, default=str)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return str(value)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# SPAN-emitting handlers (workflow, node execution, draft node)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _workflow_trace(self, info: WorkflowTraceInfo) -> None:
|
||||||
|
# -- Slim span attrs: identity + structure + status + timing only --
|
||||||
|
span_attrs: dict[str, Any] = {
|
||||||
|
"dify.trace_id": info.trace_id,
|
||||||
|
"dify.tenant_id": info.metadata.get("tenant_id"),
|
||||||
|
"dify.app_id": info.metadata.get("app_id"),
|
||||||
|
"dify.workflow.id": info.workflow_id,
|
||||||
|
"dify.workflow.run_id": info.workflow_run_id,
|
||||||
|
"dify.workflow.status": info.workflow_run_status,
|
||||||
|
"dify.workflow.error": info.error,
|
||||||
|
"dify.workflow.elapsed_time": info.workflow_run_elapsed_time,
|
||||||
|
"dify.invoke_from": info.metadata.get("triggered_from"),
|
||||||
|
"dify.conversation.id": info.conversation_id,
|
||||||
|
"dify.message.id": info.message_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
trace_correlation_override: str | None = None
|
||||||
|
parent_span_id_source: str | None = None
|
||||||
|
|
||||||
|
parent_ctx = info.metadata.get("parent_trace_context")
|
||||||
|
if parent_ctx and isinstance(parent_ctx, dict):
|
||||||
|
span_attrs["dify.parent.trace_id"] = parent_ctx.get("trace_id")
|
||||||
|
span_attrs["dify.parent.node.execution_id"] = parent_ctx.get("parent_node_execution_id")
|
||||||
|
span_attrs["dify.parent.workflow.run_id"] = parent_ctx.get("parent_workflow_run_id")
|
||||||
|
span_attrs["dify.parent.app.id"] = parent_ctx.get("parent_app_id")
|
||||||
|
|
||||||
|
trace_correlation_override = parent_ctx.get("parent_workflow_run_id")
|
||||||
|
parent_span_id_source = parent_ctx.get("parent_node_execution_id")
|
||||||
|
|
||||||
|
self._exporter.export_span(
|
||||||
|
EnterpriseTelemetrySpan.WORKFLOW_RUN,
|
||||||
|
span_attrs,
|
||||||
|
correlation_id=info.workflow_run_id,
|
||||||
|
span_id_source=info.workflow_run_id,
|
||||||
|
start_time=info.start_time,
|
||||||
|
end_time=info.end_time,
|
||||||
|
trace_correlation_override=trace_correlation_override,
|
||||||
|
parent_span_id_source=parent_span_id_source,
|
||||||
|
)
|
||||||
|
|
||||||
|
# -- Companion log: ALL attrs (span + detail) for full picture --
|
||||||
|
log_attrs: dict[str, Any] = {**span_attrs}
|
||||||
|
log_attrs.update(
|
||||||
|
{
|
||||||
|
"dify.app.name": info.metadata.get("app_name"),
|
||||||
|
"dify.workspace.name": info.metadata.get("workspace_name"),
|
||||||
|
"gen_ai.user.id": info.metadata.get("user_id"),
|
||||||
|
"gen_ai.usage.total_tokens": info.total_tokens,
|
||||||
|
"dify.workflow.version": info.workflow_run_version,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._exporter.include_content:
|
||||||
|
log_attrs["dify.workflow.inputs"] = self._maybe_json(info.workflow_run_inputs)
|
||||||
|
log_attrs["dify.workflow.outputs"] = self._maybe_json(info.workflow_run_outputs)
|
||||||
|
log_attrs["dify.workflow.query"] = info.query
|
||||||
|
else:
|
||||||
|
ref = f"ref:workflow_run_id={info.workflow_run_id}"
|
||||||
|
log_attrs["dify.workflow.inputs"] = ref
|
||||||
|
log_attrs["dify.workflow.outputs"] = ref
|
||||||
|
log_attrs["dify.workflow.query"] = ref
|
||||||
|
|
||||||
|
emit_telemetry_log(
|
||||||
|
event_name="dify.workflow.run",
|
||||||
|
attributes=log_attrs,
|
||||||
|
signal="span_detail",
|
||||||
|
trace_id_source=info.workflow_run_id,
|
||||||
|
tenant_id=info.metadata.get("tenant_id"),
|
||||||
|
user_id=info.metadata.get("user_id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# -- Metrics --
|
||||||
|
labels = {
|
||||||
|
"tenant_id": info.tenant_id,
|
||||||
|
"app_id": info.metadata.get("app_id", ""),
|
||||||
|
}
|
||||||
|
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels)
|
||||||
|
invoke_from = info.metadata.get("triggered_from", "")
|
||||||
|
self._exporter.increment_counter(
|
||||||
|
EnterpriseTelemetryCounter.REQUESTS,
|
||||||
|
1,
|
||||||
|
{**labels, "type": "workflow", "status": info.workflow_run_status, "invoke_from": invoke_from},
|
||||||
|
)
|
||||||
|
self._exporter.record_histogram(
|
||||||
|
EnterpriseTelemetryHistogram.WORKFLOW_DURATION,
|
||||||
|
float(info.workflow_run_elapsed_time),
|
||||||
|
{**labels, "status": info.workflow_run_status},
|
||||||
|
)
|
||||||
|
|
||||||
|
if info.error:
|
||||||
|
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "workflow"})
|
||||||
|
|
||||||
|
def _node_execution_trace(self, info: WorkflowNodeTraceInfo) -> None:
|
||||||
|
self._emit_node_execution_trace(info, EnterpriseTelemetrySpan.NODE_EXECUTION, "node")
|
||||||
|
|
||||||
|
def _draft_node_execution_trace(self, info: DraftNodeExecutionTrace) -> None:
|
||||||
|
self._emit_node_execution_trace(
|
||||||
|
info,
|
||||||
|
EnterpriseTelemetrySpan.DRAFT_NODE_EXECUTION,
|
||||||
|
"draft_node",
|
||||||
|
correlation_id_override=info.node_execution_id,
|
||||||
|
trace_correlation_override_param=info.workflow_run_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _emit_node_execution_trace(
|
||||||
|
self,
|
||||||
|
info: WorkflowNodeTraceInfo,
|
||||||
|
span_name: EnterpriseTelemetrySpan,
|
||||||
|
request_type: str,
|
||||||
|
correlation_id_override: str | None = None,
|
||||||
|
trace_correlation_override_param: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
# -- Slim span attrs: identity + structure + status + timing --
|
||||||
|
span_attrs: dict[str, Any] = {
|
||||||
|
"dify.trace_id": info.trace_id,
|
||||||
|
"dify.tenant_id": info.tenant_id,
|
||||||
|
"dify.app_id": info.metadata.get("app_id"),
|
||||||
|
"dify.workflow.id": info.workflow_id,
|
||||||
|
"dify.workflow.run_id": info.workflow_run_id,
|
||||||
|
"dify.message.id": info.message_id,
|
||||||
|
"dify.conversation.id": info.metadata.get("conversation_id"),
|
||||||
|
"dify.node.execution_id": info.node_execution_id,
|
||||||
|
"dify.node.id": info.node_id,
|
||||||
|
"dify.node.type": info.node_type,
|
||||||
|
"dify.node.title": info.title,
|
||||||
|
"dify.node.status": info.status,
|
||||||
|
"dify.node.error": info.error,
|
||||||
|
"dify.node.elapsed_time": info.elapsed_time,
|
||||||
|
"dify.node.index": info.index,
|
||||||
|
"dify.node.predecessor_node_id": info.predecessor_node_id,
|
||||||
|
"dify.node.iteration_id": info.iteration_id,
|
||||||
|
"dify.node.loop_id": info.loop_id,
|
||||||
|
"dify.node.parallel_id": info.parallel_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
trace_correlation_override = trace_correlation_override_param
|
||||||
|
parent_ctx = info.metadata.get("parent_trace_context")
|
||||||
|
if parent_ctx and isinstance(parent_ctx, dict):
|
||||||
|
trace_correlation_override = parent_ctx.get("parent_workflow_run_id") or trace_correlation_override
|
||||||
|
|
||||||
|
effective_correlation_id = correlation_id_override or info.workflow_run_id
|
||||||
|
self._exporter.export_span(
|
||||||
|
span_name,
|
||||||
|
span_attrs,
|
||||||
|
correlation_id=effective_correlation_id,
|
||||||
|
span_id_source=info.node_execution_id,
|
||||||
|
start_time=info.start_time,
|
||||||
|
end_time=info.end_time,
|
||||||
|
trace_correlation_override=trace_correlation_override,
|
||||||
|
)
|
||||||
|
|
||||||
|
# -- Companion log: ALL attrs (span + detail) --
|
||||||
|
log_attrs: dict[str, Any] = {**span_attrs}
|
||||||
|
log_attrs.update(
|
||||||
|
{
|
||||||
|
"dify.app.name": info.metadata.get("app_name"),
|
||||||
|
"dify.workspace.name": info.metadata.get("workspace_name"),
|
||||||
|
"dify.invoke_from": info.metadata.get("invoke_from"),
|
||||||
|
"gen_ai.user.id": info.metadata.get("user_id"),
|
||||||
|
"gen_ai.usage.total_tokens": info.total_tokens,
|
||||||
|
"dify.node.total_price": info.total_price,
|
||||||
|
"dify.node.currency": info.currency,
|
||||||
|
"gen_ai.provider.name": info.model_provider,
|
||||||
|
"gen_ai.request.model": info.model_name,
|
||||||
|
"gen_ai.usage.input_tokens": info.prompt_tokens,
|
||||||
|
"gen_ai.usage.output_tokens": info.completion_tokens,
|
||||||
|
"gen_ai.tool.name": info.tool_name,
|
||||||
|
"dify.node.iteration_index": info.iteration_index,
|
||||||
|
"dify.node.loop_index": info.loop_index,
|
||||||
|
"dify.plugin.name": info.metadata.get("plugin_name"),
|
||||||
|
"dify.credential.name": info.metadata.get("credential_name"),
|
||||||
|
"dify.dataset.ids": self._maybe_json(info.metadata.get("dataset_ids")),
|
||||||
|
"dify.dataset.names": self._maybe_json(info.metadata.get("dataset_names")),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._exporter.include_content:
|
||||||
|
log_attrs["dify.node.inputs"] = self._maybe_json(info.node_inputs)
|
||||||
|
log_attrs["dify.node.outputs"] = self._maybe_json(info.node_outputs)
|
||||||
|
log_attrs["dify.node.process_data"] = self._maybe_json(info.process_data)
|
||||||
|
else:
|
||||||
|
ref = f"ref:node_execution_id={info.node_execution_id}"
|
||||||
|
log_attrs["dify.node.inputs"] = ref
|
||||||
|
log_attrs["dify.node.outputs"] = ref
|
||||||
|
log_attrs["dify.node.process_data"] = ref
|
||||||
|
|
||||||
|
emit_telemetry_log(
|
||||||
|
event_name=span_name.value,
|
||||||
|
attributes=log_attrs,
|
||||||
|
signal="span_detail",
|
||||||
|
trace_id_source=info.workflow_run_id,
|
||||||
|
tenant_id=info.tenant_id,
|
||||||
|
user_id=info.metadata.get("user_id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# -- Metrics --
|
||||||
|
labels = {
|
||||||
|
"tenant_id": info.tenant_id,
|
||||||
|
"app_id": info.metadata.get("app_id", ""),
|
||||||
|
"node_type": info.node_type,
|
||||||
|
"model_provider": info.model_provider or "",
|
||||||
|
}
|
||||||
|
if info.total_tokens:
|
||||||
|
token_labels = {**labels, "model_name": info.model_name or ""}
|
||||||
|
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels)
|
||||||
|
self._exporter.increment_counter(
|
||||||
|
EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": request_type, "status": info.status}
|
||||||
|
)
|
||||||
|
duration_labels = dict(labels)
|
||||||
|
plugin_name = info.metadata.get("plugin_name")
|
||||||
|
if plugin_name and info.node_type in {"tool", "knowledge-retrieval"}:
|
||||||
|
duration_labels["plugin_name"] = plugin_name
|
||||||
|
self._exporter.record_histogram(EnterpriseTelemetryHistogram.NODE_DURATION, info.elapsed_time, duration_labels)
|
||||||
|
|
||||||
|
if info.error:
|
||||||
|
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": request_type})
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# METRIC-ONLY handlers (structured log + counters/histograms)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _message_trace(self, info: MessageTraceInfo) -> None:
|
||||||
|
attrs = self._common_attrs(info)
|
||||||
|
attrs.update(
|
||||||
|
{
|
||||||
|
"dify.invoke_from": info.metadata.get("from_source"),
|
||||||
|
"dify.conversation.id": info.metadata.get("conversation_id"),
|
||||||
|
"dify.conversation.mode": info.conversation_mode,
|
||||||
|
"gen_ai.provider.name": info.metadata.get("ls_provider"),
|
||||||
|
"gen_ai.request.model": info.metadata.get("ls_model_name"),
|
||||||
|
"gen_ai.usage.input_tokens": info.message_tokens,
|
||||||
|
"gen_ai.usage.output_tokens": info.answer_tokens,
|
||||||
|
"gen_ai.usage.total_tokens": info.total_tokens,
|
||||||
|
"dify.message.status": info.metadata.get("status"),
|
||||||
|
"dify.message.error": info.error,
|
||||||
|
"dify.message.from_source": info.metadata.get("from_source"),
|
||||||
|
"dify.message.from_end_user_id": info.metadata.get("from_end_user_id"),
|
||||||
|
"dify.message.from_account_id": info.metadata.get("from_account_id"),
|
||||||
|
"dify.streaming": info.is_streaming_request,
|
||||||
|
"dify.message.time_to_first_token": info.gen_ai_server_time_to_first_token,
|
||||||
|
"dify.message.streaming_duration": info.llm_streaming_time_to_generate,
|
||||||
|
"dify.workflow.run_id": info.metadata.get("workflow_run_id"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._exporter.include_content:
|
||||||
|
attrs["dify.message.inputs"] = self._maybe_json(info.inputs)
|
||||||
|
attrs["dify.message.outputs"] = self._maybe_json(info.outputs)
|
||||||
|
else:
|
||||||
|
ref = f"ref:message_id={info.message_id}"
|
||||||
|
attrs["dify.message.inputs"] = ref
|
||||||
|
attrs["dify.message.outputs"] = ref
|
||||||
|
|
||||||
|
emit_metric_only_event(
|
||||||
|
event_name="dify.message.run",
|
||||||
|
attributes=attrs,
|
||||||
|
trace_id_source=info.metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None,
|
||||||
|
tenant_id=info.metadata.get("tenant_id"),
|
||||||
|
user_id=info.metadata.get("user_id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
labels = {
|
||||||
|
"tenant_id": info.metadata.get("tenant_id", ""),
|
||||||
|
"app_id": info.metadata.get("app_id", ""),
|
||||||
|
"model_provider": info.metadata.get("ls_provider", ""),
|
||||||
|
"model_name": info.metadata.get("ls_model_name", ""),
|
||||||
|
}
|
||||||
|
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels)
|
||||||
|
invoke_from = info.metadata.get("from_source", "")
|
||||||
|
self._exporter.increment_counter(
|
||||||
|
EnterpriseTelemetryCounter.REQUESTS,
|
||||||
|
1,
|
||||||
|
{**labels, "type": "message", "status": info.metadata.get("status", ""), "invoke_from": invoke_from},
|
||||||
|
)
|
||||||
|
|
||||||
|
if info.start_time and info.end_time:
|
||||||
|
duration = (info.end_time - info.start_time).total_seconds()
|
||||||
|
self._exporter.record_histogram(EnterpriseTelemetryHistogram.MESSAGE_DURATION, duration, labels)
|
||||||
|
|
||||||
|
if info.gen_ai_server_time_to_first_token is not None:
|
||||||
|
self._exporter.record_histogram(
|
||||||
|
EnterpriseTelemetryHistogram.MESSAGE_TTFT, info.gen_ai_server_time_to_first_token, labels
|
||||||
|
)
|
||||||
|
|
||||||
|
if info.error:
|
||||||
|
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "message"})
|
||||||
|
|
||||||
|
def _tool_trace(self, info: ToolTraceInfo) -> None:
|
||||||
|
attrs = self._common_attrs(info)
|
||||||
|
attrs.update(
|
||||||
|
{
|
||||||
|
"gen_ai.tool.name": info.tool_name,
|
||||||
|
"dify.tool.time_cost": info.time_cost,
|
||||||
|
"dify.tool.error": info.error,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._exporter.include_content:
|
||||||
|
attrs["dify.tool.inputs"] = self._maybe_json(info.tool_inputs)
|
||||||
|
attrs["dify.tool.outputs"] = info.tool_outputs
|
||||||
|
attrs["dify.tool.parameters"] = self._maybe_json(info.tool_parameters)
|
||||||
|
attrs["dify.tool.config"] = self._maybe_json(info.tool_config)
|
||||||
|
else:
|
||||||
|
ref = f"ref:message_id={info.message_id}"
|
||||||
|
attrs["dify.tool.inputs"] = ref
|
||||||
|
attrs["dify.tool.outputs"] = ref
|
||||||
|
attrs["dify.tool.parameters"] = ref
|
||||||
|
attrs["dify.tool.config"] = ref
|
||||||
|
|
||||||
|
emit_metric_only_event(
|
||||||
|
event_name="dify.tool.execution",
|
||||||
|
attributes=attrs,
|
||||||
|
tenant_id=info.metadata.get("tenant_id"),
|
||||||
|
user_id=info.metadata.get("user_id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
labels = {
|
||||||
|
"tenant_id": info.metadata.get("tenant_id", ""),
|
||||||
|
"app_id": info.metadata.get("app_id", ""),
|
||||||
|
"tool_name": info.tool_name,
|
||||||
|
}
|
||||||
|
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "tool"})
|
||||||
|
self._exporter.record_histogram(EnterpriseTelemetryHistogram.TOOL_DURATION, float(info.time_cost), labels)
|
||||||
|
|
||||||
|
if info.error:
|
||||||
|
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "tool"})
|
||||||
|
|
||||||
|
def _moderation_trace(self, info: ModerationTraceInfo) -> None:
|
||||||
|
attrs = self._common_attrs(info)
|
||||||
|
attrs.update(
|
||||||
|
{
|
||||||
|
"dify.moderation.flagged": info.flagged,
|
||||||
|
"dify.moderation.action": info.action,
|
||||||
|
"dify.moderation.preset_response": info.preset_response,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._exporter.include_content:
|
||||||
|
attrs["dify.moderation.query"] = info.query
|
||||||
|
else:
|
||||||
|
attrs["dify.moderation.query"] = f"ref:message_id={info.message_id}"
|
||||||
|
|
||||||
|
emit_metric_only_event(
|
||||||
|
event_name="dify.moderation.check",
|
||||||
|
attributes=attrs,
|
||||||
|
tenant_id=info.metadata.get("tenant_id"),
|
||||||
|
user_id=info.metadata.get("user_id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")}
|
||||||
|
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "moderation"})
|
||||||
|
|
||||||
|
def _suggested_question_trace(self, info: SuggestedQuestionTraceInfo) -> None:
|
||||||
|
attrs = self._common_attrs(info)
|
||||||
|
attrs.update(
|
||||||
|
{
|
||||||
|
"gen_ai.usage.total_tokens": info.total_tokens,
|
||||||
|
"dify.suggested_question.status": info.status,
|
||||||
|
"dify.suggested_question.error": info.error,
|
||||||
|
"gen_ai.provider.name": info.model_provider,
|
||||||
|
"gen_ai.request.model": info.model_id,
|
||||||
|
"dify.suggested_question.count": len(info.suggested_question),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._exporter.include_content:
|
||||||
|
attrs["dify.suggested_question.questions"] = self._maybe_json(info.suggested_question)
|
||||||
|
else:
|
||||||
|
attrs["dify.suggested_question.questions"] = f"ref:message_id={info.message_id}"
|
||||||
|
|
||||||
|
emit_metric_only_event(
|
||||||
|
event_name="dify.suggested_question.generation",
|
||||||
|
attributes=attrs,
|
||||||
|
tenant_id=info.metadata.get("tenant_id"),
|
||||||
|
user_id=info.metadata.get("user_id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")}
|
||||||
|
self._exporter.increment_counter(
|
||||||
|
EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "suggested_question"}
|
||||||
|
)
|
||||||
|
|
||||||
|
def _dataset_retrieval_trace(self, info: DatasetRetrievalTraceInfo) -> None:
|
||||||
|
attrs = self._common_attrs(info)
|
||||||
|
attrs["dify.dataset.error"] = info.error
|
||||||
|
|
||||||
|
docs = info.documents or []
|
||||||
|
dataset_ids: list[str] = []
|
||||||
|
dataset_names: list[str] = []
|
||||||
|
structured_docs: list[dict] = []
|
||||||
|
for doc in docs:
|
||||||
|
meta = doc.get("metadata", {}) if isinstance(doc, dict) else {}
|
||||||
|
did = meta.get("dataset_id")
|
||||||
|
dname = meta.get("dataset_name")
|
||||||
|
if did and did not in dataset_ids:
|
||||||
|
dataset_ids.append(did)
|
||||||
|
if dname and dname not in dataset_names:
|
||||||
|
dataset_names.append(dname)
|
||||||
|
structured_docs.append(
|
||||||
|
{
|
||||||
|
"dataset_id": did,
|
||||||
|
"document_id": meta.get("document_id"),
|
||||||
|
"segment_id": meta.get("segment_id"),
|
||||||
|
"score": meta.get("score"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
attrs["dify.dataset.ids"] = self._maybe_json(dataset_ids)
|
||||||
|
attrs["dify.dataset.names"] = self._maybe_json(dataset_names)
|
||||||
|
attrs["dify.retrieval.document_count"] = len(docs)
|
||||||
|
|
||||||
|
embedding_models = info.metadata.get("embedding_models") or {}
|
||||||
|
if isinstance(embedding_models, dict):
|
||||||
|
providers: list[str] = []
|
||||||
|
models: list[str] = []
|
||||||
|
for ds_info in embedding_models.values():
|
||||||
|
if isinstance(ds_info, dict):
|
||||||
|
p = ds_info.get("embedding_model_provider", "")
|
||||||
|
m = ds_info.get("embedding_model", "")
|
||||||
|
if p and p not in providers:
|
||||||
|
providers.append(p)
|
||||||
|
if m and m not in models:
|
||||||
|
models.append(m)
|
||||||
|
attrs["dify.dataset.embedding_providers"] = self._maybe_json(providers)
|
||||||
|
attrs["dify.dataset.embedding_models"] = self._maybe_json(models)
|
||||||
|
|
||||||
|
if self._exporter.include_content:
|
||||||
|
attrs["dify.retrieval.query"] = self._maybe_json(info.inputs)
|
||||||
|
attrs["dify.dataset.documents"] = self._maybe_json(structured_docs)
|
||||||
|
else:
|
||||||
|
ref = f"ref:message_id={info.message_id}"
|
||||||
|
attrs["dify.retrieval.query"] = ref
|
||||||
|
attrs["dify.dataset.documents"] = ref
|
||||||
|
|
||||||
|
emit_metric_only_event(
|
||||||
|
event_name="dify.dataset.retrieval",
|
||||||
|
attributes=attrs,
|
||||||
|
tenant_id=info.metadata.get("tenant_id"),
|
||||||
|
user_id=info.metadata.get("user_id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")}
|
||||||
|
self._exporter.increment_counter(
|
||||||
|
EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "dataset_retrieval"}
|
||||||
|
)
|
||||||
|
|
||||||
|
for did in dataset_ids:
|
||||||
|
self._exporter.increment_counter(
|
||||||
|
EnterpriseTelemetryCounter.DATASET_RETRIEVALS, 1, {**labels, "dataset_id": did}
|
||||||
|
)
|
||||||
|
|
||||||
|
def _generate_name_trace(self, info: GenerateNameTraceInfo) -> None:
|
||||||
|
attrs = self._common_attrs(info)
|
||||||
|
attrs["dify.conversation.id"] = info.conversation_id
|
||||||
|
|
||||||
|
if self._exporter.include_content:
|
||||||
|
attrs["dify.generate_name.inputs"] = self._maybe_json(info.inputs)
|
||||||
|
attrs["dify.generate_name.outputs"] = self._maybe_json(info.outputs)
|
||||||
|
else:
|
||||||
|
ref = f"ref:conversation_id={info.conversation_id}"
|
||||||
|
attrs["dify.generate_name.inputs"] = ref
|
||||||
|
attrs["dify.generate_name.outputs"] = ref
|
||||||
|
|
||||||
|
emit_metric_only_event(
|
||||||
|
event_name="dify.generate_name.execution",
|
||||||
|
attributes=attrs,
|
||||||
|
tenant_id=info.tenant_id,
|
||||||
|
user_id=info.metadata.get("user_id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
labels = {"tenant_id": info.tenant_id, "app_id": info.metadata.get("app_id", "")}
|
||||||
|
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "generate_name"})
|
||||||
30
api/enterprise/telemetry/entities/__init__.py
Normal file
30
api/enterprise/telemetry/entities/__init__.py
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
from enum import StrEnum
|
||||||
|
|
||||||
|
|
||||||
|
class EnterpriseTelemetrySpan(StrEnum):
|
||||||
|
WORKFLOW_RUN = "dify.workflow.run"
|
||||||
|
NODE_EXECUTION = "dify.node.execution"
|
||||||
|
DRAFT_NODE_EXECUTION = "dify.node.execution.draft"
|
||||||
|
|
||||||
|
|
||||||
|
class EnterpriseTelemetryCounter(StrEnum):
|
||||||
|
TOKENS = "tokens"
|
||||||
|
REQUESTS = "requests"
|
||||||
|
ERRORS = "errors"
|
||||||
|
FEEDBACK = "feedback"
|
||||||
|
DATASET_RETRIEVALS = "dataset_retrievals"
|
||||||
|
|
||||||
|
|
||||||
|
class EnterpriseTelemetryHistogram(StrEnum):
|
||||||
|
WORKFLOW_DURATION = "workflow_duration"
|
||||||
|
NODE_DURATION = "node_duration"
|
||||||
|
MESSAGE_DURATION = "message_duration"
|
||||||
|
MESSAGE_TTFT = "message_ttft"
|
||||||
|
TOOL_DURATION = "tool_duration"
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"EnterpriseTelemetryCounter",
|
||||||
|
"EnterpriseTelemetryHistogram",
|
||||||
|
"EnterpriseTelemetrySpan",
|
||||||
|
]
|
||||||
146
api/enterprise/telemetry/event_handlers.py
Normal file
146
api/enterprise/telemetry/event_handlers.py
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
"""Blinker signal handlers for enterprise telemetry.
|
||||||
|
|
||||||
|
Registered at import time via ``@signal.connect`` decorators.
|
||||||
|
Import must happen during ``ext_enterprise_telemetry.init_app()`` to ensure handlers fire.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from enterprise.telemetry.entities import EnterpriseTelemetryCounter
|
||||||
|
from enterprise.telemetry.telemetry_log import emit_metric_only_event
|
||||||
|
from events.app_event import app_was_created, app_was_deleted, app_was_updated
|
||||||
|
from events.feedback_event import feedback_was_created
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"_handle_app_created",
|
||||||
|
"_handle_app_deleted",
|
||||||
|
"_handle_app_updated",
|
||||||
|
"_handle_feedback_created",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@app_was_created.connect
|
||||||
|
def _handle_app_created(sender: object, **kwargs: object) -> None:
|
||||||
|
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||||
|
|
||||||
|
exporter = get_enterprise_exporter()
|
||||||
|
if not exporter:
|
||||||
|
return
|
||||||
|
|
||||||
|
attrs = {
|
||||||
|
"dify.app.id": getattr(sender, "id", None),
|
||||||
|
"dify.tenant_id": getattr(sender, "tenant_id", None),
|
||||||
|
"dify.app.mode": getattr(sender, "mode", None),
|
||||||
|
}
|
||||||
|
|
||||||
|
emit_metric_only_event(
|
||||||
|
event_name="dify.app.created",
|
||||||
|
attributes=attrs,
|
||||||
|
tenant_id=str(getattr(sender, "tenant_id", "") or ""),
|
||||||
|
)
|
||||||
|
exporter.increment_counter(
|
||||||
|
EnterpriseTelemetryCounter.REQUESTS,
|
||||||
|
1,
|
||||||
|
{
|
||||||
|
"type": "app.created",
|
||||||
|
"tenant_id": getattr(sender, "tenant_id", ""),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app_was_deleted.connect
|
||||||
|
def _handle_app_deleted(sender: object, **kwargs: object) -> None:
|
||||||
|
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||||
|
|
||||||
|
exporter = get_enterprise_exporter()
|
||||||
|
if not exporter:
|
||||||
|
return
|
||||||
|
|
||||||
|
attrs = {
|
||||||
|
"dify.app.id": getattr(sender, "id", None),
|
||||||
|
"dify.tenant_id": getattr(sender, "tenant_id", None),
|
||||||
|
}
|
||||||
|
|
||||||
|
emit_metric_only_event(
|
||||||
|
event_name="dify.app.deleted",
|
||||||
|
attributes=attrs,
|
||||||
|
tenant_id=str(getattr(sender, "tenant_id", "") or ""),
|
||||||
|
)
|
||||||
|
exporter.increment_counter(
|
||||||
|
EnterpriseTelemetryCounter.REQUESTS,
|
||||||
|
1,
|
||||||
|
{
|
||||||
|
"type": "app.deleted",
|
||||||
|
"tenant_id": getattr(sender, "tenant_id", ""),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app_was_updated.connect
|
||||||
|
def _handle_app_updated(sender: object, **kwargs: object) -> None:
|
||||||
|
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||||
|
|
||||||
|
exporter = get_enterprise_exporter()
|
||||||
|
if not exporter:
|
||||||
|
return
|
||||||
|
|
||||||
|
attrs = {
|
||||||
|
"dify.app.id": getattr(sender, "id", None),
|
||||||
|
"dify.tenant_id": getattr(sender, "tenant_id", None),
|
||||||
|
}
|
||||||
|
|
||||||
|
emit_metric_only_event(
|
||||||
|
event_name="dify.app.updated",
|
||||||
|
attributes=attrs,
|
||||||
|
tenant_id=str(getattr(sender, "tenant_id", "") or ""),
|
||||||
|
)
|
||||||
|
exporter.increment_counter(
|
||||||
|
EnterpriseTelemetryCounter.REQUESTS,
|
||||||
|
1,
|
||||||
|
{
|
||||||
|
"type": "app.updated",
|
||||||
|
"tenant_id": getattr(sender, "tenant_id", ""),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@feedback_was_created.connect
|
||||||
|
def _handle_feedback_created(sender: object, **kwargs: object) -> None:
|
||||||
|
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||||
|
|
||||||
|
exporter = get_enterprise_exporter()
|
||||||
|
if not exporter:
|
||||||
|
return
|
||||||
|
|
||||||
|
include_content = exporter.include_content
|
||||||
|
attrs: dict = {
|
||||||
|
"dify.message.id": getattr(sender, "message_id", None),
|
||||||
|
"dify.tenant_id": kwargs.get("tenant_id"),
|
||||||
|
"dify.app_id": getattr(sender, "app_id", None),
|
||||||
|
"dify.conversation.id": getattr(sender, "conversation_id", None),
|
||||||
|
"gen_ai.user.id": getattr(sender, "from_end_user_id", None) or getattr(sender, "from_account_id", None),
|
||||||
|
"dify.feedback.rating": getattr(sender, "rating", None),
|
||||||
|
"dify.feedback.from_source": getattr(sender, "from_source", None),
|
||||||
|
}
|
||||||
|
if include_content:
|
||||||
|
attrs["dify.feedback.content"] = getattr(sender, "content", None)
|
||||||
|
|
||||||
|
emit_metric_only_event(
|
||||||
|
event_name="dify.feedback.created",
|
||||||
|
attributes=attrs,
|
||||||
|
tenant_id=str(kwargs.get("tenant_id", "") or ""),
|
||||||
|
user_id=str(getattr(sender, "from_end_user_id", None) or getattr(sender, "from_account_id", None) or ""),
|
||||||
|
)
|
||||||
|
exporter.increment_counter(
|
||||||
|
EnterpriseTelemetryCounter.FEEDBACK,
|
||||||
|
1,
|
||||||
|
{
|
||||||
|
"tenant_id": str(kwargs.get("tenant_id", "")),
|
||||||
|
"app_id": str(getattr(sender, "app_id", "")),
|
||||||
|
"rating": str(getattr(sender, "rating", "")),
|
||||||
|
},
|
||||||
|
)
|
||||||
200
api/enterprise/telemetry/exporter.py
Normal file
200
api/enterprise/telemetry/exporter.py
Normal file
@ -0,0 +1,200 @@
|
|||||||
|
"""Enterprise OTEL exporter — shared by EnterpriseOtelTrace, event handlers, and direct instrumentation.
|
||||||
|
|
||||||
|
Uses its own TracerProvider (configurable sampling, separate from ext_otel.py infrastructure)
|
||||||
|
and the global MeterProvider (shared with ext_otel.py — both target the same collector).
|
||||||
|
|
||||||
|
Initialized once during Flask extension init (single-threaded via ext_enterprise_telemetry.py).
|
||||||
|
Accessed via ``ext_enterprise_telemetry.get_enterprise_exporter()`` from any thread/process.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
import uuid
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import cast
|
||||||
|
|
||||||
|
from opentelemetry import metrics, trace
|
||||||
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||||
|
from opentelemetry.sdk.resources import Resource
|
||||||
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
|
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
|
||||||
|
from opentelemetry.semconv.resource import ResourceAttributes
|
||||||
|
from opentelemetry.trace import SpanContext, TraceFlags
|
||||||
|
|
||||||
|
from configs import dify_config
|
||||||
|
from enterprise.telemetry.entities import EnterpriseTelemetryCounter, EnterpriseTelemetryHistogram
|
||||||
|
from enterprise.telemetry.id_generator import (
|
||||||
|
CorrelationIdGenerator,
|
||||||
|
compute_deterministic_span_id,
|
||||||
|
set_correlation_id,
|
||||||
|
set_span_id_source,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def is_enterprise_telemetry_enabled() -> bool:
|
||||||
|
return bool(dify_config.ENTERPRISE_ENABLED and dify_config.ENTERPRISE_TELEMETRY_ENABLED)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_otlp_headers(raw: str) -> dict[str, str]:
|
||||||
|
"""Parse ``key=value,key2=value2`` into a dict."""
|
||||||
|
if not raw:
|
||||||
|
return {}
|
||||||
|
headers: dict[str, str] = {}
|
||||||
|
for pair in raw.split(","):
|
||||||
|
if "=" not in pair:
|
||||||
|
continue
|
||||||
|
k, v = pair.split("=", 1)
|
||||||
|
headers[k.strip()] = v.strip()
|
||||||
|
return headers
|
||||||
|
|
||||||
|
|
||||||
|
def _datetime_to_ns(dt: datetime) -> int:
|
||||||
|
"""Convert a datetime to nanoseconds since epoch (OTEL convention)."""
|
||||||
|
return int(dt.timestamp() * 1_000_000_000)
|
||||||
|
|
||||||
|
|
||||||
|
class EnterpriseExporter:
|
||||||
|
"""Shared OTEL exporter for all enterprise telemetry.
|
||||||
|
|
||||||
|
``export_span`` creates spans with optional real timestamps, deterministic
|
||||||
|
span/trace IDs, and cross-workflow parent linking.
|
||||||
|
``increment_counter`` / ``record_histogram`` emit OTEL metrics at 100% accuracy.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, config: object) -> None:
|
||||||
|
endpoint: str = getattr(config, "ENTERPRISE_OTLP_ENDPOINT", "")
|
||||||
|
headers_raw: str = getattr(config, "ENTERPRISE_OTLP_HEADERS", "")
|
||||||
|
service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify")
|
||||||
|
sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0)
|
||||||
|
self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True)
|
||||||
|
|
||||||
|
resource = Resource(
|
||||||
|
attributes={
|
||||||
|
ResourceAttributes.SERVICE_NAME: service_name,
|
||||||
|
ResourceAttributes.HOST_NAME: socket.gethostname(),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
sampler = ParentBasedTraceIdRatio(sampling_rate)
|
||||||
|
id_generator = CorrelationIdGenerator()
|
||||||
|
self._tracer_provider = TracerProvider(resource=resource, sampler=sampler, id_generator=id_generator)
|
||||||
|
|
||||||
|
headers = _parse_otlp_headers(headers_raw)
|
||||||
|
trace_endpoint = f"{endpoint}/v1/traces" if endpoint else ""
|
||||||
|
self._tracer_provider.add_span_processor(
|
||||||
|
BatchSpanProcessor(OTLPSpanExporter(endpoint=trace_endpoint or None, headers=headers))
|
||||||
|
)
|
||||||
|
self._tracer = self._tracer_provider.get_tracer("dify.enterprise")
|
||||||
|
|
||||||
|
meter = metrics.get_meter("dify.enterprise")
|
||||||
|
self._counters = {
|
||||||
|
EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"),
|
||||||
|
EnterpriseTelemetryCounter.REQUESTS: meter.create_counter("dify.requests.total", unit="{request}"),
|
||||||
|
EnterpriseTelemetryCounter.ERRORS: meter.create_counter("dify.errors.total", unit="{error}"),
|
||||||
|
EnterpriseTelemetryCounter.FEEDBACK: meter.create_counter("dify.feedback.total", unit="{feedback}"),
|
||||||
|
EnterpriseTelemetryCounter.DATASET_RETRIEVALS: meter.create_counter(
|
||||||
|
"dify.dataset.retrievals.total", unit="{retrieval}"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
self._histograms = {
|
||||||
|
EnterpriseTelemetryHistogram.WORKFLOW_DURATION: meter.create_histogram("dify.workflow.duration", unit="s"),
|
||||||
|
EnterpriseTelemetryHistogram.NODE_DURATION: meter.create_histogram("dify.node.duration", unit="s"),
|
||||||
|
EnterpriseTelemetryHistogram.MESSAGE_DURATION: meter.create_histogram("dify.message.duration", unit="s"),
|
||||||
|
EnterpriseTelemetryHistogram.MESSAGE_TTFT: meter.create_histogram(
|
||||||
|
"dify.message.time_to_first_token", unit="s"
|
||||||
|
),
|
||||||
|
EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"),
|
||||||
|
}
|
||||||
|
|
||||||
|
def export_span(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
attributes: dict,
|
||||||
|
correlation_id: str | None = None,
|
||||||
|
span_id_source: str | None = None,
|
||||||
|
start_time: datetime | None = None,
|
||||||
|
end_time: datetime | None = None,
|
||||||
|
trace_correlation_override: str | None = None,
|
||||||
|
parent_span_id_source: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Export an OTEL span with optional deterministic IDs and real timestamps.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: Span operation name.
|
||||||
|
attributes: Span attributes dict.
|
||||||
|
correlation_id: Source for trace_id derivation (groups spans in one trace).
|
||||||
|
span_id_source: Source for deterministic span_id (e.g. workflow_run_id or node_execution_id).
|
||||||
|
start_time: Real span start time. When None, uses current time.
|
||||||
|
end_time: Real span end time. When None, span ends immediately.
|
||||||
|
trace_correlation_override: Override trace_id source (for cross-workflow linking).
|
||||||
|
When set, trace_id is derived from this instead of ``correlation_id``.
|
||||||
|
parent_span_id_source: Override parent span_id source (for cross-workflow linking).
|
||||||
|
When set, parent span_id is derived from this value. When None and
|
||||||
|
``correlation_id`` is set, parent is the workflow root span.
|
||||||
|
"""
|
||||||
|
effective_trace_correlation = trace_correlation_override or correlation_id
|
||||||
|
set_correlation_id(effective_trace_correlation)
|
||||||
|
set_span_id_source(span_id_source)
|
||||||
|
|
||||||
|
try:
|
||||||
|
parent_context = None
|
||||||
|
# A span is the "root" of its correlation group when span_id_source == correlation_id
|
||||||
|
# (i.e. a workflow root span). All other spans are children.
|
||||||
|
if parent_span_id_source:
|
||||||
|
# Cross-workflow linking: parent is an explicit span (e.g. tool node in outer workflow)
|
||||||
|
parent_span_id = compute_deterministic_span_id(parent_span_id_source)
|
||||||
|
parent_trace_id = (
|
||||||
|
cast(int, uuid.UUID(effective_trace_correlation).int) if effective_trace_correlation else 0
|
||||||
|
)
|
||||||
|
if parent_trace_id:
|
||||||
|
parent_span_context = SpanContext(
|
||||||
|
trace_id=parent_trace_id,
|
||||||
|
span_id=parent_span_id,
|
||||||
|
is_remote=True,
|
||||||
|
trace_flags=TraceFlags(TraceFlags.SAMPLED),
|
||||||
|
)
|
||||||
|
parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context))
|
||||||
|
elif correlation_id and correlation_id != span_id_source:
|
||||||
|
# Child span: parent is the correlation-group root (workflow root span)
|
||||||
|
parent_span_id = compute_deterministic_span_id(correlation_id)
|
||||||
|
parent_trace_id = cast(int, uuid.UUID(effective_trace_correlation or correlation_id).int)
|
||||||
|
parent_span_context = SpanContext(
|
||||||
|
trace_id=parent_trace_id,
|
||||||
|
span_id=parent_span_id,
|
||||||
|
is_remote=True,
|
||||||
|
trace_flags=TraceFlags(TraceFlags.SAMPLED),
|
||||||
|
)
|
||||||
|
parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context))
|
||||||
|
|
||||||
|
span_kwargs: dict = {"context": parent_context}
|
||||||
|
if start_time is not None:
|
||||||
|
span_kwargs["start_time"] = _datetime_to_ns(start_time)
|
||||||
|
if end_time is not None:
|
||||||
|
span_kwargs["end_on_exit"] = False
|
||||||
|
|
||||||
|
with self._tracer.start_as_current_span(name, **span_kwargs) as span:
|
||||||
|
for key, value in attributes.items():
|
||||||
|
if value is not None:
|
||||||
|
span.set_attribute(key, value)
|
||||||
|
if end_time is not None:
|
||||||
|
span.end(end_time=_datetime_to_ns(end_time))
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to export span %s", name)
|
||||||
|
finally:
|
||||||
|
set_correlation_id(None)
|
||||||
|
set_span_id_source(None)
|
||||||
|
|
||||||
|
def increment_counter(self, name: EnterpriseTelemetryCounter, value: int, labels: dict) -> None:
|
||||||
|
counter = self._counters.get(name)
|
||||||
|
if counter:
|
||||||
|
counter.add(value, labels)
|
||||||
|
|
||||||
|
def record_histogram(self, name: EnterpriseTelemetryHistogram, value: float, labels: dict) -> None:
|
||||||
|
histogram = self._histograms.get(name)
|
||||||
|
if histogram:
|
||||||
|
histogram.record(value, labels)
|
||||||
|
|
||||||
|
def shutdown(self) -> None:
|
||||||
|
self._tracer_provider.shutdown()
|
||||||
76
api/enterprise/telemetry/id_generator.py
Normal file
76
api/enterprise/telemetry/id_generator.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
"""Custom OTEL ID Generator for correlation-based trace/span ID derivation.
|
||||||
|
|
||||||
|
Uses contextvars for thread-safe correlation_id -> trace_id mapping.
|
||||||
|
When a span_id_source is set, the span_id is derived deterministically
|
||||||
|
from that value, enabling any span to reference another as parent
|
||||||
|
without depending on span creation order.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import random
|
||||||
|
import uuid
|
||||||
|
from contextvars import ContextVar
|
||||||
|
from typing import cast
|
||||||
|
|
||||||
|
from opentelemetry.sdk.trace.id_generator import IdGenerator
|
||||||
|
|
||||||
|
_correlation_id_context: ContextVar[str | None] = ContextVar("correlation_id", default=None)
|
||||||
|
_span_id_source_context: ContextVar[str | None] = ContextVar("span_id_source", default=None)
|
||||||
|
|
||||||
|
|
||||||
|
def set_correlation_id(correlation_id: str | None) -> None:
|
||||||
|
_correlation_id_context.set(correlation_id)
|
||||||
|
|
||||||
|
|
||||||
|
def get_correlation_id() -> str | None:
|
||||||
|
return _correlation_id_context.get()
|
||||||
|
|
||||||
|
|
||||||
|
def set_span_id_source(source_id: str | None) -> None:
|
||||||
|
"""Set the source for deterministic span_id generation.
|
||||||
|
|
||||||
|
When set, ``generate_span_id()`` derives the span_id from this value
|
||||||
|
(lower 64 bits of the UUID). Pass the ``workflow_run_id`` for workflow
|
||||||
|
root spans or ``node_execution_id`` for node spans.
|
||||||
|
"""
|
||||||
|
_span_id_source_context.set(source_id)
|
||||||
|
|
||||||
|
|
||||||
|
def compute_deterministic_span_id(source_id: str) -> int:
|
||||||
|
"""Derive a deterministic span_id from any UUID string.
|
||||||
|
|
||||||
|
Uses the lower 64 bits of the UUID, guaranteeing non-zero output
|
||||||
|
(OTEL requires span_id != 0).
|
||||||
|
"""
|
||||||
|
span_id = cast(int, uuid.UUID(source_id).int) & ((1 << 64) - 1)
|
||||||
|
return span_id if span_id != 0 else 1
|
||||||
|
|
||||||
|
|
||||||
|
class CorrelationIdGenerator(IdGenerator):
|
||||||
|
"""ID generator that derives trace_id and optionally span_id from context.
|
||||||
|
|
||||||
|
- trace_id: always derived from correlation_id (groups all spans in one trace)
|
||||||
|
- span_id: derived from span_id_source when set (enables deterministic
|
||||||
|
parent-child linking), otherwise random
|
||||||
|
"""
|
||||||
|
|
||||||
|
def generate_trace_id(self) -> int:
|
||||||
|
correlation_id = _correlation_id_context.get()
|
||||||
|
if correlation_id:
|
||||||
|
try:
|
||||||
|
return cast(int, uuid.UUID(correlation_id).int)
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
pass
|
||||||
|
return random.getrandbits(128)
|
||||||
|
|
||||||
|
def generate_span_id(self) -> int:
|
||||||
|
source = _span_id_source_context.get()
|
||||||
|
if source:
|
||||||
|
try:
|
||||||
|
return compute_deterministic_span_id(source)
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
span_id = random.getrandbits(64)
|
||||||
|
while span_id == 0:
|
||||||
|
span_id = random.getrandbits(64)
|
||||||
|
return span_id
|
||||||
91
api/enterprise/telemetry/telemetry_log.py
Normal file
91
api/enterprise/telemetry/telemetry_log.py
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
"""Structured-log emitter for enterprise telemetry events.
|
||||||
|
|
||||||
|
Emits structured JSON log lines correlated with OTEL traces via trace_id.
|
||||||
|
Picked up by ``StructuredJSONFormatter`` → stdout/Loki/Elastic.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import uuid
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
logger = logging.getLogger("dify.telemetry")
|
||||||
|
|
||||||
|
|
||||||
|
def compute_trace_id_hex(uuid_str: str | None) -> str:
|
||||||
|
"""Convert a business UUID string to a 32-hex OTEL-compatible trace_id.
|
||||||
|
|
||||||
|
Returns empty string when *uuid_str* is ``None`` or invalid.
|
||||||
|
"""
|
||||||
|
if not uuid_str:
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
return f"{uuid.UUID(uuid_str).int:032x}"
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def emit_telemetry_log(
|
||||||
|
*,
|
||||||
|
event_name: str,
|
||||||
|
attributes: dict[str, Any],
|
||||||
|
signal: str = "metric_only",
|
||||||
|
trace_id_source: str | None = None,
|
||||||
|
tenant_id: str | None = None,
|
||||||
|
user_id: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Emit a structured log line for a telemetry event.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
event_name:
|
||||||
|
Canonical event name, e.g. ``"dify.workflow.run"``.
|
||||||
|
attributes:
|
||||||
|
All event-specific attributes (already built by the caller).
|
||||||
|
signal:
|
||||||
|
``"metric_only"`` for events with no span, ``"span_detail"``
|
||||||
|
for detail logs accompanying a slim span.
|
||||||
|
trace_id_source:
|
||||||
|
A UUID string (e.g. ``workflow_run_id``) used to derive a 32-hex
|
||||||
|
trace_id for cross-signal correlation.
|
||||||
|
tenant_id:
|
||||||
|
Tenant identifier (for the ``IdentityContextFilter``).
|
||||||
|
user_id:
|
||||||
|
User identifier (for the ``IdentityContextFilter``).
|
||||||
|
"""
|
||||||
|
attrs = {
|
||||||
|
"dify.event.name": event_name,
|
||||||
|
"dify.event.signal": signal,
|
||||||
|
**attributes,
|
||||||
|
}
|
||||||
|
|
||||||
|
extra: dict[str, Any] = {"attributes": attrs}
|
||||||
|
|
||||||
|
trace_id_hex = compute_trace_id_hex(trace_id_source)
|
||||||
|
if trace_id_hex:
|
||||||
|
extra["trace_id"] = trace_id_hex
|
||||||
|
if tenant_id:
|
||||||
|
extra["tenant_id"] = tenant_id
|
||||||
|
if user_id:
|
||||||
|
extra["user_id"] = user_id
|
||||||
|
|
||||||
|
logger.info("telemetry.%s", signal, extra=extra)
|
||||||
|
|
||||||
|
|
||||||
|
def emit_metric_only_event(
|
||||||
|
*,
|
||||||
|
event_name: str,
|
||||||
|
attributes: dict[str, Any],
|
||||||
|
trace_id_source: str | None = None,
|
||||||
|
tenant_id: str | None = None,
|
||||||
|
user_id: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
emit_telemetry_log(
|
||||||
|
event_name=event_name,
|
||||||
|
attributes=attributes,
|
||||||
|
signal="metric_only",
|
||||||
|
trace_id_source=trace_id_source,
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
user_id=user_id,
|
||||||
|
)
|
||||||
@ -3,6 +3,12 @@ from blinker import signal
|
|||||||
# sender: app
|
# sender: app
|
||||||
app_was_created = signal("app-was-created")
|
app_was_created = signal("app-was-created")
|
||||||
|
|
||||||
|
# sender: app
|
||||||
|
app_was_deleted = signal("app-was-deleted")
|
||||||
|
|
||||||
|
# sender: app
|
||||||
|
app_was_updated = signal("app-was-updated")
|
||||||
|
|
||||||
# sender: app, kwargs: app_model_config
|
# sender: app, kwargs: app_model_config
|
||||||
app_model_config_was_updated = signal("app-model-config-was-updated")
|
app_model_config_was_updated = signal("app-model-config-was-updated")
|
||||||
|
|
||||||
|
|||||||
4
api/events/feedback_event.py
Normal file
4
api/events/feedback_event.py
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
from blinker import signal
|
||||||
|
|
||||||
|
# sender: MessageFeedback, kwargs: tenant_id
|
||||||
|
feedback_was_created = signal("feedback-was-created")
|
||||||
48
api/extensions/ext_enterprise_telemetry.py
Normal file
48
api/extensions/ext_enterprise_telemetry.py
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
"""Flask extension for enterprise telemetry lifecycle management.
|
||||||
|
|
||||||
|
Initializes the EnterpriseExporter singleton during ``create_app()`` (single-threaded),
|
||||||
|
registers blinker event handlers, and hooks atexit for graceful shutdown.
|
||||||
|
|
||||||
|
Skipped entirely when ``ENTERPRISE_ENABLED`` and ``ENTERPRISE_TELEMETRY_ENABLED`` are false (``is_enabled()`` gate).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import atexit
|
||||||
|
import logging
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from configs import dify_config
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from dify_app import DifyApp
|
||||||
|
from enterprise.telemetry.exporter import EnterpriseExporter
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_exporter: EnterpriseExporter | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def is_enabled() -> bool:
|
||||||
|
return bool(dify_config.ENTERPRISE_ENABLED and dify_config.ENTERPRISE_TELEMETRY_ENABLED)
|
||||||
|
|
||||||
|
|
||||||
|
def init_app(app: DifyApp) -> None:
|
||||||
|
global _exporter
|
||||||
|
|
||||||
|
if not is_enabled():
|
||||||
|
return
|
||||||
|
|
||||||
|
from enterprise.telemetry.exporter import EnterpriseExporter
|
||||||
|
|
||||||
|
_exporter = EnterpriseExporter(dify_config)
|
||||||
|
atexit.register(_exporter.shutdown)
|
||||||
|
|
||||||
|
# Import to trigger @signal.connect decorator registration
|
||||||
|
import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport]
|
||||||
|
|
||||||
|
logger.info("Enterprise telemetry initialized")
|
||||||
|
|
||||||
|
|
||||||
|
def get_enterprise_exporter() -> EnterpriseExporter | None:
|
||||||
|
return _exporter
|
||||||
@ -14,7 +14,7 @@ from core.model_runtime.entities.model_entities import ModelPropertyKey, ModelTy
|
|||||||
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
|
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
|
||||||
from core.tools.tool_manager import ToolManager
|
from core.tools.tool_manager import ToolManager
|
||||||
from core.tools.utils.configuration import ToolParameterConfigurationManager
|
from core.tools.utils.configuration import ToolParameterConfigurationManager
|
||||||
from events.app_event import app_was_created
|
from events.app_event import app_was_created, app_was_deleted
|
||||||
from extensions.ext_database import db
|
from extensions.ext_database import db
|
||||||
from libs.datetime_utils import naive_utc_now
|
from libs.datetime_utils import naive_utc_now
|
||||||
from libs.login import current_user
|
from libs.login import current_user
|
||||||
@ -340,6 +340,8 @@ class AppService:
|
|||||||
db.session.delete(app)
|
db.session.delete(app)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
app_was_deleted.send(app)
|
||||||
|
|
||||||
# clean up web app settings
|
# clean up web app settings
|
||||||
if FeatureService.get_system_features().webapp_auth.enabled:
|
if FeatureService.get_system_features().webapp_auth.enabled:
|
||||||
EnterpriseService.WebAppAuth.cleanup_webapp(app.id)
|
EnterpriseService.WebAppAuth.cleanup_webapp(app.id)
|
||||||
|
|||||||
@ -10,6 +10,7 @@ from core.model_runtime.entities.model_entities import ModelType
|
|||||||
from core.ops.entities.trace_entity import TraceTaskName
|
from core.ops.entities.trace_entity import TraceTaskName
|
||||||
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
|
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
|
||||||
from core.ops.utils import measure_time
|
from core.ops.utils import measure_time
|
||||||
|
from events.feedback_event import feedback_was_created
|
||||||
from extensions.ext_database import db
|
from extensions.ext_database import db
|
||||||
from libs.infinite_scroll_pagination import InfiniteScrollPagination
|
from libs.infinite_scroll_pagination import InfiniteScrollPagination
|
||||||
from models import Account
|
from models import Account
|
||||||
@ -179,6 +180,9 @@ class MessageService:
|
|||||||
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
if feedback and rating:
|
||||||
|
feedback_was_created.send(feedback, tenant_id=app_model.tenant_id)
|
||||||
|
|
||||||
return feedback
|
return feedback
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@ -27,6 +27,7 @@ from core.workflow.nodes.start.entities import StartNodeData
|
|||||||
from core.workflow.runtime import VariablePool
|
from core.workflow.runtime import VariablePool
|
||||||
from core.workflow.system_variable import SystemVariable
|
from core.workflow.system_variable import SystemVariable
|
||||||
from core.workflow.workflow_entry import WorkflowEntry
|
from core.workflow.workflow_entry import WorkflowEntry
|
||||||
|
from enterprise.telemetry.draft_trace import enqueue_draft_node_execution_trace
|
||||||
from enums.cloud_plan import CloudPlan
|
from enums.cloud_plan import CloudPlan
|
||||||
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
|
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
|
||||||
from extensions.ext_database import db
|
from extensions.ext_database import db
|
||||||
@ -647,6 +648,7 @@ class WorkflowService:
|
|||||||
node_config = draft_workflow.get_node_config_by_id(node_id)
|
node_config = draft_workflow.get_node_config_by_id(node_id)
|
||||||
node_type = Workflow.get_node_type_from_node_config(node_config)
|
node_type = Workflow.get_node_type_from_node_config(node_config)
|
||||||
node_data = node_config.get("data", {})
|
node_data = node_config.get("data", {})
|
||||||
|
workflow_execution_id: str | None = None
|
||||||
if node_type.is_start_node:
|
if node_type.is_start_node:
|
||||||
with Session(bind=db.engine) as session, session.begin():
|
with Session(bind=db.engine) as session, session.begin():
|
||||||
draft_var_srv = WorkflowDraftVariableService(session)
|
draft_var_srv = WorkflowDraftVariableService(session)
|
||||||
@ -672,10 +674,13 @@ class WorkflowService:
|
|||||||
node_type=node_type,
|
node_type=node_type,
|
||||||
conversation_id=conversation_id,
|
conversation_id=conversation_id,
|
||||||
)
|
)
|
||||||
|
workflow_execution_id = variable_pool.system_variables.workflow_execution_id
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
workflow_execution_id = str(uuid.uuid4())
|
||||||
|
system_variable = SystemVariable(workflow_execution_id=workflow_execution_id)
|
||||||
variable_pool = VariablePool(
|
variable_pool = VariablePool(
|
||||||
system_variables=SystemVariable.default(),
|
system_variables=system_variable,
|
||||||
user_inputs=user_inputs,
|
user_inputs=user_inputs,
|
||||||
environment_variables=draft_workflow.environment_variables,
|
environment_variables=draft_workflow.environment_variables,
|
||||||
conversation_variables=[],
|
conversation_variables=[],
|
||||||
@ -729,6 +734,13 @@ class WorkflowService:
|
|||||||
with Session(db.engine) as session:
|
with Session(db.engine) as session:
|
||||||
outputs = workflow_node_execution.load_full_outputs(session, storage)
|
outputs = workflow_node_execution.load_full_outputs(session, storage)
|
||||||
|
|
||||||
|
enqueue_draft_node_execution_trace(
|
||||||
|
execution=workflow_node_execution,
|
||||||
|
outputs=outputs,
|
||||||
|
workflow_execution_id=workflow_execution_id,
|
||||||
|
user_id=account.id,
|
||||||
|
)
|
||||||
|
|
||||||
with Session(bind=db.engine) as session, session.begin():
|
with Session(bind=db.engine) as session, session.begin():
|
||||||
draft_var_saver = DraftVariableSaver(
|
draft_var_saver = DraftVariableSaver(
|
||||||
session=session,
|
session=session,
|
||||||
@ -784,19 +796,20 @@ class WorkflowService:
|
|||||||
Returns:
|
Returns:
|
||||||
WorkflowNodeExecution: The execution result
|
WorkflowNodeExecution: The execution result
|
||||||
"""
|
"""
|
||||||
|
created_at = naive_utc_now()
|
||||||
node, node_run_result, run_succeeded, error = self._execute_node_safely(invoke_node_fn)
|
node, node_run_result, run_succeeded, error = self._execute_node_safely(invoke_node_fn)
|
||||||
|
finished_at = naive_utc_now()
|
||||||
|
|
||||||
# Create base node execution
|
|
||||||
node_execution = WorkflowNodeExecution(
|
node_execution = WorkflowNodeExecution(
|
||||||
id=str(uuid.uuid4()),
|
id=node.execution_id or str(uuid.uuid4()),
|
||||||
workflow_id="", # Single-step execution has no workflow ID
|
workflow_id="", # Single-step execution has no workflow ID
|
||||||
index=1,
|
index=1,
|
||||||
node_id=node_id,
|
node_id=node_id,
|
||||||
node_type=node.node_type,
|
node_type=node.node_type,
|
||||||
title=node.title,
|
title=node.title,
|
||||||
elapsed_time=time.perf_counter() - start_at,
|
elapsed_time=time.perf_counter() - start_at,
|
||||||
created_at=naive_utc_now(),
|
created_at=created_at,
|
||||||
finished_at=naive_utc_now(),
|
finished_at=finished_at,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Populate execution result data
|
# Populate execution result data
|
||||||
|
|||||||
@ -39,12 +39,25 @@ def process_trace_tasks(file_info):
|
|||||||
trace_info["documents"] = [Document.model_validate(doc) for doc in trace_info["documents"]]
|
trace_info["documents"] = [Document.model_validate(doc) for doc in trace_info["documents"]]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
trace_type = trace_info_info_map.get(trace_info_type)
|
||||||
|
if trace_type:
|
||||||
|
trace_info = trace_type(**trace_info)
|
||||||
|
|
||||||
|
# process enterprise trace separately
|
||||||
|
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||||
|
|
||||||
|
if is_enterprise_telemetry_enabled():
|
||||||
|
from enterprise.telemetry.enterprise_trace import EnterpriseOtelTrace
|
||||||
|
|
||||||
|
try:
|
||||||
|
EnterpriseOtelTrace().trace(trace_info)
|
||||||
|
except Exception:
|
||||||
|
logger.warning("Enterprise trace failed for app_id: %s", app_id, exc_info=True)
|
||||||
|
|
||||||
if trace_instance:
|
if trace_instance:
|
||||||
with current_app.app_context():
|
with current_app.app_context():
|
||||||
trace_type = trace_info_info_map.get(trace_info_type)
|
|
||||||
if trace_type:
|
|
||||||
trace_info = trace_type(**trace_info)
|
|
||||||
trace_instance.trace(trace_info)
|
trace_instance.trace(trace_info)
|
||||||
|
|
||||||
logger.info("Processing trace tasks success, app_id: %s", app_id)
|
logger.info("Processing trace tasks success, app_id: %s", app_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.info("error:\n\n\n%s\n\n\n\n", e)
|
logger.info("error:\n\n\n%s\n\n\n\n", e)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user