Skip to content

Commit

Permalink
Create rule W3663 to validate lmbd permission account (#3523)
Browse files Browse the repository at this point in the history
* Create rule W3663 to validate lmbd permission account
  • Loading branch information
kddejong authored Jul 25, 2024
1 parent c6b8148 commit 0eeeb6b
Show file tree
Hide file tree
Showing 2 changed files with 292 additions and 0 deletions.
99 changes: 99 additions & 0 deletions src/cfnlint/rules/resources/lmbd/PermissionSourceAccount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from typing import Any

import regex as re

from cfnlint.helpers import ensure_list, is_function
from cfnlint.jsonschema import ValidationError, ValidationResult
from cfnlint.jsonschema.protocols import Validator
from cfnlint.rules.jsonschema import CfnLintKeyword


class PermissionSourceAccount(CfnLintKeyword):

id = "W3663"
shortdesc = "Validate SourceAccount is required property"
description = (
"When configuration a Lambda permission with a SourceArn "
"that doesn't have an AccountId you should also specify "
"the SourceAccount"
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-permission.html#cfn-lambda-permission-sourceaccount"
tags = ["resources", "lambda", "permission"]

def __init__(self):
super().__init__(
keywords=["Resources/AWS::Lambda::Permission/Properties"],
)

def _validate_sub_has_account_id(self, validator: Validator, value: Any) -> bool:
value = ensure_list(value)

if isinstance(value[0], str):
if re.search(r":(\d{12}|\${AWS::AccountId}):", value[0]):
return True

return False
return True

def _validate_is_gettatt_to_bucket(self, validator: Validator, value: Any) -> bool:
value = ensure_list(value)[0].split(".")[0]

resource = validator.context.resources[value]
if resource.type == "AWS::S3::Bucket":
return True
return False

def validate(
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:
if not isinstance(instance, dict):
return

for scenario in validator.cfn.get_object_without_conditions(
instance, ["SourceArn", "SourceAccount"]
):
if scenario.get("Scenario"):
scenario_validator = validator.evolve(
context=validator.context.evolve(
conditions=validator.context.conditions.evolve(
status=scenario.get("Scenario")
)
)
)
else:
scenario_validator = validator.evolve()

source_arn = scenario.get("Object").get("SourceArn")
source_account = scenario.get("Object").get("SourceAccount")
if not source_arn:
continue

if isinstance(source_arn, str):
if re.search(r":\d{12}:", source_arn):
continue

fn_k, fn_v = is_function(source_arn)
if fn_k is not None:
if fn_k == "Fn::Sub":
if self._validate_sub_has_account_id(scenario_validator, fn_v):
continue
elif fn_k == "Fn::GetAtt":
if not self._validate_is_gettatt_to_bucket(
scenario_validator, fn_v
):
continue
else:
continue

if not source_account:
yield ValidationError(
"'SourceAccount' is a required property",
validator="required",
)
193 changes: 193 additions & 0 deletions test/unit/rules/resources/lmbd/test_permission_source_account.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

import pytest

from cfnlint.jsonschema import ValidationError
from cfnlint.rules.resources.lmbd.PermissionSourceAccount import PermissionSourceAccount


@pytest.fixture(scope="module")
def rule():
rule = PermissionSourceAccount()
yield rule


@pytest.fixture
def template():
return {
"AWSTemplateFormatVersion": "2010-09-09",
"Conditions": {
"IsUsEast1": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]},
},
"Resources": {
"Bucket": {"Type": "AWS::S3::Bucket"},
"SQS": {"Type": "AWS::SQS::Queue"},
},
}


@pytest.mark.parametrize(
"instance,expected",
[
(
{
"SourceArn": "arn:aws:s3:::bucket_name",
"SourceAccount": "123456789012",
},
[],
),
(
{},
[],
),
(
[],
[],
),
(
{
"SourceArn": [],
},
[],
),
(
{
"SourceArn": "arn:aws:s3:::bucket_name",
},
[
ValidationError(
"'SourceAccount' is a required property",
validator="required",
)
],
),
(
{
"SourceArn": "arn:aws:sqs:us-east-1:123456789012:queue",
},
[],
),
(
{
"SourceArn": {
"Fn::Sub": (
"arn:${AWS::Partition}:sqs:"
"${AWS::Region}:${AWS::AccountId}:queue"
)
},
},
[],
),
(
{
"SourceArn": {"Fn::Sub": "arn:${AWS::Partition}:s3:::bucket"},
},
[
ValidationError(
"'SourceAccount' is a required property",
validator="required",
)
],
),
(
{
"SourceArn": {"Fn::Sub": [[], {}]},
},
[],
),
(
{
"SourceArn": {"Fn::Sub": "arn:${AWS::Partition}:s3:::bucket"},
"SourceAccount": {"Ref": "AWS::AccountId"},
},
[],
),
(
{
"SourceArn": {"Fn::GetAtt": ["Bucket", "Arn"]},
"SourceAccount": {"Ref": "AWS::AccountId"},
},
[],
),
(
{
"SourceArn": {"Fn::GetAtt": ["Bucket", "Arn"]},
},
[
ValidationError(
"'SourceAccount' is a required property",
validator="required",
)
],
),
(
{
"SourceArn": {"Fn::GetAtt": ["SQS", "Arn"]},
"SourceAccount": {"Ref": "AWS::AccountId"},
},
[],
),
(
{
"SourceArn": {"Fn::GetAtt": ["SQS", "Arn"]},
},
[],
),
(
{
"SourceArn": {"Ref": "Foo"},
},
[],
),
(
{
"SourceArn": {
"Fn::If": [
"IsUsEast1",
{"Fn::GetAtt": ["Bucket", "Arn"]},
{"Fn::GetAtt": ["SQS", "Arn"]},
]
},
"SourceAccount": {
"Fn::If": [
"IsUsEast1",
{"Ref": "AWS::AccountId"},
{"Ref": "AWS::NoValue"},
]
},
},
[],
),
(
{
"SourceArn": {
"Fn::If": [
"IsUsEast1",
{"Fn::GetAtt": ["Bucket", "Arn"]},
{"Fn::GetAtt": ["SQS", "Arn"]},
]
},
"SourceAccount": {
"Fn::If": [
"IsUsEast1",
{"Ref": "AWS::NoValue"},
{"Ref": "AWS::AccountId"},
]
},
},
[
ValidationError(
"'SourceAccount' is a required property",
validator="required",
)
],
),
],
)
def test_validate(instance, expected, rule, validator):
errs = list(rule.validate(validator, "", instance, {}))

assert errs == expected, f"Expected {expected} got {errs}"

0 comments on commit 0eeeb6b

Please sign in to comment.