From 9941d1f1609cd1a6f6a2a02ff541534eb0ca8e5b Mon Sep 17 00:00:00 2001 From: Novice Date: Mon, 15 Dec 2025 14:18:53 +0800 Subject: [PATCH] feat: add llm log metadata --- api/core/workflow/nodes/llm/entities.py | 11 +-- api/core/workflow/nodes/llm/node.py | 78 +++++++++++++------ .../graph_engine/test_response_coordinator.py | 1 + 3 files changed, 60 insertions(+), 30 deletions(-) diff --git a/api/core/workflow/nodes/llm/entities.py b/api/core/workflow/nodes/llm/entities.py index 8b4fa11dcf..068ea5ecf0 100644 --- a/api/core/workflow/nodes/llm/entities.py +++ b/api/core/workflow/nodes/llm/entities.py @@ -23,22 +23,19 @@ class LLMTraceSegment(BaseModel): """ Streaming trace segment for LLM tool-enabled runs. - We keep order as-is to allow direct replay: thought/content/tool_call/tool_result appear - exactly in the sequence they were emitted. + Order is preserved for replay. Tool calls are single entries containing both + arguments and results. """ - type: Literal["thought", "content", "tool_call", "tool_result"] - turn: int = Field(0, description="0-based turn index, increments after each tool_result") + type: Literal["thought", "content", "tool_call"] # Common optional fields text: str | None = Field(None, description="Text chunk for thought/content") - # Tool call fields + # Tool call fields (combined start + result) tool_call_id: str | None = None tool_name: str | None = None tool_arguments: str | None = None - - # Tool result fields tool_output: str | None = None tool_error: str | None = None files: list[str] = Field(default_factory=list, description="File IDs from tool result if any") diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index 21e8c61325..5f4d938773 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -1544,7 +1544,22 @@ class LLMNode(Node[LLMNodeData]): reasoning_per_turn: list[str] = [] # Final list: one element per turn tool_call_index_map: dict[str, int] = {} # tool_call_id -> index trace_segments: list[LLMTraceSegment] = [] # Ordered trace for replay + tool_trace_map: dict[str, LLMTraceSegment] = {} current_turn = 0 + pending_thought: list[str] = [] + pending_content: list[str] = [] + + def _flush_thought() -> None: + if not pending_thought: + return + trace_segments.append(LLMTraceSegment(type="thought", text="".join(pending_thought))) + pending_thought.clear() + + def _flush_content() -> None: + if not pending_content: + return + trace_segments.append(LLMTraceSegment(type="content", text="".join(pending_content))) + pending_content.clear() # Process each output from strategy try: @@ -1584,16 +1599,19 @@ class LLMNode(Node[LLMNodeData]): if tool_call_id and tool_call_id not in tool_call_index_map: tool_call_index_map[tool_call_id] = len(tool_call_index_map) - trace_segments.append( - LLMTraceSegment( - type="tool_call", - turn=current_turn, - tool_call_id=tool_call_id, - tool_name=tool_name, - tool_arguments=tool_arguments, - text=None, - ) + _flush_thought() + _flush_content() + + tool_call_segment = LLMTraceSegment( + type="tool_call", + text=None, + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_arguments=tool_arguments, ) + trace_segments.append(tool_call_segment) + if tool_call_id: + tool_trace_map[tool_call_id] = tool_call_segment yield ToolCallChunkEvent( selector=[self._node_id, "generation", "tool_calls"], @@ -1615,6 +1633,9 @@ class LLMNode(Node[LLMNodeData]): if tool_call_id and tool_call_id not in tool_call_index_map: tool_call_index_map[tool_call_id] = len(tool_call_index_map) + _flush_thought() + _flush_content() + # Extract file IDs if present (only for success case) files_data = output.data.get("files") if files_data and isinstance(files_data, list): @@ -1633,18 +1654,22 @@ class LLMNode(Node[LLMNodeData]): if meta and isinstance(meta, dict) and meta.get("error"): tool_error = meta.get("error") - trace_segments.append( - LLMTraceSegment( - type="tool_result", - turn=current_turn, + tool_call_segment = tool_trace_map.get(tool_call_id) + if tool_call_segment is None: + tool_call_segment = LLMTraceSegment( + type="tool_call", + text=None, tool_call_id=tool_call_id, tool_name=tool_name, - tool_output=str(tool_output) if tool_output is not None else None, - tool_error=str(tool_error) if tool_error is not None else None, - files=[str(f) for f in tool_files] if tool_files else [], - text=None, + tool_arguments=None, ) - ) + trace_segments.append(tool_call_segment) + if tool_call_id: + tool_trace_map[tool_call_id] = tool_call_segment + + tool_call_segment.tool_output = str(tool_output) if tool_output is not None else None + tool_call_segment.tool_error = str(tool_error) if tool_error is not None else None + tool_call_segment.files = [str(f) for f in tool_files] if tool_files else [] current_turn += 1 yield ToolResultChunkEvent( @@ -1679,16 +1704,18 @@ class LLMNode(Node[LLMNodeData]): continue if kind == "thought": + _flush_content() current_turn_reasoning.append(segment) - trace_segments.append(LLMTraceSegment(type="thought", turn=current_turn, text=segment)) + pending_thought.append(segment) yield ThoughtChunkEvent( selector=[self._node_id, "generation", "thought"], chunk=segment, is_final=False, ) else: + _flush_thought() text += segment - trace_segments.append(LLMTraceSegment(type="content", turn=current_turn, text=segment)) + pending_content.append(segment) yield StreamChunkEvent( selector=[self._node_id, "text"], chunk=segment, @@ -1726,16 +1753,18 @@ class LLMNode(Node[LLMNodeData]): if not segment: continue if kind == "thought": + _flush_content() current_turn_reasoning.append(segment) - trace_segments.append(LLMTraceSegment(type="thought", turn=current_turn, text=segment)) + pending_thought.append(segment) yield ThoughtChunkEvent( selector=[self._node_id, "generation", "thought"], chunk=segment, is_final=False, ) else: + _flush_thought() text += segment - trace_segments.append(LLMTraceSegment(type="content", turn=current_turn, text=segment)) + pending_content.append(segment) yield StreamChunkEvent( selector=[self._node_id, "text"], chunk=segment, @@ -1751,6 +1780,9 @@ class LLMNode(Node[LLMNodeData]): if current_turn_reasoning: reasoning_per_turn.append("".join(current_turn_reasoning)) + _flush_thought() + _flush_content() + # Send final events for all streams yield StreamChunkEvent( selector=[self._node_id, "text"], @@ -1816,7 +1848,7 @@ class LLMNode(Node[LLMNodeData]): # Return generation data for caller return LLMGenerationData( text=text, - reasoning_contents=reasoning_per_turn, # Multi-turn: [thought1, thought2, ...] + reasoning_contents=reasoning_per_turn, tool_calls=tool_calls_for_generation, usage=usage, finish_reason=finish_reason, diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_response_coordinator.py b/api/tests/unit_tests/core/workflow/graph_engine/test_response_coordinator.py index 388496ce1d..5df6bba748 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_response_coordinator.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_response_coordinator.py @@ -167,3 +167,4 @@ class TestResponseCoordinatorObjectStreaming: assert ("node1", "generation", "content") in children assert ("node1", "generation", "tool_calls") in children assert ("node1", "generation", "thought") in children +