feat(telemetry): unify token metric label structure with Pydantic enforcement

- Add TokenMetricLabels BaseModel to enforce consistent label structure
- All dify.token.* metrics now use identical 6-label structure:
  * tenant_id, app_id, operation_type, model_provider, model_name, node_type
- Pydantic validation ensures runtime enforcement (extra='forbid', frozen=True)
- Enables filtering by operation_type to avoid double-counting:
  * workflow: aggregated workflow-level tokens
  * node_execution: individual node-level tokens
  * message: direct message tokens
  * rule_generate/code_generate: prompt generation tokens

Previously, inconsistent label cardinality made aggregation impossible:
- WORKFLOW: 3 labels
- NODE_EXECUTION: 6 labels
- MESSAGE: 5 labels
- PROMPT_GENERATION: 5 labels

Now all use the same 6-label structure for consistent querying.
This commit is contained in:
GareArc 2026-02-06 03:06:06 -08:00
parent 4e624af5e0
commit 944eb28486
No known key found for this signature in database
2 changed files with 121 additions and 13 deletions

View File

@ -6,6 +6,25 @@ Only requires a matching ``trace(trace_info)`` method signature.
Signal strategy:
- **Traces (spans)**: workflow run, node execution, draft node execution only.
- **Metrics + structured logs**: all other event types.
Token metric labels (unified structure):
All token metrics (dify.tokens.input, dify.tokens.output, dify.tokens.total) use the
same label set for consistent filtering and aggregation:
- tenant_id: Tenant identifier
- app_id: Application identifier
- operation_type: Source of token usage (workflow | node_execution | message | rule_generate | etc.)
- model_provider: LLM provider name (empty string if not applicable)
- model_name: LLM model name (empty string if not applicable)
- node_type: Workflow node type (empty string if not node_execution)
This unified structure allows filtering by operation_type to separate:
- Workflow-level aggregates (operation_type=workflow)
- Individual node executions (operation_type=node_execution)
- Direct message calls (operation_type=message)
- Prompt generation operations (operation_type=rule_generate, code_generate, etc.)
Without this, tokens are double-counted when querying totals (workflow totals include
node totals, since workflow.total_tokens is the sum of all node tokens).
"""
from __future__ import annotations
@ -35,6 +54,7 @@ from enterprise.telemetry.entities import (
EnterpriseTelemetryEvent,
EnterpriseTelemetryHistogram,
EnterpriseTelemetrySpan,
TokenMetricLabels,
)
from enterprise.telemetry.telemetry_log import emit_metric_only_event, emit_telemetry_log
@ -218,10 +238,14 @@ class EnterpriseOtelTrace:
tenant_id=tenant_id or "",
app_id=app_id or "",
)
token_labels = self._labels(
**labels,
token_labels = TokenMetricLabels(
tenant_id=tenant_id or "",
app_id=app_id or "",
operation_type=OperationType.WORKFLOW,
)
model_provider="",
model_name="",
node_type="",
).to_dict()
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels)
if info.prompt_tokens is not None and info.prompt_tokens > 0:
self._exporter.increment_counter(EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, token_labels)
@ -370,11 +394,14 @@ class EnterpriseOtelTrace:
model_provider=info.model_provider or "",
)
if info.total_tokens:
token_labels = self._labels(
**labels,
model_name=info.model_name or "",
token_labels = TokenMetricLabels(
tenant_id=tenant_id or "",
app_id=app_id or "",
operation_type=OperationType.NODE_EXECUTION,
)
model_provider=info.model_provider or "",
model_name=info.model_name or "",
node_type=info.node_type,
).to_dict()
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels)
if info.prompt_tokens is not None and info.prompt_tokens > 0:
self._exporter.increment_counter(
@ -463,10 +490,14 @@ class EnterpriseOtelTrace:
model_provider=metadata.get("ls_provider", ""),
model_name=metadata.get("ls_model_name", ""),
)
token_labels = self._labels(
**labels,
token_labels = TokenMetricLabels(
tenant_id=tenant_id or "",
app_id=app_id or "",
operation_type=OperationType.MESSAGE,
)
model_provider=metadata.get("ls_provider", ""),
model_name=metadata.get("ls_model_name", ""),
node_type="",
).to_dict()
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels)
if info.message_tokens > 0:
self._exporter.increment_counter(EnterpriseTelemetryCounter.INPUT_TOKENS, info.message_tokens, token_labels)
@ -819,6 +850,15 @@ class EnterpriseOtelTrace:
user_id=user_id,
)
token_labels = TokenMetricLabels(
tenant_id=tenant_id or "",
app_id=app_id or "",
operation_type=info.operation_type,
model_provider=info.model_provider,
model_name=info.model_name,
node_type="",
).to_dict()
labels = self._labels(
tenant_id=tenant_id or "",
app_id=app_id or "",
@ -827,11 +867,13 @@ class EnterpriseOtelTrace:
model_name=info.model_name,
)
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels)
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels)
if info.prompt_tokens > 0:
self._exporter.increment_counter(EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, labels)
self._exporter.increment_counter(EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, token_labels)
if info.completion_tokens > 0:
self._exporter.increment_counter(EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, labels)
self._exporter.increment_counter(
EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, token_labels
)
status = "failed" if info.error else "success"
self._exporter.increment_counter(

View File

@ -1,4 +1,8 @@
from enum import StrEnum
from typing import cast
from opentelemetry.util.types import AttributeValue
from pydantic import BaseModel, ConfigDict
class EnterpriseTelemetrySpan(StrEnum):
@ -47,9 +51,71 @@ class EnterpriseTelemetryHistogram(StrEnum):
PROMPT_GENERATION_DURATION = "prompt_generation_duration"
class TokenMetricLabels(BaseModel):
"""Unified label structure for all dify.token.* metrics.
All token counters (dify.tokens.input, dify.tokens.output, dify.tokens.total) MUST
use this exact label set to ensure consistent filtering and aggregation across
different operation types.
Attributes:
tenant_id: Tenant identifier.
app_id: Application identifier.
operation_type: Source of token usage (workflow | node_execution | message |
rule_generate | code_generate | structured_output | instruction_modify).
model_provider: LLM provider name. Empty string if not applicable (e.g., workflow-level).
model_name: LLM model name. Empty string if not applicable (e.g., workflow-level).
node_type: Workflow node type. Empty string unless operation_type=node_execution.
Usage:
labels = TokenMetricLabels(
tenant_id="tenant-123",
app_id="app-456",
operation_type=OperationType.WORKFLOW,
model_provider="",
model_name="",
node_type="",
)
exporter.increment_counter(
EnterpriseTelemetryCounter.INPUT_TOKENS,
100,
labels.to_dict()
)
Design rationale:
Without this unified structure, tokens get double-counted when querying totals
because workflow.total_tokens is already the sum of all node tokens. The
operation_type label allows filtering to separate workflow-level aggregates from
node-level detail, while keeping the same label cardinality for consistent queries.
"""
tenant_id: str
app_id: str
operation_type: str
model_provider: str
model_name: str
node_type: str
model_config = ConfigDict(extra="forbid", frozen=True)
def to_dict(self) -> dict[str, AttributeValue]:
return cast(
dict[str, AttributeValue],
{
"tenant_id": self.tenant_id,
"app_id": self.app_id,
"operation_type": self.operation_type,
"model_provider": self.model_provider,
"model_name": self.model_name,
"node_type": self.node_type,
},
)
__all__ = [
"EnterpriseTelemetryCounter",
"EnterpriseTelemetryEvent",
"EnterpriseTelemetryHistogram",
"EnterpriseTelemetrySpan",
"TokenMetricLabels",
]