Merge remote-tracking branch 'origin/feat/rag-2' into feat/rag-2

This commit is contained in:
jyong 2025-08-26 15:52:01 +08:00
commit 60fb242f27
5 changed files with 1103 additions and 86 deletions

View File

@ -16,6 +16,7 @@ from core.plugin.entities.plugin_daemon import (
PluginDatasourceProviderEntity,
)
from core.plugin.impl.base import BasePluginClient
from core.schemas.resolver import resolve_dify_schema_refs
from services.tools.tools_transform_service import ToolTransformService
@ -32,6 +33,9 @@ class PluginDatasourceManager(BasePluginClient):
provider_name = declaration.get("identity", {}).get("name")
for datasource in declaration.get("datasources", []):
datasource["identity"]["provider"] = provider_name
# resolve refs
if datasource.get("output_schema"):
datasource["output_schema"] = resolve_dify_schema_refs(datasource["output_schema"])
return json_response
@ -69,6 +73,9 @@ class PluginDatasourceManager(BasePluginClient):
provider_name = declaration.get("identity", {}).get("name")
for datasource in declaration.get("datasources", []):
datasource["identity"]["provider"] = provider_name
# resolve refs
if datasource.get("output_schema"):
datasource["output_schema"] = resolve_dify_schema_refs(datasource["output_schema"])
return json_response
@ -106,7 +113,8 @@ class PluginDatasourceManager(BasePluginClient):
if data:
for datasource in data.get("declaration", {}).get("datasources", []):
datasource["identity"]["provider"] = tool_provider_id.provider_name
if datasource.get("output_schema"):
datasource["output_schema"] = resolve_dify_schema_refs(datasource["output_schema"])
return json_response
response = self._request_with_plugin_daemon_response(

View File

@ -100,18 +100,8 @@ class SchemaRegistry:
def _parse_uri(self, uri: str) -> tuple[str, str]:
"""Parses a schema URI to extract version and schema name"""
import re
pattern = r"^https://dify\.ai/schemas/(v\d+)/(.+)\.json$"
match = re.match(pattern, uri)
if not match:
return "", ""
version = match.group(1)
schema_name = match.group(2)
return version, schema_name
from core.schemas.resolver import parse_dify_schema_uri
return parse_dify_schema_uri(uri)
def list_versions(self) -> list[str]:
"""Returns all available versions"""

View File

@ -1,92 +1,281 @@
import logging
import re
from typing import Any, Optional
import threading
from collections import deque
from dataclasses import dataclass
from typing import Any, Optional, Union
from core.schemas.registry import SchemaRegistry
logger = logging.getLogger(__name__)
def resolve_dify_schema_refs(schema: Any, registry: Optional[SchemaRegistry] = None, max_depth: int = 10) -> Any:
# Type aliases for better clarity
SchemaType = Union[dict[str, Any], list[Any], str, int, float, bool, None]
SchemaDict = dict[str, Any]
# Pre-compiled pattern for better performance
_DIFY_SCHEMA_PATTERN = re.compile(r"^https://dify\.ai/schemas/(v\d+)/(.+)\.json$")
class SchemaResolutionError(Exception):
"""Base exception for schema resolution errors"""
pass
class CircularReferenceError(SchemaResolutionError):
"""Raised when a circular reference is detected"""
def __init__(self, ref_uri: str, ref_path: list[str]):
self.ref_uri = ref_uri
self.ref_path = ref_path
super().__init__(f"Circular reference detected: {ref_uri} in path {' -> '.join(ref_path)}")
class MaxDepthExceededError(SchemaResolutionError):
"""Raised when maximum resolution depth is exceeded"""
def __init__(self, max_depth: int):
self.max_depth = max_depth
super().__init__(f"Maximum resolution depth ({max_depth}) exceeded")
class SchemaNotFoundError(SchemaResolutionError):
"""Raised when a referenced schema cannot be found"""
def __init__(self, ref_uri: str):
self.ref_uri = ref_uri
super().__init__(f"Schema not found: {ref_uri}")
@dataclass
class QueueItem:
"""Represents an item in the BFS queue"""
current: Any
parent: Optional[Any]
key: Optional[Union[str, int]]
depth: int
ref_path: set[str]
class SchemaResolver:
"""Resolver for Dify schema references with caching and optimizations"""
_cache: dict[str, SchemaDict] = {}
_cache_lock = threading.Lock()
def __init__(self, registry: Optional[SchemaRegistry] = None, max_depth: int = 10):
"""
Initialize the schema resolver
Args:
registry: Schema registry to use (defaults to default registry)
max_depth: Maximum depth for reference resolution
"""
self.registry = registry or SchemaRegistry.default_registry()
self.max_depth = max_depth
@classmethod
def clear_cache(cls) -> None:
"""Clear the global schema cache"""
with cls._cache_lock:
cls._cache.clear()
def resolve(self, schema: SchemaType) -> SchemaType:
"""
Resolve all $ref references in the schema
Performance optimization: quickly checks for $ref presence before processing.
Args:
schema: Schema to resolve
Returns:
Resolved schema with all references expanded
Raises:
CircularReferenceError: If circular reference detected
MaxDepthExceededError: If max depth exceeded
SchemaNotFoundError: If referenced schema not found
"""
if not isinstance(schema, (dict, list)):
return schema
# Fast path: if no Dify refs found, return original schema unchanged
# This avoids expensive deepcopy and BFS traversal for schemas without refs
if not _has_dify_refs(schema):
return schema
# Slow path: schema contains refs, perform full resolution
import copy
result = copy.deepcopy(schema)
# Initialize BFS queue
queue = deque([QueueItem(
current=result,
parent=None,
key=None,
depth=0,
ref_path=set()
)])
while queue:
item = queue.popleft()
# Process the current item
self._process_queue_item(queue, item)
return result
def _process_queue_item(self, queue: deque, item: QueueItem) -> None:
"""Process a single queue item"""
if isinstance(item.current, dict):
self._process_dict(queue, item)
elif isinstance(item.current, list):
self._process_list(queue, item)
def _process_dict(self, queue: deque, item: QueueItem) -> None:
"""Process a dictionary item"""
ref_uri = item.current.get("$ref")
if ref_uri and _is_dify_schema_ref(ref_uri):
# Handle $ref resolution
self._resolve_ref(queue, item, ref_uri)
else:
# Process nested items
for key, value in item.current.items():
if isinstance(value, (dict, list)):
next_depth = item.depth + 1
if next_depth >= self.max_depth:
raise MaxDepthExceededError(self.max_depth)
queue.append(QueueItem(
current=value,
parent=item.current,
key=key,
depth=next_depth,
ref_path=item.ref_path
))
def _process_list(self, queue: deque, item: QueueItem) -> None:
"""Process a list item"""
for idx, value in enumerate(item.current):
if isinstance(value, (dict, list)):
next_depth = item.depth + 1
if next_depth >= self.max_depth:
raise MaxDepthExceededError(self.max_depth)
queue.append(QueueItem(
current=value,
parent=item.current,
key=idx,
depth=next_depth,
ref_path=item.ref_path
))
def _resolve_ref(self, queue: deque, item: QueueItem, ref_uri: str) -> None:
"""Resolve a $ref reference"""
# Check for circular reference
if ref_uri in item.ref_path:
# Mark as circular and skip
item.current["$circular_ref"] = True
logger.warning("Circular reference detected: %s", ref_uri)
return
# Get resolved schema (from cache or registry)
resolved_schema = self._get_resolved_schema(ref_uri)
if not resolved_schema:
logger.warning("Schema not found: %s", ref_uri)
return
# Update ref path
new_ref_path = item.ref_path | {ref_uri}
# Replace the reference with resolved schema
next_depth = item.depth + 1
if next_depth >= self.max_depth:
raise MaxDepthExceededError(self.max_depth)
if item.parent is None:
# Root level replacement
item.current.clear()
item.current.update(resolved_schema)
queue.append(QueueItem(
current=item.current,
parent=None,
key=None,
depth=next_depth,
ref_path=new_ref_path
))
else:
# Update parent container
item.parent[item.key] = resolved_schema.copy()
queue.append(QueueItem(
current=item.parent[item.key],
parent=item.parent,
key=item.key,
depth=next_depth,
ref_path=new_ref_path
))
def _get_resolved_schema(self, ref_uri: str) -> Optional[SchemaDict]:
"""Get resolved schema from cache or registry"""
# Check cache first
with self._cache_lock:
if ref_uri in self._cache:
return self._cache[ref_uri].copy()
# Fetch from registry
schema = self.registry.get_schema(ref_uri)
if not schema:
return None
# Clean and cache
cleaned = _remove_metadata_fields(schema)
with self._cache_lock:
self._cache[ref_uri] = cleaned
return cleaned.copy()
def resolve_dify_schema_refs(
schema: SchemaType,
registry: Optional[SchemaRegistry] = None,
max_depth: int = 30
) -> SchemaType:
"""
Resolve $ref references in Dify schema to actual schema content
This is a convenience function that creates a resolver and resolves the schema.
Performance optimization: quickly checks for $ref presence before processing.
Args:
schema: Schema object that may contain $ref references
registry: Optional schema registry, defaults to default registry
max_depth: Maximum recursion depth to prevent infinite loops (default: 10)
max_depth: Maximum depth to prevent infinite loops (default: 30)
Returns:
Schema with all $ref references resolved to actual content
Raises:
RecursionError: If maximum recursion depth is exceeded
CircularReferenceError: If circular reference detected
MaxDepthExceededError: If maximum depth exceeded
SchemaNotFoundError: If referenced schema not found
"""
if registry is None:
registry = SchemaRegistry.default_registry()
return _resolve_refs_recursive(schema, registry, max_depth, 0)
def _resolve_refs_recursive(schema: Any, registry: SchemaRegistry, max_depth: int, current_depth: int) -> Any:
"""
Recursively resolve $ref references in schema
Args:
schema: Schema object to process
registry: Schema registry for lookups
max_depth: Maximum allowed recursion depth
current_depth: Current recursion depth
Returns:
Schema with references resolved
Raises:
RecursionError: If maximum depth exceeded
"""
# Check recursion depth
if current_depth >= max_depth:
raise RecursionError(f"Maximum recursion depth ({max_depth}) exceeded while resolving schema references")
if isinstance(schema, dict):
# Check if this is a $ref reference
if "$ref" in schema:
ref_uri = schema["$ref"]
# Only resolve Dify schema references
if _is_dify_schema_ref(ref_uri):
resolved_schema = registry.get_schema(ref_uri)
if resolved_schema:
# Remove metadata fields from resolved schema
cleaned_schema = _remove_metadata_fields(resolved_schema)
# Recursively resolve the cleaned schema in case it contains more refs
return _resolve_refs_recursive(cleaned_schema, registry, max_depth, current_depth + 1)
else:
# If schema not found, return original ref (might be external or invalid)
return schema
else:
# Non-Dify reference, return as-is
return schema
else:
# Regular dict, recursively process all values
resolved_dict = {}
for key, value in schema.items():
resolved_dict[key] = _resolve_refs_recursive(value, registry, max_depth, current_depth + 1)
return resolved_dict
elif isinstance(schema, list):
# Process list items recursively
return [_resolve_refs_recursive(item, registry, max_depth, current_depth + 1) for item in schema]
else:
# Primitive value, return as-is
# Fast path: if no Dify refs found, return original schema unchanged
# This avoids expensive deepcopy and BFS traversal for schemas without refs
if not _has_dify_refs(schema):
return schema
# Slow path: schema contains refs, perform full resolution
resolver = SchemaResolver(registry, max_depth)
return resolver.resolve(schema)
def _remove_metadata_fields(schema: dict) -> dict:
"""
Remove metadata fields from schema that shouldn't be included in resolved output
"""
if not isinstance(schema, dict):
return schema
Args:
schema: Schema dictionary
Returns:
Cleaned schema without metadata fields
"""
# Create a copy and remove metadata fields
cleaned = schema.copy()
metadata_fields = ["$id", "$schema", "version"]
@ -97,13 +286,123 @@ def _remove_metadata_fields(schema: dict) -> dict:
return cleaned
def _is_dify_schema_ref(ref_uri: str) -> bool:
def _is_dify_schema_ref(ref_uri: Any) -> bool:
"""
Check if the reference URI is a Dify schema reference
Args:
ref_uri: URI to check
Returns:
True if it's a Dify schema reference
"""
if not isinstance(ref_uri, str):
return False
# Use pre-compiled pattern for better performance
return bool(_DIFY_SCHEMA_PATTERN.match(ref_uri))
def _has_dify_refs_recursive(schema: SchemaType) -> bool:
"""
Recursively check if a schema contains any Dify $ref references
This is the fallback method when string-based detection is not possible.
Args:
schema: Schema to check for references
# Match Dify schema URI pattern: https://dify.ai/schemas/v*/name.json
pattern = r"^https://dify\.ai/schemas/(v\d+)/(.+)\.json$"
return bool(re.match(pattern, ref_uri))
Returns:
True if any Dify $ref is found, False otherwise
"""
if isinstance(schema, dict):
# Check if this dict has a $ref field
ref_uri = schema.get("$ref")
if ref_uri and _is_dify_schema_ref(ref_uri):
return True
# Check nested values
for value in schema.values():
if _has_dify_refs_recursive(value):
return True
elif isinstance(schema, list):
# Check each item in the list
for item in schema:
if _has_dify_refs_recursive(item):
return True
# Primitive types don't contain refs
return False
def _has_dify_refs_hybrid(schema: SchemaType) -> bool:
"""
Hybrid detection: fast string scan followed by precise recursive check
Performance optimization using two-phase detection:
1. Fast string scan to quickly eliminate schemas without $ref
2. Precise recursive validation only for potential candidates
Args:
schema: Schema to check for references
Returns:
True if any Dify $ref is found, False otherwise
"""
# Phase 1: Fast string-based pre-filtering
try:
import json
schema_str = json.dumps(schema, separators=(',', ':'))
# Quick elimination: no $ref at all
if '"$ref"' not in schema_str:
return False
# Quick elimination: no Dify schema URLs
if 'https://dify.ai/schemas/' not in schema_str:
return False
except (TypeError, ValueError, OverflowError):
# JSON serialization failed (e.g., circular references, non-serializable objects)
# Fall back to recursive detection
logger.debug("JSON serialization failed for schema, using recursive detection")
return _has_dify_refs_recursive(schema)
# Phase 2: Precise recursive validation
# Only executed for schemas that passed string pre-filtering
return _has_dify_refs_recursive(schema)
def _has_dify_refs(schema: SchemaType) -> bool:
"""
Check if a schema contains any Dify $ref references
Uses hybrid detection for optimal performance:
- Fast string scan for quick elimination
- Precise recursive check for validation
Args:
schema: Schema to check for references
Returns:
True if any Dify $ref is found, False otherwise
"""
return _has_dify_refs_hybrid(schema)
def parse_dify_schema_uri(uri: str) -> tuple[str, str]:
"""
Parse a Dify schema URI to extract version and schema name
Args:
uri: Schema URI to parse
Returns:
Tuple of (version, schema_name) or ("", "") if invalid
"""
match = _DIFY_SCHEMA_PATTERN.match(uri)
if not match:
return "", ""
return match.group(1), match.group(2)

View File

@ -1,8 +1,21 @@
import time
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import MagicMock, patch
import pytest
from core.schemas import resolve_dify_schema_refs
from core.schemas.registry import SchemaRegistry
from core.schemas.resolver import (
MaxDepthExceededError,
SchemaResolver,
_has_dify_refs,
_has_dify_refs_hybrid,
_has_dify_refs_recursive,
_is_dify_schema_ref,
_remove_metadata_fields,
parse_dify_schema_uri,
)
class TestSchemaResolver:
@ -11,6 +24,12 @@ class TestSchemaResolver:
def setup_method(self):
"""Setup method to initialize test resources"""
self.registry = SchemaRegistry.default_registry()
# Clear cache before each test
SchemaResolver.clear_cache()
def teardown_method(self):
"""Cleanup after each test"""
SchemaResolver.clear_cache()
def test_simple_ref_resolution(self):
"""Test resolving a simple $ref to a complete schema"""
@ -156,5 +175,702 @@ class TestSchemaResolver:
assert resolved["type"] == "object"
# Should raise error with very low max_depth
with pytest.raises(RecursionError, match="Maximum recursion depth"):
resolve_dify_schema_refs(deep_schema, max_depth=5)
with pytest.raises(MaxDepthExceededError) as exc_info:
resolve_dify_schema_refs(deep_schema, max_depth=5)
assert exc_info.value.max_depth == 5
def test_circular_reference_detection(self):
"""Test that circular references are detected and handled"""
# Mock registry with circular reference
mock_registry = MagicMock()
mock_registry.get_schema.side_effect = lambda uri: {
"$ref": "https://dify.ai/schemas/v1/circular.json",
"type": "object"
}
schema = {"$ref": "https://dify.ai/schemas/v1/circular.json"}
resolved = resolve_dify_schema_refs(schema, registry=mock_registry)
# Should mark circular reference
assert "$circular_ref" in resolved
def test_schema_not_found_handling(self):
"""Test handling of missing schemas"""
# Mock registry that returns None for unknown schemas
mock_registry = MagicMock()
mock_registry.get_schema.return_value = None
schema = {"$ref": "https://dify.ai/schemas/v1/unknown.json"}
resolved = resolve_dify_schema_refs(schema, registry=mock_registry)
# Should keep the original $ref when schema not found
assert resolved["$ref"] == "https://dify.ai/schemas/v1/unknown.json"
def test_primitive_types_unchanged(self):
"""Test that primitive types are returned unchanged"""
assert resolve_dify_schema_refs("string") == "string"
assert resolve_dify_schema_refs(123) == 123
assert resolve_dify_schema_refs(True) is True
assert resolve_dify_schema_refs(None) is None
assert resolve_dify_schema_refs(3.14) == 3.14
def test_cache_functionality(self):
"""Test that caching works correctly"""
schema = {"$ref": "https://dify.ai/schemas/v1/file.json"}
# First resolution should fetch from registry
resolved1 = resolve_dify_schema_refs(schema)
# Mock the registry to return different data
with patch.object(self.registry, "get_schema") as mock_get:
mock_get.return_value = {"type": "different"}
# Second resolution should use cache
resolved2 = resolve_dify_schema_refs(schema)
# Should be the same as first resolution (from cache)
assert resolved1 == resolved2
# Mock should not have been called
mock_get.assert_not_called()
# Clear cache and try again
SchemaResolver.clear_cache()
# Now it should fetch again
resolved3 = resolve_dify_schema_refs(schema)
assert resolved3 == resolved1
def test_thread_safety(self):
"""Test that the resolver is thread-safe"""
schema = {
"type": "object",
"properties": {
f"prop_{i}": {"$ref": "https://dify.ai/schemas/v1/file.json"}
for i in range(10)
}
}
results = []
def resolve_in_thread():
try:
result = resolve_dify_schema_refs(schema)
results.append(result)
return True
except Exception as e:
results.append(e)
return False
# Run multiple threads concurrently
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(resolve_in_thread) for _ in range(20)]
success = all(f.result() for f in futures)
assert success
# All results should be the same
first_result = results[0]
assert all(r == first_result for r in results if not isinstance(r, Exception))
def test_mixed_nested_structures(self):
"""Test resolving refs in complex mixed structures"""
complex_schema = {
"type": "object",
"properties": {
"files": {
"type": "array",
"items": {"$ref": "https://dify.ai/schemas/v1/file.json"}
},
"nested": {
"type": "object",
"properties": {
"qa": {"$ref": "https://dify.ai/schemas/v1/qa_structure.json"},
"data": {
"type": "array",
"items": {
"type": "object",
"properties": {
"general": {"$ref": "https://dify.ai/schemas/v1/general_structure.json"}
}
}
}
}
}
}
}
resolved = resolve_dify_schema_refs(complex_schema, max_depth=20)
# Check structure is preserved
assert resolved["type"] == "object"
assert "files" in resolved["properties"]
assert "nested" in resolved["properties"]
# Check refs are resolved
assert resolved["properties"]["files"]["items"]["type"] == "object"
assert resolved["properties"]["files"]["items"]["title"] == "File Schema"
assert resolved["properties"]["nested"]["properties"]["qa"]["type"] == "object"
assert resolved["properties"]["nested"]["properties"]["qa"]["title"] == "Q&A Structure Schema"
class TestUtilityFunctions:
"""Test utility functions"""
def test_is_dify_schema_ref(self):
"""Test _is_dify_schema_ref function"""
# Valid Dify refs
assert _is_dify_schema_ref("https://dify.ai/schemas/v1/file.json")
assert _is_dify_schema_ref("https://dify.ai/schemas/v2/complex_name.json")
assert _is_dify_schema_ref("https://dify.ai/schemas/v999/test-file.json")
# Invalid refs
assert not _is_dify_schema_ref("https://example.com/schema.json")
assert not _is_dify_schema_ref("https://dify.ai/other/path.json")
assert not _is_dify_schema_ref("not a uri")
assert not _is_dify_schema_ref("")
assert not _is_dify_schema_ref(None)
assert not _is_dify_schema_ref(123)
assert not _is_dify_schema_ref(["list"])
def test_has_dify_refs(self):
"""Test _has_dify_refs function"""
# Schemas with Dify refs
assert _has_dify_refs({"$ref": "https://dify.ai/schemas/v1/file.json"})
assert _has_dify_refs({
"type": "object",
"properties": {
"data": {"$ref": "https://dify.ai/schemas/v1/file.json"}
}
})
assert _has_dify_refs([
{"type": "string"},
{"$ref": "https://dify.ai/schemas/v1/file.json"}
])
assert _has_dify_refs({
"type": "array",
"items": {
"type": "object",
"properties": {
"nested": {"$ref": "https://dify.ai/schemas/v1/qa_structure.json"}
}
}
})
# Schemas without Dify refs
assert not _has_dify_refs({"type": "string"})
assert not _has_dify_refs({
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "number"}
}
})
assert not _has_dify_refs([
{"type": "string"},
{"type": "number"},
{"type": "object", "properties": {"name": {"type": "string"}}}
])
# Schemas with non-Dify refs (should return False)
assert not _has_dify_refs({"$ref": "https://example.com/schema.json"})
assert not _has_dify_refs({
"type": "object",
"properties": {
"external": {"$ref": "https://example.com/external.json"}
}
})
# Primitive types
assert not _has_dify_refs("string")
assert not _has_dify_refs(123)
assert not _has_dify_refs(True)
assert not _has_dify_refs(None)
def test_has_dify_refs_hybrid_vs_recursive(self):
"""Test that hybrid and recursive detection give same results"""
test_schemas = [
# No refs
{"type": "string"},
{"type": "object", "properties": {"name": {"type": "string"}}},
[{"type": "string"}, {"type": "number"}],
# With Dify refs
{"$ref": "https://dify.ai/schemas/v1/file.json"},
{
"type": "object",
"properties": {
"data": {"$ref": "https://dify.ai/schemas/v1/file.json"}
}
},
[
{"type": "string"},
{"$ref": "https://dify.ai/schemas/v1/qa_structure.json"}
],
# With non-Dify refs
{"$ref": "https://example.com/schema.json"},
{
"type": "object",
"properties": {
"external": {"$ref": "https://example.com/external.json"}
}
},
# Complex nested
{
"type": "object",
"properties": {
"level1": {
"type": "object",
"properties": {
"level2": {
"type": "array",
"items": {"$ref": "https://dify.ai/schemas/v1/file.json"}
}
}
}
}
},
# Edge cases
{"description": "This mentions $ref but is not a reference"},
{"$ref": "not-a-url"},
# Primitive types
"string", 123, True, None, []
]
for schema in test_schemas:
hybrid_result = _has_dify_refs_hybrid(schema)
recursive_result = _has_dify_refs_recursive(schema)
assert hybrid_result == recursive_result, f"Mismatch for schema: {schema}"
def test_parse_dify_schema_uri(self):
"""Test parse_dify_schema_uri function"""
# Valid URIs
assert parse_dify_schema_uri("https://dify.ai/schemas/v1/file.json") == ("v1", "file")
assert parse_dify_schema_uri("https://dify.ai/schemas/v2/complex_name.json") == ("v2", "complex_name")
assert parse_dify_schema_uri("https://dify.ai/schemas/v999/test-file.json") == ("v999", "test-file")
# Invalid URIs
assert parse_dify_schema_uri("https://example.com/schema.json") == ("", "")
assert parse_dify_schema_uri("invalid") == ("", "")
assert parse_dify_schema_uri("") == ("", "")
def test_remove_metadata_fields(self):
"""Test _remove_metadata_fields function"""
schema = {
"$id": "should be removed",
"$schema": "should be removed",
"version": "should be removed",
"type": "object",
"title": "should remain",
"properties": {}
}
cleaned = _remove_metadata_fields(schema)
assert "$id" not in cleaned
assert "$schema" not in cleaned
assert "version" not in cleaned
assert cleaned["type"] == "object"
assert cleaned["title"] == "should remain"
assert "properties" in cleaned
# Original should be unchanged
assert "$id" in schema
class TestSchemaResolverClass:
"""Test SchemaResolver class specifically"""
def test_resolver_initialization(self):
"""Test resolver initialization"""
# Default initialization
resolver = SchemaResolver()
assert resolver.max_depth == 10
assert resolver.registry is not None
# Custom initialization
custom_registry = MagicMock()
resolver = SchemaResolver(registry=custom_registry, max_depth=5)
assert resolver.max_depth == 5
assert resolver.registry is custom_registry
def test_cache_sharing(self):
"""Test that cache is shared between resolver instances"""
SchemaResolver.clear_cache()
schema = {"$ref": "https://dify.ai/schemas/v1/file.json"}
# First resolver populates cache
resolver1 = SchemaResolver()
result1 = resolver1.resolve(schema)
# Second resolver should use the same cache
resolver2 = SchemaResolver()
with patch.object(resolver2.registry, "get_schema") as mock_get:
result2 = resolver2.resolve(schema)
# Should not call registry since it's in cache
mock_get.assert_not_called()
assert result1 == result2
def test_resolver_with_list_schema(self):
"""Test resolver with list as root schema"""
list_schema = [
{"$ref": "https://dify.ai/schemas/v1/file.json"},
{"type": "string"},
{"$ref": "https://dify.ai/schemas/v1/qa_structure.json"}
]
resolver = SchemaResolver()
resolved = resolver.resolve(list_schema)
assert isinstance(resolved, list)
assert len(resolved) == 3
assert resolved[0]["type"] == "object"
assert resolved[0]["title"] == "File Schema"
assert resolved[1] == {"type": "string"}
assert resolved[2]["type"] == "object"
assert resolved[2]["title"] == "Q&A Structure Schema"
def test_cache_performance(self):
"""Test that caching improves performance"""
SchemaResolver.clear_cache()
# Create a schema with many references to the same schema
schema = {
"type": "object",
"properties": {
f"prop_{i}": {"$ref": "https://dify.ai/schemas/v1/file.json"}
for i in range(50) # Reduced to avoid depth issues
}
}
# First run (no cache) - run multiple times to warm up
results1 = []
for _ in range(3):
SchemaResolver.clear_cache()
start = time.perf_counter()
result1 = resolve_dify_schema_refs(schema)
time_no_cache = time.perf_counter() - start
results1.append(time_no_cache)
avg_time_no_cache = sum(results1) / len(results1)
# Second run (with cache) - run multiple times
results2 = []
for _ in range(3):
start = time.perf_counter()
result2 = resolve_dify_schema_refs(schema)
time_with_cache = time.perf_counter() - start
results2.append(time_with_cache)
avg_time_with_cache = sum(results2) / len(results2)
# Cache should make it faster (more lenient check)
assert result1 == result2
# Cache should provide some performance benefit
assert avg_time_with_cache <= avg_time_no_cache
def test_fast_path_performance_no_refs(self):
"""Test that schemas without $refs use fast path and avoid deep copying"""
# Create a moderately complex schema without any $refs (typical plugin output_schema)
no_refs_schema = {
"type": "object",
"properties": {
f"property_{i}": {
"type": "object",
"properties": {
"name": {"type": "string"},
"value": {"type": "number"},
"items": {
"type": "array",
"items": {"type": "string"}
}
}
}
for i in range(50)
}
}
# Measure fast path (no refs) performance
fast_times = []
for _ in range(10):
start = time.perf_counter()
result_fast = resolve_dify_schema_refs(no_refs_schema)
elapsed = time.perf_counter() - start
fast_times.append(elapsed)
avg_fast_time = sum(fast_times) / len(fast_times)
# Most importantly: result should be identical to input (no copying)
assert result_fast is no_refs_schema
# Create schema with $refs for comparison (same structure size)
with_refs_schema = {
"type": "object",
"properties": {
f"property_{i}": {"$ref": "https://dify.ai/schemas/v1/file.json"}
for i in range(20) # Fewer to avoid depth issues but still comparable
}
}
# Measure slow path (with refs) performance
SchemaResolver.clear_cache()
slow_times = []
for _ in range(10):
SchemaResolver.clear_cache()
start = time.perf_counter()
result_slow = resolve_dify_schema_refs(with_refs_schema, max_depth=50)
elapsed = time.perf_counter() - start
slow_times.append(elapsed)
avg_slow_time = sum(slow_times) / len(slow_times)
# The key benefit: fast path should be reasonably fast (main goal is no deep copy)
# and definitely avoid the expensive BFS resolution
# Even if detection has some overhead, it should still be faster for typical cases
print(f"Fast path (no refs): {avg_fast_time:.6f}s")
print(f"Slow path (with refs): {avg_slow_time:.6f}s")
# More lenient check: fast path should be at least somewhat competitive
# The main benefit is avoiding deep copy and BFS, not necessarily being 5x faster
assert avg_fast_time < avg_slow_time * 2 # Should not be more than 2x slower
def test_batch_processing_performance(self):
"""Test performance improvement for batch processing of schemas without refs"""
# Simulate the plugin tool scenario: many schemas, most without refs
schemas_without_refs = [
{
"type": "object",
"properties": {
f"field_{j}": {"type": "string" if j % 2 else "number"}
for j in range(10)
}
}
for i in range(100)
]
# Test batch processing performance
start = time.perf_counter()
results = [resolve_dify_schema_refs(schema) for schema in schemas_without_refs]
batch_time = time.perf_counter() - start
# Verify all results are identical to inputs (fast path used)
for original, result in zip(schemas_without_refs, results):
assert result is original
# Should be very fast - each schema should take < 0.001 seconds on average
avg_time_per_schema = batch_time / len(schemas_without_refs)
assert avg_time_per_schema < 0.001
def test_has_dify_refs_performance(self):
"""Test that _has_dify_refs is fast for large schemas without refs"""
# Create a very large schema without refs
large_schema = {
"type": "object",
"properties": {}
}
# Add many nested properties
current = large_schema
for i in range(100):
current["properties"][f"level_{i}"] = {
"type": "object",
"properties": {}
}
current = current["properties"][f"level_{i}"]
# _has_dify_refs should be fast even for large schemas
times = []
for _ in range(50):
start = time.perf_counter()
has_refs = _has_dify_refs(large_schema)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg_time = sum(times) / len(times)
# Should be False and fast
assert not has_refs
assert avg_time < 0.01 # Should complete in less than 10ms
def test_hybrid_vs_recursive_performance(self):
"""Test performance comparison between hybrid and recursive detection"""
# Create test schemas of different types and sizes
test_cases = [
# Case 1: Small schema without refs (most common case)
{
"name": "small_no_refs",
"schema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"value": {"type": "number"}
}
},
"expected": False
},
# Case 2: Medium schema without refs
{
"name": "medium_no_refs",
"schema": {
"type": "object",
"properties": {
f"field_{i}": {
"type": "object",
"properties": {
"name": {"type": "string"},
"value": {"type": "number"},
"items": {
"type": "array",
"items": {"type": "string"}
}
}
}
for i in range(20)
}
},
"expected": False
},
# Case 3: Large schema without refs
{
"name": "large_no_refs",
"schema": {
"type": "object",
"properties": {}
},
"expected": False
},
# Case 4: Schema with Dify refs
{
"name": "with_dify_refs",
"schema": {
"type": "object",
"properties": {
"file": {"$ref": "https://dify.ai/schemas/v1/file.json"},
"data": {"type": "string"}
}
},
"expected": True
},
# Case 5: Schema with non-Dify refs
{
"name": "with_external_refs",
"schema": {
"type": "object",
"properties": {
"external": {"$ref": "https://example.com/schema.json"},
"data": {"type": "string"}
}
},
"expected": False
}
]
# Add deep nesting to large schema
current = test_cases[2]["schema"]
for i in range(50):
current["properties"][f"level_{i}"] = {
"type": "object",
"properties": {}
}
current = current["properties"][f"level_{i}"]
# Performance comparison
for test_case in test_cases:
schema = test_case["schema"]
expected = test_case["expected"]
name = test_case["name"]
# Test correctness first
assert _has_dify_refs_hybrid(schema) == expected
assert _has_dify_refs_recursive(schema) == expected
# Measure hybrid performance
hybrid_times = []
for _ in range(10):
start = time.perf_counter()
result_hybrid = _has_dify_refs_hybrid(schema)
elapsed = time.perf_counter() - start
hybrid_times.append(elapsed)
# Measure recursive performance
recursive_times = []
for _ in range(10):
start = time.perf_counter()
result_recursive = _has_dify_refs_recursive(schema)
elapsed = time.perf_counter() - start
recursive_times.append(elapsed)
avg_hybrid = sum(hybrid_times) / len(hybrid_times)
avg_recursive = sum(recursive_times) / len(recursive_times)
print(f"{name}: hybrid={avg_hybrid:.6f}s, recursive={avg_recursive:.6f}s")
# Results should be identical
assert result_hybrid == result_recursive == expected
# For schemas without refs, hybrid should be competitive or better
if not expected: # No refs case
# Hybrid might be slightly slower due to JSON serialization overhead,
# but should not be dramatically worse
assert avg_hybrid < avg_recursive * 5 # At most 5x slower
def test_string_matching_edge_cases(self):
"""Test edge cases for string-based detection"""
# Case 1: False positive potential - $ref in description
schema_false_positive = {
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "This field explains how $ref works in JSON Schema"
}
}
}
# Both methods should return False
assert not _has_dify_refs_hybrid(schema_false_positive)
assert not _has_dify_refs_recursive(schema_false_positive)
# Case 2: Complex URL patterns
complex_schema = {
"type": "object",
"properties": {
"config": {
"type": "object",
"properties": {
"dify_url": {
"type": "string",
"default": "https://dify.ai/schemas/info"
},
"actual_ref": {
"$ref": "https://dify.ai/schemas/v1/file.json"
}
}
}
}
}
# Both methods should return True (due to actual_ref)
assert _has_dify_refs_hybrid(complex_schema)
assert _has_dify_refs_recursive(complex_schema)
# Case 3: Non-JSON serializable objects (should fall back to recursive)
import datetime
non_serializable = {
"type": "object",
"timestamp": datetime.datetime.now(),
"data": {"$ref": "https://dify.ai/schemas/v1/file.json"}
}
# Hybrid should fall back to recursive and still work
assert _has_dify_refs_hybrid(non_serializable)
assert _has_dify_refs_recursive(non_serializable)

4
spec.http Normal file
View File

@ -0,0 +1,4 @@
GET /console/api/spec/schema-definitions
Host: cloud-rag.dify.dev
authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiNzExMDZhYTQtZWJlMC00NGMzLWI4NWYtMWQ4Mjc5ZTExOGZmIiwiZXhwIjoxNzU2MTkyNDE4LCJpc3MiOiJDTE9VRCIsInN1YiI6IkNvbnNvbGUgQVBJIFBhc3Nwb3J0In0.Yx_TMdWVXCp5YEoQ8WR90lRhHHKggxAQvEl5RUnkZuc
###