From 03a7100955dbbe51cd82cb3a3bef80adb021fb6b Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Tue, 15 Oct 2024 13:45:54 -0700 Subject: [PATCH 1/3] Dynamically determine Account ID during transform --- .../transforms/_language_extensions.py | 10 +- .../transforms/test_language_extensions.py | 108 +++++++++++++++++- 2 files changed, 116 insertions(+), 2 deletions(-) diff --git a/src/cfnlint/template/transforms/_language_extensions.py b/src/cfnlint/template/transforms/_language_extensions.py index 4e81a48d35..ddbcd89ec8 100644 --- a/src/cfnlint/template/transforms/_language_extensions.py +++ b/src/cfnlint/template/transforms/_language_extensions.py @@ -28,6 +28,8 @@ _SCALAR_TYPES = (str, int, float, bool) +_ACCOUNT_ID = None + class _ResolveError(Exception): def __init__(self, message: str, key: Any) -> None: @@ -372,6 +374,10 @@ def value( if isinstance(v, dict): if t_map[2].value(cfn, params, only_params) in v: t_map[1] = _ForEachValue.create(k) + if t_map[1].value == {"Ref": "AWS::AccountId"}: + global _ACCOUNT_ID + _ACCOUNT_ID = k + break except _ResolveError: pass @@ -439,7 +445,9 @@ def value( return region if v == "AWS::AccountId": - return account_id + if _ACCOUNT_ID is None: + raise _ResolveError("Can't resolve Fn::Ref", self._obj) + return _ACCOUNT_ID if v == "AWS::NotificationARNs": return [f"arn:{partition}:sns:{region}:{account_id}:notification"] diff --git a/test/unit/module/template/transforms/test_language_extensions.py b/test/unit/module/template/transforms/test_language_extensions.py index da4fa1871b..1acd00ab92 100644 --- a/test/unit/module/template/transforms/test_language_extensions.py +++ b/test/unit/module/template/transforms/test_language_extensions.py @@ -122,7 +122,8 @@ def test_ref(self): self.assertEqual(fe.value(self.cfn), "us-west-2") fe = _ForEachValue.create({"Ref": "AWS::AccountId"}) - self.assertEqual(fe.value(self.cfn), "123456789012") + with self.assertRaises(_ResolveError): + fe.value(self.cfn) fe = _ForEachValue.create({"Ref": "AWS::NotificationARNs"}) self.assertListEqual( @@ -832,3 +833,108 @@ def nested_set(dic, keys, value): if isinstance(key, int): dic = dic[key] dic[keys[-1]] = value + + +class TestTransformValueAccountId(TestCase): + def setUp(self) -> None: + self.template_obj = convert_dict( + { + "Transform": ["AWS::LanguageExtensions"], + "Mappings": { + "Accounts": { + "111111111111": {"AppName": ["A", "B"]}, + "222222222222": {"AppName": ["C", "D"]}, + }, + }, + "Resources": { + "Fn::ForEach::Regions": [ + "AppName", + { + "Fn::FindInMap": [ + "Accounts", + {"Ref": "AWS::AccountId"}, + "AppName", + ] + }, + { + "${AppName}Role": { + "Type": "AWS::IAM::Role", + "Properties": { + "RoleName": {"Ref": "AppName"}, + "AssumeRolePolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": ["ec2.amazonaws.com"] + }, + "Action": ["sts:AssumeRole"], + } + ], + }, + "Path": "/", + }, + } + }, + ], + }, + } + ) + + self.result = { + "Mappings": { + "Accounts": { + "111111111111": {"AppName": ["A", "B"]}, + "222222222222": {"AppName": ["C", "D"]}, + }, + }, + "Resources": { + "ARole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": ["sts:AssumeRole"], + "Effect": "Allow", + "Principal": {"Service": ["ec2.amazonaws.com"]}, + } + ], + "Version": "2012-10-17", + }, + "Path": "/", + "RoleName": "A", + }, + "Type": "AWS::IAM::Role", + }, + "BRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": ["sts:AssumeRole"], + "Effect": "Allow", + "Principal": {"Service": ["ec2.amazonaws.com"]}, + } + ], + "Version": "2012-10-17", + }, + "Path": "/", + "RoleName": "B", + }, + "Type": "AWS::IAM::Role", + }, + }, + "Transform": ["AWS::LanguageExtensions"], + } + + def test_transform(self): + self.maxDiff = None + cfn = Template(filename="", template=self.template_obj, regions=["us-east-1"]) + matches, template = language_extension(cfn) + self.assertListEqual(matches, []) + self.assertDictEqual( + template, + self.result, + template, + ) From 277a29cfbd319049f9ae278bbc299fdbc7fc431a Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Tue, 15 Oct 2024 14:07:34 -0700 Subject: [PATCH 2/3] Improve testing --- .../template/transforms/test_language_extensions.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/test/unit/module/template/transforms/test_language_extensions.py b/test/unit/module/template/transforms/test_language_extensions.py index 1acd00ab92..b94fc9e9ec 100644 --- a/test/unit/module/template/transforms/test_language_extensions.py +++ b/test/unit/module/template/transforms/test_language_extensions.py @@ -4,7 +4,7 @@ """ from copy import deepcopy -from unittest import TestCase +from unittest import TestCase, mock from cfnlint.decode import convert_dict from cfnlint.template import Template @@ -125,6 +125,13 @@ def test_ref(self): with self.assertRaises(_ResolveError): fe.value(self.cfn) + with mock.patch( + "cfnlint.template.transforms._language_extensions._ACCOUNT_ID", + "123456789012", + ): + fe = _ForEachValue.create({"Ref": "AWS::AccountId"}) + self.assertEqual(fe.value(self.cfn), "123456789012") + fe = _ForEachValue.create({"Ref": "AWS::NotificationARNs"}) self.assertListEqual( fe.value(self.cfn), ["arn:aws:sns:us-west-2:123456789012:notification"] From 9c0de35dc24adc4ae5bddb2c5558b1b8d3ed9b2a Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Tue, 15 Oct 2024 14:55:42 -0700 Subject: [PATCH 3/3] One more fix and cleanup tests --- .../transforms/_language_extensions.py | 7 ++--- .../transforms/test_language_extensions.py | 26 +++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/cfnlint/template/transforms/_language_extensions.py b/src/cfnlint/template/transforms/_language_extensions.py index ddbcd89ec8..6760be1b01 100644 --- a/src/cfnlint/template/transforms/_language_extensions.py +++ b/src/cfnlint/template/transforms/_language_extensions.py @@ -373,10 +373,11 @@ def value( for k, v in mapping.items(): if isinstance(v, dict): if t_map[2].value(cfn, params, only_params) in v: + if isinstance(t_map[1], _ForEachValueRef): + if t_map[1]._ref._value == "AWS::AccountId": + global _ACCOUNT_ID + _ACCOUNT_ID = k t_map[1] = _ForEachValue.create(k) - if t_map[1].value == {"Ref": "AWS::AccountId"}: - global _ACCOUNT_ID - _ACCOUNT_ID = k break except _ResolveError: pass diff --git a/test/unit/module/template/transforms/test_language_extensions.py b/test/unit/module/template/transforms/test_language_extensions.py index b94fc9e9ec..7741501121 100644 --- a/test/unit/module/template/transforms/test_language_extensions.py +++ b/test/unit/module/template/transforms/test_language_extensions.py @@ -6,6 +6,7 @@ from copy import deepcopy from unittest import TestCase, mock +import cfnlint.template.transforms._language_extensions from cfnlint.decode import convert_dict from cfnlint.template import Template from cfnlint.template.transforms._language_extensions import ( @@ -365,6 +366,31 @@ def test_two_mappings(self): with self.assertRaises(_ResolveError): fe.value(self.cfn) + def test_account_id(self): + + cfnlint.template.transforms._language_extensions._ACCOUNT_ID = None + + with mock.patch( + "cfnlint.template.transforms._language_extensions._ACCOUNT_ID", None + ): + self.assertIsNone( + cfnlint.template.transforms._language_extensions._ACCOUNT_ID + ) + fe = _ForEachValueFnFindInMap( + "a", + [ + "Bucket", + {"Ref": "AWS::AccountId"}, + "Names", + ], + ) + self.assertListEqual(fe.value(self.cfn), ["foo", "bar"]) + + self.assertEqual( + cfnlint.template.transforms._language_extensions._ACCOUNT_ID, + "Production", + ) + class TestTransform(TestCase): def setUp(self) -> None: