From 79433b0091aaf3a7bcbd6347e071ffb99a2d05c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yanli=20=E7=9B=90=E7=B2=92?= Date: Tue, 17 Mar 2026 19:13:12 +0800 Subject: [PATCH] Refine phase 3 typing boundaries --- api/dify_graph/nodes/base/node.py | 6 ++---- api/dify_graph/nodes/llm/node.py | 12 ++++++++++-- .../parameter_extractor/parameter_extractor_node.py | 7 +++++-- api/dify_graph/runtime/graph_runtime_state.py | 11 +++++++++++ 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/api/dify_graph/nodes/base/node.py b/api/dify_graph/nodes/base/node.py index 435c94d83b..7602aed15d 100644 --- a/api/dify_graph/nodes/base/node.py +++ b/api/dify_graph/nodes/base/node.py @@ -336,13 +336,11 @@ class Node(Generic[NodeDataT]): def _restore_execution_id_from_runtime_state(self) -> str | None: graph_execution = self.graph_runtime_state.graph_execution - node_executions = getattr(graph_execution, "node_executions", None) - if not isinstance(node_executions, dict): - return None + node_executions = graph_execution.node_executions node_execution = node_executions.get(self._node_id) if node_execution is None: return None - execution_id = getattr(node_execution, "execution_id", None) + execution_id = node_execution.execution_id if not execution_id: return None return str(execution_id) diff --git a/api/dify_graph/nodes/llm/node.py b/api/dify_graph/nodes/llm/node.py index cb002e2f6d..b476fc7a46 100644 --- a/api/dify_graph/nodes/llm/node.py +++ b/api/dify_graph/nodes/llm/node.py @@ -9,6 +9,7 @@ import time from collections.abc import Generator, Mapping, Sequence from typing import TYPE_CHECKING, Any, Literal +from pydantic import TypeAdapter from sqlalchemy import select from core.llm_generator.output_parser.errors import OutputParserError @@ -89,6 +90,7 @@ if TYPE_CHECKING: from dify_graph.runtime import GraphRuntimeState logger = logging.getLogger(__name__) +_JSON_OBJECT_ADAPTER = TypeAdapter(dict[str, object]) class LLMNode(Node[LLMNodeData]): @@ -923,6 +925,12 @@ class LLMNode(Node[LLMNodeData]): # Extract clean text and reasoning from tags clean_text, reasoning_content = LLMNode._split_reasoning(full_text, reasoning_format) + structured_output = ( + dict(invoke_result.structured_output) + if isinstance(invoke_result, LLMResultWithStructuredOutput) and invoke_result.structured_output is not None + else None + ) + event = ModelInvokeCompletedEvent( # Use clean_text for separated mode, full_text for tagged mode text=clean_text if reasoning_format == "separated" else full_text, @@ -931,7 +939,7 @@ class LLMNode(Node[LLMNodeData]): # Reasoning content for workflow variables and downstream nodes reasoning_content=reasoning_content, # Pass structured output if enabled - structured_output=getattr(invoke_result, "structured_output", None), + structured_output=structured_output, ) if request_latency is not None: event.usage.latency = round(request_latency, 3) @@ -976,7 +984,7 @@ class LLMNode(Node[LLMNodeData]): schema = structured_output.get("schema") if not schema: raise LLMNodeError("Please provide a valid structured output schema") - return dict(schema) + return _JSON_OBJECT_ADAPTER.validate_python(schema) @staticmethod def _save_multimodal_output_and_convert_result_to_markdown( diff --git a/api/dify_graph/nodes/parameter_extractor/parameter_extractor_node.py b/api/dify_graph/nodes/parameter_extractor/parameter_extractor_node.py index f9b3e9f545..106c2506e0 100644 --- a/api/dify_graph/nodes/parameter_extractor/parameter_extractor_node.py +++ b/api/dify_graph/nodes/parameter_extractor/parameter_extractor_node.py @@ -5,6 +5,8 @@ import uuid from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any, cast +from pydantic import TypeAdapter + from core.model_manager import ModelInstance from core.prompt.advanced_prompt_transform import AdvancedPromptTransform from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate @@ -63,6 +65,7 @@ from .prompts import ( ) logger = logging.getLogger(__name__) +_JSON_OBJECT_ADAPTER = TypeAdapter(dict[str, object]) if TYPE_CHECKING: from dify_graph.entities import GraphInitParams @@ -679,7 +682,7 @@ class ParameterExtractorNode(Node[ParameterExtractorNodeData]): json_str = extract_json(result[idx:]) if json_str: with contextlib.suppress(Exception): - return cast(dict[str, object], json.loads(json_str)) + return _JSON_OBJECT_ADAPTER.validate_python(json.loads(json_str)) logger.info("extra error: %s", result) return None @@ -697,7 +700,7 @@ class ParameterExtractorNode(Node[ParameterExtractorNodeData]): json_str = extract_json(result[idx:]) if json_str: with contextlib.suppress(Exception): - return cast(dict[str, object], json.loads(json_str)) + return _JSON_OBJECT_ADAPTER.validate_python(json.loads(json_str)) logger.info("extra error: %s", result) return None diff --git a/api/dify_graph/runtime/graph_runtime_state.py b/api/dify_graph/runtime/graph_runtime_state.py index 41acc6db35..d614c1fdd7 100644 --- a/api/dify_graph/runtime/graph_runtime_state.py +++ b/api/dify_graph/runtime/graph_runtime_state.py @@ -66,6 +66,11 @@ class GraphExecutionProtocol(Protocol): exceptions_count: int pause_reasons: list[PauseReason] + @property + def node_executions(self) -> Mapping[str, NodeExecutionProtocol]: + """Return node execution state keyed by node id for resume support.""" + ... + def start(self) -> None: """Transition execution into the running state.""" ... @@ -91,6 +96,12 @@ class GraphExecutionProtocol(Protocol): ... +class NodeExecutionProtocol(Protocol): + """Structural interface for per-node execution state used during resume.""" + + execution_id: str | None + + class ResponseStreamCoordinatorProtocol(Protocol): """Structural interface for response stream coordinator."""