Skip to content

Commit

Permalink
Add 3 new rules to validate ECS configs
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong committed Aug 8, 2024
1 parent ee87a1c commit 38dadef
Show file tree
Hide file tree
Showing 6 changed files with 862 additions and 0 deletions.
138 changes: 138 additions & 0 deletions src/cfnlint/rules/resources/ecs/ServiceFargate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from collections import deque
from typing import Any, Iterator

from cfnlint.helpers import ensure_list, is_function
from cfnlint.jsonschema import ValidationError, ValidationResult
from cfnlint.jsonschema.protocols import Validator
from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword


class ServiceFargate(CfnLintKeyword):
id = "E3054"
shortdesc = (
"Validate ECS service using Fargate uses TaskDefinition that allows Fargate"
)
description = (
"When using an ECS service with 'LaunchType' of 'FARGATE' "
"the associated task definition must have 'RequiresCompatibilities' "
"specified with 'FARGATE' listed"
)
tags = ["resources", "ecs"]

def __init__(self) -> None:
super().__init__(
keywords=["Resources/AWS::ECS::Service/Properties"],
)

def _filter_resource_name(self, instance: Any) -> str | None:
fn_k, fn_v = is_function(instance)
if fn_k is None:
return None
if fn_k == "Ref":
if isinstance(fn_v, str):
return fn_v
elif fn_k == "Fn::GetAtt":
name = ensure_list(fn_v)[0].split(".")[0]
if isinstance(name, str):
return name
return None

def _get_service_properties(
self, validator: Validator, instance: Any
) -> Iterator[tuple[str, str, Validator]]:
for task_definition_id, task_definition_validator in get_value_from_path(
validator, instance, deque(["TaskDefinition"])
):
task_definition_resource_name = self._filter_resource_name(
task_definition_id
)
if task_definition_resource_name is None:
continue

for (
launch_type,
launch_type_validator,
) in get_value_from_path(
task_definition_validator, instance, deque(["LaunchType"])
):
yield (
task_definition_resource_name,
launch_type,
launch_type_validator,
)

def _get_task_definition_properties(
self, validator: Validator, resource_name: Any
) -> Iterator[tuple[list[Any] | None, Validator]]:
task_definition, task_definition_validator = get_resource_by_name(
validator, resource_name, ["AWS::ECS::TaskDefinition"]
)
if not task_definition:
return

for capabilities, capabilities_validator in get_value_from_path(
task_definition_validator,
task_definition,
path=deque(["Properties", "RequiresCompatibilities"]),
):
if capabilities is None:
yield capabilities, capabilities_validator
continue
if not isinstance(capabilities, list):
continue
for capibility, _ in get_value_from_path(
capabilities_validator,
capabilities,
path=deque(["*"]),
):
if isinstance(capibility, dict) or capibility == "FARGATE":
break
else:
yield capabilities, capabilities_validator

