diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 8c0102d9bd..01c377956b 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -1,3 +1,4 @@ +import json import logging import re import time @@ -60,6 +61,7 @@ from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTas from core.app.task_pipeline.message_cycle_manager import MessageCycleManager from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_runtime.entities.llm_entities import LLMUsage +from core.model_runtime.utils.encoders import jsonable_encoder from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.enums import WorkflowExecutionStatus from core.workflow.nodes import NodeType @@ -391,6 +393,14 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): if should_direct_answer: return + current_time = time.perf_counter() + if self._task_state.first_token_time is None and delta_text.strip(): + self._task_state.first_token_time = current_time + self._task_state.is_streaming_response = True + + if delta_text.strip(): + self._task_state.last_token_time = current_time + # Only publish tts message at text chunk streaming if tts_publisher and queue_message: tts_publisher.publish(queue_message) @@ -772,7 +782,33 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): message.answer = answer_text message.updated_at = naive_utc_now() message.provider_response_latency = time.perf_counter() - self._base_task_pipeline.start_at - message.message_metadata = self._task_state.metadata.model_dump_json() + + # Set usage first before dumping metadata + if graph_runtime_state and graph_runtime_state.llm_usage: + usage = graph_runtime_state.llm_usage + message.message_tokens = usage.prompt_tokens + message.message_unit_price = usage.prompt_unit_price + message.message_price_unit = usage.prompt_price_unit + message.answer_tokens = usage.completion_tokens + message.answer_unit_price = usage.completion_unit_price + message.answer_price_unit = usage.completion_price_unit + message.total_price = usage.total_price + message.currency = usage.currency + self._task_state.metadata.usage = usage + else: + usage = LLMUsage.empty_usage() + self._task_state.metadata.usage = usage + + # Add streaming metrics to usage if available + if self._task_state.is_streaming_response and self._task_state.first_token_time: + start_time = self._base_task_pipeline.start_at + first_token_time = self._task_state.first_token_time + last_token_time = self._task_state.last_token_time or first_token_time + usage.time_to_first_token = round(first_token_time - start_time, 3) + usage.time_to_generate = round(last_token_time - first_token_time, 3) + + metadata = self._task_state.metadata.model_dump() + message.message_metadata = json.dumps(jsonable_encoder(metadata)) message_files = [ MessageFile( message_id=message.id, @@ -790,20 +826,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): ] session.add_all(message_files) - if graph_runtime_state and graph_runtime_state.llm_usage: - usage = graph_runtime_state.llm_usage - message.message_tokens = usage.prompt_tokens - message.message_unit_price = usage.prompt_unit_price - message.message_price_unit = usage.prompt_price_unit - message.answer_tokens = usage.completion_tokens - message.answer_unit_price = usage.completion_unit_price - message.answer_price_unit = usage.completion_price_unit - message.total_price = usage.total_price - message.currency = usage.currency - self._task_state.metadata.usage = usage - else: - self._task_state.metadata.usage = LLMUsage.empty_usage() - def _seed_graph_runtime_state_from_queue_manager(self) -> None: """Bootstrap the cached runtime state from the queue manager when present.""" candidate = self._base_task_pipeline.queue_manager.graph_runtime_state diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 72a92add04..79a5e657b3 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -48,6 +48,9 @@ class WorkflowTaskState(TaskState): """ answer: str = "" + first_token_time: float | None = None + last_token_time: float | None = None + is_streaming_response: bool = False class StreamEvent(StrEnum): diff --git a/api/core/model_runtime/entities/llm_entities.py b/api/core/model_runtime/entities/llm_entities.py index 17f6000d93..2c7c421eed 100644 --- a/api/core/model_runtime/entities/llm_entities.py +++ b/api/core/model_runtime/entities/llm_entities.py @@ -38,6 +38,8 @@ class LLMUsageMetadata(TypedDict, total=False): prompt_price: Union[float, str] completion_price: Union[float, str] latency: float + time_to_first_token: float + time_to_generate: float class LLMUsage(ModelUsage): @@ -57,6 +59,8 @@ class LLMUsage(ModelUsage): total_price: Decimal currency: str latency: float + time_to_first_token: float | None = None + time_to_generate: float | None = None @classmethod def empty_usage(cls): @@ -73,6 +77,8 @@ class LLMUsage(ModelUsage): total_price=Decimal("0.0"), currency="USD", latency=0.0, + time_to_first_token=None, + time_to_generate=None, ) @classmethod @@ -108,6 +114,8 @@ class LLMUsage(ModelUsage): prompt_price=Decimal(str(metadata.get("prompt_price", 0))), completion_price=Decimal(str(metadata.get("completion_price", 0))), latency=metadata.get("latency", 0.0), + time_to_first_token=metadata.get("time_to_first_token"), + time_to_generate=metadata.get("time_to_generate"), ) def plus(self, other: LLMUsage) -> LLMUsage: @@ -133,6 +141,8 @@ class LLMUsage(ModelUsage): total_price=self.total_price + other.total_price, currency=other.currency, latency=self.latency + other.latency, + time_to_first_token=other.time_to_first_token, + time_to_generate=other.time_to_generate, ) def __add__(self, other: LLMUsage) -> LLMUsage: diff --git a/api/core/ops/entities/trace_entity.py b/api/core/ops/entities/trace_entity.py index 5b81c09a2d..50a2cdea63 100644 --- a/api/core/ops/entities/trace_entity.py +++ b/api/core/ops/entities/trace_entity.py @@ -62,6 +62,9 @@ class MessageTraceInfo(BaseTraceInfo): file_list: Union[str, dict[str, Any], list] | None = None message_file_data: Any | None = None conversation_mode: str + gen_ai_server_time_to_first_token: float | None = None + llm_streaming_time_to_generate: float | None = None + is_streaming_request: bool = False class ModerationTraceInfo(BaseTraceInfo): diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index 6cc7859db7..5bb539b7dc 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -619,6 +619,8 @@ class TraceTask: file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else "" file_list.append(file_url) + streaming_metrics = self._extract_streaming_metrics(message_data) + metadata = { "conversation_id": message_data.conversation_id, "ls_provider": message_data.model_provider, @@ -651,6 +653,9 @@ class TraceTask: metadata=metadata, message_file_data=message_file_data, conversation_mode=conversation_mode, + gen_ai_server_time_to_first_token=streaming_metrics.get("gen_ai_server_time_to_first_token"), + llm_streaming_time_to_generate=streaming_metrics.get("llm_streaming_time_to_generate"), + is_streaming_request=streaming_metrics.get("is_streaming_request", False), ) return message_trace_info @@ -876,6 +881,24 @@ class TraceTask: return generate_name_trace_info + def _extract_streaming_metrics(self, message_data) -> dict: + if not message_data.message_metadata: + return {} + + try: + metadata = json.loads(message_data.message_metadata) + usage = metadata.get("usage", {}) + time_to_first_token = usage.get("time_to_first_token") + time_to_generate = usage.get("time_to_generate") + + return { + "gen_ai_server_time_to_first_token": time_to_first_token, + "llm_streaming_time_to_generate": time_to_generate, + "is_streaming_request": time_to_first_token is not None, + } + except (json.JSONDecodeError, AttributeError): + return {} + trace_manager_timer: threading.Timer | None = None trace_manager_queue: queue.Queue = queue.Queue() diff --git a/api/core/ops/tencent_trace/client.py b/api/core/ops/tencent_trace/client.py index 270732aa02..733d5b8bb6 100644 --- a/api/core/ops/tencent_trace/client.py +++ b/api/core/ops/tencent_trace/client.py @@ -11,6 +11,11 @@ import socket from typing import TYPE_CHECKING from urllib.parse import urlparse +try: + from importlib.metadata import version +except ImportError: + from importlib_metadata import version # type: ignore[import-not-found] + if TYPE_CHECKING: from opentelemetry.metrics import Meter from opentelemetry.metrics._internal.instrument import Histogram @@ -27,12 +32,27 @@ from opentelemetry.util.types import AttributeValue from configs import dify_config -from .entities.tencent_semconv import LLM_OPERATION_DURATION +from .entities.semconv import ( + GEN_AI_SERVER_TIME_TO_FIRST_TOKEN, + GEN_AI_STREAMING_TIME_TO_GENERATE, + GEN_AI_TOKEN_USAGE, + GEN_AI_TRACE_DURATION, + LLM_OPERATION_DURATION, +) from .entities.tencent_trace_entity import SpanData logger = logging.getLogger(__name__) +def _get_opentelemetry_sdk_version() -> str: + """Get OpenTelemetry SDK version dynamically.""" + try: + return version("opentelemetry-sdk") + except Exception: + logger.debug("Failed to get opentelemetry-sdk version, using default") + return "1.27.0" # fallback version + + class TencentTraceClient: """Tencent APM trace client using OpenTelemetry OTLP exporter""" @@ -57,6 +77,9 @@ class TencentTraceClient: ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.project.version}-{dify_config.COMMIT_SHA}", ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}", ResourceAttributes.HOST_NAME: socket.gethostname(), + ResourceAttributes.TELEMETRY_SDK_LANGUAGE: "python", + ResourceAttributes.TELEMETRY_SDK_NAME: "opentelemetry", + ResourceAttributes.TELEMETRY_SDK_VERSION: _get_opentelemetry_sdk_version(), } ) # Prepare gRPC endpoint/metadata @@ -80,13 +103,18 @@ class TencentTraceClient: ) self.tracer_provider.add_span_processor(self.span_processor) - self.tracer = self.tracer_provider.get_tracer("dify.tencent_apm") + # use dify api version as tracer version + self.tracer = self.tracer_provider.get_tracer("dify-sdk", dify_config.project.version) # Store span contexts for parent-child relationships self.span_contexts: dict[int, trace_api.SpanContext] = {} self.meter: Meter | None = None self.hist_llm_duration: Histogram | None = None + self.hist_token_usage: Histogram | None = None + self.hist_time_to_first_token: Histogram | None = None + self.hist_time_to_generate: Histogram | None = None + self.hist_trace_duration: Histogram | None = None self.metric_reader: MetricReader | None = None # Metrics exporter and instruments @@ -99,7 +127,7 @@ class TencentTraceClient: use_http_protobuf = protocol in {"http/protobuf", "http-protobuf"} use_http_json = protocol in {"http/json", "http-json"} - # Set preferred temporality for histograms to DELTA + # Tencent APM works best with delta aggregation temporality preferred_temporality: dict[type, AggregationTemporality] = {Histogram: AggregationTemporality.DELTA} def _create_metric_exporter(exporter_cls, **kwargs): @@ -177,20 +205,59 @@ class TencentTraceClient: provider = MeterProvider(resource=self.resource, metric_readers=[metric_reader]) metrics.set_meter_provider(provider) self.meter = metrics.get_meter("dify-sdk", dify_config.project.version) + + # LLM operation duration histogram self.hist_llm_duration = self.meter.create_histogram( name=LLM_OPERATION_DURATION, unit="s", description="LLM operation duration (seconds)", ) + + # Token usage histogram with exponential buckets + self.hist_token_usage = self.meter.create_histogram( + name=GEN_AI_TOKEN_USAGE, + unit="token", + description="Number of tokens used in prompt and completions", + ) + + # Time to first token histogram + self.hist_time_to_first_token = self.meter.create_histogram( + name=GEN_AI_SERVER_TIME_TO_FIRST_TOKEN, + unit="s", + description="Time to first token for streaming LLM responses (seconds)", + ) + + # Time to generate histogram + self.hist_time_to_generate = self.meter.create_histogram( + name=GEN_AI_STREAMING_TIME_TO_GENERATE, + unit="s", + description="Total time to generate streaming LLM responses (seconds)", + ) + + # Trace duration histogram + self.hist_trace_duration = self.meter.create_histogram( + name=GEN_AI_TRACE_DURATION, + unit="s", + description="End-to-end GenAI trace duration (seconds)", + ) + self.metric_reader = metric_reader else: self.meter = None self.hist_llm_duration = None + self.hist_token_usage = None + self.hist_time_to_first_token = None + self.hist_time_to_generate = None + self.hist_trace_duration = None self.metric_reader = None except Exception: logger.exception("[Tencent APM] Metrics initialization failed; metrics disabled") self.meter = None self.hist_llm_duration = None + self.hist_token_usage = None + self.hist_time_to_first_token = None + self.hist_time_to_generate = None + self.hist_trace_duration = None self.metric_reader = None def add_span(self, span_data: SpanData) -> None: @@ -216,6 +283,117 @@ class TencentTraceClient: except Exception: logger.debug("[Tencent APM] Failed to record LLM duration", exc_info=True) + def record_token_usage( + self, + token_count: int, + token_type: str, + operation_name: str, + request_model: str, + response_model: str, + server_address: str, + provider: str, + ) -> None: + """Record token usage histogram. + + Args: + token_count: Number of tokens used + token_type: "input" or "output" + operation_name: Operation name (e.g., "chat") + request_model: Model used in request + response_model: Model used in response + server_address: Server address + provider: Model provider name + """ + try: + if not hasattr(self, "hist_token_usage") or self.hist_token_usage is None: + return + + attributes = { + "gen_ai.operation.name": operation_name, + "gen_ai.request.model": request_model, + "gen_ai.response.model": response_model, + "gen_ai.system": provider, + "gen_ai.token.type": token_type, + "server.address": server_address, + } + + self.hist_token_usage.record(token_count, attributes) # type: ignore[attr-defined] + except Exception: + logger.debug("[Tencent APM] Failed to record token usage", exc_info=True) + + def record_time_to_first_token( + self, ttft_seconds: float, provider: str, model: str, operation_name: str = "chat" + ) -> None: + """Record time to first token histogram. + + Args: + ttft_seconds: Time to first token in seconds + provider: Model provider name + model: Model name + operation_name: Operation name (default: "chat") + """ + try: + if not hasattr(self, "hist_time_to_first_token") or self.hist_time_to_first_token is None: + return + + attributes = { + "gen_ai.operation.name": operation_name, + "gen_ai.system": provider, + "gen_ai.request.model": model, + "gen_ai.response.model": model, + "stream": "true", + } + + self.hist_time_to_first_token.record(ttft_seconds, attributes) # type: ignore[attr-defined] + except Exception: + logger.debug("[Tencent APM] Failed to record time to first token", exc_info=True) + + def record_time_to_generate( + self, ttg_seconds: float, provider: str, model: str, operation_name: str = "chat" + ) -> None: + """Record time to generate histogram. + + Args: + ttg_seconds: Time to generate in seconds + provider: Model provider name + model: Model name + operation_name: Operation name (default: "chat") + """ + try: + if not hasattr(self, "hist_time_to_generate") or self.hist_time_to_generate is None: + return + + attributes = { + "gen_ai.operation.name": operation_name, + "gen_ai.system": provider, + "gen_ai.request.model": model, + "gen_ai.response.model": model, + "stream": "true", + } + + self.hist_time_to_generate.record(ttg_seconds, attributes) # type: ignore[attr-defined] + except Exception: + logger.debug("[Tencent APM] Failed to record time to generate", exc_info=True) + + def record_trace_duration(self, duration_seconds: float, attributes: dict[str, str] | None = None) -> None: + """Record end-to-end trace duration histogram in seconds. + + Args: + duration_seconds: Trace duration in seconds + attributes: Optional attributes (e.g., conversation_mode, app_id) + """ + try: + if not hasattr(self, "hist_trace_duration") or self.hist_trace_duration is None: + return + + attrs: dict[str, str] = {} + if attributes: + for k, v in attributes.items(): + attrs[k] = str(v) if not isinstance(v, (str, int, float, bool)) else v # type: ignore[assignment] + self.hist_trace_duration.record(duration_seconds, attrs) # type: ignore[attr-defined] + except Exception: + logger.debug("[Tencent APM] Failed to record trace duration", exc_info=True) + def _create_and_export_span(self, span_data: SpanData) -> None: """Create span using OpenTelemetry Tracer API""" try: diff --git a/api/core/ops/tencent_trace/entities/tencent_semconv.py b/api/core/ops/tencent_trace/entities/semconv.py similarity index 69% rename from api/core/ops/tencent_trace/entities/tencent_semconv.py rename to api/core/ops/tencent_trace/entities/semconv.py index 5ea6eeacef..cd2dbade8b 100644 --- a/api/core/ops/tencent_trace/entities/tencent_semconv.py +++ b/api/core/ops/tencent_trace/entities/semconv.py @@ -47,6 +47,9 @@ GEN_AI_COMPLETION = "gen_ai.completion" GEN_AI_RESPONSE_FINISH_REASON = "gen_ai.response.finish_reason" +# Streaming Span Attributes +GEN_AI_IS_STREAMING_REQUEST = "llm.is_streaming" # Same as OpenLLMetry semconv + # Tool TOOL_NAME = "tool.name" @@ -62,6 +65,19 @@ INSTRUMENTATION_LANGUAGE = "python" # Metrics LLM_OPERATION_DURATION = "gen_ai.client.operation.duration" +GEN_AI_TOKEN_USAGE = "gen_ai.client.token.usage" +GEN_AI_SERVER_TIME_TO_FIRST_TOKEN = "gen_ai.server.time_to_first_token" +GEN_AI_STREAMING_TIME_TO_GENERATE = "gen_ai.streaming.time_to_generate" +# The LLM trace duration which is exclusive to tencent apm +GEN_AI_TRACE_DURATION = "gen_ai.trace.duration" + +# Token Usage Attributes +GEN_AI_OPERATION_NAME = "gen_ai.operation.name" +GEN_AI_REQUEST_MODEL = "gen_ai.request.model" +GEN_AI_RESPONSE_MODEL = "gen_ai.response.model" +GEN_AI_SYSTEM = "gen_ai.system" +GEN_AI_TOKEN_TYPE = "gen_ai.token.type" +SERVER_ADDRESS = "server.address" class GenAISpanKind(Enum): diff --git a/api/core/ops/tencent_trace/span_builder.py b/api/core/ops/tencent_trace/span_builder.py index 5ba592290d..26e8779e3e 100644 --- a/api/core/ops/tencent_trace/span_builder.py +++ b/api/core/ops/tencent_trace/span_builder.py @@ -14,10 +14,11 @@ from core.ops.entities.trace_entity import ( ToolTraceInfo, WorkflowTraceInfo, ) -from core.ops.tencent_trace.entities.tencent_semconv import ( +from core.ops.tencent_trace.entities.semconv import ( GEN_AI_COMPLETION, GEN_AI_FRAMEWORK, GEN_AI_IS_ENTRY, + GEN_AI_IS_STREAMING_REQUEST, GEN_AI_MODEL_NAME, GEN_AI_PROMPT, GEN_AI_PROVIDER, @@ -156,6 +157,25 @@ class TencentSpanBuilder: outputs = node_execution.outputs or {} usage_data = process_data.get("usage", {}) if "usage" in process_data else outputs.get("usage", {}) + attributes = { + GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""), + GEN_AI_SPAN_KIND: GenAISpanKind.GENERATION.value, + GEN_AI_FRAMEWORK: "dify", + GEN_AI_MODEL_NAME: process_data.get("model_name", ""), + GEN_AI_PROVIDER: process_data.get("model_provider", ""), + GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)), + GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)), + GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)), + GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False), + GEN_AI_COMPLETION: str(outputs.get("text", "")), + GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason", ""), + INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False), + OUTPUT_VALUE: str(outputs.get("text", "")), + } + + if usage_data.get("time_to_first_token") is not None: + attributes[GEN_AI_IS_STREAMING_REQUEST] = "true" + return SpanData( trace_id=trace_id, parent_span_id=workflow_span_id, @@ -163,21 +183,7 @@ class TencentSpanBuilder: name="GENERATION", start_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.created_at), end_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.finished_at), - attributes={ - GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""), - GEN_AI_SPAN_KIND: GenAISpanKind.GENERATION.value, - GEN_AI_FRAMEWORK: "dify", - GEN_AI_MODEL_NAME: process_data.get("model_name", ""), - GEN_AI_PROVIDER: process_data.get("model_provider", ""), - GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)), - GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)), - GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)), - GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False), - GEN_AI_COMPLETION: str(outputs.get("text", "")), - GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason", ""), - INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False), - OUTPUT_VALUE: str(outputs.get("text", "")), - }, + attributes=attributes, status=TencentSpanBuilder._get_workflow_node_status(node_execution), ) @@ -191,6 +197,19 @@ class TencentSpanBuilder: if trace_info.error: status = Status(StatusCode.ERROR, trace_info.error) + attributes = { + GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""), + GEN_AI_USER_ID: str(user_id), + GEN_AI_SPAN_KIND: GenAISpanKind.WORKFLOW.value, + GEN_AI_FRAMEWORK: "dify", + GEN_AI_IS_ENTRY: "true", + INPUT_VALUE: str(trace_info.inputs or ""), + OUTPUT_VALUE: str(trace_info.outputs or ""), + } + + if trace_info.is_streaming_request: + attributes[GEN_AI_IS_STREAMING_REQUEST] = "true" + return SpanData( trace_id=trace_id, parent_span_id=None, @@ -198,15 +217,7 @@ class TencentSpanBuilder: name="message", start_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.start_time), end_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.end_time), - attributes={ - GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""), - GEN_AI_USER_ID: str(user_id), - GEN_AI_SPAN_KIND: GenAISpanKind.WORKFLOW.value, - GEN_AI_FRAMEWORK: "dify", - GEN_AI_IS_ENTRY: "true", - INPUT_VALUE: str(trace_info.inputs or ""), - OUTPUT_VALUE: str(trace_info.outputs or ""), - }, + attributes=attributes, status=status, links=links, ) diff --git a/api/core/ops/tencent_trace/tencent_trace.py b/api/core/ops/tencent_trace/tencent_trace.py index 5ef1c61b24..9b3df86e16 100644 --- a/api/core/ops/tencent_trace/tencent_trace.py +++ b/api/core/ops/tencent_trace/tencent_trace.py @@ -90,6 +90,9 @@ class TencentDataTrace(BaseTraceInstance): self._process_workflow_nodes(trace_info, trace_id) + # Record trace duration for entry span + self._record_workflow_trace_duration(trace_info) + except Exception: logger.exception("[Tencent APM] Failed to process workflow trace") @@ -107,6 +110,11 @@ class TencentDataTrace(BaseTraceInstance): self.trace_client.add_span(message_span) + self._record_message_llm_metrics(trace_info) + + # Record trace duration for entry span + self._record_message_trace_duration(trace_info) + except Exception: logger.exception("[Tencent APM] Failed to process message trace") @@ -290,24 +298,219 @@ class TencentDataTrace(BaseTraceInstance): def _record_llm_metrics(self, node_execution: WorkflowNodeExecution) -> None: """Record LLM performance metrics""" try: - if not hasattr(self.trace_client, "record_llm_duration"): - return - process_data = node_execution.process_data or {} - usage = process_data.get("usage", {}) - latency_s = float(usage.get("latency", 0.0)) + outputs = node_execution.outputs or {} + usage = process_data.get("usage", {}) if "usage" in process_data else outputs.get("usage", {}) - if latency_s > 0: - attributes = { - "provider": process_data.get("model_provider", ""), - "model": process_data.get("model_name", ""), - "span_kind": "GENERATION", - } - self.trace_client.record_llm_duration(latency_s, attributes) + model_provider = process_data.get("model_provider", "unknown") + model_name = process_data.get("model_name", "unknown") + model_mode = process_data.get("model_mode", "chat") + + # Record LLM duration + if hasattr(self.trace_client, "record_llm_duration"): + latency_s = float(usage.get("latency", 0.0)) + + if latency_s > 0: + # Determine if streaming from usage metrics + is_streaming = usage.get("time_to_first_token") is not None + + attributes = { + "gen_ai.system": model_provider, + "gen_ai.response.model": model_name, + "gen_ai.operation.name": model_mode, + "stream": "true" if is_streaming else "false", + } + self.trace_client.record_llm_duration(latency_s, attributes) + + # Record streaming metrics from usage + time_to_first_token = usage.get("time_to_first_token") + if time_to_first_token is not None and hasattr(self.trace_client, "record_time_to_first_token"): + ttft_seconds = float(time_to_first_token) + if ttft_seconds > 0: + self.trace_client.record_time_to_first_token( + ttft_seconds=ttft_seconds, provider=model_provider, model=model_name, operation_name=model_mode + ) + + time_to_generate = usage.get("time_to_generate") + if time_to_generate is not None and hasattr(self.trace_client, "record_time_to_generate"): + ttg_seconds = float(time_to_generate) + if ttg_seconds > 0: + self.trace_client.record_time_to_generate( + ttg_seconds=ttg_seconds, provider=model_provider, model=model_name, operation_name=model_mode + ) + + # Record token usage + if hasattr(self.trace_client, "record_token_usage"): + # Extract token counts + input_tokens = int(usage.get("prompt_tokens", 0)) + output_tokens = int(usage.get("completion_tokens", 0)) + + if input_tokens > 0 or output_tokens > 0: + server_address = f"{model_provider}" + + # Record input tokens + if input_tokens > 0: + self.trace_client.record_token_usage( + token_count=input_tokens, + token_type="input", + operation_name=model_mode, + request_model=model_name, + response_model=model_name, + server_address=server_address, + provider=model_provider, + ) + + # Record output tokens + if output_tokens > 0: + self.trace_client.record_token_usage( + token_count=output_tokens, + token_type="output", + operation_name=model_mode, + request_model=model_name, + response_model=model_name, + server_address=server_address, + provider=model_provider, + ) except Exception: logger.debug("[Tencent APM] Failed to record LLM metrics") + def _record_message_llm_metrics(self, trace_info: MessageTraceInfo) -> None: + """Record LLM metrics for message traces""" + try: + trace_metadata = trace_info.metadata or {} + message_data = trace_info.message_data or {} + provider_latency = 0.0 + if isinstance(message_data, dict): + provider_latency = float(message_data.get("provider_response_latency", 0.0) or 0.0) + else: + provider_latency = float(getattr(message_data, "provider_response_latency", 0.0) or 0.0) + + model_provider = trace_metadata.get("ls_provider") or ( + message_data.get("model_provider", "") if isinstance(message_data, dict) else "" + ) + model_name = trace_metadata.get("ls_model_name") or ( + message_data.get("model_id", "") if isinstance(message_data, dict) else "" + ) + + # Record LLM duration + if provider_latency > 0 and hasattr(self.trace_client, "record_llm_duration"): + is_streaming = trace_info.is_streaming_request + + duration_attributes = { + "gen_ai.system": model_provider, + "gen_ai.response.model": model_name, + "gen_ai.operation.name": "chat", # Message traces are always chat + "stream": "true" if is_streaming else "false", + } + self.trace_client.record_llm_duration(provider_latency, duration_attributes) + + # Record streaming metrics for message traces + if trace_info.is_streaming_request: + # Record time to first token + if trace_info.gen_ai_server_time_to_first_token is not None and hasattr( + self.trace_client, "record_time_to_first_token" + ): + ttft_seconds = float(trace_info.gen_ai_server_time_to_first_token) + if ttft_seconds > 0: + self.trace_client.record_time_to_first_token( + ttft_seconds=ttft_seconds, provider=str(model_provider or ""), model=str(model_name or "") + ) + + # Record time to generate + if trace_info.llm_streaming_time_to_generate is not None and hasattr( + self.trace_client, "record_time_to_generate" + ): + ttg_seconds = float(trace_info.llm_streaming_time_to_generate) + if ttg_seconds > 0: + self.trace_client.record_time_to_generate( + ttg_seconds=ttg_seconds, provider=str(model_provider or ""), model=str(model_name or "") + ) + + # Record token usage + if hasattr(self.trace_client, "record_token_usage"): + input_tokens = int(trace_info.message_tokens or 0) + output_tokens = int(trace_info.answer_tokens or 0) + + if input_tokens > 0: + self.trace_client.record_token_usage( + token_count=input_tokens, + token_type="input", + operation_name="chat", + request_model=str(model_name or ""), + response_model=str(model_name or ""), + server_address=str(model_provider or ""), + provider=str(model_provider or ""), + ) + + if output_tokens > 0: + self.trace_client.record_token_usage( + token_count=output_tokens, + token_type="output", + operation_name="chat", + request_model=str(model_name or ""), + response_model=str(model_name or ""), + server_address=str(model_provider or ""), + provider=str(model_provider or ""), + ) + + except Exception: + logger.debug("[Tencent APM] Failed to record message LLM metrics") + + def _record_workflow_trace_duration(self, trace_info: WorkflowTraceInfo) -> None: + """Record end-to-end workflow trace duration.""" + try: + if not hasattr(self.trace_client, "record_trace_duration"): + return + + # Calculate duration from start_time and end_time to match span duration + if trace_info.start_time and trace_info.end_time: + duration_s = (trace_info.end_time - trace_info.start_time).total_seconds() + else: + # Fallback to workflow_run_elapsed_time if timestamps not available + duration_s = float(trace_info.workflow_run_elapsed_time) + + if duration_s > 0: + attributes = { + "conversation_mode": "workflow", + "workflow_status": trace_info.workflow_run_status, + } + + # Add conversation_id if available + if trace_info.conversation_id: + attributes["has_conversation"] = "true" + else: + attributes["has_conversation"] = "false" + + self.trace_client.record_trace_duration(duration_s, attributes) + + except Exception: + logger.debug("[Tencent APM] Failed to record workflow trace duration") + + def _record_message_trace_duration(self, trace_info: MessageTraceInfo) -> None: + """Record end-to-end message trace duration.""" + try: + if not hasattr(self.trace_client, "record_trace_duration"): + return + + # Calculate duration from start_time and end_time + if trace_info.start_time and trace_info.end_time: + duration = (trace_info.end_time - trace_info.start_time).total_seconds() + + if duration > 0: + attributes = { + "conversation_mode": trace_info.conversation_mode, + } + + # Add streaming flag if available + if hasattr(trace_info, "is_streaming_request"): + attributes["stream"] = "true" if trace_info.is_streaming_request else "false" + + self.trace_client.record_trace_duration(duration, attributes) + + except Exception: + logger.debug("[Tencent APM] Failed to record message trace duration") + def __del__(self): """Ensure proper cleanup on garbage collection.""" try: diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index 1644f683bf..06c9beaed2 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -3,6 +3,7 @@ import io import json import logging import re +import time from collections.abc import Generator, Mapping, Sequence from typing import TYPE_CHECKING, Any, Literal @@ -384,6 +385,8 @@ class LLMNode(Node): output_schema = LLMNode.fetch_structured_output_schema( structured_output=structured_output or {}, ) + request_start_time = time.perf_counter() + invoke_result = invoke_llm_with_structured_output( provider=model_instance.provider, model_schema=model_schema, @@ -396,6 +399,8 @@ class LLMNode(Node): user=user_id, ) else: + request_start_time = time.perf_counter() + invoke_result = model_instance.invoke_llm( prompt_messages=list(prompt_messages), model_parameters=node_data_model.completion_params, @@ -411,6 +416,7 @@ class LLMNode(Node): node_id=node_id, node_type=node_type, reasoning_format=reasoning_format, + request_start_time=request_start_time, ) @staticmethod @@ -422,14 +428,20 @@ class LLMNode(Node): node_id: str, node_type: NodeType, reasoning_format: Literal["separated", "tagged"] = "tagged", + request_start_time: float | None = None, ) -> Generator[NodeEventBase | LLMStructuredOutput, None, None]: # For blocking mode if isinstance(invoke_result, LLMResult): + duration = None + if request_start_time is not None: + duration = time.perf_counter() - request_start_time + invoke_result.usage.latency = round(duration, 3) event = LLMNode.handle_blocking_result( invoke_result=invoke_result, saver=file_saver, file_outputs=file_outputs, reasoning_format=reasoning_format, + request_latency=duration, ) yield event return @@ -441,6 +453,12 @@ class LLMNode(Node): usage = LLMUsage.empty_usage() finish_reason = None full_text_buffer = io.StringIO() + + # Initialize streaming metrics tracking + start_time = request_start_time if request_start_time is not None else time.perf_counter() + first_token_time = None + has_content = False + collected_structured_output = None # Collect structured_output from streaming chunks # Consume the invoke result and handle generator exception try: @@ -457,6 +475,11 @@ class LLMNode(Node): file_saver=file_saver, file_outputs=file_outputs, ): + # Detect first token for TTFT calculation + if text_part and not has_content: + first_token_time = time.perf_counter() + has_content = True + full_text_buffer.write(text_part) yield StreamChunkEvent( selector=[node_id, "text"], @@ -489,6 +512,16 @@ class LLMNode(Node): # Extract clean text and reasoning from tags clean_text, reasoning_content = LLMNode._split_reasoning(full_text, reasoning_format) + # Calculate streaming metrics + end_time = time.perf_counter() + total_duration = end_time - start_time + usage.latency = round(total_duration, 3) + if has_content and first_token_time: + gen_ai_server_time_to_first_token = first_token_time - start_time + llm_streaming_time_to_generate = end_time - first_token_time + usage.time_to_first_token = round(gen_ai_server_time_to_first_token, 3) + usage.time_to_generate = round(llm_streaming_time_to_generate, 3) + yield ModelInvokeCompletedEvent( # Use clean_text for separated mode, full_text for tagged mode text=clean_text if reasoning_format == "separated" else full_text, @@ -1068,6 +1101,7 @@ class LLMNode(Node): saver: LLMFileSaver, file_outputs: list["File"], reasoning_format: Literal["separated", "tagged"] = "tagged", + request_latency: float | None = None, ) -> ModelInvokeCompletedEvent: buffer = io.StringIO() for text_part in LLMNode._save_multimodal_output_and_convert_result_to_markdown( @@ -1088,7 +1122,7 @@ class LLMNode(Node): # Extract clean text and reasoning from tags clean_text, reasoning_content = LLMNode._split_reasoning(full_text, reasoning_format) - return ModelInvokeCompletedEvent( + event = ModelInvokeCompletedEvent( # Use clean_text for separated mode, full_text for tagged mode text=clean_text if reasoning_format == "separated" else full_text, usage=invoke_result.usage, @@ -1098,6 +1132,9 @@ class LLMNode(Node): # Pass structured output if enabled structured_output=getattr(invoke_result, "structured_output", None), ) + if request_latency is not None: + event.usage.latency = round(request_latency, 3) + return event @staticmethod def save_multimodal_image_output( diff --git a/api/uv.lock b/api/uv.lock index 7cf1e047de..9bbe392207 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11, <3.13" resolution-markers = [ "python_full_version >= '3.12.4' and platform_python_implementation != 'PyPy' and sys_platform == 'linux'", diff --git a/web/app/components/base/icons/assets/public/tracing/tencent-icon-big.svg b/web/app/components/base/icons/assets/public/tracing/tencent-icon-big.svg new file mode 100644 index 0000000000..b38316f3b6 --- /dev/null +++ b/web/app/components/base/icons/assets/public/tracing/tencent-icon-big.svg @@ -0,0 +1,23 @@ + + + logo + + + + diff --git a/web/app/components/base/icons/assets/public/tracing/tencent-icon.svg b/web/app/components/base/icons/assets/public/tracing/tencent-icon.svg new file mode 100644 index 0000000000..53347bf23c --- /dev/null +++ b/web/app/components/base/icons/assets/public/tracing/tencent-icon.svg @@ -0,0 +1,23 @@ + + + logo + + + + \ No newline at end of file