From 6f7a3bb2f975fea573d803eac6a134eb6e567255 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Thu, 8 Aug 2024 14:53:29 -0700 Subject: [PATCH] Bring back better findinmap resolution --- src/cfnlint/jsonschema/_resolvers_cfn.py | 163 +++++++++++++----- src/cfnlint/rules/functions/_BaseFn.py | 23 ++- .../module/jsonschema/test_resolvers_cfn.py | 20 ++- 3 files changed, 146 insertions(+), 60 deletions(-) diff --git a/src/cfnlint/jsonschema/_resolvers_cfn.py b/src/cfnlint/jsonschema/_resolvers_cfn.py index 9f890b5a20..7224b800a8 100644 --- a/src/cfnlint/jsonschema/_resolvers_cfn.py +++ b/src/cfnlint/jsonschema/_resolvers_cfn.py @@ -14,6 +14,7 @@ from cfnlint.helpers import AVAILABILITY_ZONES, REGEX_SUB_PARAMETERS from cfnlint.jsonschema import ValidationError, Validator from cfnlint.jsonschema._typing import ResolutionResult +from cfnlint.jsonschema._utils import equal def unresolvable(validator: Validator, instance: Any) -> ResolutionResult: @@ -39,11 +40,12 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult: if len(instance) not in [3, 4]: return - default_value_found = None + default_value_found = False if len(instance) == 4: options = instance[3] if validator.is_type(options, "object"): if "DefaultValue" in options: + default_value_found = True for value, v, _ in validator.resolve_value(options["DefaultValue"]): yield value, v.evolve( context=v.context.evolve( @@ -52,7 +54,6 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult: ) ), ), None - default_value_found = True if not default_value_found and not validator.context.mappings.maps: if validator.context.mappings.is_transform: @@ -65,54 +66,126 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult: path=deque([0]), ) - if ( - validator.is_type(instance[0], "string") - and ( - validator.is_type(instance[1], "string") - or validator.is_type(instance[1], "integer") - ) - and validator.is_type(instance[2], "string") - ): - map = validator.context.mappings.maps.get(instance[0]) - if map is None: - if not default_value_found: - yield None, validator, ValidationError( - ( - f"{instance[0]!r} is not one of " - f"{list(validator.context.mappings.maps.keys())!r}" - ), - path=deque([0]), - ) - return + mappings = list(validator.context.mappings.maps.keys()) + results = [] + found_valid_combination = False + for map_name, map_v, _ in validator.resolve_value(instance[0]): + if not validator.is_type(map_name, "string"): + continue - top_key = map.keys.get(instance[1]) - if top_key is None: - if map.is_transform: - return + if all(not (equal(map_name, each)) for each in mappings): if not default_value_found: - yield None, validator, ValidationError( + results.append( ( - f"{instance[1]!r} is not one of " - f"{list(map.keys.keys())!r} for " - f"mapping {instance[0]!r}" - ), - path=deque([1]), + None, + map_v, + ValidationError( + f"{map_name!r} is not one of {mappings!r}", + path=deque([0]), + ), + ) ) - return + continue - value = top_key.keys.get(instance[2]) - if value is None: - if top_key.is_transform: - return - if not default_value_found: - yield value, validator, ValidationError( - ( - f"{instance[2]!r} is not one of " - f"{list(top_key.keys.keys())!r} for mapping " - f"{instance[0]!r} and key {instance[1]!r}" - ), - path=deque([2]), - ) + if validator.context.mappings.maps[map_name].is_transform: + continue + + for top_level_key, top_v, _ in validator.resolve_value(instance[1]): + if validator.is_type(top_level_key, "integer"): + top_level_key = str(top_level_key) + if not validator.is_type(top_level_key, "string"): + continue + + top_level_keys = list(validator.context.mappings.maps[map_name].keys.keys()) + if all(not (equal(top_level_key, each)) for each in top_level_keys): + if not default_value_found: + results.append( + ( + None, + top_v, + ValidationError( + ( + f"{top_level_key!r} is not one of " + f"{top_level_keys!r} for mapping " + f"{map_name!r}" + ), + path=deque([1]), + ), + ) + ) + continue + + if ( + not top_level_key + or validator.context.mappings.maps[map_name] + .keys[top_level_key] + .is_transform + ): + continue + + for second_level_key, second_v, err in validator.resolve_value(instance[2]): + if validator.is_type(second_level_key, "integer"): + second_level_key = str(second_level_key) + if not validator.is_type(second_level_key, "string"): + continue + try: + + second_level_keys = list( + validator.context.mappings.maps[map_name] + .keys[top_level_key] + .keys.keys() + ) + if all( + not (equal(second_level_key, each)) + for each in second_level_keys + ): + if not default_value_found: + results.append( + ( + None, + second_v, + ValidationError( + ( + f"{second_level_key!r} is not " + f"one of {second_level_keys!r} " + f"for mapping {map_name!r} and " + f"key {top_level_key!r}" + ), + path=deque([2]), + ), + ) + ) + continue + + found_valid_combination = True + + for value in validator.context.mappings.maps[map_name].find_in_map( + top_level_key, + second_level_key, + ): + yield ( + value, + validator.evolve( + context=validator.context.evolve( + path=validator.context.path.evolve( + value_path=deque( + [ + "Mappings", + map_name, + top_level_key, + second_level_key, + ] + ) + ) + ) + ), + None, + ) + except KeyError: + pass + + if not found_valid_combination: + yield from iter(results) def get_azs(validator: Validator, instance: Any) -> ResolutionResult: diff --git a/src/cfnlint/rules/functions/_BaseFn.py b/src/cfnlint/rules/functions/_BaseFn.py index b2edd69fc3..b01ff2d18e 100644 --- a/src/cfnlint/rules/functions/_BaseFn.py +++ b/src/cfnlint/rules/functions/_BaseFn.py @@ -56,6 +56,18 @@ def validator(self, validator: Validator) -> Validator: ), ) + def _clean_resolve_errors( + self, err: ValidationError, value: Any, instance: Any + ) -> ValidationError: + err.message = err.message.replace(f"{value!r}", f"{instance!r}") + err.message = f"{err.message} when {self.fn.name!r} is resolved" + if self.child_rules[self.resolved_rule]: + err.rule = self.child_rules[self.resolved_rule] + for i, err_ctx in enumerate(err.context): + err.context[i] = self._clean_resolve_errors(err_ctx, value, instance) + return err + return err + def resolve( self, validator: Validator, @@ -92,14 +104,9 @@ def resolve( return for err in errs: - err.message = err.message.replace(f"{value!r}", f"{instance!r}") - err.message = f"{err.message} when {self.fn.name!r} is resolved" - all_errs.append(err) - - for err in all_errs: - if self.child_rules[self.resolved_rule]: - err.rule = self.child_rules[self.resolved_rule] - yield err + all_errs.append(self._clean_resolve_errors(err, value, instance)) + + yield from iter(all_errs) def _resolve_ref(self, validator, schema) -> Any: diff --git a/test/unit/module/jsonschema/test_resolvers_cfn.py b/test/unit/module/jsonschema/test_resolvers_cfn.py index dd647c8e37..278d551dd9 100644 --- a/test/unit/module/jsonschema/test_resolvers_cfn.py +++ b/test/unit/module/jsonschema/test_resolvers_cfn.py @@ -18,10 +18,16 @@ def _resolve(name, instance, expected_results, **kwargs): resolutions = list(validator.resolve_value(instance)) + assert len(resolutions) == len( + expected_results + ), f"{name!r} got {len(resolutions)!r}" + for i, (instance, v, errors) in enumerate(resolutions): - assert instance == expected_results[i][0] - assert v.context.path.value_path == expected_results[i][1] - assert errors == expected_results[i][2] + assert instance == expected_results[i][0], f"{name!r} got {instance!r}" + assert ( + v.context.path.value_path == expected_results[i][1] + ), f"{name!r} got {v.context.path.value_path!r}" + assert errors == expected_results[i][2], f"{name!r} got {errors!r}" @pytest.mark.parametrize( @@ -153,7 +159,7 @@ def test_resolvers_ref(name, instance, response): ), ( "Invalid FindInMap with an invalid type for third element", - {"Fn::FindInMap": ["foo", "bar", ["value"]]}, + {"Fn::FindInMap": ["foo", "first", ["value"]]}, [], ), ( @@ -218,16 +224,16 @@ def test_invalid_functions(name, instance, response): ], ), ( - "Valid FindInMap with a default value", + "Valid FindInMap with bad keys and a default value", {"Fn::FindInMap": ["foo", "bar", "value", {"DefaultValue": "default"}]}, [("default", deque([4, "DefaultValue"]), None)], ), ( - "Valid FindInMap with a default value", + "Valid FindInMap with valid keys and a default value", {"Fn::FindInMap": ["foo", "first", "second", {"DefaultValue": "default"}]}, [ ("default", deque([4, "DefaultValue"]), None), - ("bar", deque([2]), None), + ("bar", deque(["Mappings", "foo", "first", "second"]), None), ], ), (