This commit is contained in:
L1nSn0w 2026-05-09 11:57:33 +08:00 committed by GitHub
commit 66a25e96c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 244 additions and 18 deletions

View File

@ -532,7 +532,6 @@ class BaseAgentRunner(AppRunner):
file_objs = file_factory.build_from_message_files(
message_files=files,
tenant_id=self.tenant_id,
config=file_extra_config,
access_controller=_file_access_controller,
)
if not file_objs:

View File

@ -86,12 +86,10 @@ class TokenBufferMemory:
detail = ImagePromptMessageContent.DETAIL.HIGH
if file_extra_config and app_record:
# Build files directly without filtering by belongs_to
file_objs = [
file_factory.build_from_message_file(
message_file=message_file,
tenant_id=app_record.tenant_id,
config=file_extra_config,
access_controller=_file_access_controller,
)
for message_file in message_files

View File

@ -1,11 +1,18 @@
"""Adapters from persisted message files to graph-layer file values."""
"""Adapters from persisted message files to graph-layer file values.
Replay paths only: files in conversation history were validated at upload time,
so these helpers deliberately do not accept (or forward) a ``FileUploadConfig``
re-validation here would break replays whenever workflow ``file_upload`` config
drifts between rounds. Mirrors ``build_file_from_stored_mapping`` in
``models/utils/file_input_compat.py``.
"""
from __future__ import annotations
from collections.abc import Sequence
from core.app.file_access import FileAccessControllerProtocol
from graphon.file import File, FileBelongsTo, FileTransferMethod, FileUploadConfig
from graphon.file import File, FileBelongsTo, FileTransferMethod
from models import MessageFile
from .builders import build_from_mapping
@ -15,14 +22,12 @@ def build_from_message_files(
*,
message_files: Sequence[MessageFile],
tenant_id: str,
config: FileUploadConfig | None = None,
access_controller: FileAccessControllerProtocol,
) -> Sequence[File]:
return [
build_from_message_file(
message_file=message_file,
tenant_id=tenant_id,
config=config,
access_controller=access_controller,
)
for message_file in message_files
@ -34,7 +39,6 @@ def build_from_message_file(
*,
message_file: MessageFile,
tenant_id: str,
config: FileUploadConfig | None,
access_controller: FileAccessControllerProtocol,
) -> File:
mapping = {
@ -54,6 +58,5 @@ def build_from_message_file(
return build_from_mapping(
mapping=mapping,
tenant_id=tenant_id,
config=config,
access_controller=access_controller,
)

View File

@ -2,9 +2,25 @@
from __future__ import annotations
from collections.abc import Iterable
from graphon.file import FileTransferMethod, FileType, FileUploadConfig
def _normalize_extension(extension: str) -> str:
s = extension.strip().lower()
if not s:
return ""
return s if s.startswith(".") else "." + s
def _extension_matches(extension: str, whitelist: Iterable[str]) -> bool:
normalized = _normalize_extension(extension)
if not normalized:
return False
return normalized in {_normalize_extension(e) for e in whitelist}
def is_file_valid_with_config(
*,
input_file_type: str,
@ -12,22 +28,31 @@ def is_file_valid_with_config(
file_transfer_method: FileTransferMethod,
config: FileUploadConfig,
) -> bool:
# FIXME(QIN2DIM): Always allow tool files (files generated by the assistant/model)
# These are internally generated and should bypass user upload restrictions
"""Return whether the file is allowed by the upload config.
``allowed_file_types`` lists the buckets a file may fall into; ``CUSTOM`` is
a fallback bucket gated by ``allowed_file_extensions`` (case- and
dot-insensitive). Tool-generated files bypass user-facing config.
"""
if file_transfer_method == FileTransferMethod.TOOL_FILE:
return True
if (
config.allowed_file_types
and input_file_type not in config.allowed_file_types
and input_file_type != FileType.CUSTOM
):
allowed_types = config.allowed_file_types or []
custom_allowed = FileType.CUSTOM in allowed_types
type_allowed = not allowed_types or input_file_type in allowed_types
if not type_allowed and not custom_allowed:
return False
# When the file is in the CUSTOM bucket, the extension whitelist is authoritative.
# An explicitly set whitelist (including the empty list) is enforced; empty == deny —
# the UI never submits an empty list, so this guards against DSL/API paths that
# bypass the UI from accidentally widening the allowlist.
in_custom_bucket = input_file_type == FileType.CUSTOM or not type_allowed
if (
input_file_type == FileType.CUSTOM
in_custom_bucket
and config.allowed_file_extensions is not None
and file_extension not in config.allowed_file_extensions
and not _extension_matches(file_extension, config.allowed_file_extensions)
):
return False

View File

@ -198,6 +198,48 @@ class TestBuildPromptMessageWithFiles:
assert isinstance(result.content[-1], TextPromptMessageContent)
assert result.content[-1].data == "user text"
def test_replay_does_not_pass_config_to_file_factory(self):
"""Replay contract: history files were validated on upload, so this
path must not forward a FileUploadConfig. The factory's signature
no longer accepts ``config``; this test guards against a future
regression that re-introduces it."""
conv = _make_conversation(AppMode.CHAT)
mem = TokenBufferMemory(conversation=conv, model_instance=_make_model_instance())
mock_file_extra_config = MagicMock()
mock_file_extra_config.image_config = None
real_image_content = ImagePromptMessageContent(
url="http://example.com/img.png", format="png", mime_type="image/png"
)
mock_app_record = MagicMock()
mock_app_record.tenant_id = "tenant-1"
with (
patch(
"core.memory.token_buffer_memory.FileUploadConfigManager.convert",
return_value=mock_file_extra_config,
),
patch(
"core.memory.token_buffer_memory.file_factory.build_from_message_file",
return_value=MagicMock(),
) as mock_build,
patch(
"core.memory.token_buffer_memory.file_manager.to_prompt_message_content",
return_value=real_image_content,
),
):
mem._build_prompt_message_with_files(
message_files=[MagicMock()],
text_content="user text",
message=_make_message(),
app_record=mock_app_record,
is_user_message=True,
)
mock_build.assert_called_once()
assert "config" not in mock_build.call_args.kwargs
@pytest.mark.parametrize("mode", [AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.COMPLETION])
def test_chat_mode_with_files_assistant_message(self, mode):
"""When files are present, returns AssistantPromptMessage with list content."""

View File

@ -0,0 +1,159 @@
"""Unit tests for is_file_valid_with_config."""
from __future__ import annotations
import pytest
from factories.file_factory.validation import is_file_valid_with_config
from graphon.file import FileTransferMethod, FileType, FileUploadConfig
def _validate(
*,
input_file_type: str,
file_extension: str = ".png",
file_transfer_method: FileTransferMethod = FileTransferMethod.LOCAL_FILE,
config: FileUploadConfig,
) -> bool:
return is_file_valid_with_config(
input_file_type=input_file_type,
file_extension=file_extension,
file_transfer_method=file_transfer_method,
config=config,
)
@pytest.mark.parametrize(
("input_file_type", "file_extension", "allowed_file_types", "allowed_file_extensions", "expected"),
[
# round-1 happy path: literal "custom" mapping, ext whitelisted
("custom", ".png", [FileType.CUSTOM], [".png"], True),
# round-2 replay: MessageFile.type is the resolved type, but config still allows CUSTOM
("image", ".png", [FileType.CUSTOM], [".png"], True),
("document", ".pdf", [FileType.CUSTOM], [".pdf"], True),
# mixed bucket [IMAGE, CUSTOM]: document falls into CUSTOM bucket via extension
("document", ".pdf", [FileType.IMAGE, FileType.CUSTOM], [".pdf"], True),
("document", ".exe", [FileType.IMAGE, FileType.CUSTOM], [".pdf"], False),
("image", ".jpg", [FileType.IMAGE], [], True),
("video", ".mp4", [FileType.IMAGE, FileType.DOCUMENT], [], False),
("custom", ".exe", [FileType.CUSTOM], [".png"], False),
# empty allowed_file_types == no type restriction
("video", ".mp4", [], [], True),
],
)
def test_bucket_semantics(input_file_type, file_extension, allowed_file_types, allowed_file_extensions, expected):
config = FileUploadConfig(
allowed_file_types=allowed_file_types,
allowed_file_extensions=allowed_file_extensions,
)
assert _validate(input_file_type=input_file_type, file_extension=file_extension, config=config) is expected
@pytest.mark.parametrize("whitelist_entry", [".png", ".PNG", "png", "PNG", " .Png ", "PnG"])
def test_extension_match_is_case_and_dot_insensitive(whitelist_entry):
config = FileUploadConfig(
allowed_file_types=[FileType.CUSTOM],
allowed_file_extensions=[whitelist_entry],
)
assert _validate(input_file_type="custom", file_extension=".png", config=config) is True
def test_extension_mismatch_still_rejected_after_normalization():
config = FileUploadConfig(
allowed_file_types=[FileType.CUSTOM],
allowed_file_extensions=[".png", ".jpg"],
)
assert _validate(input_file_type="custom", file_extension=".pdf", config=config) is False
def test_mixed_case_whitelist_replicating_real_user_config():
config = FileUploadConfig(
allowed_file_types=[FileType.CUSTOM],
allowed_file_extensions=[".PNG", "png", "JPG", ".WEBP", "SVG", "GIF"],
)
for ext in (".png", ".jpg", ".webp", ".svg", ".gif"):
assert _validate(input_file_type="custom", file_extension=ext, config=config) is True
def test_tool_file_always_passes():
config = FileUploadConfig(allowed_file_types=[FileType.CUSTOM], allowed_file_extensions=[".pdf"])
assert (
_validate(
input_file_type="image",
file_extension=".png",
file_transfer_method=FileTransferMethod.TOOL_FILE,
config=config,
)
is True
)
def test_transfer_method_gate_for_non_image():
config = FileUploadConfig(
allowed_file_types=[FileType.DOCUMENT],
allowed_file_upload_methods=[FileTransferMethod.LOCAL_FILE],
)
assert (
_validate(
input_file_type="document",
file_extension=".pdf",
file_transfer_method=FileTransferMethod.LOCAL_FILE,
config=config,
)
is True
)
assert (
_validate(
input_file_type="document",
file_extension=".pdf",
file_transfer_method=FileTransferMethod.REMOTE_URL,
config=config,
)
is False
)
def test_history_replay_matches_round_1_outcome_under_unchanged_config():
"""A file that passes round 1 must pass history replay when config is unchanged."""
config = FileUploadConfig(
allowed_file_types=[FileType.CUSTOM],
allowed_file_extensions=[".png"],
)
assert _validate(input_file_type="custom", file_extension=".png", config=config) is True
assert _validate(input_file_type="image", file_extension=".png", config=config) is True
def test_empty_whitelist_in_custom_bucket_denies_by_default():
"""Defensive: when a file lands in the CUSTOM bucket, an empty
allowed_file_extensions list rejects. The UI never submits empty;
this guards DSL / API paths that bypass the UI from accidentally
widening what's accepted."""
config = FileUploadConfig(
allowed_file_types=[FileType.CUSTOM],
allowed_file_extensions=[],
)
assert _validate(input_file_type="custom", file_extension=".png", config=config) is False
assert _validate(input_file_type="image", file_extension=".png", config=config) is False
def test_normalize_handles_whitespace_and_empty_consistently():
"""Whitespace-only or empty entries in the whitelist must not match real
extensions (regression guard for _normalize_extension edge cases)."""
for noisy_entry in ("", " ", "\t"):
config = FileUploadConfig(
allowed_file_types=[FileType.CUSTOM],
allowed_file_extensions=[noisy_entry],
)
assert _validate(input_file_type="custom", file_extension=".png", config=config) is False
def test_empty_extension_does_not_spuriously_match_empty_whitelist_entry():
"""Defensive: even if the whitelist contains an empty / whitespace entry
(e.g., a stray comma in DSL), an extensionless file must not pass via
a both-sides-empty match. Real entries in the same whitelist still match."""
config = FileUploadConfig(
allowed_file_types=[FileType.CUSTOM],
allowed_file_extensions=["", ".png"],
)
assert _validate(input_file_type="custom", file_extension=".png", config=config) is True
assert _validate(input_file_type="custom", file_extension="", config=config) is False