From 2d31429e4bd110363923aaf9fd669a43054f9518 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Mon, 22 Jul 2024 07:27:37 -0700 Subject: [PATCH] Switch to helper functions --- src/cfnlint/conditions/conditions.py | 70 +++-- src/cfnlint/rules/__init__.py | 1 - src/cfnlint/rules/helpers/__init__.py | 12 + .../rules/helpers/get_resource_by_name.py | 51 ++++ .../rules/helpers/get_value_from_path.py | 121 +++++++++ .../rules/jsonschema/CfnLintRelationship.py | 166 ------------ src/cfnlint/rules/jsonschema/__init__.py | 2 - .../rules/resources/PrimaryIdentifiers.py | 2 +- .../resources/ecs/ServiceDynamicPorts.py | 149 ++++++++++ .../rules/resources/ectwo/InstanceImageId.py | 109 ++++++-- src/cfnlint/template/template.py | 2 +- .../unit/module/conditions/test_conditions.py | 12 +- test/unit/rules/helpers/__init__.py | 0 .../helpers/test_get_resource_by_name.py | 169 ++++++++++++ .../rules/helpers/test_get_value_from_path.py | 194 +++++++++++++ .../jsonschema/test_cfn_lint_relationship.py | 255 ------------------ .../test_cfn_lint_relationship_bad_path.py | 37 --- .../test_cfn_lint_relationship_list.py | 78 ------ .../resources/ec2/test_instance_image_id.py | 212 ++++++++++----- 19 files changed, 990 insertions(+), 652 deletions(-) create mode 100644 src/cfnlint/rules/helpers/__init__.py create mode 100644 src/cfnlint/rules/helpers/get_resource_by_name.py create mode 100644 src/cfnlint/rules/helpers/get_value_from_path.py delete mode 100644 src/cfnlint/rules/jsonschema/CfnLintRelationship.py create mode 100644 src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py create mode 100644 test/unit/rules/helpers/__init__.py create mode 100644 test/unit/rules/helpers/test_get_resource_by_name.py create mode 100644 test/unit/rules/helpers/test_get_value_from_path.py delete mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship.py delete mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py delete mode 100644 test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py diff --git a/src/cfnlint/conditions/conditions.py b/src/cfnlint/conditions/conditions.py index 7dfe25f138..cbba5bfed2 100644 --- a/src/cfnlint/conditions/conditions.py +++ b/src/cfnlint/conditions/conditions.py @@ -240,7 +240,23 @@ def build_scenarios( # formatting or just the wrong condition name return - def implies(self, scenarios: dict[str, bool], implies: str) -> bool: + def _build_cfn_implies(self, scenarios) -> And: + conditions = [] + for condition_name, opt in scenarios.items(): + if opt: + conditions.append( + self._conditions[condition_name].build_true_cnf(self._solver_params) + ) + else: + conditions.append( + self._conditions[condition_name].build_false_cnf( + self._solver_params + ) + ) + + return And(*conditions) + + def can_imply(self, scenarios: dict[str, bool], implies: str) -> bool: """Based on a bunch of scenario conditions and their Truth/False value determine if implies condition is True any time the scenarios are satisfied solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies]) @@ -260,23 +276,45 @@ def implies(self, scenarios: dict[str, bool], implies: str) -> bool: if not scenarios.get(implies, True): return False - conditions = [] - for condition_name, opt in scenarios.items(): - if opt: - conditions.append( - self._conditions[condition_name].build_true_cnf( - self._solver_params - ) - ) - else: - conditions.append( - self._conditions[condition_name].build_false_cnf( - self._solver_params - ) - ) + and_condition = self._build_cfn_implies(scenarios) + cnf.add_prop(and_condition) + implies_condition = self._conditions[implies].build_true_cnf( + self._solver_params + ) + cnf.add_prop(Implies(and_condition, implies_condition)) - and_condition = And(*conditions) + results = satisfiable(cnf) + if results: + return True + + return False + except KeyError: + # KeyError is because the listed condition doesn't exist because of bad + # formatting or just the wrong condition name + return True + + def has_to_imply(self, scenarios: dict[str, bool], implies: str) -> bool: + """Based on a bunch of scenario conditions and their Truth/False value + determine if implies condition is True any time the scenarios are satisfied + solver, solver_params = self._build_solver(list(scenarios.keys()) + [implies]) + + Args: + scenarios (dict[str, bool]): A list of condition names + and if they are True or False implies: the condition name that + we are implying will also be True + + Returns: + bool: if the implied condition will be True if the scenario is True + """ + try: + cnf = self._cnf.copy() + # if the implies condition has to be false in the scenarios we + # know it can never be true + if not scenarios.get(implies, True): + return False + and_condition = self._build_cfn_implies(scenarios) + cnf.add_prop(and_condition) implies_condition = self._conditions[implies].build_true_cnf( self._solver_params ) diff --git a/src/cfnlint/rules/__init__.py b/src/cfnlint/rules/__init__.py index 5eb87f653c..976c6058c8 100644 --- a/src/cfnlint/rules/__init__.py +++ b/src/cfnlint/rules/__init__.py @@ -8,5 +8,4 @@ from cfnlint.rules.jsonschema import CfnLintJsonSchema # type: ignore from cfnlint.rules.jsonschema import CfnLintJsonSchemaRegional # type: ignore from cfnlint.rules.jsonschema import CfnLintKeyword # type: ignore -from cfnlint.rules.jsonschema import CfnLintRelationship # type: ignore from cfnlint.rules.jsonschema import SchemaDetails diff --git a/src/cfnlint/rules/helpers/__init__.py b/src/cfnlint/rules/helpers/__init__.py new file mode 100644 index 0000000000..44d8de325f --- /dev/null +++ b/src/cfnlint/rules/helpers/__init__.py @@ -0,0 +1,12 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from cfnlint.rules.helpers.get_resource_by_name import get_resource_by_name +from cfnlint.rules.helpers.get_value_from_path import get_value_from_path + +__all__ = [ + "get_value_from_path", + "get_resource_by_name", +] diff --git a/src/cfnlint/rules/helpers/get_resource_by_name.py b/src/cfnlint/rules/helpers/get_resource_by_name.py new file mode 100644 index 0000000000..3f13415314 --- /dev/null +++ b/src/cfnlint/rules/helpers/get_resource_by_name.py @@ -0,0 +1,51 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Sequence + +from cfnlint.context import Path +from cfnlint.jsonschema import Validator + + +def get_resource_by_name( + validator: Validator, name: str, types: Sequence[str] | None = None +) -> tuple[Any, Validator]: + + resource = validator.cfn.template.get("Resources", {}).get(name, {}) + if not resource or not isinstance(resource, dict): + return None, validator + + if types and resource.get("Type") not in types: + return None, validator + + condition = resource.get("Condition", None) + if condition: + if not validator.cfn.conditions.can_imply( + validator.context.conditions.status, condition + ): + return None, validator + validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + { + condition: True, + } + ), + ) + ) + + validator = validator.evolve( + context=validator.context.evolve( + path=Path( + path=deque(["Resources", name]), + cfn_path=deque(["Resources", resource.get("Type")]), + ) + ) + ) + + return resource, validator diff --git a/src/cfnlint/rules/helpers/get_value_from_path.py b/src/cfnlint/rules/helpers/get_value_from_path.py new file mode 100644 index 0000000000..36911a7c12 --- /dev/null +++ b/src/cfnlint/rules/helpers/get_value_from_path.py @@ -0,0 +1,121 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator + +from cfnlint.helpers import is_function +from cfnlint.jsonschema import Validator + + +def _get_relationship_fn_if( + validator: Validator, key: Any, value: Any, path: deque[str | int] +) -> Iterator[tuple[Any, Validator]]: + if not isinstance(value, list) or len(value) != 3: + return + condition = value[0] + # True path + status = {k: set([v]) for k, v in validator.context.conditions.status.items()} + if condition not in status: + status[condition] = set([True, False]) + for scenario in validator.cfn.conditions.build_scenarios(status): + if_validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=scenario, + ) + ) + ) + + i = 1 if scenario[condition] else 2 + for r, v in get_value_from_path( + validator.evolve( + context=if_validator.context.evolve( + path=if_validator.context.path.descend(path=key).descend(path=i) + ) + ), + value[i], + path.copy(), + ): + yield r, v + + +def _get_value_from_path_list( + validator: Validator, instance: Any, path: deque[str | int] +) -> Iterator[tuple[Any, Validator]]: + for i, v in enumerate(instance): + for r, v in get_value_from_path( + validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=i) + ), + ), + v, + path.copy(), + ): + yield r, v + + +def get_value_from_path( + validator: Validator, instance: Any, path: deque[str | int] +) -> Iterator[tuple[Any, Validator]]: + """ + Retrieve a value from a nested dictionary or list using a path. + + Args: + validator (Validator): The validator instance + data (Any): The dictionary or list to search. + path (deque[str | int]): The path to the value. + + Returns: + The value at the specified path, or None if the key doesn't exist. + + Examples: + >>> data = {'a': {'b': {'c': 3}}} + >>> get_value_from_path(data, ['a', 'b', 'c']) + 3 + """ + + fn_k, fn_v = is_function(instance) + if fn_k is not None: + if fn_k == "Fn::If": + yield from _get_relationship_fn_if(validator, fn_k, fn_v, path) + elif fn_k == "Ref" and fn_v == "AWS::NoValue": + yield None, validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=fn_k) + ) + ) + elif not path: + yield instance, validator + return + + if not path: + yield instance, validator + return + + key = path.popleft() + if isinstance(instance, list) and key == "*": + yield from _get_value_from_path_list(validator, instance, path) + return + + if not isinstance(instance, dict): + yield None, validator + return + + for r, v in get_value_from_path( + validator.evolve( + context=validator.context.evolve( + path=validator.context.path.descend(path=key) + ) + ), + instance.get(key), + path.copy(), + ): + yield r, v + + return diff --git a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py b/src/cfnlint/rules/jsonschema/CfnLintRelationship.py deleted file mode 100644 index ee28ad0887..0000000000 --- a/src/cfnlint/rules/jsonschema/CfnLintRelationship.py +++ /dev/null @@ -1,166 +0,0 @@ -""" -Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -from collections import deque -from typing import Any, Iterator, Sequence - -from cfnlint.helpers import is_function -from cfnlint.jsonschema import ValidationResult, Validator -from cfnlint.rules import CloudFormationLintRule - - -class CfnLintRelationship(CloudFormationLintRule): - def __init__( - self, keywords: Sequence[str] | None = None, relationship: str | None = None - ) -> None: - super().__init__() - self.keywords = keywords or [] - self.relationship = relationship or "" - self.parent_rules = ["E1101"] - - def _get_relationship_fn_if( - self, validator: Validator, key: Any, value: Any, path: deque[str | int] - ) -> Iterator[tuple[Any, Validator]]: - if not isinstance(value, list) or len(value) != 3: - return - condition = value[0] - # True path - status = {k: set([v]) for k, v in validator.context.conditions.status.items()} - if condition not in status: - status[condition] = set([True, False]) - for scenario in validator.cfn.conditions.build_scenarios(status): - if_validator = validator.evolve( - context=validator.context.evolve( - conditions=validator.context.conditions.evolve( - status=scenario, - ) - ) - ) - - i = 1 if scenario[condition] else 2 - for r, v in self._get_relationship( - validator.evolve( - context=if_validator.context.evolve( - path=if_validator.context.path.descend(path=key).descend(path=i) - ) - ), - value[i], - path.copy(), - ): - yield r, v - - def _get_relationship_list( - self, validator: Validator, key: Any, instance: Any, path: deque[str | int] - ) -> Iterator[tuple[Any, Validator]]: - if key == "*": - for i, v in enumerate(instance): - for r, v in self._get_relationship( - validator.evolve( - context=validator.context.evolve( - path=validator.context.path.descend(path=i) - ), - ), - v, - path.copy(), - ): - yield r, v - - def _get_relationship( - self, validator: Validator, instance: Any, path: deque[str | int] - ) -> Iterator[tuple[Any, Validator]]: - fn_k, fn_v = is_function(instance) - if fn_k == "Fn::If": - yield from self._get_relationship_fn_if(validator, fn_k, fn_v, path) - return - - if fn_k == "Ref" and fn_v == "AWS::NoValue": - yield None, validator.evolve( - context=validator.context.evolve( - path=validator.context.path.descend(path=fn_k) - ) - ) - return - - if not path: - yield instance, validator - return - - key = path.popleft() - if isinstance(instance, list): - yield from self._get_relationship_list(validator, key, instance, path) - return - - if not isinstance(instance, dict): - return - - for r, v in self._get_relationship( - validator.evolve( - context=validator.context.evolve( - path=validator.context.path.descend(path=key) - ) - ), - instance.get(key), - path.copy(), - ): - yield r, v - - def _validate_get_relationship_parameters(self, validator: Validator) -> bool: - if len(validator.context.path.path) < 2: - return False - - if "Resources" != validator.context.path.path[0]: - return False - - if not validator.cfn.graph: - return False - - if len(self.relationship.split("/")) < 2: - return False - - return True - - def get_relationship(self, validator: Validator) -> Iterator[tuple[Any, Validator]]: - if not self._validate_get_relationship_parameters(validator): - return - - resource_name = validator.context.path.path[1] - path = self.relationship.split("/") - - # get related resources by ref/getatt to the source - # and relationship types - for edge in validator.cfn.graph.graph.out_edges(resource_name): # type: ignore - other_resource = validator.cfn.template.get(path[0], {}).get(edge[1], {}) - - if other_resource.get("Type") != path[1]: - continue - - condition = other_resource.get("Condition", None) - if condition: - if not validator.cfn.conditions.implies( - validator.context.conditions.status, condition - ): - continue - validator = validator.evolve( - context=validator.context.evolve( - conditions=validator.context.conditions.evolve( - { - condition: True, - } - ) - ) - ) - # validate reosurce type - - for r, v in self._get_relationship( - validator, other_resource, deque(path[2:]) - ): - yield r, v.evolve() - - def validate( - self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] - ) -> ValidationResult: - raise NotImplementedError("validate not implemented") diff --git a/src/cfnlint/rules/jsonschema/__init__.py b/src/cfnlint/rules/jsonschema/__init__.py index c63aae1159..60f460db6f 100644 --- a/src/cfnlint/rules/jsonschema/__init__.py +++ b/src/cfnlint/rules/jsonschema/__init__.py @@ -7,7 +7,6 @@ from cfnlint.rules.jsonschema.CfnLintJsonSchema import CfnLintJsonSchema, SchemaDetails from cfnlint.rules.jsonschema.CfnLintJsonSchemaRegional import CfnLintJsonSchemaRegional from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword -from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship from cfnlint.rules.jsonschema.MaxProperties import MaxProperties from cfnlint.rules.jsonschema.PropertyNames import PropertyNames @@ -16,7 +15,6 @@ "CfnLintJsonSchema", "CfnLintJsonSchemaRegional", "CfnLintKeyword", - "CfnLintRelationship", "MaxProperties", "PropertyNames", "SchemaDetails", diff --git a/src/cfnlint/rules/resources/PrimaryIdentifiers.py b/src/cfnlint/rules/resources/PrimaryIdentifiers.py index fa0daef3fa..493dc09822 100644 --- a/src/cfnlint/rules/resources/PrimaryIdentifiers.py +++ b/src/cfnlint/rules/resources/PrimaryIdentifiers.py @@ -63,7 +63,7 @@ def _validate_resource_type_uniqueness(self, cfn, resource_type, ids): # and add it to the scenario as needed if condition: if scenario: - if not cfn.conditions.implies(scenario, condition): + if not cfn.conditions.has_to_imply(scenario, condition): continue conditions[condition] = set([True]) diff --git a/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py b/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py new file mode 100644 index 0000000000..39961e0a5a --- /dev/null +++ b/src/cfnlint/rules/resources/ecs/ServiceDynamicPorts.py @@ -0,0 +1,149 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator + +from cfnlint.helpers import ensure_list, is_function +from cfnlint.jsonschema import ValidationError, ValidationResult +from cfnlint.jsonschema.protocols import Validator +from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path +from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword + + +class ServiceDynamicPorts(CfnLintKeyword): + id = "E3049" + shortdesc = ( + "Validate ECS tasks with dynamic host port have traffic-port ELB target groups" + ) + description = ( + "When using an ECS task definition of host port 0 and " + "associating that container to an ELB the target group " + "has to have a 'HealthCheckPort' of 'traffic-port'" + ) + tags = ["resources"] + + def __init__(self) -> None: + super().__init__( + keywords=["Resources/AWS::ECS::Service/Properties"], + ) + + def _get_service_lb_containers( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[str, str, Validator]]: + + for load_balancer, load_balancer_validator in get_value_from_path( + validator, + instance, + path=deque(["LoadBalancers", "*"]), + ): + for name, name_validator in get_value_from_path( + load_balancer_validator, + load_balancer, + path=deque(["ContainerName"]), + ): + if name is None: + continue + for port, port_validator in get_value_from_path( + name_validator, + load_balancer, + path=deque(["ContainerPort"]), + ): + if port is None: + continue + yield name, port, port_validator + + def _get_task_definition_resource_name( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[str, Validator]]: + for task_definition_id, task_definition_validator in get_value_from_path( + validator, instance, deque(["TaskDefinition"]) + ): + fn_k, fn_v = is_function(task_definition_id) + if fn_k == "Ref": + if validator.is_type(fn_v, "string"): + if fn_v in validator.context.resources: + yield fn_v, task_definition_validator + elif fn_k != "Fn::GetAtt": + continue + yield ensure_list(fn_v)[0].split(".")[0], task_definition_validator + + def _get_task_containers( + self, validator: Validator, resource_name: str, container_name: str + ) -> Iterator[tuple[Any, Validator]]: + task_def, task_def_validator = get_resource_by_name( + validator, resource_name, ["AWS::ECS::TaskDefinition"] + ) + + if not task_def: + return + + for container, container_validator in get_value_from_path( + task_def_validator, + task_def, + path=deque(["Properties", "ContainerDefinitions", "*"]), + ): + for name, name_validator in get_value_from_path( + container_validator, + container, + path=deque(["Name"]), + ): + if not isinstance(name, str): + continue + if name == container_name: + yield container, name_validator + + def _filter_port_mappings( + self, validator: Validator, instance: Any, port: Any + ) -> Iterator[tuple[Any, Validator]]: + for port_mapping, port_mapping_validator in get_value_from_path( + validator, + instance, + path=deque(["PortMappings", "*"]), + ): + for container_port, container_port_validator in get_value_from_path( + port_mapping_validator, + port_mapping, + path=deque(["ContainerPort"]), + ): + if not isinstance(container_port, (str, int)): + continue + if str(port) == str(container_port): + for host_port, host_part_validator in get_value_from_path( + container_port_validator, + port_mapping, + path=deque(["HostPort"]), + ): + yield host_port, host_part_validator + + def validate( + self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] + ) -> ValidationResult: + + for ( + task_definition_resource_name, + task_definition_resource_name_validator, + ) in self._get_task_definition_resource_name(validator, instance): + + for name, port, lb_validator in self._get_service_lb_containers( + task_definition_resource_name_validator, instance + ): + for container, container_validator in self._get_task_containers( + lb_validator, task_definition_resource_name, name + ): + for host_port, host_port_validator in self._filter_port_mappings( + container_validator, container, port + ): + if host_port == 0: + yield ValidationError( + "When using an ECS task definition of host port 0 and " + "associating that container to an ELB the target group " + "has to have a 'HealthCheckPort' of 'traffic-port'", + ) + + return + yield diff --git a/src/cfnlint/rules/resources/ectwo/InstanceImageId.py b/src/cfnlint/rules/resources/ectwo/InstanceImageId.py index 24a976ab63..7219d3c10b 100644 --- a/src/cfnlint/rules/resources/ectwo/InstanceImageId.py +++ b/src/cfnlint/rules/resources/ectwo/InstanceImageId.py @@ -6,13 +6,15 @@ from __future__ import annotations from collections import deque -from typing import Any +from typing import Any, Iterator +from cfnlint.helpers import ensure_list, is_function from cfnlint.jsonschema import ValidationError, ValidationResult, Validator -from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship +from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path +from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword -class InstanceImageId(CfnLintRelationship): +class InstanceImageId(CfnLintKeyword): id = "E3673" shortdesc = "Validate if an ImageId is required" description = ( @@ -27,34 +29,99 @@ def __init__(self) -> None: keywords=[ "Resources/AWS::EC2::Instance/Properties", ], - relationship="Resources/AWS::EC2::LaunchTemplate/Properties/LaunchTemplateData/ImageId", ) + def _get_related_launch_template( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[Any, Validator]]: + + for launch_template, launch_template_validator in get_value_from_path( + validator, instance, deque(["LaunchTemplate"]) + ): + if not launch_template: + continue + for id, id_validator in get_value_from_path( + launch_template_validator, launch_template, deque(["LaunchTemplateId"]) + ): + if id is None: + for name, name_validator in get_value_from_path( + id_validator, + launch_template, + deque(["LaunchTemplateName"]), + ): + yield name, name_validator + yield id, id_validator + def validate( self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] ) -> ValidationResult: - for source_props, source_validator in self._get_relationship( - validator, instance, path=deque(["ImageId"]) + for ( + instance_image_id, + instance_image_id_validator, + ) in get_value_from_path( + validator, + instance, + path=deque(["ImageId"]), ): - relationships = list(self.get_relationship(source_validator)) - if not relationships and not source_props: + if instance_image_id: + continue + + launch_templates = list( + self._get_related_launch_template(instance_image_id_validator, instance) + ) + + if not launch_templates: yield ValidationError( "'ImageId' is a required property", - path_override=( - source_validator.context.path.path - if len(source_validator.context.path.path) > 4 - else deque([]) - ), + path_override=instance_image_id_validator.context.path.path, ) - for relationship_props, _ in relationships: - if relationship_props is None: - if not source_props: + continue + + for ( + instance_launch_template, + instance_launch_template_validator, + ) in launch_templates: + fn_k, fn_v = is_function(instance_launch_template) + + # if its not a function we can't tell from a string + # if the image ID is there or not + if fn_k is None: + continue + + launch_template, launch_template_validator = None, None + if fn_k == "Ref": + if fn_v in instance_launch_template_validator.context.parameters: + continue + elif fn_v in instance_launch_template_validator.context.resources: + launch_template, launch_template_validator = ( + get_resource_by_name( + instance_launch_template_validator, + fn_v, + ["AWS::EC2::LaunchTemplate"], + ) + ) + else: + continue + elif fn_k == "Fn::GetAtt": + launch_template, launch_template_validator = get_resource_by_name( + instance_launch_template_validator, + ensure_list(fn_v)[0].split(".")[0], + ["AWS::EC2::LaunchTemplate"], + ) + else: + continue + + for ( + launch_template_image_id, + _, + ) in get_value_from_path( + launch_template_validator, + launch_template or {}, + path=deque(["Properties", "LaunchTemplateData", "ImageId"]), + ): + if launch_template_image_id is None and instance_image_id is None: yield ValidationError( "'ImageId' is a required property", - path_override=( - source_validator.context.path.path - if len(source_validator.context.path.path) > 4 - else deque([]) - ), + path_override=instance_image_id_validator.context.path.path, ) diff --git a/src/cfnlint/template/template.py b/src/cfnlint/template/template.py index 3fa1a73433..7c39673951 100644 --- a/src/cfnlint/template/template.py +++ b/src/cfnlint/template/template.py @@ -824,7 +824,7 @@ def is_resource_available(self, path: Path, resource: str) -> list[dict[str, boo return results scenario[condition_name] = list(condition_bool)[0] - if not self.conditions.implies(scenario, resource_condition): + if not self.conditions.has_to_imply(scenario, resource_condition): return [{**{resource_condition: False}, **scenario}] # if resource condition isn't available then the resource is available diff --git a/test/unit/module/conditions/test_conditions.py b/test/unit/module/conditions/test_conditions.py index 70918a7fee..05cfb9db6e 100644 --- a/test/unit/module/conditions/test_conditions.py +++ b/test/unit/module/conditions/test_conditions.py @@ -31,7 +31,7 @@ def test_bad_condition_definition(self): ) # would be 2 but IsProd fails # test coverage for KeyErrors in the following functions self.assertTrue( - cfn.conditions.implies( + cfn.conditions.has_to_imply( { "Test": True, }, @@ -83,10 +83,12 @@ def test_check_implies(self): condition_names.append(f"{p}Condition") cfn = Template("", template) - self.assertFalse(cfn.conditions.implies({"aCondition": False}, "aCondition")) - self.assertTrue(cfn.conditions.implies({"aCondition": True}, "aCondition")) + self.assertFalse( + cfn.conditions.has_to_imply({"aCondition": False}, "aCondition") + ) + self.assertTrue(cfn.conditions.has_to_imply({"aCondition": True}, "aCondition")) self.assertTrue( - cfn.conditions.implies( + cfn.conditions.has_to_imply( {"aCondition": True, "bCondition": False}, "aCondition" ) ) @@ -108,7 +110,7 @@ def test_check_always_true_or_false(self): cfn = Template("", template) self.assertEqual(len(cfn.conditions._conditions), 2) # test coverage for KeyErrors in the following functions - self.assertTrue(cfn.conditions.implies({"IsTrue": True}, "IsFalse")) + self.assertTrue(cfn.conditions.has_to_imply({"IsTrue": True}, "IsFalse")) def test_check_never_false(self): """With allowed values two conditions can not both be false""" diff --git a/test/unit/rules/helpers/__init__.py b/test/unit/rules/helpers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/unit/rules/helpers/test_get_resource_by_name.py b/test/unit/rules/helpers/test_get_resource_by_name.py new file mode 100644 index 0000000000..408b902efd --- /dev/null +++ b/test/unit/rules/helpers/test_get_resource_by_name.py @@ -0,0 +1,169 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.rules.helpers import get_resource_by_name + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": { + "ImageId": { + "Type": "String", + } + }, + "Conditions": { + "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, + "IsNotUsEast1": {"Fn::Not": [{"Condition": "IsUsEast1"}]}, + "IsImageId": {"Fn::Not": {"Fn::Equals": [{"Ref": "ImageId"}, ""]}}, + }, + "Resources": { + "Foo": {"Type": "AWS::S3::Bucket"}, + "Bar": {"Type": "AWS::S3::Bucket", "Condition": "IsUsEast1"}, + "FooBar": {"Type": "AWS::S3::Bucket", "Condition": "IsNotUsEast1"}, + "BadShape": [], + "NoType": {}, + }, + } + + +@pytest.mark.parametrize( + "name,resource_name,types,status,expected", + [ + ( + "Standard get", + "Foo", + None, + {}, + ({"Type": "AWS::S3::Bucket"}, {}, deque(["Resources", "Foo"])), + ), + ( + "Doesn't exist", + "Foo2", + None, + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "The destination isn't a dict", + "BadShape", + None, + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "The destination doesn't have type", + "NoType", + None, + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "Get a valid resource with a filter", + "Foo", + ["AWS::S3::Bucket"], + {}, + ( + { + "Type": "AWS::S3::Bucket", + }, + {}, + deque(["Resources", "Foo"]), + ), + ), + ( + "No result when type filters it out", + "Foo", + ["AWS::EC2::Instance"], + {}, + ( + None, + {}, + deque([]), + ), + ), + ( + "Get a resource with a condition", + "Bar", + None, + {}, + ( + {"Type": "AWS::S3::Bucket", "Condition": "IsUsEast1"}, + {"IsUsEast1": True}, + deque(["Resources", "Bar"]), + ), + ), + ( + "No resource when conditions don't align 1", + "Bar", + None, + {"IsUsEast1": False}, + ( + None, + {"IsUsEast1": False}, + deque([]), + ), + ), + ( + "No resource when conditions don't align 2", + "FooBar", + None, + {"IsUsEast1": True}, + ( + None, + {"IsUsEast1": True}, + deque([]), + ), + ), + ( + "Unrelated conditions return full results", + "Bar", + None, + {"IsImageId": False}, + ( + {"Type": "AWS::S3::Bucket", "Condition": "IsUsEast1"}, + {"IsUsEast1": True, "IsImageId": False}, + deque(["Resources", "Bar"]), + ), + ), + ], +) +def test_get_resource_by_name(name, resource_name, types, status, expected, validator): + validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=status, + ), + ), + ) + + result = get_resource_by_name(validator, resource_name, types) + + assert result[0] == expected[0], f"Test {name!r} got {result[0]!r}" + assert ( + result[1].context.conditions.status == expected[1] + ), f"Test {name!r} got {result[1].context.conditions.status!r}" + assert ( + result[1].context.path.path == expected[2] + ), f"Test {name!r} got {result[1].context.path.path!r}" diff --git a/test/unit/rules/helpers/test_get_value_from_path.py b/test/unit/rules/helpers/test_get_value_from_path.py new file mode 100644 index 0000000000..bdcf72672c --- /dev/null +++ b/test/unit/rules/helpers/test_get_value_from_path.py @@ -0,0 +1,194 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import pytest + +from cfnlint.rules.helpers import get_value_from_path + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": { + "ImageId": { + "Type": "AWS::SSM::Parameter::Value", + "Default": "/aws/service/ami-amazon-linux-latest/amzn-ami-hvm-x86_64-gp2", # noqa: E501 + } + }, + "Conditions": { + "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, + "IsNotUsEast1": {"Fn::Not": [{"Condition": "IsUsEast1"}]}, + "IsImageIdSpecified": { + "Fn::Not": [{"Fn::Equals": [{"Ref": "ImageId"}, ""]}] + }, + }, + "Resources": {}, + } + + +@pytest.mark.parametrize( + "name,instance,v_path,status,expected", + [ + ( + "No path", + {"Foo": "Bar"}, + deque([]), + {}, + [ + ({"Foo": "Bar"}, {}, deque([])), + ], + ), + ( + "Small path with out it being found", + {"Foo": "Bar"}, + deque(["Bar"]), + {}, + [ + (None, {}, deque(["Bar"])), + ], + ), + ( + "Bad path returns an empty value with list", + {"Foo": [{"Bar": "One"}]}, + deque(["Foo", "Bar"]), + {}, + [ + ( + None, + {}, + deque(["Foo"]), + ) + ], + ), + ( + "Bad path returns an empty value with string", + {"Foo": "Misvalued"}, + deque(["Foo", "Bar"]), + {}, + [ + ( + None, + {}, + deque(["Foo"]), + ) + ], + ), + ( + "With a path and no conditions", + {"Foo": {"Bar": True}}, + deque(["Foo", "Bar"]), + {}, + [ + (True, {}, deque(["Foo", "Bar"])), + ], + ), + ( + "With a list item", + {"Foo": [{"Bar": "One"}, {"Bar": "Two"}, {"NoValue": "Three"}]}, + deque(["Foo", "*", "Bar"]), + {}, + [ + ("One", {}, deque(["Foo", 0, "Bar"])), + ("Two", {}, deque(["Foo", 1, "Bar"])), + (None, {}, deque(["Foo", 2, "Bar"])), + ], + ), + ( + "With a basic condition", + { + "Foo": { + "Fn::If": [ + "IsUsEast1", + {"Bar": "One"}, + {"Bar": "Two"}, + ] + } + }, + deque(["Foo", "Bar"]), + {}, + [ + ("One", {"IsUsEast1": True}, deque(["Foo", "Fn::If", 1, "Bar"])), + ("Two", {"IsUsEast1": False}, deque(["Foo", "Fn::If", 2, "Bar"])), + ], + ), + ( + "With a basic condition and a current status", + { + "Foo": { + "Fn::If": [ + "IsUsEast1", + {"Bar": "One"}, + {"Bar": "Two"}, + ] + } + }, + deque(["Foo", "Bar"]), + {"IsUsEast1": True}, + [ + ("One", {"IsUsEast1": True}, deque(["Foo", "Fn::If", 1, "Bar"])), + ], + ), + ( + "With a basic condition and a current status on a related condition", + { + "Foo": { + "Fn::If": [ + "IsUsEast1", + {"Bar": "One"}, + {"Bar": "Two"}, + ] + } + }, + deque(["Foo", "Bar"]), + {"IsNotUsEast1": True}, + [ + ( + "Two", + {"IsUsEast1": False, "IsNotUsEast1": True}, + deque(["Foo", "Fn::If", 2, "Bar"]), + ), + ], + ), + ( + "With a random function in the way", + {"Foo": {"Fn::FindInMap": ["A", "B", "C"]}}, + deque(["Foo", "*", "Bar"]), + {}, + [], + ), + ( + "With a ref at the desination", + {"Foo": {"Ref": "Bar"}}, + deque(["Foo"]), + {}, + [({"Ref": "Bar"}, {}, deque(["Foo"]))], + ), + ], +) +def test_get_value_from_path(name, instance, v_path, status, expected, validator): + validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=status, + ), + ), + ) + + results = list(get_value_from_path(validator, instance, v_path)) + + assert len(results) == len(expected), f"Test {name!r} got {len(results)!r}" + for result, exp in zip(results, expected): + assert result[0] == exp[0], f"Test {name!r} got {result[0]!r}" + assert ( + result[1].context.conditions.status == exp[1] + ), f"Test {name!r} got {result[1].context.conditions.status!r}" + assert ( + result[1].context.path.path == exp[2] + ), f"Test {name!r} got {result[1].context.path.path!r}" diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship.py deleted file mode 100644 index 6606bbf10d..0000000000 --- a/test/unit/rules/jsonschema/test_cfn_lint_relationship.py +++ /dev/null @@ -1,255 +0,0 @@ -""" -Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -from collections import deque - -import pytest - -from cfnlint.context import Path -from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship - - -@pytest.fixture -def rule(): - return CfnLintRelationship( - keywords=[], relationship="Resources/AWS::EC2::Instance/Properties/ImageId" - ) - - -@pytest.fixture -def template(): - return { - "AWSTemplateFormatVersion": "2010-09-09", - "Parameters": { - "ImageId": { - "Type": "AWS::SSM::Parameter::Value", - "Default": "/aws/service/ami-amazon-linux-latest/amzn-ami-hvm-x86_64-gp2", # noqa: E501 - } - }, - "Conditions": { - "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, - "IsNotUsEast1": {"Fn::Not": [{"Condition": "IsUsEast1"}]}, - "IsImageIdSpecified": { - "Fn::Not": [{"Fn::Equals": [{"Ref": "ImageId"}, ""]}] - }, - }, - "Resources": { - "One": { - "Type": "AWS::EC2::Instance", - "Properties": { - "ImageId": { - "Fn::If": [ - "IsUsEast1", - {"Ref": "ImageId"}, - {"Ref": "AWS::NoValue"}, - ] - }, - }, - }, - "ParentOne": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["One", "ImageId"]}}, - }, - "Two": { - "Type": "AWS::EC2::Instance", - "Properties": { - "ImageId": {"Ref": "ImageId"}, - }, - }, - "ParentTwo": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["Two", "ImageId"]}}, - }, - "Three": { - "Type": "AWS::EC2::Instance", - "Condition": "IsImageIdSpecified", - "Properties": { - "ImageId": { - "Fn::If": [ - "IsUsEast1", - {"Ref": "ImageId"}, - {"Ref": "AWS::NoValue"}, - ], - }, - }, - }, - "ParentThree": { - "Type": "AWS::EC2::Instance", - "Condition": "IsImageIdSpecified", - "Properties": {"ImageId": {"Fn::GetAtt": ["Three", "ImageId"]}}, - }, - "Four": { - "Type": "AWS::EC2::Instance", - "Condition": "IsUsEast1", - "Properties": { - "ImageId": { - "Fn::If": [ - "IsImageIdSpecified", - {"Ref": "ImageId"}, - {"Ref": "AWS::NoValue"}, - ] - }, - }, - }, - "ParentFour": { - "Type": "AWS::EC2::Instance", - "Condition": "IsImageIdSpecified", - "Properties": {"ImageId": {"Fn::GetAtt": ["Four", "ImageId"]}}, - }, - "Five": { - "Type": "AWS::EC2::Instance", - "Properties": { - "ImageId": { - "Fn::If": [ - "IsImageIdSpecified", - {"Ref": "ImageId"}, - ] - }, - }, - }, - "ParentFive": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["Five", "ImageId"]}}, - }, - "Six": { - "Type": "AWS::EC2::Instance", - "Properties": "Foo", - }, - "ParentSix": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["Six", "ImageId"]}}, - }, - "Seven": { - "Type": "AWS::EC2::Instance", - "Condition": "IsNotUsEast1", - "Properties": { - "ImageId": { - "Fn::If": [ - "IsImageIdSpecified", - {"Ref": "ImageId"}, - {"Ref": "AWS::NoValue"}, - ] - }, - }, - }, - "ParentSeven": { - "Type": "AWS::EC2::Instance", - "Condition": "IsUsEast1", - "Properties": {"ImageId": {"Fn::GetAtt": ["Seven", "ImageId"]}}, - }, - "Eight": { - "Properties": { - "ImageId": { - "Fn::If": [ - "IsImageIdSpecified", - {"Ref": "ImageId"}, - {"Ref": "AWS::NoValue"}, - ] - }, - }, - }, - "ParentEight": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["Eight", "ImageId"]}}, - }, - }, - } - - -@pytest.mark.parametrize( - "name,path,status,expected", - [ - ( - "Condition for value", - deque(["Resources", "ParentOne", "Properties", "ImageId"]), - {}, - [ - ({"Ref": "ImageId"}, {"IsUsEast1": True}), - (None, {"IsUsEast1": False}), - ], - ), - ( - "Standard", - deque(["Resources", "ParentTwo", "Properties", "ImageId"]), - {}, - [({"Ref": "ImageId"}, {})], - ), - ( - "Lots of conditions along the way", - deque(["Resources", "ParentThree", "Properties", "ImageId"]), - { - "IsImageIdSpecified": True, - }, - [ - ({"Ref": "ImageId"}, {"IsUsEast1": True, "IsImageIdSpecified": True}), - (None, {"IsUsEast1": False, "IsImageIdSpecified": True}), - ], - ), - ( - "Unrelated conditions", - deque(["Resources", "ParentFour", "Properties", "ImageId"]), - { - "IsImageIdSpecified": True, - }, - [], - ), - ( - "Destiniation Fn::If isn't properly formatted", - deque(["Resources", "ParentFive", "Properties", "ImageId"]), - {}, - [], - ), - ( - "Properties isn't an object", - deque(["Resources", "ParentSix", "Properties", "ImageId"]), - {}, - [], - ), - ( - "Condition's don't align", - deque(["Resources", "ParentSeven", "Properties", "ImageId"]), - { - "IsUsEast1": True, - }, - [], - ), - ( - "Short Path", - deque(["Resources"]), - {}, - [], - ), - ( - "Not in resources", - deque(["Outputs", "MyOutput"]), - {}, - [], - ), - ( - "No type on relationshiop", - deque(["Resources", "ParentEight", "Properties", "ImageId"]), - {}, - [], - ), - ], -) -def test_get_relationships(name, path, status, expected, rule, validator): - validator = validator.evolve( - context=validator.context.evolve( - path=Path(path), - conditions=validator.context.conditions.evolve( - status=status, - ), - ), - ) - results = list(rule.get_relationship(validator)) - assert len(results) == len(expected), f"Test {name!r} got {len(results)!r}" - for result, exp in zip(results, expected): - assert result[0] == exp[0], f"Test {name!r} got {result[0]!r}" - assert ( - result[1].context.conditions.status == exp[1] - ), f"Test {name!r} got {result[1].context.conditions.status!r}" diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py deleted file mode 100644 index 71a2a1a544..0000000000 --- a/test/unit/rules/jsonschema/test_cfn_lint_relationship_bad_path.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -from collections import deque - -import pytest - -from cfnlint.context import Path -from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship - - -@pytest.fixture -def rule(): - return CfnLintRelationship(keywords=[], relationship="Resources") - - -@pytest.fixture -def template(): - return { - "AWSTemplateFormatVersion": "2010-09-09", - "Resources": {}, - } - - -def test_get_relationships(rule, validator): - validator = validator.evolve( - context=validator.context.evolve( - path=Path( - deque(["Resources", "ParentOne", "Properties", "ImageId"]), - ), - ), - ) - assert [] == list(rule.get_relationship(validator)) diff --git a/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py b/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py deleted file mode 100644 index e5aa828814..0000000000 --- a/test/unit/rules/jsonschema/test_cfn_lint_relationship_list.py +++ /dev/null @@ -1,78 +0,0 @@ -""" -Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 -""" - -from __future__ import annotations - -from collections import deque - -import pytest - -from cfnlint.context import Path -from cfnlint.rules.jsonschema.CfnLintRelationship import CfnLintRelationship - - -@pytest.fixture -def rule(): - return CfnLintRelationship( - keywords=[], relationship="Resources/AWS::EC2::Instance/Properties/Foo/*/Bar" - ) - - -@pytest.fixture -def template(): - return { - "AWSTemplateFormatVersion": "2010-09-09", - "Resources": { - "One": { - "Type": "AWS::EC2::Instance", - "Properties": { - "Foo": [ - { - "Bar": "One", - }, - { - "Bar": "Two", - }, - ] - }, - }, - "ParentOne": { - "Type": "AWS::EC2::Instance", - "Properties": {"ImageId": {"Fn::GetAtt": ["One", "ImageId"]}}, - }, - }, - } - - -@pytest.mark.parametrize( - "name,path,status,expected", - [ - ( - "One", - deque(["Resources", "ParentOne", "Properties", "ImageId"]), - {}, - [ - ("One", {}), - ("Two", {}), - ], - ), - ], -) -def test_get_relationships(name, path, status, expected, rule, validator): - validator = validator.evolve( - context=validator.context.evolve( - path=Path(path), - conditions=validator.context.conditions.evolve( - status=status, - ), - ), - ) - results = list(rule.get_relationship(validator)) - assert len(results) == len(expected), f"Test {name!r} got {len(results)!r}" - for result, exp in zip(results, expected): - assert result[0] == exp[0], f"Test {name!r} got {result[0]!r}" - assert ( - result[1].context.conditions.status == exp[1] - ), f"Test {name!r} got {result[1].context.conditions.status!r}" diff --git a/test/unit/rules/resources/ec2/test_instance_image_id.py b/test/unit/rules/resources/ec2/test_instance_image_id.py index 842f74a2ee..845d5876ae 100644 --- a/test/unit/rules/resources/ec2/test_instance_image_id.py +++ b/test/unit/rules/resources/ec2/test_instance_image_id.py @@ -33,23 +33,17 @@ def rule(): }, }, }, - "Instance": { - "Type": "AWS::EC2::Instance", - "Properties": { - "LaunchTemplate": { - "LaunchTemplateId": { - "Fn::GetAtt": [ - "LaunchTemplate", - "LaunchTemplateId", - ], - } - }, - "ImageId": "ami-12345678", - }, - }, } }, { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, "ImageId": "ami-12345678", }, { @@ -57,6 +51,70 @@ def rule(): }, [], ), + ( + "Valid with no ImageId and a string launch template", + { + "Resources": {}, + }, + { + "LaunchTemplate": {"LaunchTemplateId": "foo"}, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with no ImageId and a parameter", + { + "Parameters": { + "LaunchTemplateId": { + "Type": "String", + } + }, + "Resources": {}, + }, + { + "LaunchTemplate": {"LaunchTemplateId": {"Ref": "LaunchTemplateId"}}, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with no ImageId a random ref", + { + "Resources": {}, + }, + { + "LaunchTemplate": {"LaunchTemplateName": {"Ref": "AWS::StackName"}}, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Valid with no ImageId and another function", + { + "Parameters": { + "LaunchTemplateId": { + "Type": "String", + } + }, + "Resources": {}, + }, + { + "LaunchTemplate": { + "LaunchTemplateId": {"Fn::FindInMap": ["One", "Two", "Three"]} + }, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), ( "Valid with ImageId in LaunchTemplate", { @@ -71,42 +129,66 @@ def rule(): }, }, }, - "Instance": { - "Type": "AWS::EC2::Instance", - "Properties": { - "LaunchTemplate": { - "LaunchTemplateId": { - "Fn::GetAtt": [ - "LaunchTemplate", - "LaunchTemplateId", - ], - } - }, - }, - }, } }, - {}, + { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + }, { "path": ["Resources", "Instance", "Properties"], }, [], ), ( - "Invalid with no relationship", + "Valid with ImageId in LaunchTemplate using Name", { "Resources": { - "Instance": { - "Type": "AWS::EC2::Instance", - "Properties": {}, + "LaunchTemplate": { + "Type": "AWS::EC2::LaunchTemplate", + "Properties": { + "LaunchTemplateName": "a-template", + "LaunchTemplateData": { + "Monitoring": {"Enabled": True}, + "ImageId": "ami-12345678", + }, + }, }, } }, + { + "LaunchTemplate": { + "LaunchTemplateName": { + "Ref": "LaunchTemplate", + } + }, + }, + { + "path": ["Resources", "Instance", "Properties"], + }, + [], + ), + ( + "Invalid with no relationship", + {}, {}, { "path": ["Resources", "Instance", "Properties"], }, - [ValidationError("'ImageId' is a required property")], + [ + ValidationError( + "'ImageId' is a required property", + path_override=deque( + ["Resources", "Instance", "Properties", "ImageId"] + ), + ) + ], ), ( "Invalid with no ImageId in LaunchTemplate", @@ -121,28 +203,27 @@ def rule(): }, }, }, - "Instance": { - "Type": "AWS::EC2::Instance", - "Properties": { - "LaunchTemplate": { - "LaunchTemplateId": { - "Fn::GetAtt": [ - "LaunchTemplate", - "LaunchTemplateId", - ], - } - }, - }, - }, } }, - {}, + { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, + }, { "path": ["Resources", "Instance", "Properties"], }, [ ValidationError( "'ImageId' is a required property", + path_override=deque( + ["Resources", "Instance", "Properties", "ImageId"] + ), ) ], ), @@ -162,32 +243,24 @@ def rule(): }, }, }, - "Instance": { - "Type": "AWS::EC2::Instance", - "Properties": { - "LaunchTemplate": { - "LaunchTemplateId": { - "Fn::GetAtt": [ - "LaunchTemplate", - "LaunchTemplateId", - ], - } - }, - "ImageId": { - "Fn::If": [ - "IsUsEast1", - "ami-12345678", - {"Ref": "AWS::NoValue"}, - ] - }, - }, - }, }, }, { + "LaunchTemplate": { + "LaunchTemplateId": { + "Fn::GetAtt": [ + "LaunchTemplate", + "LaunchTemplateId", + ], + } + }, "ImageId": { - "Fn::If": ["IsUsEast1", "ami-12345678", {"Ref": "AWS::NoValue"}] - } + "Fn::If": [ + "IsUsEast1", + "ami-12345678", + {"Ref": "AWS::NoValue"}, + ] + }, }, { "path": ["Resources", "Instance", "Properties"], @@ -214,6 +287,7 @@ def rule(): ) def test_validate(name, instance, expected, rule, validator): errs = list(rule.validate(validator, "", instance, {})) + assert ( errs == expected ), f"Expected test {name!r} to have {expected!r} but got {errs!r}"