Skip to content

Commit

Permalink
Kinesis resource based policy implementation (#8463)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryhxu authored Jan 4, 2025
1 parent ec277aa commit 1eadeeb
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 0 deletions.
31 changes: 31 additions & 0 deletions moto/kinesis/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ class KinesisBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.streams: Dict[str, Stream] = OrderedDict()
self.resource_policies: Dict[str, str] = {}

@staticmethod
def default_vpc_endpoint_service(
Expand Down Expand Up @@ -1025,5 +1026,35 @@ def send_log_event(
explicit_hash_key="",
)

def put_resource_policy(self, resource_arn: str, policy_doc: str) -> None:
"""
Creates/updates resource policy and return policy object
"""
self.resource_policies[resource_arn] = policy_doc

def delete_resource_policy(self, resource_arn: str) -> None:
"""
Remove resource policy with a matching given resource arn.
"""
stream_name = resource_arn.split("/")[-1]
if stream_name not in self.streams:
raise StreamNotFoundError(resource_arn, self.account_id)
if resource_arn not in self.resource_policies:
raise ResourceNotFoundError(
message=f"No resource policy found for resource ARN {resource_arn}."
)
del self.resource_policies[resource_arn]

def get_resource_policy(self, resource_arn: str) -> str:
"""
Get resource policy with a matching given resource arn.
"""
stream_name = resource_arn.split("/")[-1]
if stream_name not in self.streams:
raise StreamNotFoundError(resource_arn, self.account_id)
if resource_arn not in self.resource_policies:
return "{}"
return self.resource_policies[resource_arn]


kinesis_backends = BackendDict(KinesisBackend, "kinesis")
21 changes: 21 additions & 0 deletions moto/kinesis/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,24 @@ def update_stream_mode(self) -> str:
stream_mode = self._get_param("StreamModeDetails")
self.kinesis_backend.update_stream_mode(stream_arn, stream_mode)
return "{}"

def put_resource_policy(self) -> str:
resource_arn = self._get_param("ResourceARN")
policy = self._get_param("Policy")
self.kinesis_backend.put_resource_policy(
resource_arn=resource_arn,
policy_doc=policy,
)
return json.dumps(dict())

def get_resource_policy(self) -> str:
resource_arn = self._get_param("ResourceARN")
policy = self.kinesis_backend.get_resource_policy(resource_arn=resource_arn)
return json.dumps({"Policy": policy})

def delete_resource_policy(self) -> str:
resource_arn = self._get_param("ResourceARN")
self.kinesis_backend.delete_resource_policy(
resource_arn=resource_arn,
)
return json.dumps(dict())
109 changes: 109 additions & 0 deletions tests/test_kinesis/test_kinesis_resource_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import json

import boto3
import pytest
from botocore.exceptions import ClientError

from moto import mock_aws


def get_policy_doc(action, stream_arn):
sts = boto3.client("sts", "us-east-1")
account_id = sts.get_caller_identity()["Account"]

policy_doc = {
"Version": "2012-10-17",
"Statement": [
{
"Principal": {"AWS": account_id},
"Effect": "Allow",
"Action": [action],
"Resource": f"{stream_arn}",
}
],
}
return json.dumps(policy_doc)


@mock_aws
def test_put_resource_policy():
client = boto3.client("kinesis", region_name="us-west-2")
client.create_stream(StreamName="my-stream", ShardCount=2)
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
stream_arn = desc["StreamARN"]

action = "kinesis:*"
json_policy_doc = get_policy_doc(action, stream_arn)
client.put_resource_policy(ResourceARN=stream_arn, Policy=json_policy_doc)
response = client.get_resource_policy(ResourceARN=stream_arn)

assert response["Policy"] == json_policy_doc


@mock_aws
def test_delete_resource_policy():
client = boto3.client("kinesis", region_name="us-west-2")
client.create_stream(StreamName="my-stream", ShardCount=2)
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
stream_arn = desc["StreamARN"]
action = "kinesis:*"
json_policy_doc = get_policy_doc(action, stream_arn)
client.put_resource_policy(ResourceARN=stream_arn, Policy=json_policy_doc)
response = client.get_resource_policy(ResourceARN=stream_arn)
assert response["Policy"] == json_policy_doc

client.delete_resource_policy(ResourceARN=stream_arn)
response = client.get_resource_policy(ResourceARN=stream_arn)
assert response["Policy"] == "{}"


@mock_aws
def test_get_resource_policy_from_unknown_resource():
client = boto3.client("kinesis", region_name="us-west-2")
client.create_stream(StreamName="my-stream", ShardCount=2)
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
stream_arn = desc["StreamARN"] + "unknown"
sts = boto3.client("sts", "us-east-1")
account_id = sts.get_caller_identity()["Account"]
with pytest.raises(ClientError) as exc:
client.get_resource_policy(ResourceARN=stream_arn)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert (
err["Message"] == f"Stream {stream_arn} under account {account_id} not found."
)


@mock_aws
def test_delete_resource_policy_from_unknown_resource():
client = boto3.client("kinesis", region_name="us-west-2")
client.create_stream(StreamName="my-stream", ShardCount=2)
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
stream_arn = desc["StreamARN"] + "unknown"
sts = boto3.client("sts", "us-east-1")
account_id = sts.get_caller_identity()["Account"]
with pytest.raises(ClientError) as exc:
client.delete_resource_policy(ResourceARN=stream_arn)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert (
err["Message"] == f"Stream {stream_arn} under account {account_id} not found."
)


@mock_aws
def test_delete_resource_policy_from_resource_without_policy():
client = boto3.client("kinesis", region_name="us-west-2")
client.create_stream(StreamName="my-stream", ShardCount=2)
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
stream_arn = desc["StreamARN"]
with pytest.raises(ClientError) as exc:
client.delete_resource_policy(ResourceARN=stream_arn)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == f"No resource policy found for resource ARN {stream_arn}."

0 comments on commit 1eadeeb

Please sign in to comment.