feat: add tool call time

This commit is contained in:
Novice 2026-01-04 10:29:02 +08:00
parent 152fd52cd7
commit f3e7fea628
No known key found for this signature in database
GPG Key ID: EE3F68E3105DAAAB
9 changed files with 104 additions and 31 deletions

View File

@ -200,7 +200,8 @@ class AgentPattern(ABC):
log.metadata = {
**log.metadata,
AgentLog.LogMetadata.FINISHED_AT: finished_at,
AgentLog.LogMetadata.ELAPSED_TIME: finished_at - started_at,
# Calculate elapsed time in seconds
AgentLog.LogMetadata.ELAPSED_TIME: round(finished_at - started_at, 4),
}
# Add usage information if provided

View File

@ -160,6 +160,7 @@ class StreamEventBuffer:
"name": tool_name or "",
"arguments": tool_arguments or "",
"result": "",
"elapsed_time": None,
}
self.tool_calls.append(tool_call)
idx = len(self.tool_calls) - 1
@ -168,13 +169,14 @@ class StreamEventBuffer:
self._last_event_type = "tool_call"
def record_tool_result(self, tool_call_id: str, result: str) -> None:
def record_tool_result(self, tool_call_id: str, result: str, tool_elapsed_time: float | None = None) -> None:
"""Record a tool result event (update existing tool call)."""
if not tool_call_id:
return
if tool_call_id in self._tool_call_id_map:
idx = self._tool_call_id_map[tool_call_id]
self.tool_calls[idx]["result"] = result
self.tool_calls[idx]["elapsed_time"] = tool_elapsed_time
def finalize(self) -> None:
"""Finalize the buffer, flushing any pending data."""
@ -523,7 +525,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
tool_name = tool_payload.name if tool_payload and tool_payload.name else ""
tool_arguments = tool_call.arguments if tool_call and tool_call.arguments else ""
tool_files = tool_result.files if tool_result else []
tool_elapsed_time = tool_result.elapsed_time if tool_result else None
# Record stream event based on chunk type
chunk_type = event.chunk_type or ChunkType.TEXT
match chunk_type:
@ -543,6 +545,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
self._stream_buffer.record_tool_result(
tool_call_id=tool_call_id,
result=delta_text,
tool_elapsed_time=tool_elapsed_time,
)
self._task_state.answer += delta_text
@ -555,6 +558,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
tool_name=tool_name or None,
tool_arguments=tool_arguments or None,
tool_files=tool_files,
tool_elapsed_time=tool_elapsed_time,
)
def _handle_iteration_start_event(

View File

@ -490,6 +490,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
tool_call_id = tool_payload.id if tool_payload and tool_payload.id else None
tool_name = tool_payload.name if tool_payload and tool_payload.name else None
tool_arguments = tool_call.arguments if tool_call else None
tool_elapsed_time = tool_result.elapsed_time if tool_result else None
tool_files = tool_result.files if tool_result else []
# only publish tts message at text chunk streaming
@ -504,6 +505,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
tool_name=tool_name,
tool_arguments=tool_arguments,
tool_files=tool_files,
tool_elapsed_time=tool_elapsed_time,
)
def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]:
@ -676,6 +678,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
tool_arguments: str | None = None,
tool_files: list[str] | None = None,
tool_error: str | None = None,
tool_elapsed_time: float | None = None,
) -> TextChunkStreamResponse:
"""
Handle completed event.
@ -684,18 +687,37 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
"""
from core.app.entities.task_entities import ChunkType as ResponseChunkType
response_chunk_type = ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT
data = TextChunkStreamResponse.Data(
text=text,
from_variable_selector=from_variable_selector,
chunk_type=response_chunk_type,
)
if response_chunk_type == ResponseChunkType.TOOL_CALL:
data = data.model_copy(
update={
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"tool_arguments": tool_arguments,
}
)
elif response_chunk_type == ResponseChunkType.TOOL_RESULT:
data = data.model_copy(
update={
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"tool_arguments": tool_arguments,
"tool_files": tool_files,
"tool_error": tool_error,
"tool_elapsed_time": tool_elapsed_time,
}
)
response = TextChunkStreamResponse(
task_id=self._application_generate_entity.task_id,
data=TextChunkStreamResponse.Data(
text=text,
from_variable_selector=from_variable_selector,
chunk_type=ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT,
tool_call_id=tool_call_id,
tool_name=tool_name,
tool_arguments=tool_arguments,
tool_files=tool_files or [],
tool_error=tool_error,
),
data=data,
)
return response

View File

@ -42,6 +42,7 @@ class ToolCallDetail(BaseModel):
name: str = Field(..., description="Name of the tool")
arguments: str = Field(default="", description="JSON string of tool arguments")
result: str = Field(default="", description="Result from the tool execution")
elapsed_time: float | None = Field(default=None, description="Elapsed time in seconds")
class LLMGenerationDetailData(BaseModel):

View File

@ -130,6 +130,16 @@ class MessageStreamResponse(StreamResponse):
"""file IDs produced by tool"""
tool_error: str | None = None
"""error message if tool failed"""
tool_elapsed_time: float | None = None
"""elapsed time spent executing the tool"""
def model_dump(self, *args, **kwargs) -> dict[str, object]:
kwargs.setdefault("exclude_none", True)
return super().model_dump(*args, **kwargs)
def model_dump_json(self, *args, **kwargs) -> str:
kwargs.setdefault("exclude_none", True)
return super().model_dump_json(*args, **kwargs)
class MessageAudioStreamResponse(StreamResponse):
@ -635,11 +645,23 @@ class TextChunkStreamResponse(StreamResponse):
"""accumulated tool arguments JSON"""
# Tool result fields (when chunk_type == TOOL_RESULT)
tool_files: list[str] = Field(default_factory=list)
tool_files: list[str] | None = None
"""file IDs produced by tool"""
tool_error: str | None = None
"""error message if tool failed"""
# Tool elapsed time fields (when chunk_type == TOOL_RESULT)
tool_elapsed_time: float | None = None
"""elapsed time spent executing the tool"""
def model_dump(self, *args, **kwargs) -> dict[str, object]:
kwargs.setdefault("exclude_none", True)
return super().model_dump(*args, **kwargs)
def model_dump_json(self, *args, **kwargs) -> str:
kwargs.setdefault("exclude_none", True)
return super().model_dump_json(*args, **kwargs)
event: StreamEvent = StreamEvent.TEXT_CHUNK
data: Data

