-
Notifications
You must be signed in to change notification settings - Fork 597
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add 3 new rules to validate ECS configs
- Loading branch information
Showing
6 changed files
with
862 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
""" | ||
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
SPDX-License-Identifier: MIT-0 | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
from collections import deque | ||
from typing import Any, Iterator | ||
|
||
from cfnlint.helpers import ensure_list, is_function | ||
from cfnlint.jsonschema import ValidationError, ValidationResult | ||
from cfnlint.jsonschema.protocols import Validator | ||
from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path | ||
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword | ||
|
||
|
||
class ServiceFargate(CfnLintKeyword): | ||
id = "E3054" | ||
shortdesc = ( | ||
"Validate ECS service using Fargate uses TaskDefinition that allows Fargate" | ||
) | ||
description = ( | ||
"When using an ECS service with 'LaunchType' of 'FARGATE' " | ||
"the associated task definition must have 'RequiresCompatibilities' " | ||
"specified with 'FARGATE' listed" | ||
) | ||
tags = ["resources", "ecs"] | ||
|
||
def __init__(self) -> None: | ||
super().__init__( | ||
keywords=["Resources/AWS::ECS::Service/Properties"], | ||
) | ||
|
||
def _filter_resource_name(self, instance: Any) -> str | None: | ||
fn_k, fn_v = is_function(instance) | ||
if fn_k is None: | ||
return None | ||
if fn_k == "Ref": | ||
if isinstance(fn_v, str): | ||
return fn_v | ||
elif fn_k == "Fn::GetAtt": | ||
name = ensure_list(fn_v)[0].split(".")[0] | ||
if isinstance(name, str): | ||
return name | ||
return None | ||
|
||
def _get_service_properties( | ||
self, validator: Validator, instance: Any | ||
) -> Iterator[tuple[str, str, Validator]]: | ||
for task_definition_id, task_definition_validator in get_value_from_path( | ||
validator, instance, deque(["TaskDefinition"]) | ||
): | ||
task_definition_resource_name = self._filter_resource_name( | ||
task_definition_id | ||
) | ||
if task_definition_resource_name is None: | ||
continue | ||
|
||
for ( | ||
launch_type, | ||
launch_type_validator, | ||
) in get_value_from_path( | ||
task_definition_validator, instance, deque(["LaunchType"]) | ||
): | ||
yield ( | ||
task_definition_resource_name, | ||
launch_type, | ||
launch_type_validator, | ||
) | ||
|
||
def _get_task_definition_properties( | ||
self, validator: Validator, resource_name: Any | ||
) -> Iterator[tuple[list[Any] | None, Validator]]: | ||
task_definition, task_definition_validator = get_resource_by_name( | ||
validator, resource_name, ["AWS::ECS::TaskDefinition"] | ||
) | ||
if not task_definition: | ||
return | ||
|
||
for capabilities, capabilities_validator in get_value_from_path( | ||
task_definition_validator, | ||
task_definition, | ||
path=deque(["Properties", "RequiresCompatibilities"]), | ||
): | ||
if capabilities is None: | ||
yield capabilities, capabilities_validator | ||
continue | ||
if not isinstance(capabilities, list): | ||
continue | ||
for capibility, _ in get_value_from_path( | ||
capabilities_validator, | ||
capabilities, | ||
path=deque(["*"]), | ||
): | ||
if isinstance(capibility, dict) or capibility == "FARGATE": | ||
break | ||
else: | ||
yield capabilities, capabilities_validator | ||
|
||
def validate( | ||
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any] | ||
) -> ValidationResult: | ||
|
||
for ( | ||
task_definition_resource_name, | ||
launch_type, | ||
service_validator, | ||
) in self._get_service_properties( | ||
validator, | ||
instance, | ||
): | ||
if launch_type != "FARGATE": | ||
continue | ||
for ( | ||
capabilities, | ||
capabilities_validator, | ||
) in self._get_task_definition_properties( | ||
service_validator, | ||
task_definition_resource_name, | ||
): | ||
if capabilities is None: | ||
yield ValidationError( | ||
"'RequiresCompatibilities' is a required property", | ||
validator="required", | ||
rule=self, | ||
path_override=deque( | ||
list(capabilities_validator.context.path.path)[:-1] | ||
), | ||
) | ||
continue | ||
|
||
yield ValidationError( | ||
f"{capabilities!r} does not contain items matching 'FARGATE'", | ||
validator="contains", | ||
rule=self, | ||
path_override=capabilities_validator.context.path.path, | ||
) |
107 changes: 107 additions & 0 deletions
107
src/cfnlint/rules/resources/ecs/ServiceNetworkConfiguration.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
""" | ||
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
SPDX-License-Identifier: MIT-0 | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
from collections import deque | ||
from typing import Any, Iterator | ||
|
||
from cfnlint.helpers import ensure_list, is_function | ||
from cfnlint.jsonschema import ValidationError, ValidationResult | ||
from cfnlint.jsonschema.protocols import Validator | ||
from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path | ||
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword | ||
|
||
|
||
class ServiceNetworkConfiguration(CfnLintKeyword): | ||
id = "E3052" | ||
shortdesc = "Validate ECS service requires NetworkConfiguration" | ||
description = ( | ||
"When using an ECS task definition has 'awsvpc' " | ||
"then 'NetworkConfiguration' is required" | ||
) | ||
tags = ["resources", "ecs"] | ||
|
||
def __init__(self) -> None: | ||
super().__init__( | ||
keywords=["Resources/AWS::ECS::Service/Properties"], | ||
) | ||
|
||
def _filter_resource_name(self, instance: Any) -> str | None: | ||
fn_k, fn_v = is_function(instance) | ||
if fn_k is None: | ||
return None | ||
if fn_k == "Ref": | ||
if isinstance(fn_v, str): | ||
return fn_v | ||
elif fn_k == "Fn::GetAtt": | ||
name = ensure_list(fn_v)[0].split(".")[0] | ||
if isinstance(name, str): | ||
return name | ||
return None | ||
|
||
def _get_service_properties( | ||
self, validator: Validator, instance: Any | ||
) -> Iterator[tuple[str, str, Validator]]: | ||
for task_definition_id, task_definition_validator in get_value_from_path( | ||
validator, instance, deque(["TaskDefinition"]) | ||
): | ||
task_definition_resource_name = self._filter_resource_name( | ||
task_definition_id | ||
) | ||
if task_definition_resource_name is None: | ||
continue | ||
|
||
for ( | ||
network_configuration, | ||
network_configuration_validator, | ||
) in get_value_from_path( | ||
task_definition_validator, instance, deque(["NetworkConfiguration"]) | ||
): | ||
yield ( | ||
task_definition_resource_name, | ||
network_configuration, | ||
network_configuration_validator, | ||
) | ||
|
||
def _get_task_definition_properties( | ||
self, validator: Validator, resource_name: Any | ||
) -> Iterator[tuple[str | int | None, Validator]]: | ||
target_group, target_group_validator = get_resource_by_name( | ||
validator, resource_name, ["AWS::ECS::TaskDefinition"] | ||
) | ||
if not target_group: | ||
return | ||
|
||
for network_mode, network_mode_validator in get_value_from_path( | ||
target_group_validator, | ||
target_group, | ||
path=deque(["Properties", "NetworkMode"]), | ||
): | ||
if network_mode == "awsvpc": | ||
yield network_mode, network_mode_validator | ||
|
||
def validate( | ||
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any] | ||
) -> ValidationResult: | ||
|
||
for ( | ||
task_definition_resource_name, | ||
network_configuration, | ||
task_definition_validator, | ||
) in self._get_service_properties( | ||
validator, | ||
instance, | ||
): | ||
for _, _ in self._get_task_definition_properties( | ||
task_definition_validator, | ||
task_definition_resource_name, | ||
): | ||
if network_configuration is None: | ||
yield ValidationError( | ||
"'NetworkConfiguration' is a required property", | ||
validator="required", | ||
rule=self, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
""" | ||
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
SPDX-License-Identifier: MIT-0 | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
from collections import deque | ||
from typing import Any, Iterator | ||
|
||
from cfnlint.jsonschema import ValidationError, ValidationResult | ||
from cfnlint.jsonschema.protocols import Validator | ||
from cfnlint.rules.helpers import get_value_from_path | ||
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword | ||
|
||
|
||
class TaskDefinitionAwsVpc(CfnLintKeyword): | ||
id = "E3053" | ||
shortdesc = "Validate ECS task definition is has correct values for " "'HostPort'" | ||
description = ( | ||
"The 'HostPort' must either be undefined or equal to " | ||
"the 'ContainerPort' value" | ||
) | ||
tags = ["resources", "ecs"] | ||
|
||
def __init__(self) -> None: | ||
super().__init__( | ||
keywords=["Resources/AWS::ECS::TaskDefinition/Properties"], | ||
) | ||
|
||
def _get_port_mappings( | ||
self, validator: Validator, instance: Any | ||
) -> Iterator[tuple[str | int | None, str | int | None, Validator]]: | ||
|
||
for container_definition, container_definition_validator in get_value_from_path( | ||
validator, | ||
instance, | ||
path=deque(["ContainerDefinitions", "*", "PortMappings", "*"]), | ||
): | ||
for host_port, host_port_validator in get_value_from_path( | ||
container_definition_validator, | ||
container_definition, | ||
path=deque(["HostPort"]), | ||
): | ||
if not isinstance(host_port, (str, int)): | ||
continue | ||
for container_port, _ in get_value_from_path( | ||
host_port_validator, | ||
container_definition, | ||
path=deque(["ContainerPort"]), | ||
): | ||
if not isinstance(host_port, (str, int)): | ||
continue | ||
if str(host_port) != str(container_port): | ||
yield host_port, container_port, host_port_validator | ||
|
||
def validate( | ||
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any] | ||
) -> ValidationResult: | ||
|
||
for network_mode, _ in get_value_from_path( | ||
validator, | ||
instance, | ||
path=deque(["NetworkMode"]), | ||
): | ||
if network_mode != "awsvpc": | ||
continue | ||
for ( | ||
host_port, | ||
container_port, | ||
port_mapping_validator, | ||
) in self._get_port_mappings( | ||
validator, | ||
instance, | ||
): | ||
yield ValidationError( | ||
f"{host_port!r} does not equal {container_port!r}", | ||
validator="const", | ||
rule=self, | ||
path_override=port_mapping_validator.context.path.path, | ||
) |
Oops, something went wrong.