feat: add tracing metadata

This commit is contained in:
Novice 2025-12-12 16:24:49 +08:00
parent 9ce48b4dc4
commit 13fa56b5b1
No known key found for this signature in database
GPG Key ID: EE3F68E3105DAAAB
3 changed files with 76 additions and 0 deletions

View File

@ -248,6 +248,7 @@ class WorkflowNodeExecutionMetadataKey(StrEnum):
LOOP_VARIABLE_MAP = "loop_variable_map" # single loop variable output
DATASOURCE_INFO = "datasource_info"
LLM_CONTENT_SEQUENCE = "llm_content_sequence"
LLM_TRACE = "llm_trace"
class WorkflowNodeExecutionStatus(StrEnum):

View File

@ -19,6 +19,31 @@ class ModelConfig(BaseModel):
completion_params: dict[str, Any] = Field(default_factory=dict)
class LLMTraceSegment(BaseModel):
"""
Streaming trace segment for LLM tool-enabled runs.
We keep order as-is to allow direct replay: thought/content/tool_call/tool_result appear
exactly in the sequence they were emitted.
"""
type: Literal["thought", "content", "tool_call", "tool_result"]
turn: int = Field(0, description="0-based turn index, increments after each tool_result")
# Common optional fields
text: str | None = Field(None, description="Text chunk for thought/content")
# Tool call fields
tool_call_id: str | None = None
tool_name: str | None = None
tool_arguments: str | None = None
# Tool result fields
tool_output: str | None = None
tool_error: str | None = None
files: list[str] = Field(default_factory=list, description="File IDs from tool result if any")
class LLMGenerationData(BaseModel):
"""Generation data from LLM invocation with tools.
@ -33,6 +58,7 @@ class LLMGenerationData(BaseModel):
usage: LLMUsage = Field(..., description="LLM usage statistics")
finish_reason: str | None = Field(None, description="Finish reason from LLM")
files: list[File] = Field(default_factory=list, description="Generated files")
trace: list[LLMTraceSegment] = Field(default_factory=list, description="Streaming trace in emitted order")
class ContextConfig(BaseModel):

View File

@ -87,6 +87,7 @@ from .entities import (
LLMNodeChatModelMessage,
LLMNodeCompletionModelPromptTemplate,
LLMNodeData,
LLMTraceSegment,
ModelConfig,
)
from .exc import (
@ -437,6 +438,11 @@ class LLMNode(Node[LLMNodeData]):
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
WorkflowNodeExecutionMetadataKey.LLM_TRACE: [
segment.model_dump() for segment in generation_data.trace
]
if generation_data
else [],
},
llm_usage=usage,
)
@ -1536,6 +1542,9 @@ class LLMNode(Node[LLMNodeData]):
# Track reasoning per turn: each tool_call completion marks end of a turn
current_turn_reasoning: list[str] = [] # Buffer for current turn's thought chunks
reasoning_per_turn: list[str] = [] # Final list: one element per turn
tool_call_index_map: dict[str, int] = {} # tool_call_id -> index
trace_segments: list[LLMTraceSegment] = [] # Ordered trace for replay
current_turn = 0
# Process each output from strategy
try:
@ -1572,6 +1581,20 @@ class LLMNode(Node[LLMNodeData]):
tool_args = output.data.get("tool_args", {})
tool_arguments = json.dumps(tool_args) if tool_args else ""
if tool_call_id and tool_call_id not in tool_call_index_map:
tool_call_index_map[tool_call_id] = len(tool_call_index_map)
trace_segments.append(
LLMTraceSegment(
type="tool_call",
turn=current_turn,
tool_call_id=tool_call_id,
tool_name=tool_name,
tool_arguments=tool_arguments,
text=None,
)
)
yield ToolCallChunkEvent(
selector=[self._node_id, "generation", "tool_calls"],
chunk=tool_arguments,
@ -1589,6 +1612,9 @@ class LLMNode(Node[LLMNodeData]):
tool_files = []
tool_error = None
if tool_call_id and tool_call_id not in tool_call_index_map:
tool_call_index_map[tool_call_id] = len(tool_call_index_map)
# Extract file IDs if present (only for success case)
files_data = output.data.get("files")
if files_data and isinstance(files_data, list):
@ -1607,6 +1633,20 @@ class LLMNode(Node[LLMNodeData]):
if meta and isinstance(meta, dict) and meta.get("error"):
tool_error = meta.get("error")
trace_segments.append(
LLMTraceSegment(
type="tool_result",
turn=current_turn,
tool_call_id=tool_call_id,
tool_name=tool_name,
tool_output=str(tool_output) if tool_output is not None else None,
tool_error=str(tool_error) if tool_error is not None else None,
files=[str(f) for f in tool_files] if tool_files else [],
text=None,
)
)
current_turn += 1
yield ToolResultChunkEvent(
selector=[self._node_id, "generation", "tool_results"],
chunk=str(tool_output) if tool_output else "",
@ -1640,6 +1680,7 @@ class LLMNode(Node[LLMNodeData]):
if kind == "thought":
current_turn_reasoning.append(segment)
trace_segments.append(LLMTraceSegment(type="thought", turn=current_turn, text=segment))
yield ThoughtChunkEvent(
selector=[self._node_id, "generation", "thought"],
chunk=segment,
@ -1647,6 +1688,7 @@ class LLMNode(Node[LLMNodeData]):
)
else:
text += segment
trace_segments.append(LLMTraceSegment(type="content", turn=current_turn, text=segment))
yield StreamChunkEvent(
selector=[self._node_id, "text"],
chunk=segment,
@ -1685,6 +1727,7 @@ class LLMNode(Node[LLMNodeData]):
continue
if kind == "thought":
current_turn_reasoning.append(segment)
trace_segments.append(LLMTraceSegment(type="thought", turn=current_turn, text=segment))
yield ThoughtChunkEvent(
selector=[self._node_id, "generation", "thought"],
chunk=segment,
@ -1692,6 +1735,7 @@ class LLMNode(Node[LLMNodeData]):
)
else:
text += segment
trace_segments.append(LLMTraceSegment(type="content", turn=current_turn, text=segment))
yield StreamChunkEvent(
selector=[self._node_id, "text"],
chunk=segment,
@ -1765,6 +1809,10 @@ class LLMNode(Node[LLMNodeData]):
}
)
tool_calls_for_generation.sort(
key=lambda item: tool_call_index_map.get(item.get("id", ""), len(tool_call_index_map))
)
# Return generation data for caller
return LLMGenerationData(
text=text,
@ -1773,6 +1821,7 @@ class LLMNode(Node[LLMNodeData]):
usage=usage,
finish_reason=finish_reason,
files=files,
trace=trace_segments,
)
def _accumulate_usage(self, total_usage: LLMUsage, delta_usage: LLMUsage) -> None: