Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle sensitive information being inside a list in resource_dict #2178

Merged
merged 22 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dafe9d0
Handle sensitive information being inside a list in resource_dict
dbasunag Oct 23, 2024
462ee06
Merge branch 'main' into hash_cloudinit
dbasunag Oct 23, 2024
6527412
Update ocp_resources/resource.py
dbasunag Oct 28, 2024
8f87de4
Merge branch 'main' into hash_cloudinit
dbasunag Oct 28, 2024
2fd2caa
support lists and use of key path separator
dbasunag Oct 30, 2024
237b635
Merge remote-tracking branch 'origin/main' into hash_cloudinit
dbasunag Oct 30, 2024
62d3abc
change keypath separator and how to indicate list
dbasunag Oct 30, 2024
3bda9c7
add tests and update PR based on coderabbit's recommendation
dbasunag Oct 31, 2024
576abae
add more tests based on coderabbit's recommendation
dbasunag Oct 31, 2024
0936010
Merge branch 'main' into hash_cloudinit
dbasunag Oct 31, 2024
2d23652
Merge branch 'main' into hash_cloudinit
dbasunag Nov 4, 2024
691d3eb
address review comments
dbasunag Nov 7, 2024
f01c5c0
fix newline
dbasunag Nov 11, 2024
d00217f
Merge branch 'main' into hash_cloudinit
dbasunag Nov 13, 2024
bfbd1ca
Merge branch 'main' into hash_cloudinit
dbasunag Nov 18, 2024
957069a
Merge branch 'main' into hash_cloudinit
dbasunag Nov 20, 2024
7eefdc9
update doc string
dbasunag Nov 20, 2024
f931eeb
update tests based on review comments
dbasunag Nov 24, 2024
d6db81d
Merge branch 'main' into hash_cloudinit
dbasunag Nov 25, 2024
1ed9e74
rename test file and update tox
dbasunag Nov 25, 2024
d9bfa53
updates based on reviews
dbasunag Nov 26, 2024
9a37a9b
Merge branch 'main' into hash_cloudinit
dbasunag Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 77 additions & 11 deletions ocp_resources/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,78 @@ def sub_resource_level(current_class: Any, owner_class: Any, parent_class: Any)
return None


# Exceptions classes
def replace_key_with_hashed_value(resource_dict: Dict[Any, Any], key_name: str) -> Dict[Any, Any]:
"""
Recursively search a nested dictionary for a given key and changes its value to "******" if found.

The function supports two key formats:
dbasunag marked this conversation as resolved.
Show resolved Hide resolved
1. Regular dictionary path:
A key to be hashed can be found directly in a dictionary, e.g. "a>b>c", would hash the value associated with
key "c", where dictionary format is:
input = {
"a": {
"b": {
"c": "sensitive data"
}
}
}
output = {
"a": {
"b": {
"c": "*******"
}
}
}
2. List path:
A key to be hashed can be found in a dictionary that is in list somewhere in a dictionary, e.g. "a>b[]>c",
would hash the value associated with key "c", where dictionary format is:
input = {
"a": {
"b": [
{"d": "not sensitive data"},
{"c": "sensitive data"}
]
}
}
output = {
"a": {
"b": [
{"d": "not sensitive data"},
{"c": "*******"}
]
}
}

Args:
resource_dict: The nested dictionary to search.
key_name: The key path to find.

Returns:
Dict[Any, Any]: A copy of the input dictionary with the specified key's value replaced with "*******".

"""
dbasunag marked this conversation as resolved.
Show resolved Hide resolved
result = copy.deepcopy(resource_dict)

benedict_resource_dict = benedict(result, keypath_separator=">")

if "[]" not in key_name:
if benedict_resource_dict.get(key_name):
benedict_resource_dict[key_name] = "*******"
return dict(benedict_resource_dict)

key_prefix, remaining_key = key_name.split("[]>", 1)
if not benedict_resource_dict.get(key_prefix):
return dict(benedict_resource_dict)

resource_data = benedict_resource_dict[key_prefix]
if not isinstance(resource_data, list):
return dict(benedict_resource_dict)

# End Exceptions classes
for index, element in enumerate(resource_data):
if isinstance(element, dict):
resource_data[index] = replace_key_with_hashed_value(resource_dict=element, key_name=remaining_key)

return dict(benedict_resource_dict)


class KubeAPIVersion(Version):
Expand Down Expand Up @@ -1173,21 +1242,18 @@ def keys_to_hash(self) -> List[str]:

