From a57c5c85d4b0d3ecb08dcfc50c615e29aee875b8 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sun, 2 Feb 2025 10:45:33 -0100 Subject: [PATCH] SecretsManager: get_secret_value() no longer errors after delayed secret rotation (#8547) --- moto/secretsmanager/models.py | 29 +++--- .../test_secretsmanager.py | 93 ++++++++++++++++++- 2 files changed, 109 insertions(+), 13 deletions(-) diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index 3f1f1ae892ef..67e6eb3d5aee 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -606,7 +606,6 @@ def _add_secret( version_stages: Optional[List[str]] = None, replica_regions: Optional[List[Dict[str, str]]] = None, force_overwrite: bool = False, - create_new_version: bool = False, ) -> Tuple[FakeSecret, bool]: if version_stages is None: version_stages = ["AWSCURRENT"] @@ -634,7 +633,7 @@ def _add_secret( secret.update(description, tags, kms_key_id, last_changed_date=update_time) - if new_version or create_new_version: + if new_version: if "AWSCURRENT" in version_stages: secret.reset_default_version(secret_version, version_id) else: @@ -769,14 +768,19 @@ def rotate_secret( # We add a "pending" stage. The previous version remains as "current" for now. # Caller is responsible for creating the new secret in the Lambda - self._add_secret( - secret_id, - description=secret.description, - tags=secret.tags, - version_id=new_version_id, - version_stages=["AWSPENDING"], - create_new_version=True, - ) + secret_version = { + "createdate": int(time.time()), + "version_id": new_version_id, + "version_stages": ["AWSPENDING"], + } + if not rotate_immediately: + if secret.secret_string is not None: + secret_version["secret_string"] = secret.secret_string + if secret.secret_binary is not None: + secret_version["secret_binary"] = secret.secret_binary + + secret.remove_version_stages_from_old_versions(["AWSPENDING"]) + secret.versions[new_version_id] = secret_version secret.rotation_requested = True secret.rotation_lambda_arn = rotation_lambda_arn or "" @@ -819,7 +823,10 @@ def rotate_secret( headers=request_headers, response_headers=response_headers, ) - secret.set_default_version_id(new_version_id) + if rotate_immediately: + # If we don't rotate, we only invoke the testSecret step + # This should be done with the existing (old) version ID + secret.set_default_version_id(new_version_id) elif secret.versions: # AWS will always require a Lambda ARN diff --git a/tests/test_secretsmanager/test_secretsmanager.py b/tests/test_secretsmanager/test_secretsmanager.py index e0c877d37892..a98e6295fac0 100644 --- a/tests/test_secretsmanager/test_secretsmanager.py +++ b/tests/test_secretsmanager/test_secretsmanager.py @@ -2,6 +2,7 @@ import re import string from datetime import datetime, timedelta, timezone +from time import sleep from unittest import SkipTest from uuid import uuid4 @@ -1309,13 +1310,15 @@ def test_rotate_secret_using_lambda(secret=None, iam_role_arn=None, table_name=N if updated_secret["SecretString"] == "UpdatedValue": secret_not_updated = False else: - from time import sleep - sleep(5) rotated_version = updated_secret["VersionId"] assert initial_version != rotated_version + u2 = secrets_conn.get_secret_value(SecretId=secret["ARN"]) + assert u2["SecretString"] == "UpdatedValue" + assert u2["VersionId"] == rotated_version + metadata = secrets_conn.describe_secret(SecretId=secret["ARN"]) assert metadata["VersionIdsToStages"][initial_version] == ["AWSPREVIOUS"] assert metadata["VersionIdsToStages"][rotated_version] == ["AWSCURRENT"] @@ -1335,6 +1338,92 @@ def test_rotate_secret_using_lambda(secret=None, iam_role_arn=None, table_name=N assert finish_secret["pending_value"]["VersionStages"] == ["AWSPENDING"] +@pytest.mark.aws_verified +@dynamodb_aws_verified() +@lambda_aws_verified +@secretsmanager_aws_verified +def test_rotate_secret_using_lambda_dont_rotate_immediately( + secret=None, iam_role_arn=None, table_name=None +): + role_name = iam_role_arn.split("/")[-1] + if not allow_aws_request() and not settings.TEST_SERVER_MODE: + raise SkipTest("Can only test this in ServerMode") + + iam = boto3.client("iam", "us-east-1") + if allow_aws_request(): + iam.attach_role_policy( + PolicyArn="arn:aws:iam::aws:policy/SecretsManagerReadWrite", + RoleName=role_name, + ) + # Testing this against AWS itself is a bit of pain + # Uncomment this to get more insights into what is happening during execution of the Lambda + # iam.attach_role_policy( + # PolicyArn="arn:aws:iam::aws:policy/CloudWatchLogsFullAccess", + # RoleName=role_name, + # ) + iam.attach_role_policy( + PolicyArn="arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess", + RoleName=role_name, + ) + + function_name = "moto_test_" + str(uuid4())[0:6] + + # Passing a `RotationLambdaARN` value to `rotate_secret` should invoke lambda + lambda_conn = boto3.client("lambda", region_name="us-east-1") + func = lambda_conn.create_function( + FunctionName=function_name, + Runtime="python3.11", + Role=iam_role_arn, + Handler="lambda_function.lambda_handler", + Code={"ZipFile": get_rotation_zip_file()}, + Publish=True, + Environment={"Variables": {"table_name": table_name}}, + ) + lambda_conn.add_permission( + FunctionName=function_name, + StatementId="allow_secrets_manager", + Action="lambda:InvokeFunction", + Principal="secretsmanager.amazonaws.com", + ) + lambda_conn.get_waiter("function_active_v2").wait(FunctionName=function_name) + + secrets_conn = boto3.client("secretsmanager", region_name="us-east-1") + + initial_version = secret["VersionId"] + + secrets_conn.rotate_secret( + SecretId=secret["ARN"], + RotationLambdaARN=func["FunctionArn"], + RotationRules={"AutomaticallyAfterDays": 30}, + RotateImmediately=False, + ) + + lambda_conn.delete_function(FunctionName=function_name) + + current_secret = secrets_conn.get_secret_value( + SecretId=secret["ARN"], VersionStage="AWSCURRENT" + ) + assert current_secret["SecretString"] == "old_secret" + assert current_secret["VersionId"] == initial_version + + secret = secrets_conn.get_secret_value(SecretId=secret["ARN"]) + assert secret["SecretString"] == "old_secret" + assert secret["VersionId"] == initial_version + + dynamodb = boto3.resource("dynamodb", region_name="us-east-1") + items = dynamodb.Table(table_name).scan()["Items"] + + attempts = 0 + while not items and attempts < 10: + sleep(5) + items = dynamodb.Table(table_name).scan()["Items"] + attempts += 1 + + assert items[0]["pending_value"]["VersionStages"] == ["AWSPENDING"] + assert items[0]["pending_value"]["SecretString"] == "old_secret" + assert items[0]["pending_value"]["VersionId"] != initial_version + + @mock_aws def test_put_secret_value_on_non_existing_secret(): conn = boto3.client("secretsmanager", region_name="us-west-2")