mirror of
https://github.com/langgenius/dify.git
synced 2026-05-10 05:56:31 +08:00
feat(api): integrate Sandbox Provider into Agent V2 execution pipeline
Close 3 integration gaps between the ported Sandbox system and Agent V2: 1. Fix _invoke_tool_in_sandbox to use SandboxBashSession context manager API correctly (keyword args, bash_tool, ToolReference), with graceful fallback to direct invocation when DifyCli binary is unavailable. 2. Inject sandbox into run_context via _resolve_sandbox_context() in WorkflowBasedAppRunner — automatically creates a sandbox when a tenant has an active sandbox provider configured. 3. Register SandboxLayer in both advanced_chat and workflow app runners for proper sandbox lifecycle cleanup on graph end. Also: make SkillInitializer non-fatal when no skill bundle exists, add node_id to ExecutionContext for sandbox session scoping. Made-with: Cursor
This commit is contained in:
parent
e50c36526e
commit
5cdae671d5
@ -108,6 +108,7 @@ class ExecutionContext(BaseModel):
|
||||
conversation_id: str | None = None
|
||||
message_id: str | None = None
|
||||
tenant_id: str | None = None
|
||||
node_id: str | None = None
|
||||
|
||||
@classmethod
|
||||
def create_minimal(cls, user_id: str | None = None) -> "ExecutionContext":
|
||||
|
||||
@ -246,6 +246,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
for layer in self._graph_engine_layers:
|
||||
workflow_entry.graph_engine.layer(layer)
|
||||
|
||||
if hasattr(self, '_sandbox') and self._sandbox is not None:
|
||||
from core.app.layers.sandbox_layer import SandboxLayer
|
||||
workflow_entry.graph_engine.layer(SandboxLayer(self._sandbox))
|
||||
|
||||
generator = workflow_entry.run()
|
||||
|
||||
for event in generator:
|
||||
|
||||
@ -170,6 +170,10 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
|
||||
for layer in self._graph_engine_layers:
|
||||
workflow_entry.graph_engine.layer(layer)
|
||||
|
||||
if hasattr(self, '_sandbox') and self._sandbox is not None:
|
||||
from core.app.layers.sandbox_layer import SandboxLayer
|
||||
workflow_entry.graph_engine.layer(SandboxLayer(self._sandbox))
|
||||
|
||||
generator = workflow_entry.run()
|
||||
|
||||
for event in generator:
|
||||
|
||||
@ -104,6 +104,34 @@ class WorkflowBasedAppRunner:
|
||||
return UserFrom.ACCOUNT
|
||||
return UserFrom.END_USER
|
||||
|
||||
@staticmethod
|
||||
def _resolve_sandbox_context(tenant_id: str, user_id: str, app_id: str) -> dict[str, Any] | None:
|
||||
"""Create a sandbox and inject it into run_context if a provider is configured."""
|
||||
try:
|
||||
from core.app.entities.app_invoke_entities import DIFY_SANDBOX_CONTEXT_KEY
|
||||
from core.sandbox.builder import SandboxBuilder
|
||||
from core.sandbox.entities.sandbox_type import SandboxType
|
||||
from core.sandbox.storage.noop_storage import NoopSandboxStorage
|
||||
from services.sandbox.sandbox_provider_service import SandboxProviderService
|
||||
|
||||
provider = SandboxProviderService.get_sandbox_provider(tenant_id)
|
||||
sandbox = (
|
||||
SandboxBuilder(tenant_id, SandboxType(provider.provider_type))
|
||||
.user(user_id)
|
||||
.app(app_id)
|
||||
.options(provider.config or {})
|
||||
.storage(NoopSandboxStorage(), assets_id=app_id)
|
||||
.build()
|
||||
)
|
||||
logger.info("[SANDBOX] Created sandbox for tenant=%s, provider=%s", tenant_id, provider.provider_type)
|
||||
return {DIFY_SANDBOX_CONTEXT_KEY: sandbox}
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _build_sandbox_layer(self) -> GraphEngineLayer | None:
|
||||
"""Build a SandboxLayer if sandbox exists in _graph_engine_layers context."""
|
||||
return None
|
||||
|
||||
def _init_graph(
|
||||
self,
|
||||
graph_config: Mapping[str, Any],
|
||||
@ -127,7 +155,13 @@ class WorkflowBasedAppRunner:
|
||||
if not isinstance(graph_config.get("edges"), list):
|
||||
raise ValueError("edges in workflow graph must be a list")
|
||||
|
||||
# Create required parameters for Graph.init
|
||||
extra_context = self._resolve_sandbox_context(tenant_id or "", user_id, self._app_id)
|
||||
if extra_context:
|
||||
from core.app.entities.app_invoke_entities import DIFY_SANDBOX_CONTEXT_KEY
|
||||
self._sandbox = extra_context.get(DIFY_SANDBOX_CONTEXT_KEY)
|
||||
else:
|
||||
self._sandbox = None
|
||||
|
||||
graph_init_params = GraphInitParams(
|
||||
workflow_id=workflow_id,
|
||||
graph_config=graph_config,
|
||||
@ -137,12 +171,11 @@ class WorkflowBasedAppRunner:
|
||||
user_id=user_id,
|
||||
user_from=user_from,
|
||||
invoke_from=invoke_from,
|
||||
extra_context=extra_context,
|
||||
),
|
||||
call_depth=0,
|
||||
)
|
||||
|
||||
# Use the provided graph_runtime_state for consistent state management
|
||||
|
||||
node_factory = DifyNodeFactory(
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
|
||||
@ -21,14 +21,17 @@ class SkillInitializer(SyncSandboxInitializer):
|
||||
"""
|
||||
|
||||
def initialize(self, sandbox: Sandbox, ctx: SandboxInitializeContext) -> None:
|
||||
# Draft path: bundle already populated by DraftAppAssetsInitializer.
|
||||
if sandbox.attrs.has(SkillAttrs.BUNDLE):
|
||||
return
|
||||
|
||||
# Published path: load from Redis/S3.
|
||||
bundle = SkillManager.load_bundle(
|
||||
ctx.tenant_id,
|
||||
ctx.app_id,
|
||||
ctx.assets_id,
|
||||
)
|
||||
sandbox.attrs.set(SkillAttrs.BUNDLE, bundle)
|
||||
try:
|
||||
bundle = SkillManager.load_bundle(
|
||||
ctx.tenant_id,
|
||||
ctx.app_id,
|
||||
ctx.assets_id,
|
||||
)
|
||||
sandbox.attrs.set(SkillAttrs.BUNDLE, bundle)
|
||||
except FileNotFoundError:
|
||||
logger.debug("No skill bundle found for app %s, skipping skill initialization", ctx.app_id)
|
||||
except Exception:
|
||||
logger.warning("Failed to load skill bundle for app %s, skipping", ctx.app_id, exc_info=True)
|
||||
|
||||
@ -231,6 +231,7 @@ class AgentV2Node(Node[AgentV2NodeData]):
|
||||
user_id=dify_ctx.user_id,
|
||||
app_id=dify_ctx.app_id,
|
||||
tenant_id=dify_ctx.tenant_id,
|
||||
node_id=self.id,
|
||||
conversation_id=get_system_text(
|
||||
self.graph_runtime_state.variable_pool,
|
||||
SystemVariableKey.CONVERSATION_ID,
|
||||
|
||||
@ -101,14 +101,14 @@ class AgentV2ToolManager:
|
||||
tool_name: str,
|
||||
) -> tuple[str, list[str], ToolInvokeMeta]:
|
||||
if sandbox is not None:
|
||||
return self._invoke_tool_in_sandbox(sandbox, tool, tool_args, tool_name, context)
|
||||
return AgentV2ToolManager._invoke_tool_in_sandbox(sandbox, tool, tool_args, tool_name, context)
|
||||
|
||||
return self._invoke_tool_directly(tool, tool_args, tool_name, context, workflow_call_depth)
|
||||
return AgentV2ToolManager._invoke_tool_directly(tool, tool_args, tool_name, context, workflow_call_depth)
|
||||
|
||||
return hook
|
||||
|
||||
@staticmethod
|
||||
def _invoke_tool_directly(
|
||||
self,
|
||||
tool: Tool,
|
||||
tool_args: dict[str, Any],
|
||||
tool_name: str,
|
||||
@ -150,21 +150,58 @@ class AgentV2ToolManager:
|
||||
) -> tuple[str, list[str], ToolInvokeMeta]:
|
||||
"""Invoke tool inside a sandbox environment.
|
||||
|
||||
Uses the sandbox's bash session to execute the tool via DifyCli,
|
||||
which calls back to Dify's CLI API to perform the actual invocation.
|
||||
Uses SandboxBashSession to run a `dify invoke-tool` command inside
|
||||
the sandbox VM, which calls back to Dify's CLI API for actual execution.
|
||||
Falls back to direct invocation on any failure.
|
||||
"""
|
||||
try:
|
||||
from core.sandbox.bash.dify_cli import DifyCliLocator
|
||||
from core.sandbox.bash.session import SandboxBashSession
|
||||
from core.skill.entities.skill_metadata import ToolReference
|
||||
from core.skill.entities.tool_dependencies import ToolDependencies
|
||||
from core.tools.entities.tool_entities import ToolProviderType
|
||||
|
||||
session = SandboxBashSession(sandbox)
|
||||
result = session.run_tool(
|
||||
if not sandbox.is_ready():
|
||||
sandbox.wait_ready(timeout=30)
|
||||
|
||||
cli_locator = DifyCliLocator()
|
||||
cli_locator.resolve(sandbox.vm.metadata.os, sandbox.vm.metadata.arch)
|
||||
|
||||
provider_type = tool.tool_provider_type() if hasattr(tool, 'tool_provider_type') else ToolProviderType.BUILT_IN
|
||||
tool_identity = getattr(tool, 'identity', None)
|
||||
provider_name = tool_identity.provider if tool_identity else tool_name
|
||||
|
||||
tool_ref = ToolReference(
|
||||
type=provider_type,
|
||||
provider=provider_name,
|
||||
tool_name=tool_name,
|
||||
tool_args=tool_args,
|
||||
tenant_id=context.tenant_id or "",
|
||||
app_id=context.app_id or "",
|
||||
user_id=context.user_id or "",
|
||||
)
|
||||
return result.stdout.decode("utf-8", errors="replace"), [], ToolInvokeMeta.empty()
|
||||
tool_deps = ToolDependencies(references=[tool_ref])
|
||||
|
||||
with SandboxBashSession(
|
||||
sandbox=sandbox,
|
||||
node_id=context.node_id or "agent",
|
||||
tools=tool_deps,
|
||||
) as session:
|
||||
args_json = json.dumps(tool_args, ensure_ascii=False)
|
||||
cmd = f"dify invoke-tool {tool_name} '{args_json}'"
|
||||
result = list(session.bash_tool.invoke(
|
||||
user_id=context.user_id or "",
|
||||
tool_parameters={"bash": cmd},
|
||||
))
|
||||
response_text = ""
|
||||
for msg in result:
|
||||
if msg.type == ToolInvokeMessage.MessageType.TEXT:
|
||||
assert isinstance(msg.message, ToolInvokeMessage.TextMessage)
|
||||
response_text += msg.message.text
|
||||
return response_text, [], ToolInvokeMeta.empty()
|
||||
except FileNotFoundError:
|
||||
logger.info("DifyCli binary not found, falling back to direct tool invocation for %s", tool_name)
|
||||
return AgentV2ToolManager._invoke_tool_directly(
|
||||
tool, tool_args, tool_name, context, 0
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Sandbox tool invocation failed for %s, falling back to direct: %s", tool_name, e)
|
||||
return f"Sandbox execution failed: {e}", [], ToolInvokeMeta.empty()
|
||||
return AgentV2ToolManager._invoke_tool_directly(
|
||||
tool, tool_args, tool_name, context, 0
|
||||
)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user