diff --git a/src/cfnlint/rules/resources/lmbd/PermissionSourceAccount.py b/src/cfnlint/rules/resources/lmbd/PermissionSourceAccount.py new file mode 100644 index 0000000000..f1137c7ba7 --- /dev/null +++ b/src/cfnlint/rules/resources/lmbd/PermissionSourceAccount.py @@ -0,0 +1,99 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from typing import Any + +import regex as re + +from cfnlint.helpers import ensure_list, is_function +from cfnlint.jsonschema import ValidationError, ValidationResult +from cfnlint.jsonschema.protocols import Validator +from cfnlint.rules.jsonschema import CfnLintKeyword + + +class PermissionSourceAccount(CfnLintKeyword): + + id = "W3663" + shortdesc = "Validate SourceAccount is required property" + description = ( + "When configuration a Lambda permission with a SourceArn " + "that doesn't have an AccountId you should also specify " + "the SourceAccount" + ) + source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-permission.html#cfn-lambda-permission-sourceaccount" + tags = ["resources", "lambda", "permission"] + + def __init__(self): + super().__init__( + keywords=["Resources/AWS::Lambda::Permission/Properties"], + ) + + def _validate_sub_has_account_id(self, validator: Validator, value: Any) -> bool: + value = ensure_list(value) + + if isinstance(value[0], str): + if re.search(r":(\d{12}|\${AWS::AccountId}):", value[0]): + return True + + return False + return True + + def _validate_is_gettatt_to_bucket(self, validator: Validator, value: Any) -> bool: + value = ensure_list(value)[0].split(".")[0] + + resource = validator.context.resources[value] + if resource.type == "AWS::S3::Bucket": + return True + return False + + def validate( + self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any] + ) -> ValidationResult: + if not isinstance(instance, dict): + return + + for scenario in validator.cfn.get_object_without_conditions( + instance, ["SourceArn", "SourceAccount"] + ): + if scenario.get("Scenario"): + scenario_validator = validator.evolve( + context=validator.context.evolve( + conditions=validator.context.conditions.evolve( + status=scenario.get("Scenario") + ) + ) + ) + else: + scenario_validator = validator.evolve() + + source_arn = scenario.get("Object").get("SourceArn") + source_account = scenario.get("Object").get("SourceAccount") + if not source_arn: + continue + + if isinstance(source_arn, str): + if re.search(r":\d{12}:", source_arn): + continue + + fn_k, fn_v = is_function(source_arn) + if fn_k is not None: + if fn_k == "Fn::Sub": + if self._validate_sub_has_account_id(scenario_validator, fn_v): + continue + elif fn_k == "Fn::GetAtt": + if not self._validate_is_gettatt_to_bucket( + scenario_validator, fn_v + ): + continue + else: + continue + + if not source_account: + yield ValidationError( + "'SourceAccount' is a required property", + validator="required", + ) diff --git a/test/unit/rules/resources/lmbd/test_permission_source_account.py b/test/unit/rules/resources/lmbd/test_permission_source_account.py new file mode 100644 index 0000000000..3135a86ff1 --- /dev/null +++ b/test/unit/rules/resources/lmbd/test_permission_source_account.py @@ -0,0 +1,193 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest + +from cfnlint.jsonschema import ValidationError +from cfnlint.rules.resources.lmbd.PermissionSourceAccount import PermissionSourceAccount + + +@pytest.fixture(scope="module") +def rule(): + rule = PermissionSourceAccount() + yield rule + + +@pytest.fixture +def template(): + return { + "AWSTemplateFormatVersion": "2010-09-09", + "Conditions": { + "IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, + }, + "Resources": { + "Bucket": {"Type": "AWS::S3::Bucket"}, + "SQS": {"Type": "AWS::SQS::Queue"}, + }, + } + + +@pytest.mark.parametrize( + "instance,expected", + [ + ( + { + "SourceArn": "arn:aws:s3:::bucket_name", + "SourceAccount": "123456789012", + }, + [], + ), + ( + {}, + [], + ), + ( + [], + [], + ), + ( + { + "SourceArn": [], + }, + [], + ), + ( + { + "SourceArn": "arn:aws:s3:::bucket_name", + }, + [ + ValidationError( + "'SourceAccount' is a required property", + validator="required", + ) + ], + ), + ( + { + "SourceArn": "arn:aws:sqs:us-east-1:123456789012:queue", + }, + [], + ), + ( + { + "SourceArn": { + "Fn::Sub": ( + "arn:${AWS::Partition}:sqs:" + "${AWS::Region}:${AWS::AccountId}:queue" + ) + }, + }, + [], + ), + ( + { + "SourceArn": {"Fn::Sub": "arn:${AWS::Partition}:s3:::bucket"}, + }, + [ + ValidationError( + "'SourceAccount' is a required property", + validator="required", + ) + ], + ), + ( + { + "SourceArn": {"Fn::Sub": [[], {}]}, + }, + [], + ), + ( + { + "SourceArn": {"Fn::Sub": "arn:${AWS::Partition}:s3:::bucket"}, + "SourceAccount": {"Ref": "AWS::AccountId"}, + }, + [], + ), + ( + { + "SourceArn": {"Fn::GetAtt": ["Bucket", "Arn"]}, + "SourceAccount": {"Ref": "AWS::AccountId"}, + }, + [], + ), + ( + { + "SourceArn": {"Fn::GetAtt": ["Bucket", "Arn"]}, + }, + [ + ValidationError( + "'SourceAccount' is a required property", + validator="required", + ) + ], + ), + ( + { + "SourceArn": {"Fn::GetAtt": ["SQS", "Arn"]}, + "SourceAccount": {"Ref": "AWS::AccountId"}, + }, + [], + ), + ( + { + "SourceArn": {"Fn::GetAtt": ["SQS", "Arn"]}, + }, + [], + ), + ( + { + "SourceArn": {"Ref": "Foo"}, + }, + [], + ), + ( + { + "SourceArn": { + "Fn::If": [ + "IsUsEast1", + {"Fn::GetAtt": ["Bucket", "Arn"]}, + {"Fn::GetAtt": ["SQS", "Arn"]}, + ] + }, + "SourceAccount": { + "Fn::If": [ + "IsUsEast1", + {"Ref": "AWS::AccountId"}, + {"Ref": "AWS::NoValue"}, + ] + }, + }, + [], + ), + ( + { + "SourceArn": { + "Fn::If": [ + "IsUsEast1", + {"Fn::GetAtt": ["Bucket", "Arn"]}, + {"Fn::GetAtt": ["SQS", "Arn"]}, + ] + }, + "SourceAccount": { + "Fn::If": [ + "IsUsEast1", + {"Ref": "AWS::NoValue"}, + {"Ref": "AWS::AccountId"}, + ] + }, + }, + [ + ValidationError( + "'SourceAccount' is a required property", + validator="required", + ) + ], + ), + ], +) +def test_validate(instance, expected, rule, validator): + errs = list(rule.validate(validator, "", instance, {})) + + assert errs == expected, f"Expected {expected} got {errs}"