feat(telemetry): add input/output token metrics and fix trace cleanup

- Add dify.tokens.input and dify.tokens.output OTEL metrics
- Remove token split from trace log attributes (keep metrics only)
- Emit split token metrics for workflows and node executions
- Gracefully handle trace file deletion failures to prevent task crashes

BREAKING: None
MIGRATION: None
This commit is contained in:
GareArc 2026-02-03 21:05:10 -08:00
parent d858cbdc0c
commit bada2d18a1
No known key found for this signature in database
4 changed files with 25 additions and 5 deletions

View File

@ -145,8 +145,6 @@ class EnterpriseOtelTrace:
"dify.workspace.name": info.metadata.get("workspace_name"),
"gen_ai.user.id": info.metadata.get("user_id"),
"gen_ai.usage.total_tokens": info.total_tokens,
"gen_ai.usage.input_tokens": info.prompt_tokens,
"gen_ai.usage.output_tokens": info.completion_tokens,
"dify.workflow.version": info.workflow_run_version,
}
)
@ -177,6 +175,10 @@ class EnterpriseOtelTrace:
"app_id": info.metadata.get("app_id", ""),
}
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels)
if info.prompt_tokens is not None and info.prompt_tokens > 0:
self._exporter.increment_counter(EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, labels)
if info.completion_tokens is not None and info.completion_tokens > 0:
self._exporter.increment_counter(EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, labels)
invoke_from = info.metadata.get("triggered_from", "")
self._exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS,
@ -265,8 +267,6 @@ class EnterpriseOtelTrace:
"dify.node.currency": info.currency,
"gen_ai.provider.name": info.model_provider,
"gen_ai.request.model": info.model_name,
"gen_ai.usage.input_tokens": info.prompt_tokens,
"gen_ai.usage.output_tokens": info.completion_tokens,
"gen_ai.tool.name": info.tool_name,
"dify.node.iteration_index": info.iteration_index,
"dify.node.loop_index": info.loop_index,
@ -307,6 +307,14 @@ class EnterpriseOtelTrace:
if info.total_tokens:
token_labels = {**labels, "model_name": info.model_name or ""}
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels)
if info.prompt_tokens is not None and info.prompt_tokens > 0:
self._exporter.increment_counter(
EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, token_labels
)
if info.completion_tokens is not None and info.completion_tokens > 0:
self._exporter.increment_counter(
EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, token_labels
)
self._exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": request_type, "status": info.status}
)

View File

@ -9,6 +9,8 @@ class EnterpriseTelemetrySpan(StrEnum):
class EnterpriseTelemetryCounter(StrEnum):
TOKENS = "tokens"
INPUT_TOKENS = "input_tokens"
OUTPUT_TOKENS = "output_tokens"
REQUESTS = "requests"
ERRORS = "errors"
FEEDBACK = "feedback"

View File

@ -133,6 +133,8 @@ class EnterpriseExporter:
meter = self._meter_provider.get_meter("dify.enterprise")
self._counters = {
EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"),
EnterpriseTelemetryCounter.INPUT_TOKENS: meter.create_counter("dify.tokens.input", unit="{token}"),
EnterpriseTelemetryCounter.OUTPUT_TOKENS: meter.create_counter("dify.tokens.output", unit="{token}"),
EnterpriseTelemetryCounter.REQUESTS: meter.create_counter("dify.requests.total", unit="{request}"),
EnterpriseTelemetryCounter.ERRORS: meter.create_counter("dify.errors.total", unit="{error}"),
EnterpriseTelemetryCounter.FEEDBACK: meter.create_counter("dify.feedback.total", unit="{feedback}"),

View File

@ -65,4 +65,12 @@ def process_trace_tasks(file_info):
redis_client.incr(failed_key)
logger.info("Processing trace tasks failed, app_id: %s", app_id)
finally:
storage.delete(file_path)
try:
storage.delete(file_path)
except Exception as e:
logger.warning(
"Failed to delete trace file %s for app_id %s: %s",
file_path,
app_id,
e,
)