dify/api/core/tools/tool_file_manager.py
Luyu Zhang acd6942d21 feat(storage): redirect signed file previews to S3 public base URL
Add an optional S3_PUBLIC_BASE_URL setting that, when configured, lets
file controllers 302-redirect signed previews to the object store / CDN
instead of streaming bytes through the Dify API. Works with any
S3-compatible backend exposing a public domain (Cloudflare R2 custom
domain, MinIO public endpoint, Aliyun OSS public domain, etc.) so that
egress and request handling for images, attachments, tool outputs, and
webapp logos no longer go through the API container.

Signature verification is preserved: the API still validates the HMAC
before issuing the redirect. When S3_PUBLIC_BASE_URL is unset the
behavior is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 17:12:00 -07:00

255 lines
8.6 KiB
Python

import base64
import hashlib
import hmac
import logging
import os
import time
from collections.abc import Generator
from mimetypes import guess_extension, guess_type
from uuid import uuid4
import httpx
from sqlalchemy import select
from configs import dify_config
from core.db.session_factory import session_factory
from core.helper import ssrf_proxy
from core.workflow.file_reference import build_file_reference
from extensions.ext_storage import storage
from graphon.file import File, FileTransferMethod, get_file_type_by_mime_type
from models.model import MessageFile
from models.tools import ToolFile
logger = logging.getLogger(__name__)
class ToolFileManager:
@staticmethod
def _build_graph_file_reference(tool_file: ToolFile) -> File:
extension = guess_extension(tool_file.mimetype) or ".bin"
return File(
file_type=get_file_type_by_mime_type(tool_file.mimetype),
transfer_method=FileTransferMethod.TOOL_FILE,
remote_url=tool_file.original_url,
reference=build_file_reference(record_id=str(tool_file.id)),
filename=tool_file.name,
extension=extension,
mime_type=tool_file.mimetype,
size=tool_file.size,
storage_key=tool_file.file_key,
)
@staticmethod
def sign_file(tool_file_id: str, extension: str) -> str:
"""
sign file to get a temporary url for plugin access
"""
# Use internal URL for plugin/tool file access in Docker environments
base_url = dify_config.INTERNAL_FILES_URL or dify_config.FILES_URL
file_preview_url = f"{base_url}/files/tools/{tool_file_id}{extension}"
timestamp = str(int(time.time()))
nonce = os.urandom(16).hex()
data_to_sign = f"file-preview|{tool_file_id}|{timestamp}|{nonce}"
secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b""
sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
encoded_sign = base64.urlsafe_b64encode(sign).decode()
return f"{file_preview_url}?timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}"
@staticmethod
def verify_file(file_id: str, timestamp: str, nonce: str, sign: str) -> bool:
"""
verify signature
"""
data_to_sign = f"file-preview|{file_id}|{timestamp}|{nonce}"
secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b""
recalculated_sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
recalculated_encoded_sign = base64.urlsafe_b64encode(recalculated_sign).decode()
# verify signature
if sign != recalculated_encoded_sign:
return False
current_time = int(time.time())
return current_time - int(timestamp) <= dify_config.FILES_ACCESS_TIMEOUT
def create_file_by_raw(
self,
*,
user_id: str,
tenant_id: str,
conversation_id: str | None,
file_binary: bytes,
mimetype: str,
filename: str | None = None,
) -> ToolFile:
extension = guess_extension(mimetype) or ".bin"
unique_name = uuid4().hex
unique_filename = f"{unique_name}{extension}"
# default just as before
present_filename = unique_filename
if filename is not None:
has_extension = len(filename.split(".")) > 1
# Add extension flexibly
present_filename = filename if has_extension else f"{filename}{extension}"
filepath = f"tools/{tenant_id}/{unique_filename}"
storage.save(filepath, file_binary)
with session_factory.create_session() as session:
tool_file = ToolFile(
user_id=user_id,
tenant_id=tenant_id,
conversation_id=conversation_id,
file_key=filepath,
mimetype=mimetype,
name=present_filename,
size=len(file_binary),
original_url=None,
)
session.add(tool_file)
session.commit()
session.refresh(tool_file)
return tool_file
def create_file_by_url(
self,
user_id: str,
tenant_id: str,
file_url: str,
conversation_id: str | None = None,
) -> ToolFile:
# try to download image
try:
response = ssrf_proxy.get(file_url)
response.raise_for_status()
blob = response.content
except httpx.TimeoutException:
raise ValueError(f"timeout when downloading file from {file_url}")
mimetype = (
guess_type(file_url)[0]
or response.headers.get("Content-Type", "").split(";")[0].strip()
or "application/octet-stream"
)
extension = guess_extension(mimetype) or ".bin"
unique_name = uuid4().hex
filename = f"{unique_name}{extension}"
filepath = f"tools/{tenant_id}/{filename}"
storage.save(filepath, blob)
with session_factory.create_session() as session:
tool_file = ToolFile(
user_id=user_id,
tenant_id=tenant_id,
conversation_id=conversation_id,
file_key=filepath,
mimetype=mimetype,
original_url=file_url,
name=filename,
size=len(blob),
)
session.add(tool_file)
session.commit()
session.refresh(tool_file)
return tool_file
def get_file_binary(self, id: str) -> tuple[bytes, str] | None:
"""
get file binary
:param id: the id of the file
:return: the binary of the file, mime type
"""
with session_factory.create_session() as session:
tool_file: ToolFile | None = session.scalar(select(ToolFile).where(ToolFile.id == id).limit(1))
if not tool_file:
return None
blob = storage.load_once(tool_file.file_key)
return blob, tool_file.mimetype
def get_file_binary_by_message_file_id(self, id: str) -> tuple[bytes, str] | None:
"""
get file binary
:param id: the id of the file
:return: the binary of the file, mime type
"""
with session_factory.create_session() as session:
message_file: MessageFile | None = session.scalar(select(MessageFile).where(MessageFile.id == id).limit(1))
# Check if message_file is not None
if message_file is not None:
# get tool file id
if message_file.url is not None:
tool_file_id = message_file.url.split("/")[-1]
# trim extension
tool_file_id = tool_file_id.split(".")[0]
else:
tool_file_id = None
else:
tool_file_id = None
tool_file: ToolFile | None = session.scalar(select(ToolFile).where(ToolFile.id == tool_file_id).limit(1))
if not tool_file:
return None
blob = storage.load_once(tool_file.file_key)
return blob, tool_file.mimetype
def get_file_generator_by_tool_file_id(self, tool_file_id: str) -> tuple[Generator | None, File | None]:
"""
get file binary
:param tool_file_id: the id of the tool file
:return: the binary of the file, mime type
"""
with session_factory.create_session() as session:
tool_file: ToolFile | None = session.scalar(select(ToolFile).where(ToolFile.id == tool_file_id).limit(1))
if not tool_file:
return None, None
stream = storage.load_stream(tool_file.file_key)
return stream, self._build_graph_file_reference(tool_file)
def get_public_url_and_file_by_tool_file_id(self, tool_file_id: str) -> tuple[str | None, File | None]:
"""
Resolve a tool file to a public URL when the storage backend exposes one.
Returns (public_url, file_reference). If the backend has no public URL
configured, returns (None, file_reference) and callers should fall back
to the streaming path.
"""
with session_factory.create_session() as session:
tool_file: ToolFile | None = session.scalar(select(ToolFile).where(ToolFile.id == tool_file_id).limit(1))
if not tool_file:
return None, None
public_url = storage.get_public_url(tool_file.file_key)
return public_url, self._build_graph_file_reference(tool_file)
# init tool_file_parser
from graphon.file.tool_file_parser import set_tool_file_manager_factory
def _factory() -> ToolFileManager:
return ToolFileManager()
set_tool_file_manager_factory(_factory)