def validate(
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:

for (
task_definition_resource_name,
launch_type,
service_validator,
) in self._get_service_properties(
validator,
instance,
):
if launch_type != "FARGATE":
continue
for (
capabilities,
capabilities_validator,
) in self._get_task_definition_properties(
service_validator,
task_definition_resource_name,
):
if capabilities is None:
yield ValidationError(
"'RequiresCompatibilities' is a required property",
validator="required",
rule=self,
path_override=deque(
list(capabilities_validator.context.path.path)[:-1]
),
)
continue

yield ValidationError(
f"{capabilities!r} does not contain items matching 'FARGATE'",
validator="contains",
rule=self,
path_override=capabilities_validator.context.path.path,
)
107 changes: 107 additions & 0 deletions src/cfnlint/rules/resources/ecs/ServiceNetworkConfiguration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from collections import deque
from typing import Any, Iterator

from cfnlint.helpers import ensure_list, is_function
from cfnlint.jsonschema import ValidationError, ValidationResult
from cfnlint.jsonschema.protocols import Validator
from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword


class ServiceNetworkConfiguration(CfnLintKeyword):
id = "E3052"
shortdesc = "Validate ECS service requires NetworkConfiguration"
description = (
"When using an ECS task definition has 'awsvpc' "
"then 'NetworkConfiguration' is required"
)
tags = ["resources", "ecs"]

def __init__(self) -> None:
super().__init__(
keywords=["Resources/AWS::ECS::Service/Properties"],
)

def _filter_resource_name(self, instance: Any) -> str | None:
fn_k, fn_v = is_function(instance)
if fn_k is None:
return None
if fn_k == "Ref":
if isinstance(fn_v, str):
return fn_v
elif fn_k == "Fn::GetAtt":
name = ensure_list(fn_v)[0].split(".")[0]
if isinstance(name, str):
return name
return None

def _get_service_properties(
self, validator: Validator, instance: Any
) -> Iterator[tuple[str, str, Validator]]:
for task_definition_id, task_definition_validator in get_value_from_path(
validator, instance, deque(["TaskDefinition"])
):
task_definition_resource_name = self._filter_resource_name(
task_definition_id
)
if task_definition_resource_name is None:
continue

for (
network_configuration,
network_configuration_validator,
) in get_value_from_path(
task_definition_validator, instance, deque(["NetworkConfiguration"])
):
yield (
task_definition_resource_name,
network_configuration,
network_configuration_validator,
)

def _get_task_definition_properties(
self, validator: Validator, resource_name: Any
) -> Iterator[tuple[str | int | None, Validator]]:
target_group, target_group_validator = get_resource_by_name(
validator, resource_name, ["AWS::ECS::TaskDefinition"]
)
if not target_group:
return

for network_mode, network_mode_validator in get_value_from_path(
target_group_validator,
target_group,
path=deque(["Properties", "NetworkMode"]),
):
if network_mode == "awsvpc":
yield network_mode, network_mode_validator

def validate(
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:

for (
task_definition_resource_name,
network_configuration,
task_definition_validator,
) in self._get_service_properties(
validator,
instance,
):
for _, _ in self._get_task_definition_properties(
task_definition_validator,
task_definition_resource_name,
):
if network_configuration is None:
yield ValidationError(
"'NetworkConfiguration' is a required property",
validator="required",
rule=self,
)
81 changes: 81 additions & 0 deletions src/cfnlint/rules/resources/ecs/TaskDefinitionAwsVpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from collections import deque
from typing import Any, Iterator

from cfnlint.jsonschema import ValidationError, ValidationResult
from cfnlint.jsonschema.protocols import Validator
from cfnlint.rules.helpers import get_value_from_path
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword


class TaskDefinitionAwsVpc(CfnLintKeyword):
id = "E3053"
shortdesc = "Validate ECS task definition is has correct values for " "'HostPort'"
description = (
"The 'HostPort' must either be undefined or equal to "
"the 'ContainerPort' value"
)
tags = ["resources", "ecs"]

def __init__(self) -> None:
super().__init__(
keywords=["Resources/AWS::ECS::TaskDefinition/Properties"],
)

def _get_port_mappings(
self, validator: Validator, instance: Any
) -> Iterator[tuple[str | int | None, str | int | None, Validator]]:

for container_definition, container_definition_validator in get_value_from_path(
validator,
instance,
path=deque(["ContainerDefinitions", "*", "PortMappings", "*"]),
):
for host_port, host_port_validator in get_value_from_path(
container_definition_validator,
container_definition,
path=deque(["HostPort"]),
):
if not isinstance(host_port, (str, int)):
continue
for container_port, _ in get_value_from_path(
host_port_validator,
container_definition,
path=deque(["ContainerPort"]),
):
if not isinstance(host_port, (str, int)):
continue
if str(host_port) != str(container_port):
yield host_port, container_port, host_port_validator

def validate(
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:

for network_mode, _ in get_value_from_path(
validator,
instance,
path=deque(["NetworkMode"]),
):
if network_mode != "awsvpc":
continue
for (
host_port,
container_port,
port_mapping_validator,
) in self._get_port_mappings(
validator,
instance,
):
yield ValidationError(
f"{host_port!r} does not equal {container_port!r}",
validator="const",
rule=self,
path_override=port_mapping_validator.context.path.path,
)
Loading

0 comments on commit 38dadef

Please sign in to comment.