diff --git a/api/tests/integration_tests/workflow/nodes/test_code.py b/api/tests/integration_tests/workflow/nodes/test_code.py index 8ddf09d125..8476209ce9 100644 --- a/api/tests/integration_tests/workflow/nodes/test_code.py +++ b/api/tests/integration_tests/workflow/nodes/test_code.py @@ -396,68 +396,3 @@ def test_execute_code_scientific_notation(setup_code_executor_mock): result = node._run() assert isinstance(result, NodeRunResult) assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED - - -def test_execute_code_obfuscate_secret_variables(monkeypatch): - monkeypatch.setenv("MOCK_SWITCH", "false") - code = """ - def main(argument1: str): - return { - "result": argument1, - } - """ - # trim first 4 spaces at the beginning of each line - code = "\n".join([line[4:] for line in code.split("\n")]) - - variables = [ - { - "variable": "argument1", - "value_selector": ["env", "secret_key"], - }, - ] - code_config = { - "id": "secret_environment_code", - "data": { - "type": "code", - "outputs": { - "result": { - "type": "string", - }, - }, - "title": "secret_obfus", - "variables": variables, - "answer": "fake-secret-key", - "code_language": "python3", - "code": code, - }, - } - - node = init_code_node(code_config, False) - # Variable with values replaced with env variables - replaced_variables = node.graph_runtime_state.variable_pool.variable_dictionary.get("env", {}) - - secret_variable_value_map = {} - for var in replaced_variables.values(): - if isinstance(var, SecretVariable): - secret_variable_value_map[var.name] = var.value - - input_variables_argument_map = { - var["variable"]: var["value_selector"][1] for var in variables if var.get("value_selector", [None])[0] == "env" - } - - input_argument_value_map = { - k: secret_variable_value_map[v] - for k, v in input_variables_argument_map.items() - if v in secret_variable_value_map - } - - # execute node - result = node._run() - - assert isinstance(result, NodeRunResult) - assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED - assert result.outputs is not None - for name, value in result.inputs.items(): - if input_argument_value_map.get(name): - assert value == encrypter.obfuscated_token(input_argument_value_map.get(name)) - assert result.error == "" diff --git a/api/tests/unit_tests/core/helper/test_encrypter.py b/api/tests/unit_tests/core/helper/test_encrypter.py index 5890009742..4883b9a5a4 100644 --- a/api/tests/unit_tests/core/helper/test_encrypter.py +++ b/api/tests/unit_tests/core/helper/test_encrypter.py @@ -10,6 +10,7 @@ from core.helper.encrypter import ( encrypt_token, get_decrypt_decoding, obfuscated_token, + encrypt_secret_keys, ) from libs.rsa import PrivkeyNotFoundError @@ -36,6 +37,59 @@ class TestObfuscatedToken: assert token not in obfuscated assert "*" * 12 in obfuscated + def test_encrypt_secret_keys_simple_dict(self): + data = { + "api_key": "fake-secret-key", + "username": "admin" + } + secret_vars = {"api_key"} + + result = encrypt_secret_keys(data, secret_vars) + + # api_key should be obfuscated + assert result["api_key"] == obfuscated_token("fake-secret-key") + # username should remain unchanged + assert result["username"] == "admin" + + def test_encrypt_secret_keys_nested_dict(self): + data = { + "outer": { + "inner_secret": "super-secret", + "inner_public": "visible" + }, + "non_secret": "plain" + } + secret_vars = {"inner_secret"} + + result = encrypt_secret_keys(data, secret_vars) + + assert result["outer"]["inner_secret"] == obfuscated_token("super-secret") + assert result["outer"]["inner_public"] == "visible" + assert result["non_secret"] == "plain" + + def test_encrypt_secret_keys_list_of_dicts(self): + data = [ + {"token1": "abc123", "id": 1}, + {"token2": "xyz789", "id": 2} + ] + secret_vars = {"token1","token2"} + + result = encrypt_secret_keys(data, secret_vars) + + assert result[0]["token1"] == obfuscated_token("abc123") + assert result[1]["token2"] == obfuscated_token("xyz789") + assert result[0]["id"] == 1 + + + def test_encrypt_secret_keys_non_secret_scalar(self): + # When the object is just a string, it should remain unchanged + result = encrypt_secret_keys("hello-world", secret_variables={"api_key"}) + assert result == "hello-world" + + def test_encrypt_secret_keys_handles_empty_inputs(self): + assert encrypt_secret_keys({}, {"secret"}) == {} + assert encrypt_secret_keys([], {"secret"}) == [] + assert encrypt_secret_keys(None, {"secret"}) is None class TestEncryptToken: @patch("models.engine.db.session.query")