Skip to content

Commit

Permalink
Create rule E3055 to validate CreationPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong committed Aug 16, 2024
1 parent e500fcc commit ec8d1ac
Show file tree
Hide file tree
Showing 2 changed files with 274 additions and 0 deletions.
102 changes: 102 additions & 0 deletions src/cfnlint/rules/resources/CreationPolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from typing import Any

from cfnlint.jsonschema import Validator
from cfnlint.rules.jsonschema.CfnLintJsonSchema import CfnLintJsonSchema


class CreationPolicy(CfnLintJsonSchema):
id = "E3055"
shortdesc = "Check CreationPolicy values for Resources"
description = "Check that the CreationPolicy values are valid"
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-attribute-creationpolicy.html"
tags = ["resources", "creationPolicy"]

def __init__(self) -> None:
super().__init__(
keywords=["Resources/*/CreationPolicy"],
all_matches=True,
)

def _get_schema(self, resource_type: str) -> dict[str, Any]:
if resource_type == "AWS::AppStream::Fleet":
return {
"type": "object",
"additionalProperties": False,
"properties": {
"StartFleet": {
"additionalProperties": False,
"type": "object",
"properties": {"Type": {"type": "boolean"}},
}
},
}
if resource_type == "AWS::AutoScaling::AutoScalingGroup":
return {
"type": "object",
"additionalProperties": False,
"properties": {
"AutoScalingCreationPolicy": {
"type": "object",
"additionalProperties": False,
"properties": {
"MinSuccessfulInstancesPercent": {"type": "integer"}
},
},
"ResourceSignal": {
"additionalProperties": False,
"type": "object",
"properties": {
"Timeout": {"type": "string"},
"Count": {"type": "integer"},
},
},
},
}
if resource_type == "AWS::CloudFormation::WaitCondition":

return {
"type": "object",
"additionalProperties": False,
"properties": {
"ResourceSignal": {
"additionalProperties": False,
"type": "object",
"properties": {
"Timeout": {"type": "string"},
"Count": {"type": "integer"},
},
}
},
}

return {}

# pylint: disable=unused-argument, arguments-renamed
def validate(self, validator: Validator, dP: str, instance, schema):
resource_name = validator.context.path.path[1]
if not isinstance(resource_name, str):
return
resource_type = validator.context.resources[resource_name].type

validator = validator.evolve(
context=validator.context.evolve(
functions=[
"Fn::Sub",
"Fn::Select",
"Fn::FindInMap",
"Fn::If",
"Ref",
],
strict_types=False,
),
schema=self._get_schema(resource_type),
)

yield from self._iter_errors(validator, instance)
172 changes: 172 additions & 0 deletions test/unit/rules/resources/test_creationpolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from collections import deque

import pytest

from cfnlint.jsonschema import ValidationError
from cfnlint.rules.resources.CreationPolicy import CreationPolicy


@pytest.fixture
def rule():
return CreationPolicy()


@pytest.fixture
def template():
return {
"Resources": {
"MyInstance": {
"Type": "AWS::EC2::Instance",
},
"MyWaitCondition": {
"Type": "AWS::CloudFormation::WaitCondition",
},
"MyAutoScalingGroup": {
"Type": "AWS::AutoScaling::AutoScalingGroup",
},
"MyAppStreamFleet": {
"Type": "AWS::AppStream::Fleet",
},
"MyLambdaFunction": {
"Type": "AWS::Lambda::Function",
},
}
}


@pytest.mark.parametrize(
"name, instance, path, expected",
[
(
"Correct for app stream",
{"StartFleet": {"Type": True}},
{
"path": deque(["Resources", "MyAppStreamFleet", "CreationPolicy"]),
},
[],
),
(
"Bad type for app stream",
{
"StartFleet": {
"Type": {},
}
},
{
"path": deque(["Resources", "MyAppStreamFleet", "CreationPolicy"]),
},
[
ValidationError(
"{} is not of type 'boolean'",
rule=CreationPolicy(),
path=deque(["StartFleet", "Type"]),
schema_path=deque(
["properties", "StartFleet", "properties", "Type", "type"]
),
validator="type",
)
],
),
(
"Valid ASG",
{
"AutoScalingCreationPolicy": {"MinSuccessfulInstancesPercent": 100},
"ResourceSignal": {"Count": 1, "Timeout": "60"},
},
{
"path": deque(["Resources", "MyAutoScalingGroup", "CreationPolicy"]),
},
[],
),
(
"Invalid ASG",
{
"AutoScalingCreationPolicy": {"MinSuccessfulInstancesPercent": 100},
"ResourceSignal": {"Count": "one", "Timeout": "60"},
},
{
"path": deque(["Resources", "MyAutoScalingGroup", "CreationPolicy"]),
},
[
ValidationError(
"'one' is not of type 'integer'",
rule=CreationPolicy(),
path=deque(["ResourceSignal", "Count"]),
schema_path=deque(
["properties", "ResourceSignal", "properties", "Count", "type"]
),
validator="type",
)
],
),
(
"Valid Wait Condition",
{"ResourceSignal": {"Timeout": "PT15M", "Count": "5"}},
{
"path": deque(["Resources", "MyWaitCondition", "CreationPolicy"]),
},
[],
),
(
"Invalid Wait Condition",
{"ResourceSignal": {"Timeout": "PT15M", "Count": "five"}},
{
"path": deque(["Resources", "MyWaitCondition", "CreationPolicy"]),
},
[
ValidationError(
"'five' is not of type 'integer'",
rule=CreationPolicy(),
path=deque(["ResourceSignal", "Count"]),
schema_path=deque(
["properties", "ResourceSignal", "properties", "Count", "type"]
),
validator="type",
)
],
),
(
"Valid instance",
{"ResourceSignal": {"Timeout": "PT15M", "Count": "5"}},
{
"path": deque(["Resources", "MyWaitCondition", "CreationPolicy"]),
},
[],
),
(
"Invalid Instance",
{"Foo": {"Bar"}},
{
"path": deque(["Resources", "MyInstance", "CreationPolicy"]),
},
[],
),
(
"Wait condition ignored on wrong type",
{"Foo": {"Bar"}},
{
"path": deque(["Resources", "MyLambdaFunction", "CreationPolicy"]),
},
[],
),
],
indirect=["path"],
)
def test_deletion_policy(name, instance, expected, rule, validator):

rule = CreationPolicy()
errors = list(
rule.validate(
validator=validator,
dP="creationpolicy",
instance=instance,
schema={},
)
)

assert errors == expected, f"{name}: {errors} != {expected}"

0 comments on commit ec8d1ac

Please sign in to comment.