refactor(api): reduce Dify GraphInitParams usage (#34825)

This commit is contained in:
-LAN- 2026-04-09 15:59:15 +08:00 committed by GitHub
parent 7d793e12c8
commit d1e33ba9ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 215 additions and 90 deletions

View File

@ -2,7 +2,6 @@ import logging
import time
from typing import cast
from graphon.entities import GraphInitParams
from graphon.enums import WorkflowType
from graphon.graph import Graph
from graphon.graph_events import GraphEngineEvent, GraphRunFailedEvent
@ -22,7 +21,7 @@ from core.app.entities.app_invoke_entities import (
)
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id
from core.workflow.node_factory import DifyGraphInitContext, DifyNodeFactory, get_default_root_node_id
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
@ -265,22 +264,23 @@ class PipelineRunner(WorkflowBasedAppRunner):
# graph_config["nodes"] = real_run_nodes
# graph_config["edges"] = real_edges
# init graph
# Create required parameters for Graph.init
graph_init_params = GraphInitParams(
# Create explicit graph init context for Graph.init.
run_context = build_dify_run_context(
tenant_id=workflow.tenant_id,
app_id=self._app_id,
user_id=self.application_generate_entity.user_id,
user_from=user_from,
invoke_from=invoke_from,
)
graph_init_context = DifyGraphInitContext(
workflow_id=workflow.id,
graph_config=graph_config,
run_context=build_dify_run_context(
tenant_id=workflow.tenant_id,
app_id=self._app_id,
user_id=self.application_generate_entity.user_id,
user_from=user_from,
invoke_from=invoke_from,
),
run_context=run_context,
call_depth=0,
)
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
node_factory = DifyNodeFactory.from_graph_init_context(
graph_init_context=graph_init_context,
graph_runtime_state=graph_runtime_state,
)
if start_node_id is None:

View File

@ -3,7 +3,6 @@ import time
from collections.abc import Mapping, Sequence
from typing import Any, cast
from graphon.entities import GraphInitParams
from graphon.entities.graph_config import NodeConfigDictAdapter
from graphon.entities.pause_reason import HumanInputRequired
from graphon.graph import Graph
@ -67,7 +66,12 @@ from core.app.entities.queue_entities import (
QueueWorkflowSucceededEvent,
)
from core.rag.entities import RetrievalSourceMetadata
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id, resolve_workflow_node_class
from core.workflow.node_factory import (
DifyGraphInitContext,
DifyNodeFactory,
get_default_root_node_id,
resolve_workflow_node_class,
)
from core.workflow.system_variables import (
build_bootstrap_variables,
default_system_variables,
@ -127,24 +131,25 @@ class WorkflowBasedAppRunner:
if not isinstance(graph_config.get("edges"), list):
raise ValueError("edges in workflow graph must be a list")
# Create required parameters for Graph.init
graph_init_params = GraphInitParams(
# Create explicit graph init context for Graph.init.
run_context = build_dify_run_context(
tenant_id=tenant_id or "",
app_id=self._app_id,
user_id=user_id,
user_from=user_from,
invoke_from=invoke_from,
)
graph_init_context = DifyGraphInitContext(
workflow_id=workflow_id,
graph_config=graph_config,
run_context=build_dify_run_context(
tenant_id=tenant_id or "",
app_id=self._app_id,
user_id=user_id,
user_from=user_from,
invoke_from=invoke_from,
),
run_context=run_context,
call_depth=0,
)
# Use the provided graph_runtime_state for consistent state management
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
node_factory = DifyNodeFactory.from_graph_init_context(
graph_init_context=graph_init_context,
graph_runtime_state=graph_runtime_state,
)
@ -289,22 +294,23 @@ class WorkflowBasedAppRunner:
typed_node_configs = [NodeConfigDictAdapter.validate_python(node) for node in node_configs]
# Create required parameters for Graph.init
graph_init_params = GraphInitParams(
# Create explicit graph init context for Graph.init.
run_context = build_dify_run_context(
tenant_id=workflow.tenant_id,
app_id=self._app_id,
user_id=user_id,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
)
graph_init_context = DifyGraphInitContext(
workflow_id=workflow.id,
graph_config=graph_config,
run_context=build_dify_run_context(
tenant_id=workflow.tenant_id,
app_id=self._app_id,
user_id=user_id,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
),
run_context=run_context,
call_depth=0,
)
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
node_factory = DifyNodeFactory.from_graph_init_context(
graph_init_context=graph_init_context,
graph_runtime_state=graph_runtime_state,
)

View File

@ -1,6 +1,7 @@
import importlib
import pkgutil
from collections.abc import Callable, Iterator, Mapping, MutableMapping
from dataclasses import dataclass
from functools import lru_cache
from typing import TYPE_CHECKING, Any, cast, final, override
@ -67,6 +68,31 @@ _START_NODE_TYPES: frozenset[NodeType] = frozenset(
)
@dataclass(frozen=True, slots=True)
class DifyGraphInitContext:
"""Explicit graph-init values owned by the workflow layer.
Dify is gradually removing direct `GraphInitParams` construction from its
production call sites. Keep the translation here until `graphon` exposes an
equivalent explicit API.
"""
workflow_id: str
graph_config: Mapping[str, Any]
run_context: Mapping[str, Any]
call_depth: int
def to_graph_init_params(self) -> "GraphInitParams":
from graphon.entities import GraphInitParams
return GraphInitParams(
workflow_id=self.workflow_id,
graph_config=self.graph_config,
run_context=self.run_context,
call_depth=self.call_depth,
)
def _import_node_package(package_name: str, *, excluded_modules: frozenset[str] = frozenset()) -> None:
package = importlib.import_module(package_name)
for _, module_name, _ in pkgutil.walk_packages(package.__path__, package.__name__ + "."):
@ -237,6 +263,19 @@ class DifyNodeFactory(NodeFactory):
Default implementation of NodeFactory that resolves node classes from the live registry.
"""
@classmethod
def from_graph_init_context(
cls,
*,
graph_init_context: DifyGraphInitContext,
graph_runtime_state: "GraphRuntimeState",
) -> "DifyNodeFactory":
"""Bridge Dify's explicit init context into the current `graphon` API."""
return cls(
graph_init_params=graph_init_context.to_graph_init_params(),
graph_runtime_state=graph_runtime_state,
)
def __init__(
self,
graph_init_params: "GraphInitParams",

View File

@ -29,7 +29,7 @@ class TriggerWebhookNode(Node[WebhookData]):
def post_init(self) -> None:
from core.workflow.node_runtime import DifyFileReferenceFactory
self._file_reference_factory = DifyFileReferenceFactory(self.graph_init_params.run_context)
self._file_reference_factory = DifyFileReferenceFactory(self.run_context)
@classmethod
def get_default_config(cls, filters: Mapping[str, object] | None = None) -> Mapping[str, object]:

View File

@ -24,7 +24,12 @@ from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_di
from core.app.file_access import DatabaseFileAccessController
from core.app.workflow.layers.llm_quota import LLMQuotaLayer
from core.app.workflow.layers.observability import ObservabilityLayer
from core.workflow.node_factory import DifyNodeFactory, is_start_node_type, resolve_workflow_node_class
from core.workflow.node_factory import (
DifyGraphInitContext,
DifyNodeFactory,
is_start_node_type,
resolve_workflow_node_class,
)
from core.workflow.system_variables import (
default_system_variables,
get_node_creation_preload_selectors,
@ -251,17 +256,18 @@ class WorkflowEntry:
node_version = str(node_config_data.version)
node_cls = resolve_workflow_node_class(node_type=node_type, node_version=node_version)
# init graph init params and runtime state
graph_init_params = GraphInitParams(
# init graph context and runtime state
run_context = build_dify_run_context(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
user_id=user_id,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
)
graph_init_context = DifyGraphInitContext(
workflow_id=workflow.id,
graph_config=workflow.graph_dict,
run_context=build_dify_run_context(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
user_id=user_id,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
),
run_context=run_context,
call_depth=0,
)
graph_runtime_state = GraphRuntimeState(
@ -313,8 +319,8 @@ class WorkflowEntry:
)
# init workflow run state
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
node_factory = DifyNodeFactory.from_graph_init_context(
graph_init_context=graph_init_context,
graph_runtime_state=graph_runtime_state,
)
node = node_factory.create_node(node_config)
@ -409,17 +415,18 @@ class WorkflowEntry:
variable_pool = VariablePool()
add_variables_to_pool(variable_pool, default_system_variables())
# init graph init params and runtime state
graph_init_params = GraphInitParams(
# init graph context and runtime state
run_context = build_dify_run_context(
tenant_id=tenant_id,
app_id="",
user_id=user_id,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
)
graph_init_context = DifyGraphInitContext(
workflow_id="",
graph_config=graph_dict,
run_context=build_dify_run_context(
tenant_id=tenant_id,
app_id="",
user_id=user_id,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
),
run_context=run_context,
call_depth=0,
)
graph_runtime_state = GraphRuntimeState(
@ -430,8 +437,8 @@ class WorkflowEntry:
# init workflow run state
node_config = NodeConfigDictAdapter.validate_python({"id": node_id, "data": node_data})
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
node_factory = DifyNodeFactory.from_graph_init_context(
graph_init_context=graph_init_context,
graph_runtime_state=graph_runtime_state,
)
node = node_factory.create_node(node_config)

View File

@ -5,7 +5,7 @@ import uuid
from collections.abc import Callable, Generator, Mapping, Sequence
from typing import Any, cast
from graphon.entities import GraphInitParams, WorkflowNodeExecution
from graphon.entities import WorkflowNodeExecution
from graphon.entities.graph_config import NodeConfigDict
from graphon.entities.pause_reason import HumanInputRequired
from graphon.enums import (
@ -48,7 +48,12 @@ from core.workflow.human_input_compat import (
normalize_human_input_node_data_for_graph,
parse_human_input_delivery_methods,
)
from core.workflow.node_factory import LATEST_VERSION, get_node_type_classes_mapping, is_start_node_type
from core.workflow.node_factory import (
LATEST_VERSION,
DifyGraphInitContext,
get_node_type_classes_mapping,
is_start_node_type,
)
from core.workflow.node_runtime import DifyHumanInputNodeRuntime, apply_dify_debug_email_recipient
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables, default_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
@ -1132,18 +1137,20 @@ class WorkflowService:
node_config: NodeConfigDict,
variable_pool: VariablePool,
) -> HumanInputNode:
graph_init_params = GraphInitParams(
run_context = build_dify_run_context(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
user_id=account.id,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
)
graph_init_context = DifyGraphInitContext(
workflow_id=workflow.id,
graph_config=workflow.graph_dict,
run_context=build_dify_run_context(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
user_id=account.id,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
),
run_context=run_context,
call_depth=0,
)
graph_init_params = graph_init_context.to_graph_init_params()
graph_runtime_state = GraphRuntimeState(
variable_pool=variable_pool,
start_at=time.perf_counter(),
@ -1153,7 +1160,7 @@ class WorkflowService:
config=node_config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
runtime=DifyHumanInputNodeRuntime(graph_init_params.run_context),
runtime=DifyHumanInputNodeRuntime(run_context),
)
return node

View File

@ -110,6 +110,34 @@ class TestFetchMemory:
)
class TestDifyGraphInitContext:
def test_to_graph_init_params_preserves_explicit_values(self):
run_context = {
DIFY_RUN_CONTEXT_KEY: DifyRunContext(
tenant_id="tenant-id",
app_id="app-id",
user_id="user-id",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
),
"extra": "value",
}
graph_config = {"nodes": [], "edges": []}
graph_init_context = node_factory.DifyGraphInitContext(
workflow_id="workflow-id",
graph_config=graph_config,
run_context=run_context,
call_depth=2,
)
result = graph_init_context.to_graph_init_params()
assert result.workflow_id == "workflow-id"
assert result.graph_config == graph_config
assert result.run_context == run_context
assert result.call_depth == 2
class TestDefaultWorkflowCodeExecutor:
def test_execute_delegates_to_code_executor(self, monkeypatch):
executor = node_factory.DefaultWorkflowCodeExecutor()
@ -172,6 +200,23 @@ class TestCodeExecutorJinja2TemplateRenderer:
class TestDifyNodeFactoryInit:
def test_from_graph_init_context_translates_before_init(self):
graph_init_context = MagicMock()
graph_init_context.to_graph_init_params.return_value = sentinel.graph_init_params
with patch.object(node_factory.DifyNodeFactory, "__init__", return_value=None) as init:
factory = node_factory.DifyNodeFactory.from_graph_init_context(
graph_init_context=graph_init_context,
graph_runtime_state=sentinel.graph_runtime_state,
)
assert isinstance(factory, node_factory.DifyNodeFactory)
graph_init_context.to_graph_init_params.assert_called_once_with()
init.assert_called_once_with(
graph_init_params=sentinel.graph_init_params,
graph_runtime_state=sentinel.graph_runtime_state,
)
def test_init_builds_default_dependencies(self):
graph_init_params = SimpleNamespace(run_context={"context": "value"})
graph_runtime_state = sentinel.graph_runtime_state

View File

@ -349,7 +349,7 @@ class TestWorkflowEntrySingleStepRun:
]
with (
patch.object(workflow_entry, "GraphInitParams", return_value=sentinel.graph_init_params),
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(
workflow_entry,
"GraphRuntimeState",
@ -358,7 +358,7 @@ class TestWorkflowEntrySingleStepRun:
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeLLMNode),
patch.object(workflow_entry, "DifyNodeFactory") as dify_node_factory,
patch.object(workflow_entry.DifyNodeFactory, "from_graph_init_context") as dify_node_factory,
patch.object(workflow_entry, "load_into_variable_pool"),
patch.object(workflow_entry.WorkflowEntry, "mapping_user_inputs_to_variable_pool"),
patch.object(
@ -412,12 +412,12 @@ class TestWorkflowEntrySingleStepRun:
raise NotImplementedError
with (
patch.object(workflow_entry, "GraphInitParams", return_value=sentinel.graph_init_params),
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode),
patch.object(workflow_entry, "DifyNodeFactory") as dify_node_factory,
patch.object(workflow_entry.DifyNodeFactory, "from_graph_init_context") as dify_node_factory,
patch.object(workflow_entry, "add_node_inputs_to_pool") as add_node_inputs_to_pool,
patch.object(workflow_entry, "load_into_variable_pool") as load_into_variable_pool,
patch.object(
@ -481,12 +481,12 @@ class TestWorkflowEntrySingleStepRun:
return {"question": ["node", "question"]}
with (
patch.object(workflow_entry, "GraphInitParams", return_value=sentinel.graph_init_params),
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeDatasourceNode),
patch.object(workflow_entry, "DifyNodeFactory") as dify_node_factory,
patch.object(workflow_entry.DifyNodeFactory, "from_graph_init_context") as dify_node_factory,
patch.object(workflow_entry, "add_node_inputs_to_pool") as add_node_inputs_to_pool,
patch.object(workflow_entry, "load_into_variable_pool") as load_into_variable_pool,
patch.object(
@ -541,12 +541,12 @@ class TestWorkflowEntrySingleStepRun:
return "1"
with (
patch.object(workflow_entry, "GraphInitParams", return_value=sentinel.graph_init_params),
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode),
patch.object(workflow_entry, "DifyNodeFactory") as dify_node_factory,
patch.object(workflow_entry.DifyNodeFactory, "from_graph_init_context") as dify_node_factory,
patch.object(workflow_entry, "add_node_inputs_to_pool"),
patch.object(workflow_entry, "load_into_variable_pool"),
patch.object(workflow_entry.WorkflowEntry, "mapping_user_inputs_to_variable_pool"),
@ -651,14 +651,18 @@ class TestWorkflowEntryHelpers:
patch.object(workflow_entry, "VariablePool", return_value=sentinel.variable_pool) as variable_pool_cls,
patch.object(workflow_entry, "add_variables_to_pool") as add_variables_to_pool,
patch.object(
workflow_entry, "GraphInitParams", return_value=sentinel.graph_init_params
) as graph_init_params,
workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context
) as graph_init_context_cls,
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(
workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}
) as build_dify_run_context,
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "DifyNodeFactory", return_value=dify_node_factory) as dify_node_factory_cls,
patch.object(
workflow_entry.DifyNodeFactory,
"from_graph_init_context",
return_value=dify_node_factory,
) as dify_node_factory_cls,
patch.object(
workflow_entry.WorkflowEntry,
"mapping_user_inputs_to_variable_pool",
@ -688,7 +692,7 @@ class TestWorkflowEntryHelpers:
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
)
graph_init_params.assert_called_once_with(
graph_init_context_cls.assert_called_once_with(
workflow_id="",
graph_config=workflow_entry.WorkflowEntry._create_single_node_graph(
"node-id", {"type": BuiltinNodeTypes.PARAMETER_EXTRACTOR, "title": "Node"}
@ -697,7 +701,7 @@ class TestWorkflowEntryHelpers:
call_depth=0,
)
dify_node_factory_cls.assert_called_once_with(
graph_init_params=sentinel.graph_init_params,
graph_init_context=sentinel.graph_init_context,
graph_runtime_state=sentinel.graph_runtime_state,
)
mapping_user_inputs_to_variable_pool.assert_called_once_with(
@ -734,11 +738,15 @@ class TestWorkflowEntryHelpers:
patch.object(workflow_entry, "default_system_variables", return_value=sentinel.system_variables),
patch.object(workflow_entry, "VariablePool", return_value=sentinel.variable_pool),
patch.object(workflow_entry, "add_variables_to_pool"),
patch.object(workflow_entry, "GraphInitParams", return_value=sentinel.graph_init_params),
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "DifyNodeFactory", return_value=dify_node_factory),
patch.object(
workflow_entry.DifyNodeFactory,
"from_graph_init_context",
return_value=dify_node_factory,
),
patch.object(
workflow_entry.WorkflowEntry,
"mapping_user_inputs_to_variable_pool",

View File

@ -2753,9 +2753,9 @@ class TestWorkflowServiceFreeNodeExecution:
variable_pool = MagicMock()
with (
patch("services.workflow_service.GraphInitParams") as mock_graph_init_params,
patch("services.workflow_service.DifyGraphInitContext") as mock_graph_init_context_cls,
patch("services.workflow_service.GraphRuntimeState"),
patch("services.workflow_service.build_dify_run_context"),
patch("services.workflow_service.build_dify_run_context") as mock_build_dify_run_context,
patch("services.workflow_service.DifyHumanInputNodeRuntime") as mock_runtime_cls,
patch("services.workflow_service.HumanInputNode") as mock_node_cls,
):
@ -2764,4 +2764,17 @@ class TestWorkflowServiceFreeNodeExecution:
)
assert node == mock_node_cls.return_value
mock_node_cls.assert_called_once()
mock_runtime_cls.assert_called_once_with(mock_graph_init_params.return_value.run_context)
mock_graph_init_context_cls.assert_called_once_with(
workflow_id="wf-1",
graph_config=workflow.graph_dict,
run_context=mock_build_dify_run_context.return_value,
call_depth=0,
)
mock_runtime_cls.assert_called_once_with(mock_build_dify_run_context.return_value)
mock_node_cls.assert_called_once_with(
id="n-1",
config=node_config,
graph_init_params=mock_graph_init_context_cls.return_value.to_graph_init_params.return_value,
graph_runtime_state=ANY,
runtime=mock_runtime_cls.return_value,
)