diff --git a/api/core/agent/patterns/base.py b/api/core/agent/patterns/base.py index 9f010bed6a..d797586e5f 100644 --- a/api/core/agent/patterns/base.py +++ b/api/core/agent/patterns/base.py @@ -200,7 +200,8 @@ class AgentPattern(ABC): log.metadata = { **log.metadata, AgentLog.LogMetadata.FINISHED_AT: finished_at, - AgentLog.LogMetadata.ELAPSED_TIME: finished_at - started_at, + # Calculate elapsed time in seconds + AgentLog.LogMetadata.ELAPSED_TIME: round(finished_at - started_at, 4), } # Add usage information if provided diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 53fa27cca7..a7c6426dc3 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -160,6 +160,7 @@ class StreamEventBuffer: "name": tool_name or "", "arguments": tool_arguments or "", "result": "", + "elapsed_time": None, } self.tool_calls.append(tool_call) idx = len(self.tool_calls) - 1 @@ -168,13 +169,14 @@ class StreamEventBuffer: self._last_event_type = "tool_call" - def record_tool_result(self, tool_call_id: str, result: str) -> None: + def record_tool_result(self, tool_call_id: str, result: str, tool_elapsed_time: float | None = None) -> None: """Record a tool result event (update existing tool call).""" if not tool_call_id: return if tool_call_id in self._tool_call_id_map: idx = self._tool_call_id_map[tool_call_id] self.tool_calls[idx]["result"] = result + self.tool_calls[idx]["elapsed_time"] = tool_elapsed_time def finalize(self) -> None: """Finalize the buffer, flushing any pending data.""" @@ -523,7 +525,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_name = tool_payload.name if tool_payload and tool_payload.name else "" tool_arguments = tool_call.arguments if tool_call and tool_call.arguments else "" tool_files = tool_result.files if tool_result else [] - + tool_elapsed_time = tool_result.elapsed_time if tool_result else None # Record stream event based on chunk type chunk_type = event.chunk_type or ChunkType.TEXT match chunk_type: @@ -543,6 +545,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): self._stream_buffer.record_tool_result( tool_call_id=tool_call_id, result=delta_text, + tool_elapsed_time=tool_elapsed_time, ) self._task_state.answer += delta_text @@ -555,6 +558,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_name=tool_name or None, tool_arguments=tool_arguments or None, tool_files=tool_files, + tool_elapsed_time=tool_elapsed_time, ) def _handle_iteration_start_event( diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 1a9d09f5e7..d9e1dfb474 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -490,6 +490,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_call_id = tool_payload.id if tool_payload and tool_payload.id else None tool_name = tool_payload.name if tool_payload and tool_payload.name else None tool_arguments = tool_call.arguments if tool_call else None + tool_elapsed_time = tool_result.elapsed_time if tool_result else None tool_files = tool_result.files if tool_result else [] # only publish tts message at text chunk streaming @@ -504,6 +505,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_name=tool_name, tool_arguments=tool_arguments, tool_files=tool_files, + tool_elapsed_time=tool_elapsed_time, ) def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]: @@ -676,6 +678,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_arguments: str | None = None, tool_files: list[str] | None = None, tool_error: str | None = None, + tool_elapsed_time: float | None = None, ) -> TextChunkStreamResponse: """ Handle completed event. @@ -684,18 +687,37 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): """ from core.app.entities.task_entities import ChunkType as ResponseChunkType + response_chunk_type = ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT + + data = TextChunkStreamResponse.Data( + text=text, + from_variable_selector=from_variable_selector, + chunk_type=response_chunk_type, + ) + + if response_chunk_type == ResponseChunkType.TOOL_CALL: + data = data.model_copy( + update={ + "tool_call_id": tool_call_id, + "tool_name": tool_name, + "tool_arguments": tool_arguments, + } + ) + elif response_chunk_type == ResponseChunkType.TOOL_RESULT: + data = data.model_copy( + update={ + "tool_call_id": tool_call_id, + "tool_name": tool_name, + "tool_arguments": tool_arguments, + "tool_files": tool_files, + "tool_error": tool_error, + "tool_elapsed_time": tool_elapsed_time, + } + ) + response = TextChunkStreamResponse( task_id=self._application_generate_entity.task_id, - data=TextChunkStreamResponse.Data( - text=text, - from_variable_selector=from_variable_selector, - chunk_type=ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT, - tool_call_id=tool_call_id, - tool_name=tool_name, - tool_arguments=tool_arguments, - tool_files=tool_files or [], - tool_error=tool_error, - ), + data=data, ) return response diff --git a/api/core/app/entities/llm_generation_entities.py b/api/core/app/entities/llm_generation_entities.py index 4e278249fe..33e97e3299 100644 --- a/api/core/app/entities/llm_generation_entities.py +++ b/api/core/app/entities/llm_generation_entities.py @@ -42,6 +42,7 @@ class ToolCallDetail(BaseModel): name: str = Field(..., description="Name of the tool") arguments: str = Field(default="", description="JSON string of tool arguments") result: str = Field(default="", description="Result from the tool execution") + elapsed_time: float | None = Field(default=None, description="Elapsed time in seconds") class LLMGenerationDetailData(BaseModel): diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 28951021b6..31e095f83e 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -130,6 +130,16 @@ class MessageStreamResponse(StreamResponse): """file IDs produced by tool""" tool_error: str | None = None """error message if tool failed""" + tool_elapsed_time: float | None = None + """elapsed time spent executing the tool""" + + def model_dump(self, *args, **kwargs) -> dict[str, object]: + kwargs.setdefault("exclude_none", True) + return super().model_dump(*args, **kwargs) + + def model_dump_json(self, *args, **kwargs) -> str: + kwargs.setdefault("exclude_none", True) + return super().model_dump_json(*args, **kwargs) class MessageAudioStreamResponse(StreamResponse): @@ -635,11 +645,23 @@ class TextChunkStreamResponse(StreamResponse): """accumulated tool arguments JSON""" # Tool result fields (when chunk_type == TOOL_RESULT) - tool_files: list[str] = Field(default_factory=list) + tool_files: list[str] | None = None """file IDs produced by tool""" tool_error: str | None = None """error message if tool failed""" + # Tool elapsed time fields (when chunk_type == TOOL_RESULT) + tool_elapsed_time: float | None = None + """elapsed time spent executing the tool""" + + def model_dump(self, *args, **kwargs) -> dict[str, object]: + kwargs.setdefault("exclude_none", True) + return super().model_dump(*args, **kwargs) + + def model_dump_json(self, *args, **kwargs) -> str: + kwargs.setdefault("exclude_none", True) + return super().model_dump_json(*args, **kwargs) + event: StreamEvent = StreamEvent.TEXT_CHUNK data: Data diff --git a/api/core/app/task_pipeline/message_cycle_manager.py b/api/core/app/task_pipeline/message_cycle_manager.py index f2fa3c064b..f07d42ebb9 100644 --- a/api/core/app/task_pipeline/message_cycle_manager.py +++ b/api/core/app/task_pipeline/message_cycle_manager.py @@ -238,6 +238,7 @@ class MessageCycleManager: tool_arguments: str | None = None, tool_files: list[str] | None = None, tool_error: str | None = None, + tool_elapsed_time: float | None = None, event_type: StreamEvent | None = None, ) -> MessageStreamResponse: """ @@ -253,20 +254,39 @@ class MessageCycleManager: :param tool_error: error message if tool failed :return: """ - return MessageStreamResponse( + response = MessageStreamResponse( task_id=self._application_generate_entity.task_id, id=message_id, answer=answer, from_variable_selector=from_variable_selector, - chunk_type=chunk_type, - tool_call_id=tool_call_id, - tool_name=tool_name, - tool_arguments=tool_arguments, - tool_files=tool_files, - tool_error=tool_error, event=event_type or StreamEvent.MESSAGE, ) + if chunk_type: + response = response.model_copy(update={"chunk_type": chunk_type}) + + if chunk_type == "tool_call": + response = response.model_copy( + update={ + "tool_call_id": tool_call_id, + "tool_name": tool_name, + "tool_arguments": tool_arguments, + } + ) + elif chunk_type == "tool_result": + response = response.model_copy( + update={ + "tool_call_id": tool_call_id, + "tool_name": tool_name, + "tool_arguments": tool_arguments, + "tool_files": tool_files, + "tool_error": tool_error, + "tool_elapsed_time": tool_elapsed_time, + } + ) + + return response + def message_replace_to_stream_response(self, answer: str, reason: str = "") -> MessageReplaceStreamResponse: """ Message replace to stream response. diff --git a/api/core/workflow/entities/tool_entities.py b/api/core/workflow/entities/tool_entities.py index f4833218c7..9fdd895517 100644 --- a/api/core/workflow/entities/tool_entities.py +++ b/api/core/workflow/entities/tool_entities.py @@ -22,6 +22,7 @@ class ToolResult(BaseModel): output: str | None = Field(default=None, description="Tool output text, error or success message") files: list[str] = Field(default_factory=list, description="File produced by tool") status: ToolResultStatus | None = Field(default=ToolResultStatus.SUCCESS, description="Tool execution status") + elapsed_time: float | None = Field(default=None, description="Elapsed seconds spent executing the tool") class ToolCallResult(BaseModel): @@ -31,3 +32,4 @@ class ToolCallResult(BaseModel): output: str | None = Field(default=None, description="Tool output text, error or success message") files: list[File] = Field(default_factory=list, description="File produced by tool") status: ToolResultStatus = Field(default=ToolResultStatus.SUCCESS, description="Tool execution status") + elapsed_time: float | None = Field(default=None, description="Elapsed seconds spent executing the tool") diff --git a/api/core/workflow/nodes/base/node.py b/api/core/workflow/nodes/base/node.py index 302d77d625..448e07e78c 100644 --- a/api/core/workflow/nodes/base/node.py +++ b/api/core/workflow/nodes/base/node.py @@ -580,9 +580,10 @@ class Node(Generic[NodeDataT]): from core.workflow.entities import ToolResult, ToolResultStatus from core.workflow.graph_events import ChunkType - tool_result = event.tool_result - status: ToolResultStatus = ( - tool_result.status if tool_result and tool_result.status is not None else ToolResultStatus.SUCCESS + tool_result = event.tool_result or ToolResult() + status: ToolResultStatus = tool_result.status or ToolResultStatus.SUCCESS + tool_result = tool_result.model_copy( + update={"status": status, "files": tool_result.files or []}, ) return NodeRunStreamChunkEvent( @@ -593,13 +594,7 @@ class Node(Generic[NodeDataT]): chunk=event.chunk, is_final=event.is_final, chunk_type=ChunkType.TOOL_RESULT, - tool_result=ToolResult( - id=tool_result.id if tool_result else None, - name=tool_result.name if tool_result else None, - output=tool_result.output if tool_result else None, - files=tool_result.files if tool_result else [], - status=status, - ), + tool_result=tool_result, ) @_dispatch.register diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index 6be59b6ead..ebd232c66e 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -1649,6 +1649,7 @@ class LLMNode(Node[LLMNodeData]): "output": tool_call.output, "files": files, "status": tool_call.status.value if hasattr(tool_call.status, "value") else tool_call.status, + "elapsed_time": tool_call.elapsed_time, } def _flush_thought_segment(self, buffers: StreamBuffers, trace_state: TraceState) -> None: @@ -1707,6 +1708,7 @@ class LLMNode(Node[LLMNodeData]): id=tool_call_id, name=tool_name, arguments=tool_arguments, + elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None, ), ) trace_state.trace_segments.append(tool_call_segment) @@ -1755,6 +1757,7 @@ class LLMNode(Node[LLMNodeData]): id=tool_call_id, name=tool_name, arguments=None, + elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None, ), ) if existing_tool_segment is None: @@ -1767,6 +1770,7 @@ class LLMNode(Node[LLMNodeData]): id=tool_call_id, name=tool_name, arguments=None, + elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None, ) tool_call_segment.tool_call.output = ( str(tool_output) if tool_output is not None else str(tool_error) if tool_error is not None else None @@ -1785,6 +1789,7 @@ class LLMNode(Node[LLMNodeData]): output=result_output, files=tool_files, status=ToolResultStatus.ERROR if tool_error else ToolResultStatus.SUCCESS, + elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None, ), is_final=False, ) @@ -1960,6 +1965,7 @@ class LLMNode(Node[LLMNodeData]): arguments=json.dumps(tool_args) if tool_args else "", output=result_text, status=status, + elapsed_time=log.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if log.metadata else None, ) )