diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 31b95ad165..fdc4014caa 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -184,6 +184,8 @@ class ChunkType(StrEnum): TOOL_CALL = "tool_call" # Tool call arguments streaming TOOL_RESULT = "tool_result" # Tool execution result THOUGHT = "thought" # Agent thinking process (ReAct) + THOUGHT_START = "thought_start" # Agent thought start + THOUGHT_END = "thought_end" # Agent thought end class QueueTextChunkEvent(AppQueueEvent): diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 31e095f83e..068856b947 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -617,6 +617,8 @@ class ChunkType(StrEnum): TOOL_CALL = "tool_call" # Tool call arguments streaming TOOL_RESULT = "tool_result" # Tool execution result THOUGHT = "thought" # Agent thinking process (ReAct) + THOUGHT_START = "thought_start" # Agent thought start + THOUGHT_END = "thought_end" # Agent thought end class TextChunkStreamResponse(StreamResponse): diff --git a/api/core/workflow/graph_events/node.py b/api/core/workflow/graph_events/node.py index 01bc27d3e4..2ae4fcd919 100644 --- a/api/core/workflow/graph_events/node.py +++ b/api/core/workflow/graph_events/node.py @@ -29,6 +29,8 @@ class ChunkType(StrEnum): TOOL_CALL = "tool_call" # Tool call arguments streaming TOOL_RESULT = "tool_result" # Tool execution result THOUGHT = "thought" # Agent thinking process (ReAct) + THOUGHT_START = "thought_start" # Agent thought start + THOUGHT_END = "thought_end" # Agent thought end class NodeRunStreamChunkEvent(GraphNodeEventBase): diff --git a/api/core/workflow/node_events/node.py b/api/core/workflow/node_events/node.py index 39f09d02a5..f0bbc4d96f 100644 --- a/api/core/workflow/node_events/node.py +++ b/api/core/workflow/node_events/node.py @@ -41,6 +41,8 @@ class ChunkType(StrEnum): TOOL_CALL = "tool_call" # Tool call arguments streaming TOOL_RESULT = "tool_result" # Tool execution result THOUGHT = "thought" # Agent thinking process (ReAct) + THOUGHT_START = "thought_start" # Agent thought start + THOUGHT_END = "thought_end" # Agent thought end class StreamChunkEvent(NodeEventBase): @@ -70,6 +72,18 @@ class ToolResultChunkEvent(StreamChunkEvent): tool_result: ToolResult | None = Field(default=None, description="structured tool result payload") +class ThoughtStartChunkEvent(StreamChunkEvent): + """Agent thought start streaming event - Agent thinking process (ReAct).""" + + chunk_type: ChunkType = Field(default=ChunkType.THOUGHT_START, frozen=True) + + +class ThoughtEndChunkEvent(StreamChunkEvent): + """Agent thought end streaming event - Agent thinking process (ReAct).""" + + chunk_type: ChunkType = Field(default=ChunkType.THOUGHT_END, frozen=True) + + class ThoughtChunkEvent(StreamChunkEvent): """Agent thought streaming event - Agent thinking process (ReAct).""" diff --git a/api/core/workflow/nodes/llm/entities.py b/api/core/workflow/nodes/llm/entities.py index c1938fb5e3..1dbd2408d0 100644 --- a/api/core/workflow/nodes/llm/entities.py +++ b/api/core/workflow/nodes/llm/entities.py @@ -163,6 +163,7 @@ class ThinkTagStreamParser: thought_text = self._buffer[: end_match.start()] if thought_text: parts.append(("thought", thought_text)) + parts.append(("thought_end", "")) self._buffer = self._buffer[end_match.end() :] self._in_think = False continue @@ -180,6 +181,7 @@ class ThinkTagStreamParser: if prefix: parts.append(("text", prefix)) self._buffer = self._buffer[start_match.end() :] + parts.append(("thought_start", "")) self._in_think = True continue @@ -195,7 +197,7 @@ class ThinkTagStreamParser: # Extra safeguard: strip any stray tags that slipped through. content = self._START_PATTERN.sub("", content) content = self._END_PATTERN.sub("", content) - if content: + if content or kind in {"thought_start", "thought_end"}: cleaned_parts.append((kind, content)) return cleaned_parts @@ -210,12 +212,19 @@ class ThinkTagStreamParser: if content.lower().startswith(self._START_PREFIX) or content.lower().startswith(self._END_PREFIX): content = "" self._buffer = "" - if not content: + if not content and not self._in_think: return [] # Strip any complete tags that might still be present. content = self._START_PATTERN.sub("", content) content = self._END_PATTERN.sub("", content) - return [(kind, content)] if content else [] + + result: list[tuple[str, str]] = [] + if content: + result.append((kind, content)) + if self._in_think: + result.append(("thought_end", "")) + self._in_think = False + return result class StreamBuffers(BaseModel): diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index ebd232c66e..798ae1376f 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -80,6 +80,7 @@ from core.workflow.node_events import ( ToolCallChunkEvent, ToolResultChunkEvent, ) +from core.workflow.node_events.node import ThoughtEndChunkEvent, ThoughtStartChunkEvent from core.workflow.nodes.base.entities import VariableSelector from core.workflow.nodes.base.node import Node from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser @@ -565,15 +566,28 @@ class LLMNode(Node[LLMNodeData]): # Generation output: split out thoughts, forward only non-thought content chunks for kind, segment in think_parser.process(text_part): if not segment: - continue + if kind not in {"thought_start", "thought_end"}: + continue - if kind == "thought": + if kind == "thought_start": + yield ThoughtStartChunkEvent( + selector=[node_id, "generation", "thought"], + chunk="", + is_final=False, + ) + elif kind == "thought": reasoning_chunks.append(segment) yield ThoughtChunkEvent( selector=[node_id, "generation", "thought"], chunk=segment, is_final=False, ) + elif kind == "thought_end": + yield ThoughtEndChunkEvent( + selector=[node_id, "generation", "thought"], + chunk="", + is_final=False, + ) else: yield StreamChunkEvent( selector=[node_id, "generation", "content"], @@ -596,15 +610,27 @@ class LLMNode(Node[LLMNodeData]): raise LLMNodeError(f"Failed to parse structured output: {e}") for kind, segment in think_parser.flush(): - if not segment: + if not segment and kind not in {"thought_start", "thought_end"}: continue - if kind == "thought": + if kind == "thought_start": + yield ThoughtStartChunkEvent( + selector=[node_id, "generation", "thought"], + chunk="", + is_final=False, + ) + elif kind == "thought": reasoning_chunks.append(segment) yield ThoughtChunkEvent( selector=[node_id, "generation", "thought"], chunk=segment, is_final=False, ) + elif kind == "thought_end": + yield ThoughtEndChunkEvent( + selector=[node_id, "generation", "thought"], + chunk="", + is_final=False, + ) else: yield StreamChunkEvent( selector=[node_id, "generation", "content"], @@ -1811,10 +1837,17 @@ class LLMNode(Node[LLMNodeData]): chunk_text = str(chunk_text) for kind, segment in buffers.think_parser.process(chunk_text): - if not segment: + if not segment and kind not in {"thought_start", "thought_end"}: continue - if kind == "thought": + if kind == "thought_start": + self._flush_content_segment(buffers, trace_state) + yield ThoughtStartChunkEvent( + selector=[self._node_id, "generation", "thought"], + chunk="", + is_final=False, + ) + elif kind == "thought": self._flush_content_segment(buffers, trace_state) buffers.current_turn_reasoning.append(segment) buffers.pending_thought.append(segment) @@ -1823,6 +1856,13 @@ class LLMNode(Node[LLMNodeData]): chunk=segment, is_final=False, ) + elif kind == "thought_end": + self._flush_thought_segment(buffers, trace_state) + yield ThoughtEndChunkEvent( + selector=[self._node_id, "generation", "thought"], + chunk="", + is_final=False, + ) else: self._flush_thought_segment(buffers, trace_state) aggregate.text += segment @@ -1848,9 +1888,16 @@ class LLMNode(Node[LLMNodeData]): self, buffers: StreamBuffers, trace_state: TraceState, aggregate: AggregatedResult ) -> Generator[NodeEventBase, None, None]: for kind, segment in buffers.think_parser.flush(): - if not segment: + if not segment and kind not in {"thought_start", "thought_end"}: continue - if kind == "thought": + if kind == "thought_start": + self._flush_content_segment(buffers, trace_state) + yield ThoughtStartChunkEvent( + selector=[self._node_id, "generation", "thought"], + chunk="", + is_final=False, + ) + elif kind == "thought": self._flush_content_segment(buffers, trace_state) buffers.current_turn_reasoning.append(segment) buffers.pending_thought.append(segment) @@ -1859,6 +1906,13 @@ class LLMNode(Node[LLMNodeData]): chunk=segment, is_final=False, ) + elif kind == "thought_end": + self._flush_thought_segment(buffers, trace_state) + yield ThoughtEndChunkEvent( + selector=[self._node_id, "generation", "thought"], + chunk="", + is_final=False, + ) else: self._flush_thought_segment(buffers, trace_state) aggregate.text += segment