View File

@ -238,6 +238,7 @@ class MessageCycleManager:
tool_arguments: str | None = None,
tool_files: list[str] | None = None,
tool_error: str | None = None,
tool_elapsed_time: float | None = None,
event_type: StreamEvent | None = None,
) -> MessageStreamResponse:
"""
@ -253,20 +254,39 @@ class MessageCycleManager:
:param tool_error: error message if tool failed
:return:
"""
return MessageStreamResponse(
response = MessageStreamResponse(
task_id=self._application_generate_entity.task_id,
id=message_id,
answer=answer,
from_variable_selector=from_variable_selector,
chunk_type=chunk_type,
tool_call_id=tool_call_id,
tool_name=tool_name,
tool_arguments=tool_arguments,
tool_files=tool_files,
tool_error=tool_error,
event=event_type or StreamEvent.MESSAGE,
)
if chunk_type:
response = response.model_copy(update={"chunk_type": chunk_type})
if chunk_type == "tool_call":
response = response.model_copy(
update={
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"tool_arguments": tool_arguments,
}
)
elif chunk_type == "tool_result":
response = response.model_copy(
update={
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"tool_arguments": tool_arguments,
"tool_files": tool_files,
"tool_error": tool_error,
"tool_elapsed_time": tool_elapsed_time,
}
)
return response
def message_replace_to_stream_response(self, answer: str, reason: str = "") -> MessageReplaceStreamResponse:
"""
Message replace to stream response.

View File

@ -22,6 +22,7 @@ class ToolResult(BaseModel):
output: str | None = Field(default=None, description="Tool output text, error or success message")
files: list[str] = Field(default_factory=list, description="File produced by tool")
status: ToolResultStatus | None = Field(default=ToolResultStatus.SUCCESS, description="Tool execution status")
elapsed_time: float | None = Field(default=None, description="Elapsed seconds spent executing the tool")
class ToolCallResult(BaseModel):
@ -31,3 +32,4 @@ class ToolCallResult(BaseModel):
output: str | None = Field(default=None, description="Tool output text, error or success message")
files: list[File] = Field(default_factory=list, description="File produced by tool")
status: ToolResultStatus = Field(default=ToolResultStatus.SUCCESS, description="Tool execution status")
elapsed_time: float | None = Field(default=None, description="Elapsed seconds spent executing the tool")

View File

@ -580,9 +580,10 @@ class Node(Generic[NodeDataT]):
from core.workflow.entities import ToolResult, ToolResultStatus
from core.workflow.graph_events import ChunkType
tool_result = event.tool_result
status: ToolResultStatus = (
tool_result.status if tool_result and tool_result.status is not None else ToolResultStatus.SUCCESS
tool_result = event.tool_result or ToolResult()
status: ToolResultStatus = tool_result.status or ToolResultStatus.SUCCESS
tool_result = tool_result.model_copy(
update={"status": status, "files": tool_result.files or []},
)
return NodeRunStreamChunkEvent(
@ -593,13 +594,7 @@ class Node(Generic[NodeDataT]):
chunk=event.chunk,
is_final=event.is_final,
chunk_type=ChunkType.TOOL_RESULT,
tool_result=ToolResult(
id=tool_result.id if tool_result else None,
name=tool_result.name if tool_result else None,
output=tool_result.output if tool_result else None,
files=tool_result.files if tool_result else [],
status=status,
),
tool_result=tool_result,
)
@_dispatch.register

View File

@ -1649,6 +1649,7 @@ class LLMNode(Node[LLMNodeData]):
"output": tool_call.output,
"files": files,
"status": tool_call.status.value if hasattr(tool_call.status, "value") else tool_call.status,
"elapsed_time": tool_call.elapsed_time,
}
def _flush_thought_segment(self, buffers: StreamBuffers, trace_state: TraceState) -> None:
@ -1707,6 +1708,7 @@ class LLMNode(Node[LLMNodeData]):
id=tool_call_id,
name=tool_name,
arguments=tool_arguments,
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
),
)
trace_state.trace_segments.append(tool_call_segment)
@ -1755,6 +1757,7 @@ class LLMNode(Node[LLMNodeData]):
id=tool_call_id,
name=tool_name,
arguments=None,
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
),
)
if existing_tool_segment is None:
@ -1767,6 +1770,7 @@ class LLMNode(Node[LLMNodeData]):
id=tool_call_id,
name=tool_name,
arguments=None,
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
)
tool_call_segment.tool_call.output = (
str(tool_output) if tool_output is not None else str(tool_error) if tool_error is not None else None
@ -1785,6 +1789,7 @@ class LLMNode(Node[LLMNodeData]):
output=result_output,
files=tool_files,
status=ToolResultStatus.ERROR if tool_error else ToolResultStatus.SUCCESS,
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
),
is_final=False,
)
@ -1960,6 +1965,7 @@ class LLMNode(Node[LLMNodeData]):
arguments=json.dumps(tool_args) if tool_args else "",
output=result_text,
status=status,
elapsed_time=log.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if log.metadata else None,
)
)