From 13fa56b5b1bb522aeda2aa7629ef2882f1d9c43c Mon Sep 17 00:00:00 2001 From: Novice Date: Fri, 12 Dec 2025 16:24:49 +0800 Subject: [PATCH] feat: add tracing metadata --- api/core/workflow/enums.py | 1 + api/core/workflow/nodes/llm/entities.py | 26 +++++++++++++ api/core/workflow/nodes/llm/node.py | 49 +++++++++++++++++++++++++ 3 files changed, 76 insertions(+) diff --git a/api/core/workflow/enums.py b/api/core/workflow/enums.py index 3a60d34691..32c6b2d6e5 100644 --- a/api/core/workflow/enums.py +++ b/api/core/workflow/enums.py @@ -248,6 +248,7 @@ class WorkflowNodeExecutionMetadataKey(StrEnum): LOOP_VARIABLE_MAP = "loop_variable_map" # single loop variable output DATASOURCE_INFO = "datasource_info" LLM_CONTENT_SEQUENCE = "llm_content_sequence" + LLM_TRACE = "llm_trace" class WorkflowNodeExecutionStatus(StrEnum): diff --git a/api/core/workflow/nodes/llm/entities.py b/api/core/workflow/nodes/llm/entities.py index 2003820d80..8b4fa11dcf 100644 --- a/api/core/workflow/nodes/llm/entities.py +++ b/api/core/workflow/nodes/llm/entities.py @@ -19,6 +19,31 @@ class ModelConfig(BaseModel): completion_params: dict[str, Any] = Field(default_factory=dict) +class LLMTraceSegment(BaseModel): + """ + Streaming trace segment for LLM tool-enabled runs. + + We keep order as-is to allow direct replay: thought/content/tool_call/tool_result appear + exactly in the sequence they were emitted. + """ + + type: Literal["thought", "content", "tool_call", "tool_result"] + turn: int = Field(0, description="0-based turn index, increments after each tool_result") + + # Common optional fields + text: str | None = Field(None, description="Text chunk for thought/content") + + # Tool call fields + tool_call_id: str | None = None + tool_name: str | None = None + tool_arguments: str | None = None + + # Tool result fields + tool_output: str | None = None + tool_error: str | None = None + files: list[str] = Field(default_factory=list, description="File IDs from tool result if any") + + class LLMGenerationData(BaseModel): """Generation data from LLM invocation with tools. @@ -33,6 +58,7 @@ class LLMGenerationData(BaseModel): usage: LLMUsage = Field(..., description="LLM usage statistics") finish_reason: str | None = Field(None, description="Finish reason from LLM") files: list[File] = Field(default_factory=list, description="Generated files") + trace: list[LLMTraceSegment] = Field(default_factory=list, description="Streaming trace in emitted order") class ContextConfig(BaseModel): diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index fe105c2ddb..21e8c61325 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -87,6 +87,7 @@ from .entities import ( LLMNodeChatModelMessage, LLMNodeCompletionModelPromptTemplate, LLMNodeData, + LLMTraceSegment, ModelConfig, ) from .exc import ( @@ -437,6 +438,11 @@ class LLMNode(Node[LLMNodeData]): WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens, WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price, WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency, + WorkflowNodeExecutionMetadataKey.LLM_TRACE: [ + segment.model_dump() for segment in generation_data.trace + ] + if generation_data + else [], }, llm_usage=usage, ) @@ -1536,6 +1542,9 @@ class LLMNode(Node[LLMNodeData]): # Track reasoning per turn: each tool_call completion marks end of a turn current_turn_reasoning: list[str] = [] # Buffer for current turn's thought chunks reasoning_per_turn: list[str] = [] # Final list: one element per turn + tool_call_index_map: dict[str, int] = {} # tool_call_id -> index + trace_segments: list[LLMTraceSegment] = [] # Ordered trace for replay + current_turn = 0 # Process each output from strategy try: @@ -1572,6 +1581,20 @@ class LLMNode(Node[LLMNodeData]): tool_args = output.data.get("tool_args", {}) tool_arguments = json.dumps(tool_args) if tool_args else "" + if tool_call_id and tool_call_id not in tool_call_index_map: + tool_call_index_map[tool_call_id] = len(tool_call_index_map) + + trace_segments.append( + LLMTraceSegment( + type="tool_call", + turn=current_turn, + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_arguments=tool_arguments, + text=None, + ) + ) + yield ToolCallChunkEvent( selector=[self._node_id, "generation", "tool_calls"], chunk=tool_arguments, @@ -1589,6 +1612,9 @@ class LLMNode(Node[LLMNodeData]): tool_files = [] tool_error = None + if tool_call_id and tool_call_id not in tool_call_index_map: + tool_call_index_map[tool_call_id] = len(tool_call_index_map) + # Extract file IDs if present (only for success case) files_data = output.data.get("files") if files_data and isinstance(files_data, list): @@ -1607,6 +1633,20 @@ class LLMNode(Node[LLMNodeData]): if meta and isinstance(meta, dict) and meta.get("error"): tool_error = meta.get("error") + trace_segments.append( + LLMTraceSegment( + type="tool_result", + turn=current_turn, + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_output=str(tool_output) if tool_output is not None else None, + tool_error=str(tool_error) if tool_error is not None else None, + files=[str(f) for f in tool_files] if tool_files else [], + text=None, + ) + ) + current_turn += 1 + yield ToolResultChunkEvent( selector=[self._node_id, "generation", "tool_results"], chunk=str(tool_output) if tool_output else "", @@ -1640,6 +1680,7 @@ class LLMNode(Node[LLMNodeData]): if kind == "thought": current_turn_reasoning.append(segment) + trace_segments.append(LLMTraceSegment(type="thought", turn=current_turn, text=segment)) yield ThoughtChunkEvent( selector=[self._node_id, "generation", "thought"], chunk=segment, @@ -1647,6 +1688,7 @@ class LLMNode(Node[LLMNodeData]): ) else: text += segment + trace_segments.append(LLMTraceSegment(type="content", turn=current_turn, text=segment)) yield StreamChunkEvent( selector=[self._node_id, "text"], chunk=segment, @@ -1685,6 +1727,7 @@ class LLMNode(Node[LLMNodeData]): continue if kind == "thought": current_turn_reasoning.append(segment) + trace_segments.append(LLMTraceSegment(type="thought", turn=current_turn, text=segment)) yield ThoughtChunkEvent( selector=[self._node_id, "generation", "thought"], chunk=segment, @@ -1692,6 +1735,7 @@ class LLMNode(Node[LLMNodeData]): ) else: text += segment + trace_segments.append(LLMTraceSegment(type="content", turn=current_turn, text=segment)) yield StreamChunkEvent( selector=[self._node_id, "text"], chunk=segment, @@ -1765,6 +1809,10 @@ class LLMNode(Node[LLMNodeData]): } ) + tool_calls_for_generation.sort( + key=lambda item: tool_call_index_map.get(item.get("id", ""), len(tool_call_index_map)) + ) + # Return generation data for caller return LLMGenerationData( text=text, @@ -1773,6 +1821,7 @@ class LLMNode(Node[LLMNodeData]): usage=usage, finish_reason=finish_reason, files=files, + trace=trace_segments, ) def _accumulate_usage(self, total_usage: LLMUsage, delta_usage: LLMUsage) -> None: