-
Notifications
You must be signed in to change notification settings - Fork 597
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
attempt to generalize cross-validation and user-defined rules
- Loading branch information
Showing
3 changed files
with
255 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
""" | ||
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
SPDX-License-Identifier: MIT-0 | ||
""" | ||
import logging | ||
|
||
import regex as re | ||
import yaml | ||
from yaml import Loader | ||
|
||
import cfnlint.rules | ||
import cfnlint.rules.custom | ||
import cfnlint.rules.custom.Operators | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
RuleDefinition = """ | ||
Description: "MatchProjectValidateRule" | ||
ShortDescription: "MatchProjectValidateRule" | ||
ErrorMessage: "MatchProjectValidateRule failed" | ||
ResourceTypes: | ||
- "AWS::Lambda::Function" | ||
Conditions: | ||
- Fn::REGEX_MATCH: | ||
- Runtime | ||
- nodejs.* | ||
Projection: Tags.Value | ||
Validations: | ||
- Fn::IS: | ||
- Environment.Variables.NODE_ENV | ||
- DEFINED | ||
- Fn::IN: | ||
- Environment.Variables.NODE_ENV | ||
- Fn::Projection | ||
""" | ||
|
||
Operators = { | ||
"EQUALS": lambda value, expected_value: str(value).strip().lower() | ||
== str(expected_value).strip().lower(), | ||
"NOT_EQUALS": lambda value, expected_value: str(value).strip().lower() | ||
!= str(expected_value).strip().lower(), | ||
"==": lambda value, expected_value: str(value).strip().lower() | ||
== str(expected_value).strip().lower(), | ||
"!=": lambda value, expected_value: str(value).strip().lower() | ||
!= str(expected_value).strip().lower(), | ||
">": lambda value, expected_value: str(value).strip().isnumeric() | ||
and str(expected_value).strip().isnumeric() | ||
and float(value) > float(expected_value), | ||
">=": lambda value, expected_value: str(value).strip().isnumeric() | ||
and str(expected_value).strip().isnumeric() | ||
and float(value) >= float(expected_value), | ||
"<": lambda value, expected_value: str(value).strip().isnumeric() | ||
and str(expected_value).strip().isnumeric() | ||
and float(value) < float(expected_value), | ||
"<=": lambda value, expected_value: str(value).strip().isnumeric() | ||
and str(expected_value).strip().isnumeric() | ||
and float(value) <= float(expected_value), | ||
"IN": lambda value, expected_values: str(value).strip().lower() | ||
in [str(i).strip().lower() for i in expected_values], | ||
"NOT_IN": lambda value, expected_values: str(value).strip().lower() | ||
not in [str(i).strip().lower() for i in expected_values], | ||
"IS": lambda value, expected_value: ( | ||
expected_value == "DEFINED" and value is not None | ||
) | ||
or (expected_value == "NOT_DEFINED" and value is None), | ||
"REGEX_MATCH": lambda value, pattern: re.match(pattern, value), | ||
} | ||
|
||
CreateRuleFromOp = { | ||
"EQUALS": cfnlint.rules.custom.Operators.CreateEqualsRule, | ||
"NOT_EQUALS": cfnlint.rules.custom.Operators.CreateNotEqualsRule, | ||
"==": cfnlint.rules.custom.Operators.CreateEqualsRule, | ||
"!=": cfnlint.rules.custom.Operators.CreateNotEqualsRule, | ||
">": cfnlint.rules.custom.Operators.CreateGreaterRule, | ||
"<": cfnlint.rules.custom.Operators.CreateLesserRule, | ||
"IN": cfnlint.rules.custom.Operators.CreateInSetRule, | ||
"NOT_IN": cfnlint.rules.custom.Operators.CreateNotInSetRule, | ||
"IS": cfnlint.rules.custom.Operators.CreateCustomIsDefinedRule, | ||
} | ||
|
||
|
||
def flatten(l): | ||
for el in l: | ||
if isinstance(el, list): | ||
yield from flatten(el) | ||
else: | ||
yield el | ||
|
||
|
||
class MatchProjectValidateRule(cfnlint.rules.CloudFormationLintRule): | ||
def __init__( | ||
self, | ||
rule_id, | ||
): | ||
super().__init__() | ||
self.id = rule_id | ||
self.config = yaml.load(RuleDefinition, Loader=Loader) | ||
self.resource_types = ( | ||
self.config["ResourceTypes"] | ||
if isinstance(self.config["ResourceTypes"], list) | ||
else [self.config["ResourceTypes"]] | ||
) | ||
if isinstance(self.config["ResourceTypes"], list): | ||
self.resource_property_types.extend(self.resource_types) | ||
else: | ||
self.resource_property_types.append(self.config["ResourceTypes"]) | ||
self.error_message = self.config["ErrorMessage"] | ||
self.description = self.config["Description"] | ||
self.shortdesc = self.config["ShortDescription"] | ||
|
||
def _extract_path_value(self, resource_properties, path): | ||
if path == "": | ||
return True, resource_properties | ||
path_parts = path.split(".") | ||
current = resource_properties | ||
for part in path_parts: | ||
if part not in current: | ||
return False, None | ||
current = current[part] | ||
return True, current | ||
|
||
def _check_condition(self, resource_properties, condition): | ||
op = list(condition.keys())[0] | ||
[path, expected_value] = condition[op] | ||
op = op[4:] | ||
path_exists, value = self._extract_path_value(resource_properties, path) | ||
# behavior is same comp. to custom rule, i.e. rule does nothing if path does not exist | ||
# unless explicitly requiring it to be undefined | ||
if not path_exists and op != "IS" and expected_value != "NOT_DEFINED": | ||
return False | ||
return Operators[op](value, expected_value) | ||
|
||
def _check_conditions(self, resource_properties): | ||
if isinstance(self.config["Conditions"], list): | ||
return all( | ||
self._check_condition(resource_properties, condition) | ||
for condition in self.config["Conditions"] | ||
) | ||
|
||
return self._check_condition(resource_properties, self.config["Conditions"]) | ||
|
||
def _project_value(self, value, path_split): | ||
if (len(path_split) == 0) or (value is None): | ||
return value | ||
k, next_path_split = path_split[0], path_split[1:] | ||
v = value.get(k, None) | ||
if isinstance(v, list): | ||
return [self._project_value(item, next_path_split) for item in v] | ||
return self._project_value(v, next_path_split) | ||
|
||
def _compute_projection(self, resource_properties): | ||
if self.config["Projection"] is not None: | ||
return list( | ||
set( | ||
flatten( | ||
self._project_value( | ||
resource_properties, self.config["Projection"].split(".") | ||
) | ||
) | ||
) | ||
) | ||
return None | ||
|
||
def _match_resource_properties( | ||
self, resource_properties, property_type, _, cfn, projected_values | ||
): | ||
matches = [] | ||
validations = ( | ||
self.config["Validations"] | ||
if isinstance(self.config["Validations"], list) | ||
else [self.config["Validations"]] | ||
) | ||
for validation in validations: | ||
op = list(validation.keys())[0] | ||
[validation_path, expected_value] = validation[op] | ||
op = op[4:] | ||
if expected_value == "Fn::Projection": | ||
expected_value = projected_values | ||
# temporarily delegate to custom rule impl for validating things | ||
try: | ||
rule_instance = CreateRuleFromOp[op]( | ||
self.id, | ||
property_type, | ||
validation_path, | ||
expected_value, | ||
self.error_message, | ||
) | ||
rule_matches = rule_instance.match_resource_properties( | ||
resource_properties, property_type, validation_path.split("."), cfn | ||
) | ||
matches.extend(rule_matches) | ||
except KeyError: | ||
matches.extend( | ||
cfnlint.rules.custom.Operators.CreateInvalidRule("E9999", op).match( | ||
cfn | ||
) | ||
) | ||
return matches | ||
|
||
def match_resource_properties(self, resource_properties, property_type, path, cfn): | ||
# can be omitted | ||
if not self._check_conditions(resource_properties): | ||
return [] | ||
# can be omitted, as long as projected value is not used | ||
projected_values = self._compute_projection(resource_properties) | ||
|
||
return self._match_resource_properties( | ||
resource_properties, property_type, path, cfn, projected_values | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
--- | ||
AWSTemplateFormatVersion: "2010-09-09" | ||
Description: > | ||
Testing for custom inequality rules | ||
Resources: | ||
TimeoutInNumericsFunction: | ||
Type: AWS::Lambda::Function | ||
Properties: | ||
Handler: index.handler | ||
Role: arn:aws:iam::123456789012:role/lambda_basic_execution | ||
Code: ./ | ||
Runtime: nodejs18.x | ||
Timeout: 100 | ||
Tags: | ||
- Key: "Name" | ||
Value: "Production" | ||
- Key: "Name" | ||
Value: "Development" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
""" | ||
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
SPDX-License-Identifier: MIT-0 | ||
""" | ||
from test.unit.rules import BaseRuleTestCase | ||
|
||
from cfnlint.MatchProjectValidateRule import ( | ||
MatchProjectValidateRule, | ||
) # pylint: disable=E0401 | ||
|
||
|
||
class TestMatchProjectValidateRule(BaseRuleTestCase): | ||
"""Test template mapping configurations""" | ||
|
||
def setUp(self): | ||
"""Setup""" | ||
super(TestMatchProjectValidateRule, self).setUp() | ||
self.collection.register(MatchProjectValidateRule("E9001")) | ||
|
||
success_templates = ["test/test.yaml"] | ||
|
||
# def test_file_positive(self): | ||
# """Test Positive""" | ||
# self.helper_file_positive() | ||
|
||
def test_file_negative(self): | ||
"""Test failure""" | ||
self.helper_file_negative("test/test.yaml", 1) |