fix: harden segment file URL signing

This commit is contained in:
Yanli 盐粒 2026-03-01 17:12:07 +08:00
parent c693cb9789
commit 0d6a4bac0f
3 changed files with 113 additions and 41 deletions

View File

@ -804,41 +804,53 @@ class DocumentSegment(Base):
def sign_content(self) -> str:
return self.get_sign_content()
@staticmethod
def _build_signed_query_params(*, sign_target: str, upload_file_id: str) -> str:
nonce = os.urandom(16).hex()
timestamp = str(int(time.time()))
data_to_sign = f"{sign_target}|{upload_file_id}|{timestamp}|{nonce}"
secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b""
sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
encoded_sign = base64.urlsafe_b64encode(sign).decode()
return f"timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}"
def _get_accessible_upload_file_ids(self, upload_file_ids: set[str]) -> set[str]:
if not upload_file_ids:
return set()
matched_upload_file_ids = db.session.scalars(
select(UploadFile.id).where(
UploadFile.tenant_id == self.tenant_id,
UploadFile.id.in_(list(upload_file_ids)),
)
).all()
return {str(upload_file_id) for upload_file_id in matched_upload_file_ids}
def get_sign_content(self) -> str:
signed_urls: list[tuple[int, int, str]] = []
text = self.content
# For data before v0.10.0
pattern = r"(?:https?://[^\s\)\"\']+)?/files/([a-f0-9\-]+)/image-preview(?:\?[^\s\)\"\']*)?"
matches = re.finditer(pattern, text)
for match in matches:
upload_file_id = match.group(1)
nonce = os.urandom(16).hex()
timestamp = str(int(time.time()))
data_to_sign = f"image-preview|{upload_file_id}|{timestamp}|{nonce}"
secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b""
sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
encoded_sign = base64.urlsafe_b64encode(sign).decode()
upload_file_preview_patterns = {
"image-preview": r"(?:https?://[^\s\)\"\']+)?/files/([a-f0-9\-]+)/image-preview(?:\?[^\s\)\"\']*)?",
"file-preview": r"(?:https?://[^\s\)\"\']+)?/files/([a-f0-9\-]+)/file-preview(?:\?[^\s\)\"\']*)?",
}
upload_file_matches: list[tuple[re.Match[str], str, str]] = []
upload_file_ids: set[str] = set()
params = f"timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}"
base_url = f"/files/{upload_file_id}/image-preview"
signed_url = f"{base_url}?{params}"
signed_urls.append((match.start(), match.end(), signed_url))
for preview_type, pattern in upload_file_preview_patterns.items():
for match in re.finditer(pattern, text):
upload_file_id = match.group(1)
upload_file_matches.append((match, preview_type, upload_file_id))
upload_file_ids.add(upload_file_id)
# For data after v0.10.0
pattern = r"(?:https?://[^\s\)\"\']+)?/files/([a-f0-9\-]+)/file-preview(?:\?[^\s\)\"\']*)?"
matches = re.finditer(pattern, text)
for match in matches:
upload_file_id = match.group(1)
nonce = os.urandom(16).hex()
timestamp = str(int(time.time()))
data_to_sign = f"file-preview|{upload_file_id}|{timestamp}|{nonce}"
secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b""
sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
encoded_sign = base64.urlsafe_b64encode(sign).decode()
accessible_upload_file_ids = self._get_accessible_upload_file_ids(upload_file_ids)
params = f"timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}"
base_url = f"/files/{upload_file_id}/file-preview"
for match, preview_type, upload_file_id in upload_file_matches:
if upload_file_id not in accessible_upload_file_ids:
continue
params = self._build_signed_query_params(sign_target=preview_type, upload_file_id=upload_file_id)
base_url = f"/files/{upload_file_id}/{preview_type}"
signed_url = f"{base_url}?{params}"
signed_urls.append((match.start(), match.end(), signed_url))
@ -849,14 +861,7 @@ class DocumentSegment(Base):
for match in matches:
upload_file_id = match.group(1)
file_extension = match.group(2)
nonce = os.urandom(16).hex()
timestamp = str(int(time.time()))
data_to_sign = f"file-preview|{upload_file_id}|{timestamp}|{nonce}"
secret_key = dify_config.SECRET_KEY.encode() if dify_config.SECRET_KEY else b""
sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
encoded_sign = base64.urlsafe_b64encode(sign).decode()
params = f"timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}"
params = self._build_signed_query_params(sign_target="file-preview", upload_file_id=upload_file_id)
base_url = f"/files/tools/{upload_file_id}.{file_extension}"
signed_url = f"{base_url}?{params}"
signed_urls.append((match.start(), match.end(), signed_url))

