import base64 import hashlib import hmac import os import time import urllib.parse from configs import dify_config def sign_tool_file(tool_file_id: str, extension: str, for_external: bool = True) -> str: """ sign file to get a temporary url for plugin access """ # Use internal URL for plugin/tool file access in Docker environments, unless for_external is True base_url = dify_config.FILES_URL if for_external else (dify_config.INTERNAL_FILES_URL or dify_config.FILES_URL) file_preview_url = f"{base_url}/files/tools/{tool_file_id}{extension}" timestamp = str(int(time.time())) nonce = os.urandom(16).hex() data_to_sign = f"file-preview|{tool_file_id}|{timestamp}|{nonce}" secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b"" sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest() encoded_sign = base64.urlsafe_b64encode(sign).decode() return f"{file_preview_url}?timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}" def sign_upload_file(upload_file_id: str, extension: str) -> str: """ sign file to get a temporary url for plugin access """ # Use internal URL for plugin/tool file access in Docker environments base_url = dify_config.INTERNAL_FILES_URL or dify_config.FILES_URL file_preview_url = f"{base_url}/files/{upload_file_id}/image-preview" timestamp = str(int(time.time())) nonce = os.urandom(16).hex() data_to_sign = f"image-preview|{upload_file_id}|{timestamp}|{nonce}" secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b"" sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest() encoded_sign = base64.urlsafe_b64encode(sign).decode() return f"{file_preview_url}?timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}" def verify_tool_file_signature(file_id: str, timestamp: str, nonce: str, sign: str) -> bool: """ verify signature """ data_to_sign = f"file-preview|{file_id}|{timestamp}|{nonce}" secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b"" recalculated_sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest() recalculated_encoded_sign = base64.urlsafe_b64encode(recalculated_sign).decode() # verify signature if sign != recalculated_encoded_sign: return False current_time = int(time.time()) return current_time - int(timestamp) <= dify_config.FILES_ACCESS_TIMEOUT def get_signed_file_url_for_plugin(filename: str, mimetype: str, tenant_id: str, user_id: str) -> str: """Build the signed upload URL used by the plugin-facing file upload endpoint.""" base_url = dify_config.INTERNAL_FILES_URL or dify_config.FILES_URL upload_url = f"{base_url}/files/upload/for-plugin" timestamp = str(int(time.time())) nonce = os.urandom(16).hex() data_to_sign = f"upload|{filename}|{mimetype}|{tenant_id}|{user_id}|{timestamp}|{nonce}" secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b"" sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest() encoded_sign = base64.urlsafe_b64encode(sign).decode() query = urllib.parse.urlencode( { "timestamp": timestamp, "nonce": nonce, "sign": encoded_sign, "user_id": user_id, "tenant_id": tenant_id, } ) return f"{upload_url}?{query}" def verify_plugin_file_signature( *, filename: str, mimetype: str, tenant_id: str, user_id: str, timestamp: str, nonce: str, sign: str ) -> bool: """Verify the signature used by the plugin-facing file upload endpoint.""" data_to_sign = f"upload|{filename}|{mimetype}|{tenant_id}|{user_id}|{timestamp}|{nonce}" secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b"" recalculated_sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest() recalculated_encoded_sign = base64.urlsafe_b64encode(recalculated_sign).decode() if sign != recalculated_encoded_sign: return False current_time = int(time.time()) return current_time - int(timestamp) <= dify_config.FILES_ACCESS_TIMEOUT