removed integration case as it needs sanbox service to be up, added unit test case for secret variables encryption

This commit is contained in:
Pankaj Kaushal 2025-10-31 22:56:39 +05:30
parent bb53760e6c
commit b848224c78
2 changed files with 54 additions and 65 deletions

View File

@ -396,68 +396,3 @@ def test_execute_code_scientific_notation(setup_code_executor_mock):
result = node._run()
assert isinstance(result, NodeRunResult)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
def test_execute_code_obfuscate_secret_variables(monkeypatch):
monkeypatch.setenv("MOCK_SWITCH", "false")
code = """
def main(argument1: str):
return {
"result": argument1,
}
"""
# trim first 4 spaces at the beginning of each line
code = "\n".join([line[4:] for line in code.split("\n")])
variables = [
{
"variable": "argument1",
"value_selector": ["env", "secret_key"],
},
]
code_config = {
"id": "secret_environment_code",
"data": {
"type": "code",
"outputs": {
"result": {
"type": "string",
},
},
"title": "secret_obfus",
"variables": variables,
"answer": "fake-secret-key",
"code_language": "python3",
"code": code,
},
}
node = init_code_node(code_config, False)
# Variable with values replaced with env variables
replaced_variables = node.graph_runtime_state.variable_pool.variable_dictionary.get("env", {})
secret_variable_value_map = {}
for var in replaced_variables.values():
if isinstance(var, SecretVariable):
secret_variable_value_map[var.name] = var.value
input_variables_argument_map = {
var["variable"]: var["value_selector"][1] for var in variables if var.get("value_selector", [None])[0] == "env"
}
input_argument_value_map = {
k: secret_variable_value_map[v]
for k, v in input_variables_argument_map.items()
if v in secret_variable_value_map
}
# execute node
result = node._run()
assert isinstance(result, NodeRunResult)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
for name, value in result.inputs.items():
if input_argument_value_map.get(name):
assert value == encrypter.obfuscated_token(input_argument_value_map.get(name))
assert result.error == ""

View File

@ -10,6 +10,7 @@ from core.helper.encrypter import (
encrypt_token,
get_decrypt_decoding,
obfuscated_token,
encrypt_secret_keys,
)
from libs.rsa import PrivkeyNotFoundError
@ -36,6 +37,59 @@ class TestObfuscatedToken:
assert token not in obfuscated
assert "*" * 12 in obfuscated
def test_encrypt_secret_keys_simple_dict(self):
data = {
"api_key": "fake-secret-key",
"username": "admin"
}
secret_vars = {"api_key"}
result = encrypt_secret_keys(data, secret_vars)
# api_key should be obfuscated
assert result["api_key"] == obfuscated_token("fake-secret-key")
# username should remain unchanged
assert result["username"] == "admin"
def test_encrypt_secret_keys_nested_dict(self):
data = {
"outer": {
"inner_secret": "super-secret",
"inner_public": "visible"
},
"non_secret": "plain"
}
secret_vars = {"inner_secret"}
result = encrypt_secret_keys(data, secret_vars)
assert result["outer"]["inner_secret"] == obfuscated_token("super-secret")
assert result["outer"]["inner_public"] == "visible"
assert result["non_secret"] == "plain"
def test_encrypt_secret_keys_list_of_dicts(self):
data = [
{"token1": "abc123", "id": 1},
{"token2": "xyz789", "id": 2}
]
secret_vars = {"token1","token2"}
result = encrypt_secret_keys(data, secret_vars)
assert result[0]["token1"] == obfuscated_token("abc123")
assert result[1]["token2"] == obfuscated_token("xyz789")
assert result[0]["id"] == 1
def test_encrypt_secret_keys_non_secret_scalar(self):
# When the object is just a string, it should remain unchanged
result = encrypt_secret_keys("hello-world", secret_variables={"api_key"})
assert result == "hello-world"
def test_encrypt_secret_keys_handles_empty_inputs(self):
assert encrypt_secret_keys({}, {"secret"}) == {}
assert encrypt_secret_keys([], {"secret"}) == []
assert encrypt_secret_keys(None, {"secret"}) is None
class TestEncryptToken:
@patch("models.engine.db.session.query")