diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 43dddbd011..15acd4aee3 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -63,6 +63,11 @@ class SecurityConfig(BaseSettings): default=None, ) + PASSWORD_HASH_ITERATIONS: PositiveInt = Field( + description="Number of PBKDF2 iterations for password hashing. Recommended: 600000+ for 2025 standards", + default=10000, + ) + class AppExecutionConfig(BaseSettings): """ diff --git a/api/libs/password.py b/api/libs/password.py index cdf55c57e5..9a41c7f379 100644 --- a/api/libs/password.py +++ b/api/libs/password.py @@ -3,6 +3,8 @@ import binascii import hashlib import re +from configs import dify_config + password_pattern = r"^(?=.*[a-zA-Z])(?=.*\d).{8,}$" @@ -16,11 +18,61 @@ def valid_password(password): raise ValueError("Password must contain letters and numbers, and the length must be greater than 8.") -def hash_password(password_str, salt_byte): - dk = hashlib.pbkdf2_hmac("sha256", password_str.encode("utf-8"), salt_byte, 10000) +def hash_password(password_str, salt_byte, iterations=None): + """ + Hash a password using PBKDF2-HMAC-SHA256. + + Args: + password_str: The plaintext password to hash + salt_byte: Cryptographic salt as bytes + iterations: Number of PBKDF2 iterations. If None, uses dify_config.PASSWORD_HASH_ITERATIONS + + Returns: + Hexadecimal-encoded hash as bytes + """ + if iterations is None: + iterations = dify_config.PASSWORD_HASH_ITERATIONS + + dk = hashlib.pbkdf2_hmac( + "sha256", + password_str.encode("utf-8"), + salt_byte, + iterations, + ) return binascii.hexlify(dk) -def compare_password(password_str, password_hashed_base64, salt_base64): - # compare password for login - return hash_password(password_str, base64.b64decode(salt_base64)) == base64.b64decode(password_hashed_base64) +def compare_password(password_str, password_hashed_base64, salt_base64, iterations=None): + """ + Compare a plaintext password against a stored hash. + + Supports backward compatibility by trying the configured iteration count first, + then falling back to the legacy default (10000) if verification fails. + + Args: + password_str: The plaintext password to verify + password_hashed_base64: Base64-encoded stored password hash + salt_base64: Base64-encoded salt + iterations: Number of PBKDF2 iterations. If None, tries current config then legacy default. + + Returns: + True if password matches, False otherwise + """ + password_hash_bytes = base64.b64decode(password_hashed_base64) + salt_bytes = base64.b64decode(salt_base64) + + if iterations is not None: + # Explicit iteration count provided - use it directly + return hash_password(password_str, salt_bytes, iterations) == password_hash_bytes + + # Try current configured iteration count first + if hash_password(password_str, salt_bytes, dify_config.PASSWORD_HASH_ITERATIONS) == password_hash_bytes: + return True + + # Fallback: Try legacy default (10000) for backward compatibility + # This allows passwords hashed with old iteration count to still work + if dify_config.PASSWORD_HASH_ITERATIONS != 10000: + if hash_password(password_str, salt_bytes, 10000) == password_hash_bytes: + return True + + return False diff --git a/api/tests/unit_tests/libs/test_password.py b/api/tests/unit_tests/libs/test_password.py index 79fc792cc5..c9ee558196 100644 --- a/api/tests/unit_tests/libs/test_password.py +++ b/api/tests/unit_tests/libs/test_password.py @@ -72,3 +72,80 @@ class TestPasswordHashing: """Test password case sensitivity""" result = compare_password(self.password.upper(), self.password_hash_base64, self.salt_base64) assert result is False + + +class TestPasswordHashIterations: + """Test password hashing with configurable iterations""" + + def setup_method(self): + """Setup test data""" + self.password = "secure123password" + self.salt = os.urandom(16) + self.salt_base64 = base64.b64encode(self.salt).decode() + + def test_should_hash_with_custom_iterations(self): + """Test hashing with custom iteration count""" + iterations = 50000 + password_hash = hash_password(self.password, self.salt, iterations) + password_hash_base64 = base64.b64encode(password_hash).decode() + + result = compare_password(self.password, password_hash_base64, self.salt_base64, iterations) + assert result is True + + def test_should_produce_different_hashes_for_different_iterations(self): + """Test that different iteration counts produce different hashes""" + hash_10k = hash_password(self.password, self.salt, 10000) + hash_50k = hash_password(self.password, self.salt, 50000) + + assert hash_10k != hash_50k + + def test_should_use_default_iterations_when_none_specified(self): + """Test that default iterations are used when not specified""" + hash_default = hash_password(self.password, self.salt) + hash_default_base64 = base64.b64encode(hash_default).decode() + + # Should verify with default config iterations + result = compare_password(self.password, hash_default_base64, self.salt_base64) + assert result is True + + def test_should_fail_verification_with_wrong_iterations(self): + """Test that verification fails with incorrect iteration count""" + hash_10k = hash_password(self.password, self.salt, 10000) + hash_10k_base64 = base64.b64encode(hash_10k).decode() + + # Try to verify with different iteration count + result = compare_password(self.password, hash_10k_base64, self.salt_base64, 50000) + assert result is False + + def test_backward_compatibility_with_legacy_iterations(self): + """Test backward compatibility with legacy 10000 iterations""" + # Hash with legacy iteration count + legacy_hash = hash_password(self.password, self.salt, 10000) + legacy_hash_base64 = base64.b64encode(legacy_hash).decode() + + # Should verify without specifying iterations (fallback to legacy) + result = compare_password(self.password, legacy_hash_base64, self.salt_base64) + assert result is True + + def test_should_support_high_iteration_counts(self): + """Test support for modern high iteration counts""" + # Test with OWASP recommended iteration count for 2024 + high_iterations = 600000 + password_hash = hash_password(self.password, self.salt, high_iterations) + password_hash_base64 = base64.b64encode(password_hash).decode() + + result = compare_password(self.password, password_hash_base64, self.salt_base64, high_iterations) + assert result is True + + def test_should_handle_explicit_iterations_without_fallback(self): + """Test that explicit iterations bypass fallback logic""" + hash_50k = hash_password(self.password, self.salt, 50000) + hash_50k_base64 = base64.b64encode(hash_50k).decode() + + # With explicit iterations, should not fallback + result = compare_password(self.password, hash_50k_base64, self.salt_base64, 50000) + assert result is True + + # Wrong explicit iterations should fail + result = compare_password(self.password, hash_50k_base64, self.salt_base64, 10000) + assert result is False