From 1cb5ee918f74b65a21e18119846e8d1fd3f35fac Mon Sep 17 00:00:00 2001 From: Novice Date: Thu, 5 Mar 2026 14:08:37 +0800 Subject: [PATCH] feat: enhance model event handling with new identity and metrics fields --- .../advanced_chat/generate_task_pipeline.py | 7 ++ .../apps/workflow/generate_task_pipeline.py | 36 ++++++++-- api/core/app/apps/workflow_app_runner.py | 7 ++ api/core/app/entities/queue_entities.py | 19 +++++- api/core/app/entities/task_entities.py | 26 ++++++- .../task_pipeline/message_cycle_manager.py | 37 ++++++---- api/core/workflow/entities/tool_entities.py | 1 + api/core/workflow/graph_events/node.py | 11 +++ api/core/workflow/node_events/node.py | 10 +++ api/core/workflow/nodes/llm/entities.py | 1 + api/core/workflow/nodes/llm/node.py | 67 ++++++++++++++----- 11 files changed, 185 insertions(+), 37 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index a84272cde9..e49cd869b6 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -591,6 +591,13 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_elapsed_time=tool_elapsed_time, tool_icon=tool_icon, tool_icon_dark=tool_icon_dark, + node_id=event.node_id, + model_provider=event.model_provider, + model_name=event.model_name, + model_icon=event.model_icon, + model_icon_dark=event.model_icon_dark, + model_usage=event.model_usage, + model_duration=event.model_duration, ) def _handle_iteration_start_event( diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index ba0b72a63b..fe0ef138c6 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -551,6 +551,13 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_elapsed_time=tool_elapsed_time, tool_icon=tool_icon, tool_icon_dark=tool_icon_dark, + node_id=event.node_id, + model_provider=event.model_provider, + model_name=event.model_name, + model_icon=event.model_icon, + model_icon_dark=event.model_icon_dark, + model_usage=event.model_usage, + model_duration=event.model_duration, ) def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]: @@ -748,12 +755,14 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): tool_elapsed_time: float | None = None, tool_icon: str | dict | None = None, tool_icon_dark: str | dict | None = None, + node_id: str | None = None, + model_provider: str | None = None, + model_name: str | None = None, + model_icon: str | dict | None = None, + model_icon_dark: str | dict | None = None, + model_usage: dict | None = None, + model_duration: float | None = None, ) -> TextChunkStreamResponse: - """ - Handle completed event. - :param text: text - :return: - """ from core.app.entities.task_entities import ChunkType as ResponseChunkType response_chunk_type = ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT @@ -762,6 +771,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): text=text, from_variable_selector=from_variable_selector, chunk_type=response_chunk_type, + node_id=node_id, ) if response_chunk_type == ResponseChunkType.TOOL_CALL: @@ -787,6 +797,22 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): "tool_icon_dark": tool_icon_dark, } ) + elif response_chunk_type == ResponseChunkType.MODEL_START: + data = data.model_copy( + update={ + "model_provider": model_provider, + "model_name": model_name, + "model_icon": model_icon, + "model_icon_dark": model_icon_dark, + } + ) + elif response_chunk_type == ResponseChunkType.MODEL_END: + data = data.model_copy( + update={ + "model_usage": model_usage, + "model_duration": model_duration, + } + ) response = TextChunkStreamResponse( task_id=self._application_generate_entity.task_id, diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index af7a99392e..be3c1e3025 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -494,6 +494,13 @@ class WorkflowBasedAppRunner: tool_call=event.tool_call, tool_result=event.tool_result, in_parent_node_id=event.in_parent_node_id, + node_id=event.node_id, + model_provider=event.model_provider, + model_name=event.model_name, + model_icon=event.model_icon, + model_icon_dark=event.model_icon_dark, + model_usage=event.model_usage, + model_duration=event.model_duration, ) ) elif isinstance(event, NodeRunRetrieverResourceEvent): diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index ddd6d85906..2262b571fa 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -188,9 +188,11 @@ class ChunkType(StrEnum): TEXT = "text" # Normal text streaming TOOL_CALL = "tool_call" # Tool call arguments streaming TOOL_RESULT = "tool_result" # Tool execution result - THOUGHT = "thought" # Agent thinking process (ReAct) - THOUGHT_START = "thought_start" # Agent thought start - THOUGHT_END = "thought_end" # Agent thought end + THOUGHT = "thought" # Model thinking process + THOUGHT_START = "thought_start" # Model thought start + THOUGHT_END = "thought_end" # Model thought end + MODEL_START = "model_start" # Model turn started with identity info + MODEL_END = "model_end" # Model turn completed with metrics class QueueTextChunkEvent(AppQueueEvent): @@ -208,6 +210,8 @@ class QueueTextChunkEvent(AppQueueEvent): """loop id if node is in loop""" in_parent_node_id: str | None = None """parent node id if this is an extractor node event""" + node_id: str | None = None + """workflow node id that produced this chunk""" # Extended fields for Agent/Tool streaming chunk_type: ChunkType = ChunkType.TEXT @@ -219,6 +223,15 @@ class QueueTextChunkEvent(AppQueueEvent): tool_result: ToolResult | None = None """structured tool result info""" + # Model identity (when chunk_type == MODEL_START) + model_provider: str | None = None + model_name: str | None = None + model_icon: str | dict | None = None + model_icon_dark: str | dict | None = None + # Model metrics (when chunk_type == MODEL_END) + model_usage: dict | None = None + model_duration: float | None = None + class QueueAgentMessageEvent(AppQueueEvent): """ diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 66e0e83220..a0e2488376 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -118,10 +118,12 @@ class MessageStreamResponse(StreamResponse): id: str answer: str from_variable_selector: list[str] | None = None + node_id: str | None = None + """workflow node id that produced this chunk""" # Extended fields for Agent/Tool streaming (imported at runtime to avoid circular import) chunk_type: str | None = None - """type of the chunk: text, tool_call, tool_result, thought""" + """type of the chunk: text, tool_call, tool_result, thought, model_start, model_end""" # Tool call fields (when chunk_type == "tool_call") tool_call_id: str | None = None @@ -143,6 +145,15 @@ class MessageStreamResponse(StreamResponse): tool_icon_dark: str | dict | None = None """dark theme icon of the tool""" + # Model identity fields (when chunk_type == "model_start") + model_provider: str | None = None + model_name: str | None = None + model_icon: str | dict | None = None + model_icon_dark: str | dict | None = None + # Model metrics fields (when chunk_type == "model_end") + model_usage: dict | None = None + model_duration: float | None = None + def model_dump(self, *args, **kwargs) -> dict[str, object]: kwargs.setdefault("exclude_none", True) return super().model_dump(*args, **kwargs) @@ -716,6 +727,8 @@ class ChunkType(StrEnum): THOUGHT = "thought" # Agent thinking process (ReAct) THOUGHT_START = "thought_start" # Agent thought start THOUGHT_END = "thought_end" # Agent thought end + MODEL_START = "model_start" # Model turn started with identity info + MODEL_END = "model_end" # Model turn completed with metrics class TextChunkStreamResponse(StreamResponse): @@ -730,6 +743,8 @@ class TextChunkStreamResponse(StreamResponse): text: str from_variable_selector: list[str] | None = None + node_id: str | None = None + """workflow node id that produced this chunk""" # Extended fields for Agent/Tool streaming chunk_type: ChunkType = ChunkType.TEXT @@ -753,6 +768,15 @@ class TextChunkStreamResponse(StreamResponse): tool_elapsed_time: float | None = None """elapsed time spent executing the tool""" + # Model identity fields (when chunk_type == MODEL_START) + model_provider: str | None = None + model_name: str | None = None + model_icon: str | dict | None = None + model_icon_dark: str | dict | None = None + # Model metrics fields (when chunk_type == MODEL_END) + model_usage: dict | None = None + model_duration: float | None = None + def model_dump(self, *args, **kwargs) -> dict[str, object]: kwargs.setdefault("exclude_none", True) return super().model_dump(*args, **kwargs) diff --git a/api/core/app/task_pipeline/message_cycle_manager.py b/api/core/app/task_pipeline/message_cycle_manager.py index e1e63df9d9..7d62da9e75 100644 --- a/api/core/app/task_pipeline/message_cycle_manager.py +++ b/api/core/app/task_pipeline/message_cycle_manager.py @@ -255,26 +255,21 @@ class MessageCycleManager: tool_icon: str | dict | None = None, tool_icon_dark: str | dict | None = None, event_type: StreamEvent | None = None, + node_id: str | None = None, + model_provider: str | None = None, + model_name: str | None = None, + model_icon: str | dict | None = None, + model_icon_dark: str | dict | None = None, + model_usage: dict | None = None, + model_duration: float | None = None, ) -> MessageStreamResponse: - """ - Message to stream response. - :param answer: answer - :param message_id: message id - :param from_variable_selector: from variable selector - :param chunk_type: type of the chunk (text, function_call, tool_result, thought) - :param tool_call_id: unique identifier for this tool call - :param tool_name: name of the tool being called - :param tool_arguments: accumulated tool arguments JSON - :param tool_files: file IDs produced by tool - :param tool_error: error message if tool failed - :return: - """ response = MessageStreamResponse( task_id=self._application_generate_entity.task_id, id=message_id, answer=answer, from_variable_selector=from_variable_selector, event=event_type or StreamEvent.MESSAGE, + node_id=node_id, ) if chunk_type: @@ -303,6 +298,22 @@ class MessageCycleManager: "tool_icon_dark": tool_icon_dark, } ) + elif chunk_type == "model_start": + response = response.model_copy( + update={ + "model_provider": model_provider, + "model_name": model_name, + "model_icon": model_icon, + "model_icon_dark": model_icon_dark, + } + ) + elif chunk_type == "model_end": + response = response.model_copy( + update={ + "model_usage": model_usage, + "model_duration": model_duration, + } + ) return response diff --git a/api/core/workflow/entities/tool_entities.py b/api/core/workflow/entities/tool_entities.py index 07cdbf99d9..eb5d4baca5 100644 --- a/api/core/workflow/entities/tool_entities.py +++ b/api/core/workflow/entities/tool_entities.py @@ -28,6 +28,7 @@ class ToolResult(BaseModel): elapsed_time: float | None = Field(default=None, description="Elapsed seconds spent executing the tool") icon: str | dict[str, Any] | None = Field(default=None, description="Icon of the tool") icon_dark: str | dict[str, Any] | None = Field(default=None, description="Dark theme icon of the tool") + provider: str | None = Field(default=None, description="Tool provider identifier") class ToolCallResult(BaseModel): diff --git a/api/core/workflow/graph_events/node.py b/api/core/workflow/graph_events/node.py index d26ab9d5f4..e6a392a974 100644 --- a/api/core/workflow/graph_events/node.py +++ b/api/core/workflow/graph_events/node.py @@ -31,6 +31,8 @@ class ChunkType(StrEnum): THOUGHT = "thought" # Agent thinking process (ReAct) THOUGHT_START = "thought_start" # Agent thought start THOUGHT_END = "thought_end" # Agent thought end + MODEL_START = "model_start" # Model turn started with identity info + MODEL_END = "model_end" # Model turn completed with metrics class NodeRunStreamChunkEvent(GraphNodeEventBase): @@ -56,6 +58,15 @@ class NodeRunStreamChunkEvent(GraphNodeEventBase): description="structured payload for tool_result chunks", ) + # Model identity fields (when chunk_type == MODEL_START) + model_provider: str | None = Field(default=None, description="model provider identifier") + model_name: str | None = Field(default=None, description="model name") + model_icon: str | dict | None = Field(default=None, description="model provider icon") + model_icon_dark: str | dict | None = Field(default=None, description="model provider dark icon") + # Model metrics fields (when chunk_type == MODEL_END) + model_usage: dict | None = Field(default=None, description="per-turn token usage as dict") + model_duration: float | None = Field(default=None, description="per-turn duration in seconds") + class NodeRunRetrieverResourceEvent(GraphNodeEventBase): retriever_resources: Sequence[RetrievalSourceMetadata] = Field(..., description="retriever resources") diff --git a/api/core/workflow/node_events/node.py b/api/core/workflow/node_events/node.py index b37b3cff86..371e314811 100644 --- a/api/core/workflow/node_events/node.py +++ b/api/core/workflow/node_events/node.py @@ -43,6 +43,8 @@ class ChunkType(StrEnum): THOUGHT = "thought" # Agent thinking process (ReAct) THOUGHT_START = "thought_start" # Agent thought start THOUGHT_END = "thought_end" # Agent thought end + MODEL_START = "model_start" # Model turn started with identity info + MODEL_END = "model_end" # Model turn completed with metrics class StreamChunkEvent(NodeEventBase): @@ -56,6 +58,14 @@ class StreamChunkEvent(NodeEventBase): chunk_type: ChunkType = Field(default=ChunkType.TEXT, description="type of the chunk") tool_call: ToolCall | None = Field(default=None, description="structured payload for tool_call chunks") tool_result: ToolResult | None = Field(default=None, description="structured payload for tool_result chunks") + # Model identity fields (when chunk_type == MODEL_START) + model_provider: str | None = Field(default=None, description="model provider identifier") + model_name: str | None = Field(default=None, description="model name") + model_icon: str | dict | None = Field(default=None, description="model provider icon") + model_icon_dark: str | dict | None = Field(default=None, description="model provider dark icon") + # Model metrics fields (when chunk_type == MODEL_END) + model_usage: LLMUsage | None = Field(default=None, description="per-turn token usage") + model_duration: float | None = Field(default=None, description="per-turn duration in seconds") class ToolCallChunkEvent(StreamChunkEvent): diff --git a/api/core/workflow/nodes/llm/entities.py b/api/core/workflow/nodes/llm/entities.py index 1a33d137d6..f86dcd9d95 100644 --- a/api/core/workflow/nodes/llm/entities.py +++ b/api/core/workflow/nodes/llm/entities.py @@ -274,6 +274,7 @@ class TraceState(BaseModel): tool_trace_map: dict[str, LLMTraceSegment] = Field(default_factory=dict) tool_call_index_map: dict[str, int] = Field(default_factory=dict) model_segment_start_time: float | None = Field(default=None, description="Start time for current model segment") + model_start_emitted: bool = Field(default=False, description="Whether model_start has been emitted for this turn") pending_usage: LLMUsage | None = Field(default=None, description="Pending usage for current model segment") diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index 95e8237629..e14e86b5e7 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -105,7 +105,7 @@ from core.workflow.node_events import ( ToolCallChunkEvent, ToolResultChunkEvent, ) -from core.workflow.node_events.node import ThoughtEndChunkEvent, ThoughtStartChunkEvent +from core.workflow.node_events.node import ChunkType, ThoughtEndChunkEvent, ThoughtStartChunkEvent from core.workflow.nodes.base.entities import VariableSelector from core.workflow.nodes.base.node import Node from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser @@ -2277,23 +2277,42 @@ class LLMNode(Node[LLMNodeData]): except Exception: return None + def _emit_model_start(self, trace_state: TraceState) -> Generator[NodeEventBase, None, None]: + """Yield a MODEL_START event with model identity info at the beginning of a model turn. + Idempotent: only emits once per turn (guarded by trace_state.model_start_emitted).""" + if trace_state.model_start_emitted: + return + trace_state.model_start_emitted = True + if trace_state.model_segment_start_time is None: + trace_state.model_segment_start_time = time.perf_counter() + provider = self._node_data.model.provider + yield StreamChunkEvent( + selector=[self._node_id, "generation", "model_start"], + chunk="", + chunk_type=ChunkType.MODEL_START, + is_final=False, + model_provider=provider, + model_name=self._node_data.model.name, + model_icon=self._generate_model_provider_icon_url(provider), + model_icon_dark=self._generate_model_provider_icon_url(provider, dark=True), + ) + def _flush_model_segment( self, buffers: StreamBuffers, trace_state: TraceState, error: str | None = None, - ) -> None: - """Flush pending thought/content buffers into a single model trace segment.""" + ) -> Generator[NodeEventBase, None, None]: + """Flush pending thought/content buffers into a single model trace segment + and yield a MODEL_END chunk event with usage/duration metrics.""" if not buffers.pending_thought and not buffers.pending_content and not buffers.pending_tool_calls: return now = time.perf_counter() duration = now - trace_state.model_segment_start_time if trace_state.model_segment_start_time else 0.0 - # Use pending_usage from trace_state (captured from THOUGHT log) usage = trace_state.pending_usage - # Generate model provider icon URL provider = self._node_data.model.provider model_name = self._node_data.model.name model_icon = self._generate_model_provider_icon_url(provider) @@ -2317,10 +2336,21 @@ class LLMNode(Node[LLMNodeData]): status="error" if error else "success", ) ) + + yield StreamChunkEvent( + selector=[self._node_id, "generation", "model_end"], + chunk="", + chunk_type=ChunkType.MODEL_END, + is_final=False, + model_usage=usage, + model_duration=duration, + ) + buffers.pending_thought.clear() buffers.pending_content.clear() buffers.pending_tool_calls.clear() trace_state.model_segment_start_time = None + trace_state.model_start_emitted = False trace_state.pending_usage = None def _handle_agent_log_output( @@ -2356,18 +2386,18 @@ class LLMNode(Node[LLMNodeData]): trace_state.pending_usage = llm_usage if output.log_type == AgentLog.LogType.TOOL_CALL and output.status == AgentLog.LogStatus.START: + yield from self._emit_model_start(trace_state) + tool_name = payload.tool_name tool_call_id = payload.tool_call_id tool_arguments = json.dumps(payload.tool_args or {}) - # Get icon from metadata (available at START) tool_icon = output.metadata.get(AgentLog.LogMetadata.ICON) if output.metadata else None tool_icon_dark = output.metadata.get(AgentLog.LogMetadata.ICON_DARK) if output.metadata else None if tool_call_id and tool_call_id not in trace_state.tool_call_index_map: trace_state.tool_call_index_map[tool_call_id] = len(trace_state.tool_call_index_map) - # Add tool call to pending list for model segment buffers.pending_tool_calls.append(ToolCall(id=tool_call_id, name=tool_name, arguments=tool_arguments)) yield ToolCallChunkEvent( @@ -2395,7 +2425,7 @@ class LLMNode(Node[LLMNodeData]): trace_state.tool_call_index_map[tool_call_id] = len(trace_state.tool_call_index_map) # Flush model segment before tool result processing - self._flush_model_segment(buffers, trace_state) + yield from self._flush_model_segment(buffers, trace_state) if output.status == AgentLog.LogStatus.ERROR: tool_error = output.error or payload.tool_error @@ -2450,6 +2480,7 @@ class LLMNode(Node[LLMNodeData]): elapsed_time=elapsed_time, icon=tool_icon, icon_dark=tool_icon_dark, + provider=tool_provider, ), is_final=False, ) @@ -2474,9 +2505,7 @@ class LLMNode(Node[LLMNodeData]): if not segment and kind not in {"thought_start", "thought_end"}: continue - # Start tracking model segment time on first output - if trace_state.model_segment_start_time is None: - trace_state.model_segment_start_time = time.perf_counter() + yield from self._emit_model_start(trace_state) if kind == "thought_start": yield ThoughtStartChunkEvent( @@ -2525,9 +2554,7 @@ class LLMNode(Node[LLMNodeData]): if not segment and kind not in {"thought_start", "thought_end"}: continue - # Start tracking model segment time on first output - if trace_state.model_segment_start_time is None: - trace_state.model_segment_start_time = time.perf_counter() + yield from self._emit_model_start(trace_state) if kind == "thought_start": yield ThoughtStartChunkEvent( @@ -2572,7 +2599,7 @@ class LLMNode(Node[LLMNodeData]): trace_state.pending_usage = aggregate.usage # Flush final model segment - self._flush_model_segment(buffers, trace_state) + yield from self._flush_model_segment(buffers, trace_state) def _close_streams(self) -> Generator[NodeEventBase, None, None]: yield StreamChunkEvent( @@ -2612,6 +2639,16 @@ class LLMNode(Node[LLMNodeData]): ), is_final=True, ) + yield StreamChunkEvent( + selector=[self._node_id, "generation", "model_start"], + chunk="", + is_final=True, + ) + yield StreamChunkEvent( + selector=[self._node_id, "generation", "model_end"], + chunk="", + is_final=True, + ) def _build_generation_data( self,