From 152fd52cd73c496d276f03e779f86c88957d29e0 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 30 Dec 2025 02:23:25 +0000 Subject: [PATCH 1/3] [autofix.ci] apply automated fixes --- api/tests/unit_tests/core/agent/__init__.py | 1 - .../core/app/apps/test_workflow_app_runner_stream_chunk.py | 1 - .../unit_tests/core/workflow/nodes/test_llm_node_streaming.py | 1 - 3 files changed, 3 deletions(-) diff --git a/api/tests/unit_tests/core/agent/__init__.py b/api/tests/unit_tests/core/agent/__init__.py index e7c478bf83..a9ccd45f4b 100644 --- a/api/tests/unit_tests/core/agent/__init__.py +++ b/api/tests/unit_tests/core/agent/__init__.py @@ -1,4 +1,3 @@ """ Mark agent test modules as a package to avoid import name collisions. """ - diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_stream_chunk.py b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_stream_chunk.py index 6a8a691a25..8779e8c586 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_stream_chunk.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_stream_chunk.py @@ -45,4 +45,3 @@ def test_skip_empty_final_chunk() -> None: published_event, publish_from = queue_manager.published[0] assert publish_from == PublishFrom.APPLICATION_MANAGER assert published_event.text == "hi" - diff --git a/api/tests/unit_tests/core/workflow/nodes/test_llm_node_streaming.py b/api/tests/unit_tests/core/workflow/nodes/test_llm_node_streaming.py index 55f6525bcc..9d793f804f 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_llm_node_streaming.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_llm_node_streaming.py @@ -146,4 +146,3 @@ def test_serialize_tool_call_strips_files_to_ids(): assert serialized["name"] == "do" assert serialized["arguments"] == '{"a":1}' assert serialized["output"] == "ok" - From f3e7fea628f6032eb2fa3d11ee1601751b992b72 Mon Sep 17 00:00:00 2001 From: Novice Date: Sun, 4 Jan 2026 10:29:02 +0800 Subject: [PATCH 2/3] feat: add tool call time --- api/core/agent/patterns/base.py | 3 +- .../advanced_chat/generate_task_pipeline.py | 8 +++- .../apps/workflow/generate_task_pipeline.py | 42 ++++++++++++++----- .../app/entities/llm_generation_entities.py | 1 + api/core/app/entities/task_entities.py | 24 ++++++++++- .../task_pipeline/message_cycle_manager.py | 34 +++++++++++---- api/core/workflow/entities/tool_entities.py | 2 + api/core/workflow/nodes/base/node.py | 15 +++---- api/core/workflow/nodes/llm/node.py | 6 +++ 9 files changed, 104 insertions(+), 31 deletions(-) diff --git a/api/core/agent/patterns/base.py b/api/core/agent/patterns/base.py index 9f010bed6a..d797586e5f 100644 --- a/api/core/agent/patterns/base.py +++ b/api/core/agent/patterns/base.py @@ -200,7 +200,8 @@ class AgentPattern(ABC): log.metadata = { **log.metadata, AgentLog.LogMetadata.FINISHED_AT: finished_at, - AgentLog.LogMetadata.ELAPSED_TIME: finished_at - started_at, + # Calculate elapsed time in seconds + AgentLog.LogMetadata.ELAPSED_TIME: round(finished_at - started_at, 4), } # Add usage information if provided diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 53fa27cca7..a7c6426dc3 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -160,6 +160,7 @@ class StreamEventBuffer: "name": tool_name or "", "arguments": tool_arguments or "", "result": "", + "elapsed_time": None, } self.tool_calls.append(tool_call) idx = len(self.tool_calls) - 1 @@ -168,13 +169,14 @@ class StreamEventBuffer: self._last_event_type = "tool_call" - def record_tool_result(self, tool_call_id: str, result: str) -> None: + def record_tool_result(self, tool_call_id: str, result: str, tool_elapsed_time: float | None = None) -> None: """Record a tool result event (update existing tool call).""" if not tool_call_id: return if tool_call_id in self._tool_call_id_map: idx = self._tool_call_id_map[tool_call_id] self.tool_calls[idx]["result"] = result + self.tool_calls[idx]["elapsed_time"] = tool_elapsed_time def finalize(self) -> None: """Finalize the buffer, flushing any pending data.""" @@ -523,7 +525,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_name = tool_payload.name if tool_payload and tool_payload.name else "" tool_arguments = tool_call.arguments if tool_call and tool_call.arguments else "" tool_files = tool_result.files if tool_result else [] - + tool_elapsed_time = tool_result.elapsed_time if tool_result else None # Record stream event based on chunk type chunk_type = event.chunk_type or ChunkType.TEXT match chunk_type: @@ -543,6 +545,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): self._stream_buffer.record_tool_result( tool_call_id=tool_call_id, result=delta_text, + tool_elapsed_time=tool_elapsed_time, ) self._task_state.answer += delta_text @@ -555,6 +558,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_name=tool_name or None, tool_arguments=tool_arguments or None, tool_files=tool_files, + tool_elapsed_time=tool_elapsed_time, ) def _handle_iteration_start_event( diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 1a9d09f5e7..d9e1dfb474 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -490,6 +490,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_call_id = tool_payload.id if tool_payload and tool_payload.id else None tool_name = tool_payload.name if tool_payload and tool_payload.name else None tool_arguments = tool_call.arguments if tool_call else None + tool_elapsed_time = tool_result.elapsed_time if tool_result else None tool_files = tool_result.files if tool_result else [] # only publish tts message at text chunk streaming @@ -504,6 +505,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_name=tool_name, tool_arguments=tool_arguments, tool_files=tool_files, + tool_elapsed_time=tool_elapsed_time, ) def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]: @@ -676,6 +678,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_arguments: str | None = None, tool_files: list[str] | None = None, tool_error: str | None = None, + tool_elapsed_time: float | None = None, ) -> TextChunkStreamResponse: """ Handle completed event. @@ -684,18 +687,37 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): """ from core.app.entities.task_entities import ChunkType as ResponseChunkType + response_chunk_type = ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT + + data = TextChunkStreamResponse.Data( + text=text, + from_variable_selector=from_variable_selector, + chunk_type=response_chunk_type, + ) + + if response_chunk_type == ResponseChunkType.TOOL_CALL: + data = data.model_copy( + update={ + "tool_call_id": tool_call_id, + "tool_name": tool_name, + "tool_arguments": tool_arguments, + } + ) + elif response_chunk_type == ResponseChunkType.TOOL_RESULT: + data = data.model_copy( + update={ + "tool_call_id": tool_call_id, + "tool_name": tool_name, + "tool_arguments": tool_arguments, + "tool_files": tool_files, + "tool_error": tool_error, + "tool_elapsed_time": tool_elapsed_time, + } + ) + response = TextChunkStreamResponse( task_id=self._application_generate_entity.task_id, - data=TextChunkStreamResponse.Data( - text=text, - from_variable_selector=from_variable_selector, - chunk_type=ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT, - tool_call_id=tool_call_id, - tool_name=tool_name, - tool_arguments=tool_arguments, - tool_files=tool_files or [], - tool_error=tool_error, - ), + data=data, ) return response diff --git a/api/core/app/entities/llm_generation_entities.py b/api/core/app/entities/llm_generation_entities.py index 4e278249fe..33e97e3299 100644 --- a/api/core/app/entities/llm_generation_entities.py +++ b/api/core/app/entities/llm_generation_entities.py @@ -42,6 +42,7 @@ class ToolCallDetail(BaseModel): name: str = Field(..., description="Name of the tool") arguments: str = Field(default="", description="JSON string of tool arguments") result: str = Field(default="", description="Result from the tool execution") + elapsed_time: float | None = Field(default=None, description="Elapsed time in seconds") class LLMGenerationDetailData(BaseModel): diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 28951021b6..31e095f83e 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -130,6 +130,16 @@ class MessageStreamResponse(StreamResponse): """file IDs produced by tool""" tool_error: str | None = None """error message if tool failed""" + tool_elapsed_time: float | None = None + """elapsed time spent executing the tool""" + + def model_dump(self, *args, **kwargs) -> dict[str, object]: + kwargs.setdefault("exclude_none", True) + return super().model_dump(*args, **kwargs) + + def model_dump_json(self, *args, **kwargs) -> str: + kwargs.setdefault("exclude_none", True) + return super().model_dump_json(*args, **kwargs) class MessageAudioStreamResponse(StreamResponse): @@ -635,11 +645,23 @@ class TextChunkStreamResponse(StreamResponse): """accumulated tool arguments JSON""" # Tool result fields (when chunk_type == TOOL_RESULT) - tool_files: list[str] = Field(default_factory=list) + tool_files: list[str] | None = None """file IDs produced by tool""" tool_error: str | None = None """error message if tool failed""" + # Tool elapsed time fields (when chunk_type == TOOL_RESULT) + tool_elapsed_time: float | None = None + """elapsed time spent executing the tool""" + + def model_dump(self, *args, **kwargs) -> dict[str, object]: + kwargs.setdefault("exclude_none", True) + return super().model_dump(*args, **kwargs) + + def model_dump_json(self, *args, **kwargs) -> str: + kwargs.setdefault("exclude_none", True) + return super().model_dump_json(*args, **kwargs) + event: StreamEvent = StreamEvent.TEXT_CHUNK data: Data diff --git a/api/core/app/task_pipeline/message_cycle_manager.py b/api/core/app/task_pipeline/message_cycle_manager.py index f2fa3c064b..f07d42ebb9 100644 --- a/api/core/app/task_pipeline/message_cycle_manager.py +++ b/api/core/app/task_pipeline/message_cycle_manager.py @@ -238,6 +238,7 @@ class MessageCycleManager: tool_arguments: str | None = None, tool_files: list[str] | None = None, tool_error: str | None = None, + tool_elapsed_time: float | None = None, event_type: StreamEvent | None = None, ) -> MessageStreamResponse: """ @@ -253,20 +254,39 @@ class MessageCycleManager: :param tool_error: error message if tool failed :return: """ - return MessageStreamResponse( + response = MessageStreamResponse( task_id=self._application_generate_entity.task_id, id=message_id, answer=answer, from_variable_selector=from_variable_selector, - chunk_type=chunk_type, - tool_call_id=tool_call_id, - tool_name=tool_name, - tool_arguments=tool_arguments, - tool_files=tool_files, - tool_error=tool_error, event=event_type or StreamEvent.MESSAGE, ) + if chunk_type: + response = response.model_copy(update={"chunk_type": chunk_type}) + + if chunk_type == "tool_call": + response = response.model_copy( + update={ + "tool_call_id": tool_call_id, + "tool_name": tool_name, + "tool_arguments": tool_arguments, + } + ) + elif chunk_type == "tool_result": + response = response.model_copy( + update={ + "tool_call_id": tool_call_id, + "tool_name": tool_name, + "tool_arguments": tool_arguments, + "tool_files": tool_files, + "tool_error": tool_error, + "tool_elapsed_time": tool_elapsed_time, + } + ) + + return response + def message_replace_to_stream_response(self, answer: str, reason: str = "") -> MessageReplaceStreamResponse: """ Message replace to stream response. diff --git a/api/core/workflow/entities/tool_entities.py b/api/core/workflow/entities/tool_entities.py index f4833218c7..9fdd895517 100644 --- a/api/core/workflow/entities/tool_entities.py +++ b/api/core/workflow/entities/tool_entities.py @@ -22,6 +22,7 @@ class ToolResult(BaseModel): output: str | None = Field(default=None, description="Tool output text, error or success message") files: list[str] = Field(default_factory=list, description="File produced by tool") status: ToolResultStatus | None = Field(default=ToolResultStatus.SUCCESS, description="Tool execution status") + elapsed_time: float | None = Field(default=None, description="Elapsed seconds spent executing the tool") class ToolCallResult(BaseModel): @@ -31,3 +32,4 @@ class ToolCallResult(BaseModel): output: str | None = Field(default=None, description="Tool output text, error or success message") files: list[File] = Field(default_factory=list, description="File produced by tool") status: ToolResultStatus = Field(default=ToolResultStatus.SUCCESS, description="Tool execution status") + elapsed_time: float | None = Field(default=None, description="Elapsed seconds spent executing the tool") diff --git a/api/core/workflow/nodes/base/node.py b/api/core/workflow/nodes/base/node.py index 302d77d625..448e07e78c 100644 --- a/api/core/workflow/nodes/base/node.py +++ b/api/core/workflow/nodes/base/node.py @@ -580,9 +580,10 @@ class Node(Generic[NodeDataT]): from core.workflow.entities import ToolResult, ToolResultStatus from core.workflow.graph_events import ChunkType - tool_result = event.tool_result - status: ToolResultStatus = ( - tool_result.status if tool_result and tool_result.status is not None else ToolResultStatus.SUCCESS + tool_result = event.tool_result or ToolResult() + status: ToolResultStatus = tool_result.status or ToolResultStatus.SUCCESS + tool_result = tool_result.model_copy( + update={"status": status, "files": tool_result.files or []}, ) return NodeRunStreamChunkEvent( @@ -593,13 +594,7 @@ class Node(Generic[NodeDataT]): chunk=event.chunk, is_final=event.is_final, chunk_type=ChunkType.TOOL_RESULT, - tool_result=ToolResult( - id=tool_result.id if tool_result else None, - name=tool_result.name if tool_result else None, - output=tool_result.output if tool_result else None, - files=tool_result.files if tool_result else [], - status=status, - ), + tool_result=tool_result, ) @_dispatch.register diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index 6be59b6ead..ebd232c66e 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -1649,6 +1649,7 @@ class LLMNode(Node[LLMNodeData]): "output": tool_call.output, "files": files, "status": tool_call.status.value if hasattr(tool_call.status, "value") else tool_call.status, + "elapsed_time": tool_call.elapsed_time, } def _flush_thought_segment(self, buffers: StreamBuffers, trace_state: TraceState) -> None: @@ -1707,6 +1708,7 @@ class LLMNode(Node[LLMNodeData]): id=tool_call_id, name=tool_name, arguments=tool_arguments, + elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None, ), ) trace_state.trace_segments.append(tool_call_segment) @@ -1755,6 +1757,7 @@ class LLMNode(Node[LLMNodeData]): id=tool_call_id, name=tool_name, arguments=None, + elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None, ), ) if existing_tool_segment is None: @@ -1767,6 +1770,7 @@ class LLMNode(Node[LLMNodeData]): id=tool_call_id, name=tool_name, arguments=None, + elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None, ) tool_call_segment.tool_call.output = ( str(tool_output) if tool_output is not None else str(tool_error) if tool_error is not None else None @@ -1785,6 +1789,7 @@ class LLMNode(Node[LLMNodeData]): output=result_output, files=tool_files, status=ToolResultStatus.ERROR if tool_error else ToolResultStatus.SUCCESS, + elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None, ), is_final=False, ) @@ -1960,6 +1965,7 @@ class LLMNode(Node[LLMNodeData]): arguments=json.dumps(tool_args) if tool_args else "", output=result_text, status=status, + elapsed_time=log.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if log.metadata else None, ) ) From dc8a618b6ad87291477761ffd3b7d54da26c3ec9 Mon Sep 17 00:00:00 2001 From: Novice Date: Sun, 4 Jan 2026 11:09:43 +0800 Subject: [PATCH 3/3] feat: add think start end tag --- api/core/app/entities/queue_entities.py | 2 + api/core/app/entities/task_entities.py | 2 + api/core/workflow/graph_events/node.py | 2 + api/core/workflow/node_events/node.py | 14 +++++ api/core/workflow/nodes/llm/entities.py | 15 ++++-- api/core/workflow/nodes/llm/node.py | 70 ++++++++++++++++++++++--- 6 files changed, 94 insertions(+), 11 deletions(-) diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 31b95ad165..fdc4014caa 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -184,6 +184,8 @@ class ChunkType(StrEnum): TOOL_CALL = "tool_call" # Tool call arguments streaming TOOL_RESULT = "tool_result" # Tool execution result THOUGHT = "thought" # Agent thinking process (ReAct) + THOUGHT_START = "thought_start" # Agent thought start + THOUGHT_END = "thought_end" # Agent thought end class QueueTextChunkEvent(AppQueueEvent): diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 31e095f83e..068856b947 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -617,6 +617,8 @@ class ChunkType(StrEnum): TOOL_CALL = "tool_call" # Tool call arguments streaming TOOL_RESULT = "tool_result" # Tool execution result THOUGHT = "thought" # Agent thinking process (ReAct) + THOUGHT_START = "thought_start" # Agent thought start + THOUGHT_END = "thought_end" # Agent thought end class TextChunkStreamResponse(StreamResponse): diff --git a/api/core/workflow/graph_events/node.py b/api/core/workflow/graph_events/node.py index 01bc27d3e4..2ae4fcd919 100644 --- a/api/core/workflow/graph_events/node.py +++ b/api/core/workflow/graph_events/node.py @@ -29,6 +29,8 @@ class ChunkType(StrEnum): TOOL_CALL = "tool_call" # Tool call arguments streaming TOOL_RESULT = "tool_result" # Tool execution result THOUGHT = "thought" # Agent thinking process (ReAct) + THOUGHT_START = "thought_start" # Agent thought start + THOUGHT_END = "thought_end" # Agent thought end class NodeRunStreamChunkEvent(GraphNodeEventBase): diff --git a/api/core/workflow/node_events/node.py b/api/core/workflow/node_events/node.py index 39f09d02a5..f0bbc4d96f 100644 --- a/api/core/workflow/node_events/node.py +++ b/api/core/workflow/node_events/node.py @@ -41,6 +41,8 @@ class ChunkType(StrEnum): TOOL_CALL = "tool_call" # Tool call arguments streaming TOOL_RESULT = "tool_result" # Tool execution result THOUGHT = "thought" # Agent thinking process (ReAct) + THOUGHT_START = "thought_start" # Agent thought start + THOUGHT_END = "thought_end" # Agent thought end class StreamChunkEvent(NodeEventBase): @@ -70,6 +72,18 @@ class ToolResultChunkEvent(StreamChunkEvent): tool_result: ToolResult | None = Field(default=None, description="structured tool result payload") +class ThoughtStartChunkEvent(StreamChunkEvent): + """Agent thought start streaming event - Agent thinking process (ReAct).""" + + chunk_type: ChunkType = Field(default=ChunkType.THOUGHT_START, frozen=True) + + +class ThoughtEndChunkEvent(StreamChunkEvent): + """Agent thought end streaming event - Agent thinking process (ReAct).""" + + chunk_type: ChunkType = Field(default=ChunkType.THOUGHT_END, frozen=True) + + class ThoughtChunkEvent(StreamChunkEvent): """Agent thought streaming event - Agent thinking process (ReAct).""" diff --git a/api/core/workflow/nodes/llm/entities.py b/api/core/workflow/nodes/llm/entities.py index c1938fb5e3..1dbd2408d0 100644 --- a/api/core/workflow/nodes/llm/entities.py +++ b/api/core/workflow/nodes/llm/entities.py @@ -163,6 +163,7 @@ class ThinkTagStreamParser: thought_text = self._buffer[: end_match.start()] if thought_text: parts.append(("thought", thought_text)) + parts.append(("thought_end", "")) self._buffer = self._buffer[end_match.end() :] self._in_think = False continue @@ -180,6 +181,7 @@ class ThinkTagStreamParser: if prefix: parts.append(("text", prefix)) self._buffer = self._buffer[start_match.end() :] + parts.append(("thought_start", "")) self._in_think = True continue @@ -195,7 +197,7 @@ class ThinkTagStreamParser: # Extra safeguard: strip any stray tags that slipped through. content = self._START_PATTERN.sub("", content) content = self._END_PATTERN.sub("", content) - if content: + if content or kind in {"thought_start", "thought_end"}: cleaned_parts.append((kind, content)) return cleaned_parts @@ -210,12 +212,19 @@ class ThinkTagStreamParser: if content.lower().startswith(self._START_PREFIX) or content.lower().startswith(self._END_PREFIX): content = "" self._buffer = "" - if not content: + if not content and not self._in_think: return [] # Strip any complete tags that might still be present. content = self._START_PATTERN.sub("", content) content = self._END_PATTERN.sub("", content) - return [(kind, content)] if content else [] + + result: list[tuple[str, str]] = [] + if content: + result.append((kind, content)) + if self._in_think: + result.append(("thought_end", "")) + self._in_think = False + return result class StreamBuffers(BaseModel): diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index ebd232c66e..798ae1376f 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -80,6 +80,7 @@ from core.workflow.node_events import ( ToolCallChunkEvent, ToolResultChunkEvent, ) +from core.workflow.node_events.node import ThoughtEndChunkEvent, ThoughtStartChunkEvent from core.workflow.nodes.base.entities import VariableSelector from core.workflow.nodes.base.node import Node from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser @@ -565,15 +566,28 @@ class LLMNode(Node[LLMNodeData]): # Generation output: split out thoughts, forward only non-thought content chunks for kind, segment in think_parser.process(text_part): if not segment: - continue + if kind not in {"thought_start", "thought_end"}: + continue - if kind == "thought": + if kind == "thought_start": + yield ThoughtStartChunkEvent( + selector=[node_id, "generation", "thought"], + chunk="", + is_final=False, + ) + elif kind == "thought": reasoning_chunks.append(segment) yield ThoughtChunkEvent( selector=[node_id, "generation", "thought"], chunk=segment, is_final=False, ) + elif kind == "thought_end": + yield ThoughtEndChunkEvent( + selector=[node_id, "generation", "thought"], + chunk="", + is_final=False, + ) else: yield StreamChunkEvent( selector=[node_id, "generation", "content"], @@ -596,15 +610,27 @@ class LLMNode(Node[LLMNodeData]): raise LLMNodeError(f"Failed to parse structured output: {e}") for kind, segment in think_parser.flush(): - if not segment: + if not segment and kind not in {"thought_start", "thought_end"}: continue - if kind == "thought": + if kind == "thought_start": + yield ThoughtStartChunkEvent( + selector=[node_id, "generation", "thought"], + chunk="", + is_final=False, + ) + elif kind == "thought": reasoning_chunks.append(segment) yield ThoughtChunkEvent( selector=[node_id, "generation", "thought"], chunk=segment, is_final=False, ) + elif kind == "thought_end": + yield ThoughtEndChunkEvent( + selector=[node_id, "generation", "thought"], + chunk="", + is_final=False, + ) else: yield StreamChunkEvent( selector=[node_id, "generation", "content"], @@ -1811,10 +1837,17 @@ class LLMNode(Node[LLMNodeData]): chunk_text = str(chunk_text) for kind, segment in buffers.think_parser.process(chunk_text): - if not segment: + if not segment and kind not in {"thought_start", "thought_end"}: continue - if kind == "thought": + if kind == "thought_start": + self._flush_content_segment(buffers, trace_state) + yield ThoughtStartChunkEvent( + selector=[self._node_id, "generation", "thought"], + chunk="", + is_final=False, + ) + elif kind == "thought": self._flush_content_segment(buffers, trace_state) buffers.current_turn_reasoning.append(segment) buffers.pending_thought.append(segment) @@ -1823,6 +1856,13 @@ class LLMNode(Node[LLMNodeData]): chunk=segment, is_final=False, ) + elif kind == "thought_end": + self._flush_thought_segment(buffers, trace_state) + yield ThoughtEndChunkEvent( + selector=[self._node_id, "generation", "thought"], + chunk="", + is_final=False, + ) else: self._flush_thought_segment(buffers, trace_state) aggregate.text += segment @@ -1848,9 +1888,16 @@ class LLMNode(Node[LLMNodeData]): self, buffers: StreamBuffers, trace_state: TraceState, aggregate: AggregatedResult ) -> Generator[NodeEventBase, None, None]: for kind, segment in buffers.think_parser.flush(): - if not segment: + if not segment and kind not in {"thought_start", "thought_end"}: continue - if kind == "thought": + if kind == "thought_start": + self._flush_content_segment(buffers, trace_state) + yield ThoughtStartChunkEvent( + selector=[self._node_id, "generation", "thought"], + chunk="", + is_final=False, + ) + elif kind == "thought": self._flush_content_segment(buffers, trace_state) buffers.current_turn_reasoning.append(segment) buffers.pending_thought.append(segment) @@ -1859,6 +1906,13 @@ class LLMNode(Node[LLMNodeData]): chunk=segment, is_final=False, ) + elif kind == "thought_end": + self._flush_thought_segment(buffers, trace_state) + yield ThoughtEndChunkEvent( + selector=[self._node_id, "generation", "thought"], + chunk="", + is_final=False, + ) else: self._flush_thought_segment(buffers, trace_state) aggregate.text += segment