From 6956944e4116a3b177f374ee92d5c4a7a01e53c7 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Tue, 30 Jul 2024 15:45:18 -0700 Subject: [PATCH] Add 3 new rules to validate ECS configs --- .../rules/resources/ecs/ServiceFargate.py | 138 ++++++++++++ .../ecs/ServiceNetworkConfiguration.py | 107 +++++++++ .../resources/ecs/TaskDefinitionAwsVpc.py | 81 +++++++ .../resources/ecs/test_service_fargate.py | 210 ++++++++++++++++++ .../ecs/test_service_network_configuration.py | 169 ++++++++++++++ .../ecs/test_task_definition_aws_vpc.py | 157 +++++++++++++ 6 files changed, 862 insertions(+) create mode 100644 src/cfnlint/rules/resources/ecs/ServiceFargate.py create mode 100644 src/cfnlint/rules/resources/ecs/ServiceNetworkConfiguration.py create mode 100644 src/cfnlint/rules/resources/ecs/TaskDefinitionAwsVpc.py create mode 100644 test/unit/rules/resources/ecs/test_service_fargate.py create mode 100644 test/unit/rules/resources/ecs/test_service_network_configuration.py create mode 100644 test/unit/rules/resources/ecs/test_task_definition_aws_vpc.py diff --git a/src/cfnlint/rules/resources/ecs/ServiceFargate.py b/src/cfnlint/rules/resources/ecs/ServiceFargate.py new file mode 100644 index 0000000000..1d03742e68 --- /dev/null +++ b/src/cfnlint/rules/resources/ecs/ServiceFargate.py @@ -0,0 +1,138 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator + +from cfnlint.helpers import ensure_list, is_function +from cfnlint.jsonschema import ValidationError, ValidationResult +from cfnlint.jsonschema.protocols import Validator +from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path +from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword + + +class ServiceFargate(CfnLintKeyword): + id = "E3054" + shortdesc = ( + "Validate ECS service using Fargate uses TaskDefinition that allows Fargate" + ) + description = ( + "When using an ECS service with 'LaunchType' of 'FARGATE' " + "the associated task definition must have 'RequiresCompatibilities' " + "specified with 'FARGATE' listed" + ) + tags = ["resources", "ecs"] + + def __init__(self) -> None: + super().__init__( + keywords=["Resources/AWS::ECS::Service/Properties"], + ) + + def _filter_resource_name(self, instance: Any) -> str | None: + fn_k, fn_v = is_function(instance) + if fn_k is None: + return None + if fn_k == "Ref": + if isinstance(fn_v, str): + return fn_v + elif fn_k == "Fn::GetAtt": + name = ensure_list(fn_v)[0].split(".")[0] + if isinstance(name, str): + return name + return None + + def _get_service_properties( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[str, str, Validator]]: + for task_definition_id, task_definition_validator in get_value_from_path( + validator, instance, deque(["TaskDefinition"]) + ): + task_definition_resource_name = self._filter_resource_name( + task_definition_id + ) + if task_definition_resource_name is None: + continue + + for ( + launch_type, + launch_type_validator, + ) in get_value_from_path( + task_definition_validator, instance, deque(["LaunchType"]) + ): + yield ( + task_definition_resource_name, + launch_type, + launch_type_validator, + ) + + def _get_task_definition_properties( + self, validator: Validator, resource_name: Any + ) -> Iterator[tuple[list[Any] | None, Validator]]: + task_definition, task_definition_validator = get_resource_by_name( + validator, resource_name, ["AWS::ECS::TaskDefinition"] + ) + if not task_definition: + return + + for capabilities, capabilities_validator in get_value_from_path( + task_definition_validator, + task_definition, + path=deque(["Properties", "RequiresCompatibilities"]), + ): + if capabilities is None: + yield capabilities, capabilities_validator + continue + if not isinstance(capabilities, list): + continue + for capibility, _ in get_value_from_path( + capabilities_validator, + capabilities, + path=deque(["*"]), + ): + if isinstance(capibility, dict) or capibility == "FARGATE": + break + else: + yield capabilities, capabilities_validator + + def validate( + self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any] + ) -> ValidationResult: + + for ( + task_definition_resource_name, + launch_type, + service_validator, + ) in self._get_service_properties( + validator, + instance, + ): + if launch_type != "FARGATE": + continue + for ( + capabilities, + capabilities_validator, + ) in self._get_task_definition_properties( + service_validator, + task_definition_resource_name, + ): + if capabilities is None: + yield ValidationError( + "'RequiresCompatibilities' is a required property", + validator="required", + rule=self, + path_override=deque( + list(capabilities_validator.context.path.path)[:-1] + ), + ) + continue + + yield ValidationError( + f"{capabilities!r} does not contain items matching 'FARGATE'", + validator="contains", + rule=self, + path_override=capabilities_validator.context.path.path, + ) diff --git a/src/cfnlint/rules/resources/ecs/ServiceNetworkConfiguration.py b/src/cfnlint/rules/resources/ecs/ServiceNetworkConfiguration.py new file mode 100644 index 0000000000..7c14fadd3d --- /dev/null +++ b/src/cfnlint/rules/resources/ecs/ServiceNetworkConfiguration.py @@ -0,0 +1,107 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator + +from cfnlint.helpers import ensure_list, is_function +from cfnlint.jsonschema import ValidationError, ValidationResult +from cfnlint.jsonschema.protocols import Validator +from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path +from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword + + +class ServiceNetworkConfiguration(CfnLintKeyword): + id = "E3052" + shortdesc = "Validate ECS service requires NetworkConfiguration" + description = ( + "When using an ECS task definition has 'awsvpc' " + "then 'NetworkConfiguration' is required" + ) + tags = ["resources", "ecs"] + + def __init__(self) -> None: + super().__init__( + keywords=["Resources/AWS::ECS::Service/Properties"], + ) + + def _filter_resource_name(self, instance: Any) -> str | None: + fn_k, fn_v = is_function(instance) + if fn_k is None: + return None + if fn_k == "Ref": + if isinstance(fn_v, str): + return fn_v + elif fn_k == "Fn::GetAtt": + name = ensure_list(fn_v)[0].split(".")[0] + if isinstance(name, str): + return name + return None + + def _get_service_properties( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[str, str, Validator]]: + for task_definition_id, task_definition_validator in get_value_from_path( + validator, instance, deque(["TaskDefinition"]) + ): + task_definition_resource_name = self._filter_resource_name( + task_definition_id + ) + if task_definition_resource_name is None: + continue + + for ( + network_configuration, + network_configuration_validator, + ) in get_value_from_path( + task_definition_validator, instance, deque(["NetworkConfiguration"]) + ): + yield ( + task_definition_resource_name, + network_configuration, + network_configuration_validator, + ) + + def _get_task_definition_properties( + self, validator: Validator, resource_name: Any + ) -> Iterator[tuple[str | int | None, Validator]]: + target_group, target_group_validator = get_resource_by_name( + validator, resource_name, ["AWS::ECS::TaskDefinition"] + ) + if not target_group: + return + + for network_mode, network_mode_validator in get_value_from_path( + target_group_validator, + target_group, + path=deque(["Properties", "NetworkMode"]), + ): + if network_mode == "awsvpc": + yield network_mode, network_mode_validator + + def validate( + self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any] + ) -> ValidationResult: + + for ( + task_definition_resource_name, + network_configuration, + task_definition_validator, + ) in self._get_service_properties( + validator, + instance, + ): + for _, _ in self._get_task_definition_properties( + task_definition_validator, + task_definition_resource_name, + ): + if network_configuration is None: + yield ValidationError( + "'NetworkConfiguration' is a required property", + validator="required", + rule=self, + ) diff --git a/src/cfnlint/rules/resources/ecs/TaskDefinitionAwsVpc.py b/src/cfnlint/rules/resources/ecs/TaskDefinitionAwsVpc.py new file mode 100644 index 0000000000..c120bccd27 --- /dev/null +++ b/src/cfnlint/rules/resources/ecs/TaskDefinitionAwsVpc.py @@ -0,0 +1,81 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque +from typing import Any, Iterator + +from cfnlint.jsonschema import ValidationError, ValidationResult +from cfnlint.jsonschema.protocols import Validator +from cfnlint.rules.helpers import get_value_from_path +from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword + + +class TaskDefinitionAwsVpc(CfnLintKeyword): + id = "E3053" + shortdesc = "Validate ECS task definition is has correct values for " "'HostPort'" + description = ( + "The 'HostPort' must either be undefined or equal to " + "the 'ContainerPort' value" + ) + tags = ["resources", "ecs"] + + def __init__(self) -> None: + super().__init__( + keywords=["Resources/AWS::ECS::TaskDefinition/Properties"], + ) + + def _get_port_mappings( + self, validator: Validator, instance: Any + ) -> Iterator[tuple[str | int | None, str | int | None, Validator]]: + + for container_definition, container_definition_validator in get_value_from_path( + validator, + instance, + path=deque(["ContainerDefinitions", "*", "PortMappings", "*"]), + ): + for host_port, host_port_validator in get_value_from_path( + container_definition_validator, + container_definition, + path=deque(["HostPort"]), + ): + if not isinstance(host_port, (str, int)): + continue + for container_port, _ in get_value_from_path( + host_port_validator, + container_definition, + path=deque(["ContainerPort"]), + ): + if not isinstance(host_port, (str, int)): + continue + if str(host_port) != str(container_port): + yield host_port, container_port, host_port_validator + + def validate( + self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any] + ) -> ValidationResult: + + for network_mode, _ in get_value_from_path( + validator, + instance, + path=deque(["NetworkMode"]), + ): + if network_mode != "awsvpc": + continue + for ( + host_port, + container_port, + port_mapping_validator, + ) in self._get_port_mappings( + validator, + instance, + ): + yield ValidationError( + f"{host_port!r} does not equal {container_port!r}", + validator="const", + rule=self, + path_override=port_mapping_validator.context.path.path, + ) diff --git a/test/unit/rules/resources/ecs/test_service_fargate.py b/test/unit/rules/resources/ecs/test_service_fargate.py new file mode 100644 index 0000000000..60cc6fff47 --- /dev/null +++ b/test/unit/rules/resources/ecs/test_service_fargate.py @@ -0,0 +1,210 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import jsonpatch +import pytest + +from cfnlint.jsonschema import ValidationError +from cfnlint.rules.helpers import get_value_from_path +from cfnlint.rules.resources.ecs.ServiceFargate import ServiceFargate + + +@pytest.fixture +def rule(): + rule = ServiceFargate() + yield rule + + +_task_definition = { + "Type": "AWS::ECS::TaskDefinition", + "Properties": { + "RequiresCompatibilities": ["FARGATE"], + }, +} + +_service = { + "Type": "AWS::ECS::Service", + "Properties": { + "TaskDefinition": {"Ref": "TaskDefinition"}, + "LaunchType": "FARGATE", + }, +} + + +@pytest.mark.parametrize( + "template,start_path,expected", + [ + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "remove", + "path": "/Properties/RequiresCompatibilities", + }, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ("'RequiresCompatibilities' is a required property"), + validator="required", + rule=ServiceFargate(), + path_override=deque(["Resources", "TaskDefinition", "Properties"]), + ) + ], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "remove", + "path": "/Properties/RequiresCompatibilities", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "replace", + "path": "/Properties/LaunchType", + "value": "EC2", + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/RequiresCompatibilities", + "value": [ + "EC2", + "FARGATE", + ], + }, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/RequiresCompatibilities", + "value": [ + "EC2", + "EXTERNAL", + ], + }, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ("['EC2', 'EXTERNAL'] does not contain items matching 'FARGATE'"), + validator="contains", + rule=ServiceFargate(), + path_override=deque( + [ + "Resources", + "TaskDefinition", + "Properties", + "RequiresCompatibilities", + ] + ), + ) + ], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/RequiresCompatibilities", + "value": {"Fn::FindInMap": ["A", "B", "C"]}, + }, + ], + ), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": {"MyLaunchType": {"Type": "String"}}, + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/RequiresCompatibilities", + "value": ["EC2", {"Ref": "MyLaunchType"}], + }, + ], + ), + "Service": dict(_service), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ], + indirect=["template"], +) +def test_validate(template, start_path, expected, rule, validator): + for instance, instance_validator in get_value_from_path( + validator, template, start_path + ): + errs = list(rule.validate(instance_validator, "", instance, {})) + assert errs == expected, f"Expected {expected} got {errs}" diff --git a/test/unit/rules/resources/ecs/test_service_network_configuration.py b/test/unit/rules/resources/ecs/test_service_network_configuration.py new file mode 100644 index 0000000000..dfd08490d0 --- /dev/null +++ b/test/unit/rules/resources/ecs/test_service_network_configuration.py @@ -0,0 +1,169 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import jsonpatch +import pytest + +from cfnlint.jsonschema import ValidationError +from cfnlint.rules.helpers import get_value_from_path +from cfnlint.rules.resources.ecs.ServiceNetworkConfiguration import ( + ServiceNetworkConfiguration, +) + + +@pytest.fixture +def rule(): + rule = ServiceNetworkConfiguration() + yield rule + + +_task_definition = { + "Type": "AWS::ECS::TaskDefinition", + "Properties": { + "NetworkMode": "awsvpc", + "ContainerDefinitions": [], + }, +} + +_service = { + "Type": "AWS::ECS::Service", + "Properties": { + "TaskDefinition": {"Ref": "TaskDefinition"}, + "NetworkConfiguration": {}, + }, +} + + +@pytest.mark.parametrize( + "template,start_path,expected", + [ + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "Service": dict(_service), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "remove", + "path": "/Properties/NetworkMode", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/NetworkConfiguration", + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/NetworkMode", + "value": "bridge", + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/NetworkConfiguration", + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Parameters": {"MyNetworkMode": {"Type": "String"}}, + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "replace", + "path": "/Properties/NetworkMode", + "value": {"Ref": "MyNetworkMode"}, + }, + ], + ), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/NetworkConfiguration", + }, + ], + ), + }, + }, + deque(["Resources", "Service", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + "Service": jsonpatch.apply_patch( + dict(_service), + [ + { + "op": "remove", + "path": "/Properties/NetworkConfiguration", + }, + ], + ), + } + }, + deque(["Resources", "Service", "Properties"]), + [ + ValidationError( + ("'NetworkConfiguration' is a required property"), + validator="required", + rule=ServiceNetworkConfiguration(), + ) + ], + ), + ], + indirect=["template"], +) +def test_validate(template, start_path, expected, rule, validator): + for instance, instance_validator in get_value_from_path( + validator, template, start_path + ): + errs = list(rule.validate(instance_validator, "", instance, {})) + assert errs == expected, f"Expected {expected} got {errs}" diff --git a/test/unit/rules/resources/ecs/test_task_definition_aws_vpc.py b/test/unit/rules/resources/ecs/test_task_definition_aws_vpc.py new file mode 100644 index 0000000000..371d58ef87 --- /dev/null +++ b/test/unit/rules/resources/ecs/test_task_definition_aws_vpc.py @@ -0,0 +1,157 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from collections import deque + +import jsonpatch +import pytest + +from cfnlint.jsonschema import ValidationError +from cfnlint.rules.helpers import get_value_from_path +from cfnlint.rules.resources.ecs.TaskDefinitionAwsVpc import TaskDefinitionAwsVpc + + +@pytest.fixture +def rule(): + rule = TaskDefinitionAwsVpc() + yield rule + + +_task_definition = { + "Type": "AWS::ECS::TaskDefinition", + "Properties": { + "NetworkMode": "awsvpc", + "ContainerDefinitions": [ + { + "Name": "my-container", + "Image": "my-image", + "PortMappings": [ + { + "ContainerPort": 8080, + } + ], + } + ], + }, +} + + +@pytest.mark.parametrize( + "template,start_path,expected", + [ + ( + { + "Resources": { + "TaskDefinition": dict(_task_definition), + } + }, + deque(["Resources", "TaskDefinition", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "add", + "path": ( + "/Properties/ContainerDefinitions/" + "0/PortMappings/0/HostPort" + ), + "value": "8080", + }, + ], + ), + } + }, + deque(["Resources", "TaskDefinition", "Properties"]), + [], + ), + ( + { + "Parameters": { + "MyPort": {"Type": "String"}, + "MySecondPort": {"Type": "String"}, + }, + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "add", + "path": ( + "/Properties/ContainerDefinitions/0" + "/PortMappings/0/ContainerPort" + ), + "value": {"Ref": "MyPort"}, + }, + { + "op": "add", + "path": ( + "/Properties/ContainerDefinitions/0" + "/PortMappings/0/HostPort" + ), + "value": {"Ref": "MySecondPort"}, + }, + ], + ), + }, + }, + deque(["Resources", "TaskDefinition", "Properties"]), + [], + ), + ( + { + "Resources": { + "TaskDefinition": jsonpatch.apply_patch( + dict(_task_definition), + [ + { + "op": "add", + "path": ( + "/Properties/ContainerDefinitions/0" + "/PortMappings/0/HostPort" + ), + "value": "80", + } + ], + ), + } + }, + deque(["Resources", "TaskDefinition", "Properties"]), + [ + ValidationError( + ("'80' does not equal 8080"), + validator="const", + rule=TaskDefinitionAwsVpc(), + path_override=deque( + [ + "Resources", + "TaskDefinition", + "Properties", + "ContainerDefinitions", + 0, + "PortMappings", + 0, + "HostPort", + ] + ), + ) + ], + ), + ], + indirect=["template"], +) +def test_validate(template, start_path, expected, rule, validator): + for instance, instance_validator in get_value_from_path( + validator, template, start_path + ): + errs = list(rule.validate(instance_validator, "", instance, {})) + + assert errs == expected, f"Expected {expected} got {errs}"