From ec8d1acf0b2e14e74204c16487024c1863e0f217 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Fri, 16 Aug 2024 14:40:09 -0700 Subject: [PATCH] Create rule E3055 to validate CreationPolicy --- src/cfnlint/rules/resources/CreationPolicy.py | 102 +++++++++++ .../rules/resources/test_creationpolicy.py | 172 ++++++++++++++++++ 2 files changed, 274 insertions(+) create mode 100644 src/cfnlint/rules/resources/CreationPolicy.py create mode 100644 test/unit/rules/resources/test_creationpolicy.py diff --git a/src/cfnlint/rules/resources/CreationPolicy.py b/src/cfnlint/rules/resources/CreationPolicy.py new file mode 100644 index 0000000000..862e61da97 --- /dev/null +++ b/src/cfnlint/rules/resources/CreationPolicy.py @@ -0,0 +1,102 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from typing import Any + +from cfnlint.jsonschema import Validator +from cfnlint.rules.jsonschema.CfnLintJsonSchema import CfnLintJsonSchema + + +class CreationPolicy(CfnLintJsonSchema): + id = "E3055" + shortdesc = "Check CreationPolicy values for Resources" + description = "Check that the CreationPolicy values are valid" + source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-attribute-creationpolicy.html" + tags = ["resources", "creationPolicy"] + + def __init__(self) -> None: + super().__init__( + keywords=["Resources/*/CreationPolicy"], + all_matches=True, + ) + + def _get_schema(self, resource_type: str) -> dict[str, Any]: + if resource_type == "AWS::AppStream::Fleet": + return { + "type": "object", + "additionalProperties": False, + "properties": { + "StartFleet": { + "additionalProperties": False, + "type": "object", + "properties": {"Type": {"type": "boolean"}}, + } + }, + } + if resource_type == "AWS::AutoScaling::AutoScalingGroup": + return { + "type": "object", + "additionalProperties": False, + "properties": { + "AutoScalingCreationPolicy": { + "type": "object", + "additionalProperties": False, + "properties": { + "MinSuccessfulInstancesPercent": {"type": "integer"} + }, + }, + "ResourceSignal": { + "additionalProperties": False, + "type": "object", + "properties": { + "Timeout": {"type": "string"}, + "Count": {"type": "integer"}, + }, + }, + }, + } + if resource_type == "AWS::CloudFormation::WaitCondition": + + return { + "type": "object", + "additionalProperties": False, + "properties": { + "ResourceSignal": { + "additionalProperties": False, + "type": "object", + "properties": { + "Timeout": {"type": "string"}, + "Count": {"type": "integer"}, + }, + } + }, + } + + return {} + + # pylint: disable=unused-argument, arguments-renamed + def validate(self, validator: Validator, dP: str, instance, schema): + resource_name = validator.context.path.path[1] + if not isinstance(resource_name, str): + return + resource_type = validator.context.resources[resource_name].type + + validator = validator.evolve( + context=validator.context.evolve( + functions=[ + "Fn::Sub", + "Fn::Select", + "Fn::FindInMap", + "Fn::If", + "Ref", + ], + strict_types=False, + ), + schema=self._get_schema(resource_type), + ) + + yield from self._iter_errors(validator, instance) diff --git a/test/unit/rules/resources/test_creationpolicy.py b/test/unit/rules/resources/test_creationpolicy.py new file mode 100644 index 0000000000..fd44c74f51 --- /dev/null +++ b/test/unit/rules/resources/test_creationpolicy.py @@ -0,0 +1,172 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from collections import deque + +import pytest + +from cfnlint.jsonschema import ValidationError +from cfnlint.rules.resources.CreationPolicy import CreationPolicy + + +@pytest.fixture +def rule(): + return CreationPolicy() + + +@pytest.fixture +def template(): + return { + "Resources": { + "MyInstance": { + "Type": "AWS::EC2::Instance", + }, + "MyWaitCondition": { + "Type": "AWS::CloudFormation::WaitCondition", + }, + "MyAutoScalingGroup": { + "Type": "AWS::AutoScaling::AutoScalingGroup", + }, + "MyAppStreamFleet": { + "Type": "AWS::AppStream::Fleet", + }, + "MyLambdaFunction": { + "Type": "AWS::Lambda::Function", + }, + } + } + + +@pytest.mark.parametrize( + "name, instance, path, expected", + [ + ( + "Correct for app stream", + {"StartFleet": {"Type": True}}, + { + "path": deque(["Resources", "MyAppStreamFleet", "CreationPolicy"]), + }, + [], + ), + ( + "Bad type for app stream", + { + "StartFleet": { + "Type": {}, + } + }, + { + "path": deque(["Resources", "MyAppStreamFleet", "CreationPolicy"]), + }, + [ + ValidationError( + "{} is not of type 'boolean'", + rule=CreationPolicy(), + path=deque(["StartFleet", "Type"]), + schema_path=deque( + ["properties", "StartFleet", "properties", "Type", "type"] + ), + validator="type", + ) + ], + ), + ( + "Valid ASG", + { + "AutoScalingCreationPolicy": {"MinSuccessfulInstancesPercent": 100}, + "ResourceSignal": {"Count": 1, "Timeout": "60"}, + }, + { + "path": deque(["Resources", "MyAutoScalingGroup", "CreationPolicy"]), + }, + [], + ), + ( + "Invalid ASG", + { + "AutoScalingCreationPolicy": {"MinSuccessfulInstancesPercent": 100}, + "ResourceSignal": {"Count": "one", "Timeout": "60"}, + }, + { + "path": deque(["Resources", "MyAutoScalingGroup", "CreationPolicy"]), + }, + [ + ValidationError( + "'one' is not of type 'integer'", + rule=CreationPolicy(), + path=deque(["ResourceSignal", "Count"]), + schema_path=deque( + ["properties", "ResourceSignal", "properties", "Count", "type"] + ), + validator="type", + ) + ], + ), + ( + "Valid Wait Condition", + {"ResourceSignal": {"Timeout": "PT15M", "Count": "5"}}, + { + "path": deque(["Resources", "MyWaitCondition", "CreationPolicy"]), + }, + [], + ), + ( + "Invalid Wait Condition", + {"ResourceSignal": {"Timeout": "PT15M", "Count": "five"}}, + { + "path": deque(["Resources", "MyWaitCondition", "CreationPolicy"]), + }, + [ + ValidationError( + "'five' is not of type 'integer'", + rule=CreationPolicy(), + path=deque(["ResourceSignal", "Count"]), + schema_path=deque( + ["properties", "ResourceSignal", "properties", "Count", "type"] + ), + validator="type", + ) + ], + ), + ( + "Valid instance", + {"ResourceSignal": {"Timeout": "PT15M", "Count": "5"}}, + { + "path": deque(["Resources", "MyWaitCondition", "CreationPolicy"]), + }, + [], + ), + ( + "Invalid Instance", + {"Foo": {"Bar"}}, + { + "path": deque(["Resources", "MyInstance", "CreationPolicy"]), + }, + [], + ), + ( + "Wait condition ignored on wrong type", + {"Foo": {"Bar"}}, + { + "path": deque(["Resources", "MyLambdaFunction", "CreationPolicy"]), + }, + [], + ), + ], + indirect=["path"], +) +def test_deletion_policy(name, instance, expected, rule, validator): + + rule = CreationPolicy() + errors = list( + rule.validate( + validator=validator, + dP="creationpolicy", + instance=instance, + schema={}, + ) + ) + + assert errors == expected, f"{name}: {errors} != {expected}"