mirror of https://github.com/langgenius/dify.git
Merge branch 'feat/agent-node-v2' into deploy/dev
This commit is contained in:
commit
8fb50d52dc
|
|
@ -200,7 +200,8 @@ class AgentPattern(ABC):
|
|||
log.metadata = {
|
||||
**log.metadata,
|
||||
AgentLog.LogMetadata.FINISHED_AT: finished_at,
|
||||
AgentLog.LogMetadata.ELAPSED_TIME: finished_at - started_at,
|
||||
# Calculate elapsed time in seconds
|
||||
AgentLog.LogMetadata.ELAPSED_TIME: round(finished_at - started_at, 4),
|
||||
}
|
||||
|
||||
# Add usage information if provided
|
||||
|
|
|
|||
|
|
@ -160,6 +160,7 @@ class StreamEventBuffer:
|
|||
"name": tool_name or "",
|
||||
"arguments": tool_arguments or "",
|
||||
"result": "",
|
||||
"elapsed_time": None,
|
||||
}
|
||||
self.tool_calls.append(tool_call)
|
||||
idx = len(self.tool_calls) - 1
|
||||
|
|
@ -168,13 +169,14 @@ class StreamEventBuffer:
|
|||
|
||||
self._last_event_type = "tool_call"
|
||||
|
||||
def record_tool_result(self, tool_call_id: str, result: str) -> None:
|
||||
def record_tool_result(self, tool_call_id: str, result: str, tool_elapsed_time: float | None = None) -> None:
|
||||
"""Record a tool result event (update existing tool call)."""
|
||||
if not tool_call_id:
|
||||
return
|
||||
if tool_call_id in self._tool_call_id_map:
|
||||
idx = self._tool_call_id_map[tool_call_id]
|
||||
self.tool_calls[idx]["result"] = result
|
||||
self.tool_calls[idx]["elapsed_time"] = tool_elapsed_time
|
||||
|
||||
def finalize(self) -> None:
|
||||
"""Finalize the buffer, flushing any pending data."""
|
||||
|
|
@ -523,7 +525,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||
tool_name = tool_payload.name if tool_payload and tool_payload.name else ""
|
||||
tool_arguments = tool_call.arguments if tool_call and tool_call.arguments else ""
|
||||
tool_files = tool_result.files if tool_result else []
|
||||
|
||||
tool_elapsed_time = tool_result.elapsed_time if tool_result else None
|
||||
# Record stream event based on chunk type
|
||||
chunk_type = event.chunk_type or ChunkType.TEXT
|
||||
match chunk_type:
|
||||
|
|
@ -543,6 +545,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||
self._stream_buffer.record_tool_result(
|
||||
tool_call_id=tool_call_id,
|
||||
result=delta_text,
|
||||
tool_elapsed_time=tool_elapsed_time,
|
||||
)
|
||||
self._task_state.answer += delta_text
|
||||
|
||||
|
|
@ -555,6 +558,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||
tool_name=tool_name or None,
|
||||
tool_arguments=tool_arguments or None,
|
||||
tool_files=tool_files,
|
||||
tool_elapsed_time=tool_elapsed_time,
|
||||
)
|
||||
|
||||
def _handle_iteration_start_event(
|
||||
|
|
|
|||
|
|
@ -490,6 +490,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||
tool_call_id = tool_payload.id if tool_payload and tool_payload.id else None
|
||||
tool_name = tool_payload.name if tool_payload and tool_payload.name else None
|
||||
tool_arguments = tool_call.arguments if tool_call else None
|
||||
tool_elapsed_time = tool_result.elapsed_time if tool_result else None
|
||||
tool_files = tool_result.files if tool_result else []
|
||||
|
||||
# only publish tts message at text chunk streaming
|
||||
|
|
@ -504,6 +505,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||
tool_name=tool_name,
|
||||
tool_arguments=tool_arguments,
|
||||
tool_files=tool_files,
|
||||
tool_elapsed_time=tool_elapsed_time,
|
||||
)
|
||||
|
||||
def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]:
|
||||
|
|
@ -676,6 +678,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||
tool_arguments: str | None = None,
|
||||
tool_files: list[str] | None = None,
|
||||
tool_error: str | None = None,
|
||||
tool_elapsed_time: float | None = None,
|
||||
) -> TextChunkStreamResponse:
|
||||
"""
|
||||
Handle completed event.
|
||||
|
|
@ -684,18 +687,37 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
|||
"""
|
||||
from core.app.entities.task_entities import ChunkType as ResponseChunkType
|
||||
|
||||
response_chunk_type = ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT
|
||||
|
||||
data = TextChunkStreamResponse.Data(
|
||||
text=text,
|
||||
from_variable_selector=from_variable_selector,
|
||||
chunk_type=response_chunk_type,
|
||||
)
|
||||
|
||||
if response_chunk_type == ResponseChunkType.TOOL_CALL:
|
||||
data = data.model_copy(
|
||||
update={
|
||||
"tool_call_id": tool_call_id,
|
||||
"tool_name": tool_name,
|
||||
"tool_arguments": tool_arguments,
|
||||
}
|
||||
)
|
||||
elif response_chunk_type == ResponseChunkType.TOOL_RESULT:
|
||||
data = data.model_copy(
|
||||
update={
|
||||
"tool_call_id": tool_call_id,
|
||||
"tool_name": tool_name,
|
||||
"tool_arguments": tool_arguments,
|
||||
"tool_files": tool_files,
|
||||
"tool_error": tool_error,
|
||||
"tool_elapsed_time": tool_elapsed_time,
|
||||
}
|
||||
)
|
||||
|
||||
response = TextChunkStreamResponse(
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
data=TextChunkStreamResponse.Data(
|
||||
text=text,
|
||||
from_variable_selector=from_variable_selector,
|
||||
chunk_type=ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT,
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=tool_name,
|
||||
tool_arguments=tool_arguments,
|
||||
tool_files=tool_files or [],
|
||||
tool_error=tool_error,
|
||||
),
|
||||
data=data,
|
||||
)
|
||||
|
||||
return response
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ class ToolCallDetail(BaseModel):
|
|||
name: str = Field(..., description="Name of the tool")
|
||||
arguments: str = Field(default="", description="JSON string of tool arguments")
|
||||
result: str = Field(default="", description="Result from the tool execution")
|
||||
elapsed_time: float | None = Field(default=None, description="Elapsed time in seconds")
|
||||
|
||||
|
||||
class LLMGenerationDetailData(BaseModel):
|
||||
|
|
|
|||
|
|
@ -184,6 +184,8 @@ class ChunkType(StrEnum):
|
|||
TOOL_CALL = "tool_call" # Tool call arguments streaming
|
||||
TOOL_RESULT = "tool_result" # Tool execution result
|
||||
THOUGHT = "thought" # Agent thinking process (ReAct)
|
||||
THOUGHT_START = "thought_start" # Agent thought start
|
||||
THOUGHT_END = "thought_end" # Agent thought end
|
||||
|
||||
|
||||
class QueueTextChunkEvent(AppQueueEvent):
|
||||
|
|
|
|||
|
|
@ -130,6 +130,16 @@ class MessageStreamResponse(StreamResponse):
|
|||
"""file IDs produced by tool"""
|
||||
tool_error: str | None = None
|
||||
"""error message if tool failed"""
|
||||
tool_elapsed_time: float | None = None
|
||||
"""elapsed time spent executing the tool"""
|
||||
|
||||
def model_dump(self, *args, **kwargs) -> dict[str, object]:
|
||||
kwargs.setdefault("exclude_none", True)
|
||||
return super().model_dump(*args, **kwargs)
|
||||
|
||||
def model_dump_json(self, *args, **kwargs) -> str:
|
||||
kwargs.setdefault("exclude_none", True)
|
||||
return super().model_dump_json(*args, **kwargs)
|
||||
|
||||
|
||||
class MessageAudioStreamResponse(StreamResponse):
|
||||
|
|
@ -607,6 +617,8 @@ class ChunkType(StrEnum):
|
|||
TOOL_CALL = "tool_call" # Tool call arguments streaming
|
||||
TOOL_RESULT = "tool_result" # Tool execution result
|
||||
THOUGHT = "thought" # Agent thinking process (ReAct)
|
||||
THOUGHT_START = "thought_start" # Agent thought start
|
||||
THOUGHT_END = "thought_end" # Agent thought end
|
||||
|
||||
|
||||
class TextChunkStreamResponse(StreamResponse):
|
||||
|
|
@ -635,11 +647,23 @@ class TextChunkStreamResponse(StreamResponse):
|
|||
"""accumulated tool arguments JSON"""
|
||||
|
||||
# Tool result fields (when chunk_type == TOOL_RESULT)
|
||||
tool_files: list[str] = Field(default_factory=list)
|
||||
tool_files: list[str] | None = None
|
||||
"""file IDs produced by tool"""
|
||||
tool_error: str | None = None
|
||||
"""error message if tool failed"""
|
||||
|
||||
# Tool elapsed time fields (when chunk_type == TOOL_RESULT)
|
||||
tool_elapsed_time: float | None = None
|
||||
"""elapsed time spent executing the tool"""
|
||||
|
||||
def model_dump(self, *args, **kwargs) -> dict[str, object]:
|
||||
kwargs.setdefault("exclude_none", True)
|
||||
return super().model_dump(*args, **kwargs)
|
||||
|
||||
def model_dump_json(self, *args, **kwargs) -> str:
|
||||
kwargs.setdefault("exclude_none", True)
|
||||
return super().model_dump_json(*args, **kwargs)
|
||||
|
||||
event: StreamEvent = StreamEvent.TEXT_CHUNK
|
||||
data: Data
|
||||
|
||||
|
|
|
|||
|
|
@ -238,6 +238,7 @@ class MessageCycleManager:
|
|||
tool_arguments: str | None = None,
|
||||
tool_files: list[str] | None = None,
|
||||
tool_error: str | None = None,
|
||||
tool_elapsed_time: float | None = None,
|
||||
event_type: StreamEvent | None = None,
|
||||
) -> MessageStreamResponse:
|
||||
"""
|
||||
|
|
@ -253,20 +254,39 @@ class MessageCycleManager:
|
|||
:param tool_error: error message if tool failed
|
||||
:return:
|
||||
"""
|
||||
return MessageStreamResponse(
|
||||
response = MessageStreamResponse(
|
||||
task_id=self._application_generate_entity.task_id,
|
||||
id=message_id,
|
||||
answer=answer,
|
||||
from_variable_selector=from_variable_selector,
|
||||
chunk_type=chunk_type,
|
||||
tool_call_id=tool_call_id,
|
||||
tool_name=tool_name,
|
||||
tool_arguments=tool_arguments,
|
||||
tool_files=tool_files,
|
||||
tool_error=tool_error,
|
||||
event=event_type or StreamEvent.MESSAGE,
|
||||
)
|
||||
|
||||
if chunk_type:
|
||||
response = response.model_copy(update={"chunk_type": chunk_type})
|
||||
|
||||
if chunk_type == "tool_call":
|
||||
response = response.model_copy(
|
||||
update={
|
||||
"tool_call_id": tool_call_id,
|
||||
"tool_name": tool_name,
|
||||
"tool_arguments": tool_arguments,
|
||||
}
|
||||
)
|
||||
elif chunk_type == "tool_result":
|
||||
response = response.model_copy(
|
||||
update={
|
||||
"tool_call_id": tool_call_id,
|
||||
"tool_name": tool_name,
|
||||
"tool_arguments": tool_arguments,
|
||||
"tool_files": tool_files,
|
||||
"tool_error": tool_error,
|
||||
"tool_elapsed_time": tool_elapsed_time,
|
||||
}
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
def message_replace_to_stream_response(self, answer: str, reason: str = "") -> MessageReplaceStreamResponse:
|
||||
"""
|
||||
Message replace to stream response.
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ class ToolResult(BaseModel):
|
|||
output: str | None = Field(default=None, description="Tool output text, error or success message")
|
||||
files: list[str] = Field(default_factory=list, description="File produced by tool")
|
||||
status: ToolResultStatus | None = Field(default=ToolResultStatus.SUCCESS, description="Tool execution status")
|
||||
elapsed_time: float | None = Field(default=None, description="Elapsed seconds spent executing the tool")
|
||||
|
||||
|
||||
class ToolCallResult(BaseModel):
|
||||
|
|
@ -31,3 +32,4 @@ class ToolCallResult(BaseModel):
|
|||
output: str | None = Field(default=None, description="Tool output text, error or success message")
|
||||
files: list[File] = Field(default_factory=list, description="File produced by tool")
|
||||
status: ToolResultStatus = Field(default=ToolResultStatus.SUCCESS, description="Tool execution status")
|
||||
elapsed_time: float | None = Field(default=None, description="Elapsed seconds spent executing the tool")
|
||||
|
|
|
|||
|
|
@ -29,6 +29,8 @@ class ChunkType(StrEnum):
|
|||
TOOL_CALL = "tool_call" # Tool call arguments streaming
|
||||
TOOL_RESULT = "tool_result" # Tool execution result
|
||||
THOUGHT = "thought" # Agent thinking process (ReAct)
|
||||
THOUGHT_START = "thought_start" # Agent thought start
|
||||
THOUGHT_END = "thought_end" # Agent thought end
|
||||
|
||||
|
||||
class NodeRunStreamChunkEvent(GraphNodeEventBase):
|
||||
|
|
|
|||
|
|
@ -41,6 +41,8 @@ class ChunkType(StrEnum):
|
|||
TOOL_CALL = "tool_call" # Tool call arguments streaming
|
||||
TOOL_RESULT = "tool_result" # Tool execution result
|
||||
THOUGHT = "thought" # Agent thinking process (ReAct)
|
||||
THOUGHT_START = "thought_start" # Agent thought start
|
||||
THOUGHT_END = "thought_end" # Agent thought end
|
||||
|
||||
|
||||
class StreamChunkEvent(NodeEventBase):
|
||||
|
|
@ -70,6 +72,18 @@ class ToolResultChunkEvent(StreamChunkEvent):
|
|||
tool_result: ToolResult | None = Field(default=None, description="structured tool result payload")
|
||||
|
||||
|
||||
class ThoughtStartChunkEvent(StreamChunkEvent):
|
||||
"""Agent thought start streaming event - Agent thinking process (ReAct)."""
|
||||
|
||||
chunk_type: ChunkType = Field(default=ChunkType.THOUGHT_START, frozen=True)
|
||||
|
||||
|
||||
class ThoughtEndChunkEvent(StreamChunkEvent):
|
||||
"""Agent thought end streaming event - Agent thinking process (ReAct)."""
|
||||
|
||||
chunk_type: ChunkType = Field(default=ChunkType.THOUGHT_END, frozen=True)
|
||||
|
||||
|
||||
class ThoughtChunkEvent(StreamChunkEvent):
|
||||
"""Agent thought streaming event - Agent thinking process (ReAct)."""
|
||||
|
||||
|
|
|
|||
|
|
@ -580,9 +580,10 @@ class Node(Generic[NodeDataT]):
|
|||
from core.workflow.entities import ToolResult, ToolResultStatus
|
||||
from core.workflow.graph_events import ChunkType
|
||||
|
||||
tool_result = event.tool_result
|
||||
status: ToolResultStatus = (
|
||||
tool_result.status if tool_result and tool_result.status is not None else ToolResultStatus.SUCCESS
|
||||
tool_result = event.tool_result or ToolResult()
|
||||
status: ToolResultStatus = tool_result.status or ToolResultStatus.SUCCESS
|
||||
tool_result = tool_result.model_copy(
|
||||
update={"status": status, "files": tool_result.files or []},
|
||||
)
|
||||
|
||||
return NodeRunStreamChunkEvent(
|
||||
|
|
@ -593,13 +594,7 @@ class Node(Generic[NodeDataT]):
|
|||
chunk=event.chunk,
|
||||
is_final=event.is_final,
|
||||
chunk_type=ChunkType.TOOL_RESULT,
|
||||
tool_result=ToolResult(
|
||||
id=tool_result.id if tool_result else None,
|
||||
name=tool_result.name if tool_result else None,
|
||||
output=tool_result.output if tool_result else None,
|
||||
files=tool_result.files if tool_result else [],
|
||||
status=status,
|
||||
),
|
||||
tool_result=tool_result,
|
||||
)
|
||||
|
||||
@_dispatch.register
|
||||
|
|
|
|||
|
|
@ -163,6 +163,7 @@ class ThinkTagStreamParser:
|
|||
thought_text = self._buffer[: end_match.start()]
|
||||
if thought_text:
|
||||
parts.append(("thought", thought_text))
|
||||
parts.append(("thought_end", ""))
|
||||
self._buffer = self._buffer[end_match.end() :]
|
||||
self._in_think = False
|
||||
continue
|
||||
|
|
@ -180,6 +181,7 @@ class ThinkTagStreamParser:
|
|||
if prefix:
|
||||
parts.append(("text", prefix))
|
||||
self._buffer = self._buffer[start_match.end() :]
|
||||
parts.append(("thought_start", ""))
|
||||
self._in_think = True
|
||||
continue
|
||||
|
||||
|
|
@ -195,7 +197,7 @@ class ThinkTagStreamParser:
|
|||
# Extra safeguard: strip any stray tags that slipped through.
|
||||
content = self._START_PATTERN.sub("", content)
|
||||
content = self._END_PATTERN.sub("", content)
|
||||
if content:
|
||||
if content or kind in {"thought_start", "thought_end"}:
|
||||
cleaned_parts.append((kind, content))
|
||||
|
||||
return cleaned_parts
|
||||
|
|
@ -210,12 +212,19 @@ class ThinkTagStreamParser:
|
|||
if content.lower().startswith(self._START_PREFIX) or content.lower().startswith(self._END_PREFIX):
|
||||
content = ""
|
||||
self._buffer = ""
|
||||
if not content:
|
||||
if not content and not self._in_think:
|
||||
return []
|
||||
# Strip any complete tags that might still be present.
|
||||
content = self._START_PATTERN.sub("", content)
|
||||
content = self._END_PATTERN.sub("", content)
|
||||
return [(kind, content)] if content else []
|
||||
|
||||
result: list[tuple[str, str]] = []
|
||||
if content:
|
||||
result.append((kind, content))
|
||||
if self._in_think:
|
||||
result.append(("thought_end", ""))
|
||||
self._in_think = False
|
||||
return result
|
||||
|
||||
|
||||
class StreamBuffers(BaseModel):
|
||||
|
|
|
|||
|
|
@ -80,6 +80,7 @@ from core.workflow.node_events import (
|
|||
ToolCallChunkEvent,
|
||||
ToolResultChunkEvent,
|
||||
)
|
||||
from core.workflow.node_events.node import ThoughtEndChunkEvent, ThoughtStartChunkEvent
|
||||
from core.workflow.nodes.base.entities import VariableSelector
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser
|
||||
|
|
@ -565,15 +566,28 @@ class LLMNode(Node[LLMNodeData]):
|
|||
# Generation output: split out thoughts, forward only non-thought content chunks
|
||||
for kind, segment in think_parser.process(text_part):
|
||||
if not segment:
|
||||
continue
|
||||
if kind not in {"thought_start", "thought_end"}:
|
||||
continue
|
||||
|
||||
if kind == "thought":
|
||||
if kind == "thought_start":
|
||||
yield ThoughtStartChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought":
|
||||
reasoning_chunks.append(segment)
|
||||
yield ThoughtChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk=segment,
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought_end":
|
||||
yield ThoughtEndChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
else:
|
||||
yield StreamChunkEvent(
|
||||
selector=[node_id, "generation", "content"],
|
||||
|
|
@ -596,15 +610,27 @@ class LLMNode(Node[LLMNodeData]):
|
|||
raise LLMNodeError(f"Failed to parse structured output: {e}")
|
||||
|
||||
for kind, segment in think_parser.flush():
|
||||
if not segment:
|
||||
if not segment and kind not in {"thought_start", "thought_end"}:
|
||||
continue
|
||||
if kind == "thought":
|
||||
if kind == "thought_start":
|
||||
yield ThoughtStartChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought":
|
||||
reasoning_chunks.append(segment)
|
||||
yield ThoughtChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk=segment,
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought_end":
|
||||
yield ThoughtEndChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
else:
|
||||
yield StreamChunkEvent(
|
||||
selector=[node_id, "generation", "content"],
|
||||
|
|
@ -1649,6 +1675,7 @@ class LLMNode(Node[LLMNodeData]):
|
|||
"output": tool_call.output,
|
||||
"files": files,
|
||||
"status": tool_call.status.value if hasattr(tool_call.status, "value") else tool_call.status,
|
||||
"elapsed_time": tool_call.elapsed_time,
|
||||
}
|
||||
|
||||
def _flush_thought_segment(self, buffers: StreamBuffers, trace_state: TraceState) -> None:
|
||||
|
|
@ -1707,6 +1734,7 @@ class LLMNode(Node[LLMNodeData]):
|
|||
id=tool_call_id,
|
||||
name=tool_name,
|
||||
arguments=tool_arguments,
|
||||
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
|
||||
),
|
||||
)
|
||||
trace_state.trace_segments.append(tool_call_segment)
|
||||
|
|
@ -1755,6 +1783,7 @@ class LLMNode(Node[LLMNodeData]):
|
|||
id=tool_call_id,
|
||||
name=tool_name,
|
||||
arguments=None,
|
||||
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
|
||||
),
|
||||
)
|
||||
if existing_tool_segment is None:
|
||||
|
|
@ -1767,6 +1796,7 @@ class LLMNode(Node[LLMNodeData]):
|
|||
id=tool_call_id,
|
||||
name=tool_name,
|
||||
arguments=None,
|
||||
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
|
||||
)
|
||||
tool_call_segment.tool_call.output = (
|
||||
str(tool_output) if tool_output is not None else str(tool_error) if tool_error is not None else None
|
||||
|
|
@ -1785,6 +1815,7 @@ class LLMNode(Node[LLMNodeData]):
|
|||
output=result_output,
|
||||
files=tool_files,
|
||||
status=ToolResultStatus.ERROR if tool_error else ToolResultStatus.SUCCESS,
|
||||
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
|
||||
),
|
||||
is_final=False,
|
||||
)
|
||||
|
|
@ -1806,10 +1837,17 @@ class LLMNode(Node[LLMNodeData]):
|
|||
chunk_text = str(chunk_text)
|
||||
|
||||
for kind, segment in buffers.think_parser.process(chunk_text):
|
||||
if not segment:
|
||||
if not segment and kind not in {"thought_start", "thought_end"}:
|
||||
continue
|
||||
|
||||
if kind == "thought":
|
||||
if kind == "thought_start":
|
||||
self._flush_content_segment(buffers, trace_state)
|
||||
yield ThoughtStartChunkEvent(
|
||||
selector=[self._node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought":
|
||||
self._flush_content_segment(buffers, trace_state)
|
||||
buffers.current_turn_reasoning.append(segment)
|
||||
buffers.pending_thought.append(segment)
|
||||
|
|
@ -1818,6 +1856,13 @@ class LLMNode(Node[LLMNodeData]):
|
|||
chunk=segment,
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought_end":
|
||||
self._flush_thought_segment(buffers, trace_state)
|
||||
yield ThoughtEndChunkEvent(
|
||||
selector=[self._node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
else:
|
||||
self._flush_thought_segment(buffers, trace_state)
|
||||
aggregate.text += segment
|
||||
|
|
@ -1843,9 +1888,16 @@ class LLMNode(Node[LLMNodeData]):
|
|||
self, buffers: StreamBuffers, trace_state: TraceState, aggregate: AggregatedResult
|
||||
) -> Generator[NodeEventBase, None, None]:
|
||||
for kind, segment in buffers.think_parser.flush():
|
||||
if not segment:
|
||||
if not segment and kind not in {"thought_start", "thought_end"}:
|
||||
continue
|
||||
if kind == "thought":
|
||||
if kind == "thought_start":
|
||||
self._flush_content_segment(buffers, trace_state)
|
||||
yield ThoughtStartChunkEvent(
|
||||
selector=[self._node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought":
|
||||
self._flush_content_segment(buffers, trace_state)
|
||||
buffers.current_turn_reasoning.append(segment)
|
||||
buffers.pending_thought.append(segment)
|
||||
|
|
@ -1854,6 +1906,13 @@ class LLMNode(Node[LLMNodeData]):
|
|||
chunk=segment,
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought_end":
|
||||
self._flush_thought_segment(buffers, trace_state)
|
||||
yield ThoughtEndChunkEvent(
|
||||
selector=[self._node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
else:
|
||||
self._flush_thought_segment(buffers, trace_state)
|
||||
aggregate.text += segment
|
||||
|
|
@ -1960,6 +2019,7 @@ class LLMNode(Node[LLMNodeData]):
|
|||
arguments=json.dumps(tool_args) if tool_args else "",
|
||||
output=result_text,
|
||||
status=status,
|
||||
elapsed_time=log.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if log.metadata else None,
|
||||
)
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
"""
|
||||
Mark agent test modules as a package to avoid import name collisions.
|
||||
"""
|
||||
|
||||
|
|
|
|||
|
|
@ -45,4 +45,3 @@ def test_skip_empty_final_chunk() -> None:
|
|||
published_event, publish_from = queue_manager.published[0]
|
||||
assert publish_from == PublishFrom.APPLICATION_MANAGER
|
||||
assert published_event.text == "hi"
|
||||
|
||||
|
|
|
|||
|
|
@ -146,4 +146,3 @@ def test_serialize_tool_call_strips_files_to_ids():
|
|||
assert serialized["name"] == "do"
|
||||
assert serialized["arguments"] == '{"a":1}'
|
||||
assert serialized["output"] == "ok"
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue