Skip to content

Commit

Permalink
Bring back better findinmap resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong committed Aug 8, 2024
1 parent ee87a1c commit 1cbb5c5
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 60 deletions.
163 changes: 118 additions & 45 deletions src/cfnlint/jsonschema/_resolvers_cfn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cfnlint.helpers import AVAILABILITY_ZONES, REGEX_SUB_PARAMETERS
from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.jsonschema._typing import ResolutionResult
from cfnlint.jsonschema._utils import equal


def unresolvable(validator: Validator, instance: Any) -> ResolutionResult:
Expand All @@ -39,11 +40,12 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult:
if len(instance) not in [3, 4]:
return

default_value_found = None
default_value_found = False
if len(instance) == 4:
options = instance[3]
if validator.is_type(options, "object"):
if "DefaultValue" in options:
default_value_found = True
for value, v, _ in validator.resolve_value(options["DefaultValue"]):
yield value, v.evolve(
context=v.context.evolve(
Expand All @@ -52,7 +54,6 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult:
)
),
), None
default_value_found = True

if not default_value_found and not validator.context.mappings.maps:
if validator.context.mappings.is_transform:
Expand All @@ -65,54 +66,126 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult:
path=deque([0]),
)

if (
validator.is_type(instance[0], "string")
and (
validator.is_type(instance[1], "string")
or validator.is_type(instance[1], "integer")
)
and validator.is_type(instance[2], "string")
):
map = validator.context.mappings.maps.get(instance[0])
if map is None:
if not default_value_found:
yield None, validator, ValidationError(
(
f"{instance[0]!r} is not one of "
f"{list(validator.context.mappings.maps.keys())!r}"
),
path=deque([0]),
)
return
mappings = list(validator.context.mappings.maps.keys())
results = []
found_valid_combination = False
for map_name, map_v, _ in validator.resolve_value(instance[0]):
if not validator.is_type(map_name, "string"):
continue

top_key = map.keys.get(instance[1])
if top_key is None:
if map.is_transform:
return
if all(not (equal(map_name, each)) for each in mappings):
if not default_value_found:
yield None, validator, ValidationError(
results.append(
(
f"{instance[1]!r} is not one of "
f"{list(map.keys.keys())!r} for "
f"mapping {instance[0]!r}"
),
path=deque([1]),
None,
map_v,
ValidationError(
f"{map_name!r} is not one of {mappings!r}",
path=deque([0]),
),
)
)
return
continue

value = top_key.keys.get(instance[2])
if value is None:
if top_key.is_transform:
return
if not default_value_found:
yield value, validator, ValidationError(
(
f"{instance[2]!r} is not one of "
f"{list(top_key.keys.keys())!r} for mapping "
f"{instance[0]!r} and key {instance[1]!r}"
),
path=deque([2]),
)
if validator.context.mappings.maps[map_name].is_transform:
continue

for top_level_key, top_v, _ in validator.resolve_value(instance[1]):
if validator.is_type(top_level_key, "integer"):
top_level_key = str(top_level_key)
if not validator.is_type(top_level_key, "string"):
continue

top_level_keys = list(validator.context.mappings.maps[map_name].keys.keys())
if all(not (equal(top_level_key, each)) for each in top_level_keys):
if not default_value_found:
results.append(
(
None,
top_v,
ValidationError(
(
f"{top_level_key!r} is not one of "
f"{top_level_keys!r} for mapping "
f"{map_name!r}"
),
path=deque([1]),
),
)
)
continue

if (
not top_level_key
or validator.context.mappings.maps[map_name]
.keys[top_level_key]
.is_transform
):
continue

for second_level_key, second_v, err in validator.resolve_value(instance[2]):
if validator.is_type(second_level_key, "integer"):
second_level_key = str(second_level_key)
if not validator.is_type(second_level_key, "string"):
continue
try:

