diff --git a/api/enterprise/telemetry/enterprise_trace.py b/api/enterprise/telemetry/enterprise_trace.py index a500ec7424..a1d52e0441 100644 --- a/api/enterprise/telemetry/enterprise_trace.py +++ b/api/enterprise/telemetry/enterprise_trace.py @@ -145,8 +145,6 @@ class EnterpriseOtelTrace: "dify.workspace.name": info.metadata.get("workspace_name"), "gen_ai.user.id": info.metadata.get("user_id"), "gen_ai.usage.total_tokens": info.total_tokens, - "gen_ai.usage.input_tokens": info.prompt_tokens, - "gen_ai.usage.output_tokens": info.completion_tokens, "dify.workflow.version": info.workflow_run_version, } ) @@ -177,6 +175,10 @@ class EnterpriseOtelTrace: "app_id": info.metadata.get("app_id", ""), } self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels) + if info.prompt_tokens is not None and info.prompt_tokens > 0: + self._exporter.increment_counter(EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, labels) + if info.completion_tokens is not None and info.completion_tokens > 0: + self._exporter.increment_counter(EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, labels) invoke_from = info.metadata.get("triggered_from", "") self._exporter.increment_counter( EnterpriseTelemetryCounter.REQUESTS, @@ -265,8 +267,6 @@ class EnterpriseOtelTrace: "dify.node.currency": info.currency, "gen_ai.provider.name": info.model_provider, "gen_ai.request.model": info.model_name, - "gen_ai.usage.input_tokens": info.prompt_tokens, - "gen_ai.usage.output_tokens": info.completion_tokens, "gen_ai.tool.name": info.tool_name, "dify.node.iteration_index": info.iteration_index, "dify.node.loop_index": info.loop_index, @@ -307,6 +307,14 @@ class EnterpriseOtelTrace: if info.total_tokens: token_labels = {**labels, "model_name": info.model_name or ""} self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels) + if info.prompt_tokens is not None and info.prompt_tokens > 0: + self._exporter.increment_counter( + EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, token_labels + ) + if info.completion_tokens is not None and info.completion_tokens > 0: + self._exporter.increment_counter( + EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, token_labels + ) self._exporter.increment_counter( EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": request_type, "status": info.status} ) diff --git a/api/enterprise/telemetry/entities/__init__.py b/api/enterprise/telemetry/entities/__init__.py index 7c45bbfcbe..c7fd99bd01 100644 --- a/api/enterprise/telemetry/entities/__init__.py +++ b/api/enterprise/telemetry/entities/__init__.py @@ -9,6 +9,8 @@ class EnterpriseTelemetrySpan(StrEnum): class EnterpriseTelemetryCounter(StrEnum): TOKENS = "tokens" + INPUT_TOKENS = "input_tokens" + OUTPUT_TOKENS = "output_tokens" REQUESTS = "requests" ERRORS = "errors" FEEDBACK = "feedback" diff --git a/api/enterprise/telemetry/exporter.py b/api/enterprise/telemetry/exporter.py index a2728fb7ba..cbbdbb84bf 100644 --- a/api/enterprise/telemetry/exporter.py +++ b/api/enterprise/telemetry/exporter.py @@ -133,6 +133,8 @@ class EnterpriseExporter: meter = self._meter_provider.get_meter("dify.enterprise") self._counters = { EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"), + EnterpriseTelemetryCounter.INPUT_TOKENS: meter.create_counter("dify.tokens.input", unit="{token}"), + EnterpriseTelemetryCounter.OUTPUT_TOKENS: meter.create_counter("dify.tokens.output", unit="{token}"), EnterpriseTelemetryCounter.REQUESTS: meter.create_counter("dify.requests.total", unit="{request}"), EnterpriseTelemetryCounter.ERRORS: meter.create_counter("dify.errors.total", unit="{error}"), EnterpriseTelemetryCounter.FEEDBACK: meter.create_counter("dify.feedback.total", unit="{feedback}"), diff --git a/api/tasks/ops_trace_task.py b/api/tasks/ops_trace_task.py index f7d08d6207..c48e1f13e1 100644 --- a/api/tasks/ops_trace_task.py +++ b/api/tasks/ops_trace_task.py @@ -65,4 +65,12 @@ def process_trace_tasks(file_info): redis_client.incr(failed_key) logger.info("Processing trace tasks failed, app_id: %s", app_id) finally: - storage.delete(file_path) + try: + storage.delete(file_path) + except Exception as e: + logger.warning( + "Failed to delete trace file %s for app_id %s: %s", + file_path, + app_id, + e, + )