Refine phase 3 typing boundaries

This commit is contained in:
Yanli 盐粒 2026-03-17 19:13:12 +08:00
parent c4aeaa35d4
commit 79433b0091
4 changed files with 28 additions and 8 deletions

View File

@ -336,13 +336,11 @@ class Node(Generic[NodeDataT]):
def _restore_execution_id_from_runtime_state(self) -> str | None:
graph_execution = self.graph_runtime_state.graph_execution
node_executions = getattr(graph_execution, "node_executions", None)
if not isinstance(node_executions, dict):
return None
node_executions = graph_execution.node_executions
node_execution = node_executions.get(self._node_id)
if node_execution is None:
return None
execution_id = getattr(node_execution, "execution_id", None)
execution_id = node_execution.execution_id
if not execution_id:
return None
return str(execution_id)

View File

@ -9,6 +9,7 @@ import time
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal
from pydantic import TypeAdapter
from sqlalchemy import select
from core.llm_generator.output_parser.errors import OutputParserError
@ -89,6 +90,7 @@ if TYPE_CHECKING:
from dify_graph.runtime import GraphRuntimeState
logger = logging.getLogger(__name__)
_JSON_OBJECT_ADAPTER = TypeAdapter(dict[str, object])
class LLMNode(Node[LLMNodeData]):
@ -923,6 +925,12 @@ class LLMNode(Node[LLMNodeData]):
# Extract clean text and reasoning from <think> tags
clean_text, reasoning_content = LLMNode._split_reasoning(full_text, reasoning_format)
structured_output = (
dict(invoke_result.structured_output)
if isinstance(invoke_result, LLMResultWithStructuredOutput) and invoke_result.structured_output is not None
else None
)
event = ModelInvokeCompletedEvent(
# Use clean_text for separated mode, full_text for tagged mode
text=clean_text if reasoning_format == "separated" else full_text,
@ -931,7 +939,7 @@ class LLMNode(Node[LLMNodeData]):
# Reasoning content for workflow variables and downstream nodes
reasoning_content=reasoning_content,
# Pass structured output if enabled
structured_output=getattr(invoke_result, "structured_output", None),
structured_output=structured_output,
)
if request_latency is not None:
event.usage.latency = round(request_latency, 3)
@ -976,7 +984,7 @@ class LLMNode(Node[LLMNodeData]):
schema = structured_output.get("schema")
if not schema:
raise LLMNodeError("Please provide a valid structured output schema")
return dict(schema)
return _JSON_OBJECT_ADAPTER.validate_python(schema)
@staticmethod
def _save_multimodal_output_and_convert_result_to_markdown(

View File

@ -5,6 +5,8 @@ import uuid
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, cast
from pydantic import TypeAdapter
from core.model_manager import ModelInstance
from core.prompt.advanced_prompt_transform import AdvancedPromptTransform
from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate
@ -63,6 +65,7 @@ from .prompts import (
)
logger = logging.getLogger(__name__)
_JSON_OBJECT_ADAPTER = TypeAdapter(dict[str, object])
if TYPE_CHECKING:
from dify_graph.entities import GraphInitParams
@ -679,7 +682,7 @@ class ParameterExtractorNode(Node[ParameterExtractorNodeData]):
json_str = extract_json(result[idx:])
if json_str:
with contextlib.suppress(Exception):
return cast(dict[str, object], json.loads(json_str))
return _JSON_OBJECT_ADAPTER.validate_python(json.loads(json_str))
logger.info("extra error: %s", result)
return None
@ -697,7 +700,7 @@ class ParameterExtractorNode(Node[ParameterExtractorNodeData]):
json_str = extract_json(result[idx:])
if json_str:
with contextlib.suppress(Exception):
return cast(dict[str, object], json.loads(json_str))
return _JSON_OBJECT_ADAPTER.validate_python(json.loads(json_str))
logger.info("extra error: %s", result)
return None

View File

@ -66,6 +66,11 @@ class GraphExecutionProtocol(Protocol):
exceptions_count: int
pause_reasons: list[PauseReason]
@property
def node_executions(self) -> Mapping[str, NodeExecutionProtocol]:
"""Return node execution state keyed by node id for resume support."""
...
def start(self) -> None:
"""Transition execution into the running state."""
...
@ -91,6 +96,12 @@ class GraphExecutionProtocol(Protocol):
...
class NodeExecutionProtocol(Protocol):
"""Structural interface for per-node execution state used during resume."""
execution_id: str | None
class ResponseStreamCoordinatorProtocol(Protocol):
"""Structural interface for response stream coordinator."""