second_level_keys = list(
validator.context.mappings.maps[map_name]
.keys[top_level_key]
.keys.keys()
)
if all(
not (equal(second_level_key, each))
for each in second_level_keys
):
if not default_value_found:
results.append(
(
None,
second_v,
ValidationError(
(
f"{second_level_key!r} is not "
f"one of {second_level_keys!r} "
f"for mapping {map_name!r} and "
f"key {top_level_key!r}"
),
path=deque([2]),
),
)
)
continue

found_valid_combination = True

for value in validator.context.mappings.maps[map_name].find_in_map(
top_level_key,
second_level_key,
):
yield (
value,
validator.evolve(
context=validator.context.evolve(
path=validator.context.path.evolve(
value_path=deque(
[
"Mappings",
map_name,
top_level_key,
second_level_key,
]
)
)
)
),
None,
)
except KeyError:
pass

if not found_valid_combination:
yield from iter(results)


def get_azs(validator: Validator, instance: Any) -> ResolutionResult:
Expand Down
23 changes: 15 additions & 8 deletions src/cfnlint/rules/functions/_BaseFn.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ def validator(self, validator: Validator) -> Validator:
),
)

def _clean_resolve_errors(
self, err: ValidationError, value: Any, instance: Any
) -> ValidationError:
err.message = err.message.replace(f"{value!r}", f"{instance!r}")
err.message = f"{err.message} when {self.fn.name!r} is resolved"
if self.child_rules[self.resolved_rule]:
err.rule = self.child_rules[self.resolved_rule]
for i, err_ctx in enumerate(err.context):
err.context[i] = self._clean_resolve_errors(err_ctx, value, instance)
return err
return err

def resolve(
self,
validator: Validator,
Expand Down Expand Up @@ -92,14 +104,9 @@ def resolve(
return

for err in errs:
err.message = err.message.replace(f"{value!r}", f"{instance!r}")
err.message = f"{err.message} when {self.fn.name!r} is resolved"
all_errs.append(err)

for err in all_errs:
if self.child_rules[self.resolved_rule]:
err.rule = self.child_rules[self.resolved_rule]
yield err
all_errs.append(self._clean_resolve_errors(err, value, instance))

yield from iter(all_errs)

def _resolve_ref(self, validator, schema) -> Any:

Expand Down
20 changes: 13 additions & 7 deletions test/unit/module/jsonschema/test_resolvers_cfn.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ def _resolve(name, instance, expected_results, **kwargs):

resolutions = list(validator.resolve_value(instance))

assert len(resolutions) == len(
expected_results
), f"{name!r} got {len(resolutions)!r}"

for i, (instance, v, errors) in enumerate(resolutions):
assert instance == expected_results[i][0]
assert v.context.path.value_path == expected_results[i][1]
assert errors == expected_results[i][2]
assert instance == expected_results[i][0], f"{name!r} got {instance!r}"
assert (
v.context.path.value_path == expected_results[i][1]
), f"{name!r} got {v.context.path.value_path!r}"
assert errors == expected_results[i][2], f"{name!r} got {errors!r}"


@pytest.mark.parametrize(
Expand Down Expand Up @@ -153,7 +159,7 @@ def test_resolvers_ref(name, instance, response):
),
(
"Invalid FindInMap with an invalid type for third element",
{"Fn::FindInMap": ["foo", "bar", ["value"]]},
{"Fn::FindInMap": ["foo", "first", ["value"]]},
[],
),
(
Expand Down Expand Up @@ -218,16 +224,16 @@ def test_invalid_functions(name, instance, response):
],
),
(
"Valid FindInMap with a default value",
"Valid FindInMap with bad keys and a default value",
{"Fn::FindInMap": ["foo", "bar", "value", {"DefaultValue": "default"}]},
[("default", deque([4, "DefaultValue"]), None)],
),
(
"Valid FindInMap with a default value",
"Valid FindInMap with valid keys and a default value",
{"Fn::FindInMap": ["foo", "first", "second", {"DefaultValue": "default"}]},
[
("default", deque([4, "DefaultValue"]), None),
("bar", deque([2]), None),
("bar", deque(["Mappings", "foo", "first", "second"]), None),
],
),
(
Expand Down

0 comments on commit 1cbb5c5

Please sign in to comment.