From 1eadeeb683eabede8d322f6ae90857b399b73dcd Mon Sep 17 00:00:00 2001 From: Jerry Xu Date: Sat, 4 Jan 2025 07:32:51 -0500 Subject: [PATCH] Kinesis resource based policy implementation (#8463) --- moto/kinesis/models.py | 31 +++++ moto/kinesis/responses.py | 21 ++++ .../test_kinesis_resource_policy.py | 109 ++++++++++++++++++ 3 files changed, 161 insertions(+) create mode 100644 tests/test_kinesis/test_kinesis_resource_policy.py diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 51a4a21e0b1c..f3492006b362 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -585,6 +585,7 @@ class KinesisBackend(BaseBackend): def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.streams: Dict[str, Stream] = OrderedDict() + self.resource_policies: Dict[str, str] = {} @staticmethod def default_vpc_endpoint_service( @@ -1025,5 +1026,35 @@ def send_log_event( explicit_hash_key="", ) + def put_resource_policy(self, resource_arn: str, policy_doc: str) -> None: + """ + Creates/updates resource policy and return policy object + """ + self.resource_policies[resource_arn] = policy_doc + + def delete_resource_policy(self, resource_arn: str) -> None: + """ + Remove resource policy with a matching given resource arn. + """ + stream_name = resource_arn.split("/")[-1] + if stream_name not in self.streams: + raise StreamNotFoundError(resource_arn, self.account_id) + if resource_arn not in self.resource_policies: + raise ResourceNotFoundError( + message=f"No resource policy found for resource ARN {resource_arn}." + ) + del self.resource_policies[resource_arn] + + def get_resource_policy(self, resource_arn: str) -> str: + """ + Get resource policy with a matching given resource arn. + """ + stream_name = resource_arn.split("/")[-1] + if stream_name not in self.streams: + raise StreamNotFoundError(resource_arn, self.account_id) + if resource_arn not in self.resource_policies: + return "{}" + return self.resource_policies[resource_arn] + kinesis_backends = BackendDict(KinesisBackend, "kinesis") diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index ba873a7ff38b..e4d46410a1cb 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -325,3 +325,24 @@ def update_stream_mode(self) -> str: stream_mode = self._get_param("StreamModeDetails") self.kinesis_backend.update_stream_mode(stream_arn, stream_mode) return "{}" + + def put_resource_policy(self) -> str: + resource_arn = self._get_param("ResourceARN") + policy = self._get_param("Policy") + self.kinesis_backend.put_resource_policy( + resource_arn=resource_arn, + policy_doc=policy, + ) + return json.dumps(dict()) + + def get_resource_policy(self) -> str: + resource_arn = self._get_param("ResourceARN") + policy = self.kinesis_backend.get_resource_policy(resource_arn=resource_arn) + return json.dumps({"Policy": policy}) + + def delete_resource_policy(self) -> str: + resource_arn = self._get_param("ResourceARN") + self.kinesis_backend.delete_resource_policy( + resource_arn=resource_arn, + ) + return json.dumps(dict()) diff --git a/tests/test_kinesis/test_kinesis_resource_policy.py b/tests/test_kinesis/test_kinesis_resource_policy.py new file mode 100644 index 000000000000..d7c4e6fe4933 --- /dev/null +++ b/tests/test_kinesis/test_kinesis_resource_policy.py @@ -0,0 +1,109 @@ +import json + +import boto3 +import pytest +from botocore.exceptions import ClientError + +from moto import mock_aws + + +def get_policy_doc(action, stream_arn): + sts = boto3.client("sts", "us-east-1") + account_id = sts.get_caller_identity()["Account"] + + policy_doc = { + "Version": "2012-10-17", + "Statement": [ + { + "Principal": {"AWS": account_id}, + "Effect": "Allow", + "Action": [action], + "Resource": f"{stream_arn}", + } + ], + } + return json.dumps(policy_doc) + + +@mock_aws +def test_put_resource_policy(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + resp = client.describe_stream(StreamName="my-stream") + desc = resp["StreamDescription"] + stream_arn = desc["StreamARN"] + + action = "kinesis:*" + json_policy_doc = get_policy_doc(action, stream_arn) + client.put_resource_policy(ResourceARN=stream_arn, Policy=json_policy_doc) + response = client.get_resource_policy(ResourceARN=stream_arn) + + assert response["Policy"] == json_policy_doc + + +@mock_aws +def test_delete_resource_policy(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + resp = client.describe_stream(StreamName="my-stream") + desc = resp["StreamDescription"] + stream_arn = desc["StreamARN"] + action = "kinesis:*" + json_policy_doc = get_policy_doc(action, stream_arn) + client.put_resource_policy(ResourceARN=stream_arn, Policy=json_policy_doc) + response = client.get_resource_policy(ResourceARN=stream_arn) + assert response["Policy"] == json_policy_doc + + client.delete_resource_policy(ResourceARN=stream_arn) + response = client.get_resource_policy(ResourceARN=stream_arn) + assert response["Policy"] == "{}" + + +@mock_aws +def test_get_resource_policy_from_unknown_resource(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + resp = client.describe_stream(StreamName="my-stream") + desc = resp["StreamDescription"] + stream_arn = desc["StreamARN"] + "unknown" + sts = boto3.client("sts", "us-east-1") + account_id = sts.get_caller_identity()["Account"] + with pytest.raises(ClientError) as exc: + client.get_resource_policy(ResourceARN=stream_arn) + err = exc.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + assert ( + err["Message"] == f"Stream {stream_arn} under account {account_id} not found." + ) + + +@mock_aws +def test_delete_resource_policy_from_unknown_resource(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + resp = client.describe_stream(StreamName="my-stream") + desc = resp["StreamDescription"] + stream_arn = desc["StreamARN"] + "unknown" + sts = boto3.client("sts", "us-east-1") + account_id = sts.get_caller_identity()["Account"] + with pytest.raises(ClientError) as exc: + client.delete_resource_policy(ResourceARN=stream_arn) + err = exc.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + assert ( + err["Message"] == f"Stream {stream_arn} under account {account_id} not found." + ) + + +@mock_aws +def test_delete_resource_policy_from_resource_without_policy(): + client = boto3.client("kinesis", region_name="us-west-2") + client.create_stream(StreamName="my-stream", ShardCount=2) + resp = client.describe_stream(StreamName="my-stream") + desc = resp["StreamDescription"] + stream_arn = desc["StreamARN"] + with pytest.raises(ClientError) as exc: + client.delete_resource_policy(ResourceARN=stream_arn) + err = exc.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + assert err["Message"] == f"No resource policy found for resource ARN {stream_arn}."