Skip to content

Commit

Permalink
Add rule W1051 to validate if dyn ref when arn
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong committed Feb 14, 2025
1 parent 31a421c commit 8e28374
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 4 deletions.
8 changes: 5 additions & 3 deletions src/cfnlint/rules/functions/DynamicReference.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(self) -> None:
super().__init__()
self.child_rules = {
"E1051": None,
"W1051": None,
"E1027": None,
}

Expand Down Expand Up @@ -125,9 +126,10 @@ def dynamicReference(
evolved = validator.evolve(schema=_secrets_manager_arn)
else: # this is secrets manager
evolved = validator.evolve(schema=_secrets_manager)
rule = self.child_rules["E1051"]
if rule and hasattr(rule, "validate"):
yield from rule.validate(validator, {}, v, schema)
for rule_id in ["E1051", "W1051"]:
rule = self.child_rules[rule_id]
if rule and hasattr(rule, "validate"):
yield from rule.validate(validator, {}, v, schema)

for err in evolved.iter_errors(parts):
yield self._clean_errors(err)
61 changes: 61 additions & 0 deletions src/cfnlint/rules/functions/DynamicReferenceSecretsManagerArn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from typing import Any

import regex as re

from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.rules import CloudFormationLintRule


class DynamicReferenceSecretsManagerArn(CloudFormationLintRule):
id = "W1051"
shortdesc = (
"Validate dynamic references to secrets manager "
"are not used when a secrets manager ARN was expected"
)
description = (
"Certain properties expect a secret manager ARN. This rule "
"validates if you may be accidently using a secret in place "
"of the ARN"
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html#dynamic-references-secretsmanager"
tags = ["functions", "dynamic reference"]

def validate(self, validator: Validator, s: Any, instance: Any, schema: Any):

if "Fn::Sub" == validator.context.path.path[-1]:
if not re.match(r"^(\\\")?{{resolve:secretsmanager:.*}}(\\\")?$", instance):
return

if len(validator.context.path.path) < 3:
return

Check warning on line 35 in src/cfnlint/rules/functions/DynamicReferenceSecretsManagerArn.py

View check run for this annotation

Codecov / codecov/patch

src/cfnlint/rules/functions/DynamicReferenceSecretsManagerArn.py#L35

Added line #L35 was not covered by tests

if (
validator.context.path.path[0] != "Resources"
or validator.context.path.path[2] != "Properties"
):
return

Check warning on line 41 in src/cfnlint/rules/functions/DynamicReferenceSecretsManagerArn.py

View check run for this annotation

Codecov / codecov/patch

src/cfnlint/rules/functions/DynamicReferenceSecretsManagerArn.py#L41

Added line #L41 was not covered by tests

fields = [
"SecretArn",
"SecretARN",
"SecretsManagerSecretId",
"SecretsManagerOracleAsmSecretId",
"SecretsManagerSecurityDbEncryptionSecretId",
"SecretsManagerConfiguration",
]

for field in fields:
if any(field == p for p in validator.context.path.path):
yield ValidationError(
(
f"Dynamic reference {instance!r} to secrets manager when "
f"the field {field!r} expects the ARN to the secret and "
"not the secret"
),
rule=self,
)
8 changes: 7 additions & 1 deletion src/cfnlint/rules/functions/Sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self) -> None:
{
"W1019": None,
"W1020": None,
"W1051": None,
}
)
self._functions = [
Expand Down Expand Up @@ -142,9 +143,14 @@ def fn_sub(

# we know the structure is valid at this point
# so any child rule doesn't have to revalidate it
value_validator = validator.evolve(
context=validator.context.evolve(
path=validator.context.path.descend(path=key)
)
)
for _, rule in self.child_rules.items():
if rule and hasattr(rule, "validate"):
for err in rule.validate(validator, s, instance.get("Fn::Sub"), schema):
for err in rule.validate(value_validator, s, value, schema):
err.path.append("Fn::Sub")
err.rule = rule
yield err
13 changes: 13 additions & 0 deletions test/unit/rules/functions/test_dynamic_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def context(cfn):
{
"E1051": _TestRule(),
"E1027": _TestRule(),
"W1051": _TestRule(),
},
[],
),
Expand All @@ -57,6 +58,7 @@ def context(cfn):
{
"E1051": None,
"E1027": None,
"W1051": None,
},
[],
),
Expand All @@ -67,6 +69,7 @@ def context(cfn):
{
"E1051": _TestRule(),
"E1027": _TestRule(),
"W1051": _TestRule(),
},
[],
),
Expand All @@ -77,6 +80,7 @@ def context(cfn):
{
"E1051": None,
"E1027": None,
"W1051": None,
},
[],
),
Expand All @@ -87,6 +91,7 @@ def context(cfn):
{
"E1051": _TestRule(),
"E1027": _TestRule(),
"W1051": _TestRule(),
},
[],
),
Expand All @@ -97,6 +102,7 @@ def context(cfn):
{
"E1051": _TestRule(),
"E1027": _TestRule(),
"W1051": _TestRule(),
},
[],
),
Expand All @@ -107,6 +113,7 @@ def context(cfn):
{
"E1051": _TestRule(),
"E1027": _TestRule(),
"W1051": _TestRule(),
},
[
ValidationError(
Expand All @@ -123,6 +130,7 @@ def context(cfn):
{
"E1051": _TestRule(),
"E1027": _TestRule(),
"W1051": _TestRule(),
},
[
ValidationError(
Expand All @@ -139,6 +147,7 @@ def context(cfn):
{
"E1051": _TestRule(),
"E1027": _TestRule(),
"W1051": _TestRule(),
},
[],
),
Expand All @@ -149,6 +158,7 @@ def context(cfn):
{
"E1051": None,
"E1027": None,
"W1051": None,
},
[],
),
Expand All @@ -159,6 +169,7 @@ def context(cfn):
{
"E1051": _TestRule(),
"E1027": _TestRule(),
"W1051": _TestRule(),
},
[],
),
Expand All @@ -169,6 +180,7 @@ def context(cfn):
{
"E1051": _TestRule(),
"E1027": _TestRule(),
"W1051": _TestRule(),
},
[],
),
Expand All @@ -179,6 +191,7 @@ def context(cfn):
{
"E1051": _TestRule(),
"E1027": _TestRule(),
"W1051": _TestRule(),
},
[
ValidationError(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from collections import deque

import pytest

from cfnlint.jsonschema import ValidationError
from cfnlint.rules.functions.DynamicReferenceSecretsManagerArn import (
DynamicReferenceSecretsManagerArn,
)


@pytest.fixture(scope="module")
def rule():
rule = DynamicReferenceSecretsManagerArn()
yield rule


@pytest.mark.parametrize(
"name,instance,path,expected",
[
(
"Valid secrets manager",
"{{resolve:secretsmanager:Parameter}}",
{
"path": deque(["Resources", "MyResource", "Properties", "Password"]),
},
[],
),
(
"Valid sub not to secrets manaager",
"secretsmanager",
{
"path": deque(
["Resources", "MyResource", "Properties", "Password", "Fn::Sub"]
),
},
[],
),
(
"Valid secrets manager",
"{{resolve:secretsmanager:Parameter}}",
{
"path": deque(["Resources", "MyResource", "Properties", "Password"]),
},
[],
),
(
"Invalid secrets manager when ARN was expected",
"{{resolve:secretsmanager:secret}}",
{
"path": deque(["Resources", "MyResource", "Properties", "SecretArn"]),
},
[
ValidationError(
(
"Dynamic reference '{{resolve:secretsmanager:secret}}'"
" to secrets manager when the field 'SecretArn' expects "
"the ARN to the secret and not the secret"
),
rule=DynamicReferenceSecretsManagerArn(),
)
],
),
(
"Invalid secrets manager when ARN was expected",
'\\"{{resolve:secretsmanager:${MyParameter}}}\\"',
{
"path": deque(
["Resources", "MyResource", "Properties", "SecretArn", "Fn::Sub"]
),
},
[
ValidationError(
(
"Dynamic reference "
"'\\\\\"{{resolve:secretsmanager:${MyParameter}}}\\\\\"'"
" to secrets manager when the field 'SecretArn' "
"expects the ARN to the secret and not the secret"
),
rule=DynamicReferenceSecretsManagerArn(),
)
],
),
],
indirect=["path"],
)
def test_validate(name, instance, path, expected, validator, rule):
errs = list(rule.validate(validator, {}, instance, {}))

assert errs == expected, f"Test {name!r} got {errs!r}"

0 comments on commit 8e28374

Please sign in to comment.