View File

@ -121,9 +121,7 @@ def test_extract_images_from_docx(monkeypatch):
db_stub = SimpleNamespace(session=DummySession())
monkeypatch.setattr(we, "db", db_stub)
# Patch config values used for URL composition and storage type
monkeypatch.setattr(we.dify_config, "FILES_URL", "http://files.local", raising=False)
monkeypatch.setattr(we.dify_config, "INTERNAL_FILES_URL", "http://internal.docker:5001", raising=False)
# Patch config value used in this code path
monkeypatch.setattr(we.dify_config, "STORAGE_TYPE", "local", raising=False)
# Patch UploadFile to avoid real DB models

View File

@ -15,6 +15,7 @@ from datetime import UTC, datetime
from unittest.mock import MagicMock, patch
from uuid import uuid4
import models.dataset as dataset_module
from models.dataset import (
AppDatasetJoin,
ChildChunk,
@ -561,12 +562,13 @@ class TestDocumentSegmentIndexing:
tokens=1,
created_by=str(uuid4()),
)
import models.dataset as dataset_module
mock_scalars_result = MagicMock()
mock_scalars_result.all.return_value = [upload_file_id]
# Act
with (
patch.object(dataset_module.dify_config, "SECRET_KEY", "secret", create=True),
patch("models.dataset.db.session.scalars", return_value=mock_scalars_result),
patch("models.dataset.time.time", return_value=1700000000),
patch("models.dataset.os.urandom", return_value=b"\x00" * 16),
):
@ -578,6 +580,73 @@ class TestDocumentSegmentIndexing:
assert "&nonce=" in signed
assert "&sign=" in signed
def test_document_segment_sign_content_strips_absolute_files_host_for_image_preview(self):
"""Test that sign_content strips scheme/host from absolute image-preview URLs."""
# Arrange
upload_file_id = "e2a4f7b1-1234-5678-9abc-def012345678"
segment = DocumentSegment(
tenant_id=str(uuid4()),
dataset_id=str(uuid4()),
document_id=str(uuid4()),
position=1,
content=f"![image](http://internal.docker:5001/files/{upload_file_id}/image-preview)",
word_count=1,
tokens=1,
created_by=str(uuid4()),
)
mock_scalars_result = MagicMock()
mock_scalars_result.all.return_value = [upload_file_id]
# Act
with (
patch.object(dataset_module.dify_config, "SECRET_KEY", "secret", create=True),
patch("models.dataset.db.session.scalars", return_value=mock_scalars_result),
patch("models.dataset.time.time", return_value=1700000000),
patch("models.dataset.os.urandom", return_value=b"\x00" * 16),
):
signed = segment.get_sign_content()
# Assert
assert "internal.docker:5001" not in signed
assert f"/files/{upload_file_id}/image-preview?timestamp=" in signed
assert "&nonce=" in signed
assert "&sign=" in signed
def test_document_segment_sign_content_skips_upload_files_outside_tenant(self):
"""Test that sign_content only signs upload files belonging to the segment tenant."""
# Arrange
allowed_upload_file_id = "1602650a-4fe4-423c-85a2-af76c083e3c4"
denied_upload_file_id = "f8f35fca-568f-4626-adf0-4f30de96aa32"
segment = DocumentSegment(
tenant_id=str(uuid4()),
dataset_id=str(uuid4()),
document_id=str(uuid4()),
position=1,
content=(
f"allowed: ![image](/files/{allowed_upload_file_id}/file-preview) "
f"denied: ![image](/files/{denied_upload_file_id}/file-preview)"
),
word_count=1,
tokens=1,
created_by=str(uuid4()),
)
mock_scalars_result = MagicMock()
mock_scalars_result.all.return_value = [allowed_upload_file_id]
# Act
with (
patch.object(dataset_module.dify_config, "SECRET_KEY", "secret", create=True),
patch("models.dataset.db.session.scalars", return_value=mock_scalars_result),
patch("models.dataset.time.time", return_value=1700000000),
patch("models.dataset.os.urandom", return_value=b"\x00" * 16),
):
signed = segment.get_sign_content()
# Assert
assert f"/files/{allowed_upload_file_id}/file-preview?timestamp=" in signed
assert f"/files/{denied_upload_file_id}/file-preview?timestamp=" not in signed
assert f"/files/{denied_upload_file_id}/file-preview)" in signed
def test_document_segment_with_answer_field(self):
"""Test creating a document segment with answer field for QA model."""
# Arrange