From f54b9b12b0646391c12d2222f74e21c77a7cbf81 Mon Sep 17 00:00:00 2001 From: Novice Date: Wed, 17 Dec 2025 17:34:02 +0800 Subject: [PATCH] feat: add process data --- api/core/workflow/nodes/llm/node.py | 233 +++++++++++++++++----------- 1 file changed, 145 insertions(+), 88 deletions(-) diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index 408363d226..6be59b6ead 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -164,81 +164,6 @@ class LLMNode(Node[LLMNodeData]): def version(cls) -> str: return "1" - def _stream_llm_events( - self, - generator: Generator[NodeEventBase | LLMStructuredOutput, None, LLMGenerationData | None], - *, - model_instance: ModelInstance, - ) -> Generator[ - NodeEventBase, - None, - tuple[ - str, - str, - LLMUsage, - str | None, - LLMStructuredOutput | None, - LLMGenerationData | None, - ], - ]: - """ - Stream events and capture generator return value in one place. - Uses generator delegation so _run stays concise while still emitting events. - """ - clean_text = "" - reasoning_content = "" - usage = LLMUsage.empty_usage() - finish_reason: str | None = None - structured_output: LLMStructuredOutput | None = None - generation_data: LLMGenerationData | None = None - completed = False - - while True: - try: - event = next(generator) - except StopIteration as exc: - if isinstance(exc.value, LLMGenerationData): - generation_data = exc.value - break - - if completed: - # After completion we still drain to reach StopIteration.value - continue - - match event: - case StreamChunkEvent() | ThoughtChunkEvent(): - yield event - - case ModelInvokeCompletedEvent( - text=text, - usage=usage_event, - finish_reason=finish_reason_event, - reasoning_content=reasoning_event, - structured_output=structured_raw, - ): - clean_text = text - usage = usage_event - finish_reason = finish_reason_event - reasoning_content = reasoning_event or "" - - if self.node_data.reasoning_format != "tagged": - clean_text, _ = LLMNode._split_reasoning(clean_text, self.node_data.reasoning_format) - - structured_output = ( - LLMStructuredOutput(structured_output=structured_raw) if structured_raw else None - ) - - llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage) - completed = True - - case LLMStructuredOutput(): - structured_output = event - - case _: - continue - - return clean_text, reasoning_content, usage, finish_reason, structured_output, generation_data - def _run(self) -> Generator: node_inputs: dict[str, Any] = {} process_data: dict[str, Any] = {} @@ -262,15 +187,6 @@ class LLMNode(Node[LLMNodeData]): # merge inputs inputs.update(jinja_inputs) - # Add all inputs to node_inputs for logging - node_inputs.update(inputs) - - # Add tools to inputs if configured - if self.tool_call_enabled: - node_inputs["tools"] = [ - {"provider_id": tool.provider_name, "tool_name": tool.tool_name} for tool in self._node_data.tools - ] - # fetch files files = ( llm_utils.fetch_files( @@ -372,6 +288,8 @@ class LLMNode(Node[LLMNodeData]): ( clean_text, reasoning_content, + generation_reasoning_content, + generation_clean_content, usage, finish_reason, structured_output, @@ -396,6 +314,16 @@ class LLMNode(Node[LLMNodeData]): "model_provider": model_config.provider, "model_name": model_config.model, } + if self.tool_call_enabled and self._node_data.tools: + process_data["tools"] = [ + { + "type": tool.type.value if hasattr(tool.type, "value") else tool.type, + "provider_name": tool.provider_name, + "tool_name": tool.tool_name, + } + for tool in self._node_data.tools + if tool.enabled + ] # Unified outputs building outputs = { @@ -411,17 +339,25 @@ class LLMNode(Node[LLMNodeData]): generation = { "content": generation_data.text, "reasoning_content": generation_data.reasoning_contents, # [thought1, thought2, ...] - "tool_calls": generation_data.tool_calls, + "tool_calls": [self._serialize_tool_call(item) for item in generation_data.tool_calls], "sequence": generation_data.sequence, } files_to_output = generation_data.files else: # Traditional LLM invocation + generation_reasoning = generation_reasoning_content or reasoning_content + generation_content = generation_clean_content or clean_text + sequence: list[dict[str, Any]] = [] + if generation_reasoning: + sequence = [ + {"type": "reasoning", "index": 0}, + {"type": "content", "start": 0, "end": len(generation_content)}, + ] generation = { - "content": clean_text, - "reasoning_content": [reasoning_content] if reasoning_content else [], + "content": generation_content, + "reasoning_content": [generation_reasoning] if generation_reasoning else [], "tool_calls": [], - "sequence": [], + "sequence": sequence, } files_to_output = self._file_outputs @@ -1460,6 +1396,104 @@ class LLMNode(Node[LLMNodeData]): and all(tool.enabled for tool in self.node_data.tools) ) + def _stream_llm_events( + self, + generator: Generator[NodeEventBase | LLMStructuredOutput, None, LLMGenerationData | None], + *, + model_instance: ModelInstance, + ) -> Generator[ + NodeEventBase, + None, + tuple[ + str, + str, + str, + str, + LLMUsage, + str | None, + LLMStructuredOutput | None, + LLMGenerationData | None, + ], + ]: + """ + Stream events and capture generator return value in one place. + Uses generator delegation so _run stays concise while still emitting events. + """ + clean_text = "" + reasoning_content = "" + generation_reasoning_content = "" + generation_clean_content = "" + usage = LLMUsage.empty_usage() + finish_reason: str | None = None + structured_output: LLMStructuredOutput | None = None + generation_data: LLMGenerationData | None = None + completed = False + + while True: + try: + event = next(generator) + except StopIteration as exc: + if isinstance(exc.value, LLMGenerationData): + generation_data = exc.value + break + + if completed: + # After completion we still drain to reach StopIteration.value + continue + + match event: + case StreamChunkEvent() | ThoughtChunkEvent(): + yield event + + case ModelInvokeCompletedEvent( + text=text, + usage=usage_event, + finish_reason=finish_reason_event, + reasoning_content=reasoning_event, + structured_output=structured_raw, + ): + clean_text = text + usage = usage_event + finish_reason = finish_reason_event + reasoning_content = reasoning_event or "" + generation_reasoning_content = reasoning_content + generation_clean_content = clean_text + + if self.node_data.reasoning_format == "tagged": + # Keep tagged text for output; also extract reasoning for generation field + generation_clean_content, generation_reasoning_content = LLMNode._split_reasoning( + clean_text, reasoning_format="separated" + ) + else: + clean_text, generation_reasoning_content = LLMNode._split_reasoning( + clean_text, self.node_data.reasoning_format + ) + generation_clean_content = clean_text + + structured_output = ( + LLMStructuredOutput(structured_output=structured_raw) if structured_raw else None + ) + + llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage) + completed = True + + case LLMStructuredOutput(): + structured_output = event + + case _: + continue + + return ( + clean_text, + reasoning_content, + generation_reasoning_content, + generation_clean_content, + usage, + finish_reason, + structured_output, + generation_data, + ) + def _invoke_llm_with_tools( self, model_instance: ModelInstance, @@ -1594,6 +1628,29 @@ class LLMNode(Node[LLMNodeData]): return files + @staticmethod + def _serialize_tool_call(tool_call: ToolCallResult) -> dict[str, Any]: + """Convert ToolCallResult into JSON-friendly dict.""" + + def _file_to_ref(file: File) -> str | None: + # Align with streamed tool result events which carry file IDs + return file.id or file.related_id + + files = [] + for file in tool_call.files or []: + ref = _file_to_ref(file) + if ref: + files.append(ref) + + return { + "id": tool_call.id, + "name": tool_call.name, + "arguments": tool_call.arguments, + "output": tool_call.output, + "files": files, + "status": tool_call.status.value if hasattr(tool_call.status, "value") else tool_call.status, + } + def _flush_thought_segment(self, buffers: StreamBuffers, trace_state: TraceState) -> None: if not buffers.pending_thought: return