Example:
given a dict: {"spec": {"data": <value_to_hash>}}
To hash spec['data'] key pass: ["spec..data"]
To hash spec['data'] key pass: ["spec>data"]
"""
return []

def hash_resource_dict(self, resource_dict: Dict[Any, Any]) -> Dict[Any, Any]:
if not isinstance(resource_dict, dict):
raise ValueError("Expected a dictionary as the first argument")
myakove marked this conversation as resolved.
Show resolved Hide resolved

if self.keys_to_hash and self.hash_log_data:
resource_dict = copy.deepcopy(resource_dict)
resource_dict = benedict(resource_dict, keypath_separator="..")

for key in self.keys_to_hash:
if key in resource_dict:
resource_dict[key] = "***"

return resource_dict

for key_name in self.keys_to_hash:
resource_dict = replace_key_with_hashed_value(resource_dict=resource_dict, key_name=key_name)
return resource_dict

def get_condition_message(self, condition_type: str, condition_status: str = "") -> str:
Expand Down
2 changes: 1 addition & 1 deletion ocp_resources/sealed_secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ def to_dict(self) -> None:

@property
def keys_to_hash(self):
return ["spec..data", "spec..encryptedData"]
return ["spec>data", "spec>encryptedData"]
4 changes: 4 additions & 0 deletions ocp_resources/virtual_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,7 @@ def wait_for_status_none(self, status, timeout=TIMEOUT_4MINUTES):
):
if sample is None:
return

@property
def keys_to_hash(self):
return ["spec>template>spec>volumes[]>cloudInitNoCloud>userData"]
88 changes: 88 additions & 0 deletions tests/test_unittests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import pytest
from benedict import benedict
from ocp_resources.resource import replace_key_with_hashed_value


@pytest.fixture()
def vm_spec():
return {
"spec": {
"template": {
"features": {"someNewFeature": {"someNewData": "sensitive information"}},
"spec": {
"volumes": [
{"name": "volume1", "userData": "sensitive-data"},
]
},
}
}
}


@pytest.mark.parametrize(
"key_name, expected_key",
[
pytest.param(
"spec>template>features>someNewFeature>someNewData",
"spec>template>features>someNewFeature>someNewData",
id="test_replace_key_with_hashed_value_replace_key",
),
pytest.param(
"spec>template>spec>volumes[]>userData",
"spec>template>spec>volumes[0]>userData",
id="test_replace_key_with_hashed_value_replace_key_in_list",
),
],
)
def test_replace_key_with_hashed_value(vm_spec, key_name, expected_key):
result = benedict(replace_key_with_hashed_value(resource_dict=vm_spec, key_name=key_name), keypath_separator=">")
assert result[expected_key] == "*******"
vm_spec_benedict = benedict(vm_spec, keypath_separator=">")
result_benedict = benedict(result, keypath_separator=">")
del vm_spec_benedict[expected_key]
del result_benedict[expected_key]
assert result_benedict == vm_spec_benedict


@pytest.mark.parametrize(
"resource, key_name, expected_result",
[
# Empty dictionary
pytest.param({}, "a>b", {}, id="test_replace_key_with_hashed_value_empty_dict"),
# Non-existent key
pytest.param(
{"x": {"y": "z"}}, "a>b", {"x": {"y": "z"}}, id="test_replace_key_with_hashed_value_key_doesnt_exist"
),
# Malformed key
pytest.param(
{"x": {"y": "z"}}, "x>y>", {"x": {"y": "z"}}, id="test_replace_key_with_hashed_value_malformed_key"
),
# empty key path
pytest.param({"x": {"y": "z"}}, "", {"x": {"y": "z"}}, id="test_replace_key_with_hashed_value_empty_key"),
],
)
def test_replace_key_with_hashed_value_edge_cases(resource, key_name, expected_result):
"""Test edge cases for replace_key_with_hashed_value function."""
assert replace_key_with_hashed_value(resource_dict=resource, key_name=key_name) == expected_result


def test_replace_key_with_hashed_value_multiple_occurances(vm_spec):
vm_spec["spec"]["template"]["spec"]["volumes"].append({"name": "volume2", "userData": "more sensitive-data"})
result = replace_key_with_hashed_value(resource_dict=vm_spec, key_name="spec>template>spec>volumes[]>userData")
for volume in result["spec"]["template"]["spec"]["volumes"]:
assert volume["userData"] == "*******"

dbasunag marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.parametrize(
"resource, key_name, exception_type",
[
pytest.param(None, "a>b", TypeError, id="test_replace_key_with_hashed_value_empty_dict_valid_key"),
pytest.param({}, None, TypeError, id="test_replace_key_with_hashed_value_empty_dict_no_key"),
pytest.param({}, 123, TypeError, id="test_replace_key_with_hashed_value_empty_dict_invalid_key"),
pytest.param("not_a_dict", "a>b", ValueError, id="test_replace_key_with_hashed_value_invalid_dict_valid_key"),
],
)
def test_replace_key_with_hashed_value_invalid_inputs(resource, key_name, exception_type):
"""Test that the function handles invalid inputs appropriately."""
with pytest.raises(exception_type):
replace_key_with_hashed_value(resource_dict=resource, key_name=key_name)
6 changes: 6 additions & 0 deletions tox.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ env_list = [
"class-generator",
"resource-tests",
"api-group-order",
"validate-unittests"
]

[env.resource-tests]
Expand Down Expand Up @@ -57,3 +58,8 @@ commands = [
description = "Run API group order tests"
deps = ["uv"]
commands = [["uv", "run", "pytest", "tests/test_api_group_order.py"]]

[env.unittests]
description = "Run unittests"
deps = ["uv"]
commands = [["uv", "run", "pytest", "tests/test_unittests.py", "-svv"]]