Skip to content

Commit

Permalink
SecretsManager: get_secret_value() no longer errors after delayed sec…
Browse files Browse the repository at this point in the history
…ret rotation (#8547)
  • Loading branch information
bblommers authored Feb 2, 2025
1 parent a67975b commit a57c5c8
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 13 deletions.
29 changes: 18 additions & 11 deletions moto/secretsmanager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,6 @@ def _add_secret(
version_stages: Optional[List[str]] = None,
replica_regions: Optional[List[Dict[str, str]]] = None,
force_overwrite: bool = False,
create_new_version: bool = False,
) -> Tuple[FakeSecret, bool]:
if version_stages is None:
version_stages = ["AWSCURRENT"]
Expand Down Expand Up @@ -634,7 +633,7 @@ def _add_secret(

secret.update(description, tags, kms_key_id, last_changed_date=update_time)

if new_version or create_new_version:
if new_version:
if "AWSCURRENT" in version_stages:
secret.reset_default_version(secret_version, version_id)
else:
Expand Down Expand Up @@ -769,14 +768,19 @@ def rotate_secret(

# We add a "pending" stage. The previous version remains as "current" for now.
# Caller is responsible for creating the new secret in the Lambda
self._add_secret(
secret_id,
description=secret.description,
tags=secret.tags,
version_id=new_version_id,
version_stages=["AWSPENDING"],
create_new_version=True,
)
secret_version = {
"createdate": int(time.time()),
"version_id": new_version_id,
"version_stages": ["AWSPENDING"],
}
if not rotate_immediately:
if secret.secret_string is not None:
secret_version["secret_string"] = secret.secret_string
if secret.secret_binary is not None:
secret_version["secret_binary"] = secret.secret_binary

secret.remove_version_stages_from_old_versions(["AWSPENDING"])
secret.versions[new_version_id] = secret_version

secret.rotation_requested = True
secret.rotation_lambda_arn = rotation_lambda_arn or ""
Expand Down Expand Up @@ -819,7 +823,10 @@ def rotate_secret(
headers=request_headers,
response_headers=response_headers,
)
secret.set_default_version_id(new_version_id)
if rotate_immediately:
# If we don't rotate, we only invoke the testSecret step
# This should be done with the existing (old) version ID
secret.set_default_version_id(new_version_id)

elif secret.versions:
# AWS will always require a Lambda ARN
Expand Down
93 changes: 91 additions & 2 deletions tests/test_secretsmanager/test_secretsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import re
import string
from datetime import datetime, timedelta, timezone
from time import sleep
from unittest import SkipTest
from uuid import uuid4

Expand Down Expand Up @@ -1309,13 +1310,15 @@ def test_rotate_secret_using_lambda(secret=None, iam_role_arn=None, table_name=N
if updated_secret["SecretString"] == "UpdatedValue":
secret_not_updated = False
else:
from time import sleep

sleep(5)
rotated_version = updated_secret["VersionId"]

assert initial_version != rotated_version

u2 = secrets_conn.get_secret_value(SecretId=secret["ARN"])
assert u2["SecretString"] == "UpdatedValue"
assert u2["VersionId"] == rotated_version

metadata = secrets_conn.describe_secret(SecretId=secret["ARN"])
assert metadata["VersionIdsToStages"][initial_version] == ["AWSPREVIOUS"]
assert metadata["VersionIdsToStages"][rotated_version] == ["AWSCURRENT"]
Expand All @@ -1335,6 +1338,92 @@ def test_rotate_secret_using_lambda(secret=None, iam_role_arn=None, table_name=N
assert finish_secret["pending_value"]["VersionStages"] == ["AWSPENDING"]


@pytest.mark.aws_verified
@dynamodb_aws_verified()
@lambda_aws_verified
@secretsmanager_aws_verified
def test_rotate_secret_using_lambda_dont_rotate_immediately(
secret=None, iam_role_arn=None, table_name=None
):
role_name = iam_role_arn.split("/")[-1]
if not allow_aws_request() and not settings.TEST_SERVER_MODE:
raise SkipTest("Can only test this in ServerMode")

iam = boto3.client("iam", "us-east-1")
if allow_aws_request():
iam.attach_role_policy(
PolicyArn="arn:aws:iam::aws:policy/SecretsManagerReadWrite",
RoleName=role_name,
)
# Testing this against AWS itself is a bit of pain
# Uncomment this to get more insights into what is happening during execution of the Lambda
# iam.attach_role_policy(
# PolicyArn="arn:aws:iam::aws:policy/CloudWatchLogsFullAccess",
# RoleName=role_name,
# )
iam.attach_role_policy(
PolicyArn="arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess",
RoleName=role_name,
)

function_name = "moto_test_" + str(uuid4())[0:6]

# Passing a `RotationLambdaARN` value to `rotate_secret` should invoke lambda
lambda_conn = boto3.client("lambda", region_name="us-east-1")
func = lambda_conn.create_function(
FunctionName=function_name,
Runtime="python3.11",
Role=iam_role_arn,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_rotation_zip_file()},
Publish=True,
Environment={"Variables": {"table_name": table_name}},
)
lambda_conn.add_permission(
FunctionName=function_name,
StatementId="allow_secrets_manager",
Action="lambda:InvokeFunction",
Principal="secretsmanager.amazonaws.com",
)
lambda_conn.get_waiter("function_active_v2").wait(FunctionName=function_name)

secrets_conn = boto3.client("secretsmanager", region_name="us-east-1")

initial_version = secret["VersionId"]

secrets_conn.rotate_secret(
SecretId=secret["ARN"],
RotationLambdaARN=func["FunctionArn"],
RotationRules={"AutomaticallyAfterDays": 30},
RotateImmediately=False,
)

lambda_conn.delete_function(FunctionName=function_name)

current_secret = secrets_conn.get_secret_value(
SecretId=secret["ARN"], VersionStage="AWSCURRENT"
)
assert current_secret["SecretString"] == "old_secret"
assert current_secret["VersionId"] == initial_version

secret = secrets_conn.get_secret_value(SecretId=secret["ARN"])
assert secret["SecretString"] == "old_secret"
assert secret["VersionId"] == initial_version

dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
items = dynamodb.Table(table_name).scan()["Items"]

attempts = 0
while not items and attempts < 10:
sleep(5)
items = dynamodb.Table(table_name).scan()["Items"]
attempts += 1

assert items[0]["pending_value"]["VersionStages"] == ["AWSPENDING"]
assert items[0]["pending_value"]["SecretString"] == "old_secret"
assert items[0]["pending_value"]["VersionId"] != initial_version


@mock_aws
def test_put_secret_value_on_non_existing_secret():
conn = boto3.client("secretsmanager", region_name="us-west-2")
Expand Down

0 comments on commit a57c5c8

Please sign in to comment.