From 4886ac685a4c96ff763bdc62927e786b0236aa38 Mon Sep 17 00:00:00 2001 From: Stefan Nordhausen Date: Sat, 4 Jan 2025 14:21:10 +0100 Subject: [PATCH] Refactor test_rds.py (#8461) --- tests/test_rds/test_rds.py | 1933 ++++++++++++------------------------ 1 file changed, 626 insertions(+), 1307 deletions(-) diff --git a/tests/test_rds/test_rds.py b/tests/test_rds/test_rds.py index 9003cccc7b10..1a6f415af9f1 100644 --- a/tests/test_rds/test_rds.py +++ b/tests/test_rds/test_rds.py @@ -14,10 +14,27 @@ DEFAULT_REGION = "us-west-2" +@pytest.fixture(name="client") @mock_aws -def test_create_database(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( +def get_rds_client(): + return boto3.client("rds", region_name=DEFAULT_REGION) + + +def create_db_instance(**extra_kwargs): + client = boto3.client("rds", region_name=DEFAULT_REGION) + kwargs = { + "DBInstanceIdentifier": "db-master-1", + "Engine": "postgres", + "DBName": "staging-postgres", + "DBInstanceClass": "db.m1.small", + } + kwargs.update(extra_kwargs) + return client.create_db_instance(**kwargs)["DBInstance"] + + +@mock_aws +def test_create_database(client): + database = client.create_db_instance( DBInstanceIdentifier="db-master-1", AllocatedStorage=10, Engine="postgres", @@ -56,30 +73,14 @@ def test_create_database(): @mock_aws def test_database_with_deletion_protection_cannot_be_deleted(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - DeletionProtection=True, - ) - db_instance = database["DBInstance"] + db_instance = create_db_instance(DeletionProtection=True) assert db_instance["DBInstanceClass"] == "db.m1.small" assert db_instance["DeletionProtection"] is True @mock_aws def test_create_database_no_allocated_storage(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - ) - db_instance = database["DBInstance"] + db_instance = create_db_instance() assert db_instance["Engine"] == "postgres" assert db_instance["StorageType"] == "gp2" assert db_instance["AllocatedStorage"] == 20 @@ -88,15 +89,8 @@ def test_create_database_no_allocated_storage(): @mock_aws def test_create_database_invalid_preferred_maintenance_window_more_24_hours(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) with pytest.raises(ClientError) as ex: - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - PreferredMaintenanceWindow="mon:16:00-tue:17:00", - ) + create_db_instance(PreferredMaintenanceWindow="mon:16:00-tue:17:00") err = ex.value.response["Error"] assert err["Code"] == "InvalidParameterValue" assert err["Message"] == "Maintenance window must be less than 24 hours." @@ -104,15 +98,8 @@ def test_create_database_invalid_preferred_maintenance_window_more_24_hours(): @mock_aws def test_create_database_invalid_preferred_maintenance_window_less_30_mins(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) with pytest.raises(ClientError) as ex: - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - PreferredMaintenanceWindow="mon:16:00-mon:16:05", - ) + create_db_instance(PreferredMaintenanceWindow="mon:16:00-mon:16:05") err = ex.value.response["Error"] assert err["Code"] == "InvalidParameterValue" assert err["Message"] == "The maintenance window must be at least 30 minutes." @@ -120,15 +107,8 @@ def test_create_database_invalid_preferred_maintenance_window_less_30_mins(): @mock_aws def test_create_database_invalid_preferred_maintenance_window_value(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) with pytest.raises(ClientError) as ex: - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - PreferredMaintenanceWindow="sim:16:00-mon:16:30", - ) + create_db_instance(PreferredMaintenanceWindow="sim:16:00-mon:16:30") err = ex.value.response["Error"] assert err["Code"] == "InvalidParameterValue" assert "Invalid day:hour:minute" in err["Message"] @@ -136,15 +116,8 @@ def test_create_database_invalid_preferred_maintenance_window_value(): @mock_aws def test_create_database_invalid_preferred_maintenance_window_format(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) with pytest.raises(ClientError) as ex: - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - PreferredMaintenanceWindow="mon:16tue:17:00", - ) + create_db_instance(PreferredMaintenanceWindow="mon:16tue:17:00") err = ex.value.response["Error"] assert err["Code"] == "InvalidParameterValue" assert ( @@ -155,13 +128,8 @@ def test_create_database_invalid_preferred_maintenance_window_format(): @mock_aws def test_create_database_preferred_backup_window_overlap_no_spill(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) with pytest.raises(ClientError) as ex: - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", + create_db_instance( PreferredMaintenanceWindow="wed:18:00-wed:22:00", PreferredBackupWindow="20:00-20:30", ) @@ -174,13 +142,8 @@ def test_create_database_preferred_backup_window_overlap_no_spill(): @mock_aws def test_create_database_preferred_backup_window_overlap_maintenance_window_spill(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) with pytest.raises(ClientError) as ex: - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", + create_db_instance( PreferredMaintenanceWindow="wed:18:00-thu:01:00", PreferredBackupWindow="00:00-00:30", ) @@ -193,13 +156,8 @@ def test_create_database_preferred_backup_window_overlap_maintenance_window_spil @mock_aws def test_create_database_preferred_backup_window_overlap_backup_window_spill(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) with pytest.raises(ClientError) as ex: - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", + create_db_instance( PreferredMaintenanceWindow="thu:00:00-thu:14:00", PreferredBackupWindow="23:50-00:20", ) @@ -212,13 +170,8 @@ def test_create_database_preferred_backup_window_overlap_backup_window_spill(): @mock_aws def test_create_database_preferred_backup_window_overlap_both_spill(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) with pytest.raises(ClientError) as ex: - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", + create_db_instance( PreferredMaintenanceWindow="wed:18:00-thu:01:00", PreferredBackupWindow="23:50-00:20", ) @@ -231,66 +184,33 @@ def test_create_database_preferred_backup_window_overlap_both_spill(): @mock_aws def test_create_database_valid_preferred_maintenance_window_format(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - PreferredMaintenanceWindow="sun:16:00-sun:16:30", - ) - db_instance = database["DBInstance"] + db_instance = create_db_instance(PreferredMaintenanceWindow="sun:16:00-sun:16:30") assert db_instance["DBInstanceClass"] == "db.m1.small" assert db_instance["PreferredMaintenanceWindow"] == "sun:16:00-sun:16:30" @mock_aws def test_create_database_valid_preferred_maintenance_window_uppercase_format(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - PreferredMaintenanceWindow="MON:16:00-TUE:01:30", - ) - db_instance = database["DBInstance"] + db_instance = create_db_instance(PreferredMaintenanceWindow="MON:16:00-TUE:01:30") assert db_instance["DBInstanceClass"] == "db.m1.small" assert db_instance["PreferredMaintenanceWindow"] == "mon:16:00-tue:01:30" @mock_aws def test_create_database_non_existing_option_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) with pytest.raises(ClientError): - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - OptionGroupName="non-existing", - ) + create_db_instance(OptionGroupName="non-existing") @mock_aws -def test_create_database_with_option_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_option_group( +def test_create_database_with_option_group(client): + client.create_option_group( OptionGroupName="my-og", EngineName="mysql", MajorEngineVersion="5.6", OptionGroupDescription="test option group", ) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - OptionGroupName="my-og", - ) - db_instance = database["DBInstance"] + db_instance = create_db_instance(AllocatedStorage=10, OptionGroupName="my-og") assert db_instance["AllocatedStorage"] == 10 assert db_instance["DBInstanceClass"] == "db.m1.small" assert db_instance["DBName"] == "staging-postgres" @@ -298,294 +218,212 @@ def test_create_database_with_option_group(): @mock_aws -def test_stop_database(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - LicenseModel="license-included", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - mydb = conn.describe_db_instances( - DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"] +def test_stop_database(client): + db_instance = create_db_instance() + mydb = client.describe_db_instances( + DBInstanceIdentifier=db_instance["DBInstanceIdentifier"] )["DBInstances"][0] assert mydb["DBInstanceStatus"] == "available" # test stopping database should shutdown - response = conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + response = client.stop_db_instance( + DBInstanceIdentifier=mydb["DBInstanceIdentifier"] + ) assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 assert response["DBInstance"]["DBInstanceStatus"] == "stopped" # test rdsclient error when trying to stop an already stopped database with pytest.raises(ClientError): - conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + client.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) # test stopping a stopped database with snapshot should error and no # snapshot should exist for that call with pytest.raises(ClientError): - conn.stop_db_instance( + client.stop_db_instance( DBInstanceIdentifier=mydb["DBInstanceIdentifier"], DBSnapshotIdentifier="rocky4570-rds-snap", ) - response = conn.describe_db_snapshots() + response = client.describe_db_snapshots() assert response["DBSnapshots"] == [] @mock_aws -def test_start_database(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - LicenseModel="license-included", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - mydb = conn.describe_db_instances( - DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"] +def test_start_database(client): + db_instance = create_db_instance() + mydb = client.describe_db_instances( + DBInstanceIdentifier=db_instance["DBInstanceIdentifier"] )["DBInstances"][0] assert mydb["DBInstanceStatus"] == "available" # test starting an already started database should error with pytest.raises(ClientError): - conn.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + client.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) # stop and test start - should go from stopped to available, create # snapshot and check snapshot - response = conn.stop_db_instance( + response = client.stop_db_instance( DBInstanceIdentifier=mydb["DBInstanceIdentifier"], DBSnapshotIdentifier="rocky4570-rds-snap", ) assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 assert response["DBInstance"]["DBInstanceStatus"] == "stopped" - response = conn.describe_db_snapshots() + response = client.describe_db_snapshots() assert response["DBSnapshots"][0]["DBSnapshotIdentifier"] == "rocky4570-rds-snap" - response = conn.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + response = client.start_db_instance( + DBInstanceIdentifier=mydb["DBInstanceIdentifier"] + ) assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 assert response["DBInstance"]["DBInstanceStatus"] == "available" # starting database should not remove snapshot - response = conn.describe_db_snapshots() + response = client.describe_db_snapshots() assert response["DBSnapshots"][0]["DBSnapshotIdentifier"] == "rocky4570-rds-snap" # test stopping database, create snapshot with existing snapshot already # created should throw error with pytest.raises(ClientError): - conn.stop_db_instance( + client.stop_db_instance( DBInstanceIdentifier=mydb["DBInstanceIdentifier"], DBSnapshotIdentifier="rocky4570-rds-snap", ) # test stopping database not invoking snapshot should succeed. - response = conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + response = client.stop_db_instance( + DBInstanceIdentifier=mydb["DBInstanceIdentifier"] + ) assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 assert response["DBInstance"]["DBInstanceStatus"] == "stopped" @mock_aws -def test_fail_to_stop_multi_az_and_sqlserver(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, +def test_fail_to_stop_multi_az_and_sqlserver(client): + db_instance = create_db_instance( Engine="sqlserver-ee", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", LicenseModel="license-included", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], MultiAZ=True, ) - mydb = conn.describe_db_instances( - DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"] + mydb = client.describe_db_instances( + DBInstanceIdentifier=db_instance["DBInstanceIdentifier"] )["DBInstances"][0] assert mydb["DBInstanceStatus"] == "available" # multi-az databases arent allowed to be shutdown at this time. with pytest.raises(ClientError): - conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + client.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) # multi-az databases arent allowed to be started up at this time. with pytest.raises(ClientError): - conn.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + client.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) @mock_aws -def test_stop_multi_az_postgres(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - LicenseModel="license-included", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - MultiAZ=True, - ) +def test_stop_multi_az_postgres(client): + db_instance = create_db_instance(MultiAZ=True) - mydb = conn.describe_db_instances( - DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"] + mydb = client.describe_db_instances( + DBInstanceIdentifier=db_instance["DBInstanceIdentifier"] )["DBInstances"][0] assert mydb["DBInstanceStatus"] == "available" - response = conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + response = client.stop_db_instance( + DBInstanceIdentifier=mydb["DBInstanceIdentifier"] + ) assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 assert response["DBInstance"]["DBInstanceStatus"] == "stopped" @mock_aws -def test_fail_to_stop_readreplica(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - LicenseModel="license-included", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_fail_to_stop_readreplica(client): + db_instance = create_db_instance() - replica = conn.create_db_instance_read_replica( + replica = client.create_db_instance_read_replica( DBInstanceIdentifier="db-replica-1", - SourceDBInstanceIdentifier="db-master-1", + SourceDBInstanceIdentifier=db_instance["DBInstanceIdentifier"], DBInstanceClass="db.m1.small", ) - mydb = conn.describe_db_instances( + mydb = client.describe_db_instances( DBInstanceIdentifier=replica["DBInstance"]["DBInstanceIdentifier"] )["DBInstances"][0] assert mydb["DBInstanceStatus"] == "available" # read-replicas are not allowed to be stopped at this time. with pytest.raises(ClientError): - conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + client.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) # read-replicas are not allowed to be started at this time. with pytest.raises(ClientError): - conn.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + client.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) @mock_aws -def test_get_databases(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - - instances = conn.describe_db_instances() - assert len(list(instances["DBInstances"])) == 0 +def test_get_databases(client): + instances = client.describe_db_instances() + assert len(instances["DBInstances"]) == 0 - conn.create_db_instance( + create_db_instance( DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], ) - conn.create_db_instance( + create_db_instance( DBInstanceIdentifier="db-master-2", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", Port=1234, - DBSecurityGroups=["my_sg"], DeletionProtection=True, ) - instances = conn.describe_db_instances() - assert len(list(instances["DBInstances"])) == 2 + instances = client.describe_db_instances() + assert len(instances["DBInstances"]) == 2 - instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - assert len(list(instances["DBInstances"])) == 1 + instances = client.describe_db_instances(DBInstanceIdentifier="db-master-1") + assert len(instances["DBInstances"]) == 1 assert instances["DBInstances"][0]["DBInstanceIdentifier"] == "db-master-1" assert instances["DBInstances"][0]["DeletionProtection"] is False assert instances["DBInstances"][0]["DBInstanceArn"] == ( - f"arn:aws:rds:us-west-2:{ACCOUNT_ID}:db:db-master-1" + f"arn:aws:rds:{DEFAULT_REGION}:{ACCOUNT_ID}:db:db-master-1" ) - instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-2") + instances = client.describe_db_instances(DBInstanceIdentifier="db-master-2") assert instances["DBInstances"][0]["DeletionProtection"] is True assert instances["DBInstances"][0]["Endpoint"]["Port"] == 1234 assert instances["DBInstances"][0]["DbInstancePort"] == 1234 @mock_aws -def test_get_databases_paginated(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - +def test_get_databases_paginated(client): for i in range(51): - conn.create_db_instance( - AllocatedStorage=5, - Port=5432, - DBInstanceIdentifier=f"rds{i}", - DBInstanceClass="db.t1.micro", - Engine="postgres", - ) + create_db_instance(DBInstanceIdentifier=f"rds{i}") - resp = conn.describe_db_instances() + resp = client.describe_db_instances() assert len(resp["DBInstances"]) == 50 assert resp["Marker"] == resp["DBInstances"][-1]["DBInstanceIdentifier"] - resp2 = conn.describe_db_instances(Marker=resp["Marker"]) + resp2 = client.describe_db_instances(Marker=resp["Marker"]) assert len(resp2["DBInstances"]) == 1 - resp3 = conn.describe_db_instances(MaxRecords=100) + resp3 = client.describe_db_instances(MaxRecords=100) assert len(resp3["DBInstances"]) == 51 @mock_aws -def test_describe_non_existent_database(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_describe_non_existent_database(client): with pytest.raises(ClientError): - conn.describe_db_instances(DBInstanceIdentifier="not-a-db") + client.describe_db_instances(DBInstanceIdentifier="not-a-db") @pytest.mark.parametrize( "custom_db_subnet_group", [True, False], ids=("custom_subnet", "default_subnet") ) @mock_aws -def test_modify_db_instance(custom_db_subnet_group: bool): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - +def test_modify_db_instance(custom_db_subnet_group: bool, client): if custom_db_subnet_group: extra_kwargs = {"DBSubnetGroupName": create_db_subnet_group()} else: extra_kwargs = {} - conn.create_db_instance( + create_db_instance( DBInstanceIdentifier="db-id", AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], **extra_kwargs, ) - inst = conn.describe_db_instances(DBInstanceIdentifier="db-id")["DBInstances"][0] + inst = client.describe_db_instances(DBInstanceIdentifier="db-id")["DBInstances"][0] assert inst["AllocatedStorage"] == 10 assert inst["EnabledCloudwatchLogsExports"] == [] - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-id", AllocatedStorage=20, ApplyImmediately=True, VpcSecurityGroupIds=["sg-123456"], CloudwatchLogsExportConfiguration={"EnableLogTypes": ["error"]}, ) - inst = conn.describe_db_instances(DBInstanceIdentifier="db-id")["DBInstances"][0] + inst = client.describe_db_instances(DBInstanceIdentifier="db-id")["DBInstances"][0] assert inst["AllocatedStorage"] == 20 assert inst["PreferredMaintenanceWindow"] == "wed:06:38-wed:07:08" assert inst["VpcSecurityGroups"][0]["VpcSecurityGroupId"] == "sg-123456" @@ -594,8 +432,9 @@ def test_modify_db_instance(custom_db_subnet_group: bool): @pytest.mark.parametrize("with_custom_kms_key", [True, False]) @mock_aws -def test_modify_db_instance_manage_master_user_password(with_custom_kms_key: bool): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_modify_db_instance_manage_master_user_password( + with_custom_kms_key: bool, client +): db_id = "db-id" custom_kms_key = f"arn:aws:kms:{DEFAULT_REGION}:123456789012:key/abcd1234-56ef-78gh-90ij-klmnopqrstuv" @@ -603,28 +442,22 @@ def test_modify_db_instance_manage_master_user_password(with_custom_kms_key: boo {"MasterUserSecretKmsKeyId": custom_kms_key} if with_custom_kms_key else {} ) - create_response = conn.create_db_instance( + db_instance = create_db_instance( DBInstanceIdentifier=db_id, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter21", ManageMasterUserPassword=False, ) - modify_response = conn.modify_db_instance( + modify_response = client.modify_db_instance( DBInstanceIdentifier=db_id, ManageMasterUserPassword=True, **custom_kms_key_args ) - describe_response = conn.describe_db_instances( - DBInstanceIdentifier=db_id, - ) + describe_response = client.describe_db_instances(DBInstanceIdentifier=db_id) - revert_modification_response = conn.modify_db_instance( + revert_modification_response = client.modify_db_instance( DBInstanceIdentifier=db_id, ManageMasterUserPassword=False ) - assert create_response["DBInstance"].get("MasterUserSecret") is None + assert db_instance.get("MasterUserSecret") is None master_user_secret = modify_response["DBInstance"]["MasterUserSecret"] assert len(master_user_secret.keys()) == 3 assert ( @@ -649,27 +482,21 @@ def test_modify_db_instance_manage_master_user_password(with_custom_kms_key: boo @pytest.mark.parametrize("with_apply_immediately", [True, False]) @mock_aws -def test_modify_db_instance_rotate_master_user_password(with_apply_immediately): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_modify_db_instance_rotate_master_user_password(with_apply_immediately, client): db_id = "db-id" - - conn.create_db_instance( + create_db_instance( DBInstanceIdentifier=db_id, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter21", ManageMasterUserPassword=True, ) if with_apply_immediately: - modify_response = conn.modify_db_instance( + modify_response = client.modify_db_instance( DBInstanceIdentifier=db_id, RotateMasterUserPassword=True, ApplyImmediately=True, ) - describe_response = conn.describe_db_instances( + describe_response = client.describe_db_instances( DBInstanceIdentifier=db_id, ) @@ -684,97 +511,55 @@ def test_modify_db_instance_rotate_master_user_password(with_apply_immediately): else: with pytest.raises(ClientError): - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier=db_id, RotateMasterUserPassword=True, ) @mock_aws -def test_modify_db_instance_not_existent_db_parameter_group_name(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - assert instances["DBInstances"][0]["AllocatedStorage"] == 10 +def test_modify_db_instance_not_existent_db_parameter_group_name(client): + create_db_instance(DBInstanceIdentifier="db-master-1") with pytest.raises(ClientError): - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", DBParameterGroupName="test-sqlserver-se-2017", ) @mock_aws -def test_modify_db_instance_valid_preferred_maintenance_window(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - conn.modify_db_instance( +def test_modify_db_instance_valid_preferred_maintenance_window(client): + create_db_instance(DBInstanceIdentifier="db-master-1") + client.modify_db_instance( DBInstanceIdentifier="db-master-1", PreferredMaintenanceWindow="sun:16:00-sun:16:30", ) - instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") + instances = client.describe_db_instances(DBInstanceIdentifier="db-master-1") assert instances["DBInstances"][0]["PreferredMaintenanceWindow"] == ( "sun:16:00-sun:16:30" ) @mock_aws -def test_modify_db_instance_valid_preferred_maintenance_window_uppercase(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - conn.modify_db_instance( +def test_modify_db_instance_valid_preferred_maintenance_window_uppercase(client): + create_db_instance(DBInstanceIdentifier="db-master-1") + client.modify_db_instance( DBInstanceIdentifier="db-master-1", PreferredMaintenanceWindow="SUN:16:00-SUN:16:30", ) - instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") + instances = client.describe_db_instances(DBInstanceIdentifier="db-master-1") assert instances["DBInstances"][0]["PreferredMaintenanceWindow"] == ( "sun:16:00-sun:16:30" ) @mock_aws -def test_modify_db_instance_invalid_preferred_maintenance_window_more_than_24_hours(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_modify_db_instance_invalid_preferred_maintenance_window_more_than_24_hours( + client, +): + create_db_instance(DBInstanceIdentifier="db-master-1") with pytest.raises(ClientError) as ex: - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", PreferredMaintenanceWindow="sun:16:00-sat:16:30", ) @@ -784,20 +569,12 @@ def test_modify_db_instance_invalid_preferred_maintenance_window_more_than_24_ho @mock_aws -def test_modify_db_instance_invalid_preferred_maintenance_window_less_than_30_mins(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_modify_db_instance_invalid_preferred_maintenance_window_less_than_30_mins( + client, +): + create_db_instance(DBInstanceIdentifier="db-master-1") with pytest.raises(ClientError) as ex: - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", PreferredMaintenanceWindow="sun:16:00-sun:16:10", ) @@ -807,20 +584,10 @@ def test_modify_db_instance_invalid_preferred_maintenance_window_less_than_30_mi @mock_aws -def test_modify_db_instance_invalid_preferred_maintenance_window_value(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_modify_db_instance_invalid_preferred_maintenance_window_value(client): + create_db_instance(DBInstanceIdentifier="db-master-1") with pytest.raises(ClientError) as ex: - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", PreferredMaintenanceWindow="sin:16:00-sun:16:30", ) @@ -830,20 +597,10 @@ def test_modify_db_instance_invalid_preferred_maintenance_window_value(): @mock_aws -def test_modify_db_instance_invalid_preferred_maintenance_window_format(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_modify_db_instance_invalid_preferred_maintenance_window_format(client): + create_db_instance(DBInstanceIdentifier="db-master-1") with pytest.raises(ClientError) as ex: - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", PreferredMaintenanceWindow="sun:16:00sun:16:30", ) @@ -856,20 +613,10 @@ def test_modify_db_instance_invalid_preferred_maintenance_window_format(): @mock_aws -def test_modify_db_instance_maintenance_backup_window_no_spill(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_modify_db_instance_maintenance_backup_window_no_spill(client): + create_db_instance(DBInstanceIdentifier="db-master-1") with pytest.raises(ClientError) as ex: - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", PreferredMaintenanceWindow="sun:16:00-sun:16:30", PreferredBackupWindow="15:50-16:20", @@ -882,20 +629,10 @@ def test_modify_db_instance_maintenance_backup_window_no_spill(): @mock_aws -def test_modify_db_instance_maintenance_backup_window_maintenance_spill(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_modify_db_instance_maintenance_backup_window_maintenance_spill(client): + create_db_instance(DBInstanceIdentifier="db-master-1") with pytest.raises(ClientError) as ex: - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", PreferredMaintenanceWindow="sun:16:00-mon:15:00", PreferredBackupWindow="00:00-00:30", @@ -908,20 +645,10 @@ def test_modify_db_instance_maintenance_backup_window_maintenance_spill(): @mock_aws -def test_modify_db_instance_maintenance_backup_window_backup_spill(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_modify_db_instance_maintenance_backup_window_backup_spill(client): + create_db_instance(DBInstanceIdentifier="db-master-1") with pytest.raises(ClientError) as ex: - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", PreferredMaintenanceWindow="mon:00:00-mon:15:00", PreferredBackupWindow="23:50-00:20", @@ -934,20 +661,10 @@ def test_modify_db_instance_maintenance_backup_window_backup_spill(): @mock_aws -def test_modify_db_instance_maintenance_backup_window_both_spill(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_modify_db_instance_maintenance_backup_window_both_spill(client): + create_db_instance(DBInstanceIdentifier="db-master-1") with pytest.raises(ClientError) as ex: - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", PreferredMaintenanceWindow="sun:16:00-mon:15:00", PreferredBackupWindow="23:20-00:20", @@ -960,159 +677,109 @@ def test_modify_db_instance_maintenance_backup_window_both_spill(): @mock_aws -def test_rename_db_instance(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - assert len(list(instances["DBInstances"])) == 1 +def test_rename_db_instance(client): + create_db_instance(DBInstanceIdentifier="db-master-1") + instances = client.describe_db_instances(DBInstanceIdentifier="db-master-1") + assert len(instances["DBInstances"]) == 1 with pytest.raises(ClientError): - conn.describe_db_instances(DBInstanceIdentifier="db-master-2") - conn.modify_db_instance( + client.describe_db_instances(DBInstanceIdentifier="db-master-2") + client.modify_db_instance( DBInstanceIdentifier="db-master-1", NewDBInstanceIdentifier="db-master-2", ApplyImmediately=True, ) with pytest.raises(ClientError): - conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-2") - assert len(list(instances["DBInstances"])) == 1 + client.describe_db_instances(DBInstanceIdentifier="db-master-1") + instances = client.describe_db_instances(DBInstanceIdentifier="db-master-2") + assert len(instances["DBInstances"]) == 1 @mock_aws -def test_modify_non_existent_database(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_modify_non_existent_database(client): with pytest.raises(ClientError): - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="not-a-db", AllocatedStorage=20, ApplyImmediately=True ) @mock_aws -def test_reboot_db_instance(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - database = conn.reboot_db_instance(DBInstanceIdentifier="db-master-1") +def test_reboot_db_instance(client): + create_db_instance(DBInstanceIdentifier="db-master-1") + database = client.reboot_db_instance(DBInstanceIdentifier="db-master-1") assert database["DBInstance"]["DBInstanceIdentifier"] == "db-master-1" @mock_aws -def test_reboot_non_existent_database(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_reboot_non_existent_database(client): with pytest.raises(ClientError): - conn.reboot_db_instance(DBInstanceIdentifier="not-a-db") + client.reboot_db_instance(DBInstanceIdentifier="not-a-db") @mock_aws -def test_delete_database(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - instances = conn.describe_db_instances() - assert len(list(instances["DBInstances"])) == 0 - conn.create_db_instance( - DBInstanceIdentifier="db-1", - AllocatedStorage=10, - Engine="postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - instances = conn.describe_db_instances() - assert len(list(instances["DBInstances"])) == 1 +def test_delete_database(client): + instances = client.describe_db_instances() + assert len(instances["DBInstances"]) == 0 + create_db_instance(DBInstanceIdentifier="db-1") + instances = client.describe_db_instances() + assert len(instances["DBInstances"]) == 1 - conn.delete_db_instance( + client.delete_db_instance( DBInstanceIdentifier="db-1", FinalDBSnapshotIdentifier="primary-1-snapshot", ) - instances = conn.describe_db_instances() - assert len(list(instances["DBInstances"])) == 0 + instances = client.describe_db_instances() + assert len(instances["DBInstances"]) == 0 # Saved the snapshot - snapshot = conn.describe_db_snapshots(DBInstanceIdentifier="db-1")["DBSnapshots"][0] + snapshot = client.describe_db_snapshots(DBInstanceIdentifier="db-1")["DBSnapshots"][ + 0 + ] assert snapshot["Engine"] == "postgres" assert snapshot["SnapshotType"] == "automated" @mock_aws -def test_create_db_snapshots(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_create_db_snapshots(client): with pytest.raises(ClientError): - conn.create_db_snapshot( + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" ) - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) + create_db_instance(DBInstanceIdentifier="db-primary-1") - snapshot = conn.create_db_snapshot( + snapshot = client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="g-1" - ).get("DBSnapshot") + )["DBSnapshot"] - assert snapshot.get("Engine") == "postgres" - assert snapshot.get("DBInstanceIdentifier") == "db-primary-1" - assert snapshot.get("DBSnapshotIdentifier") == "g-1" - result = conn.list_tags_for_resource(ResourceName=snapshot["DBSnapshotArn"]) + assert snapshot["Engine"] == "postgres" + assert snapshot["DBInstanceIdentifier"] == "db-primary-1" + assert snapshot["DBSnapshotIdentifier"] == "g-1" + result = client.list_tags_for_resource(ResourceName=snapshot["DBSnapshotArn"]) assert result["TagList"] == [] @mock_aws -def test_create_db_snapshots_copy_tags(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_create_db_snapshots_copy_tags(client): with pytest.raises(ClientError): - conn.create_db_snapshot( + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" ) - conn.create_db_instance( + create_db_instance( DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], CopyTagsToSnapshot=True, Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}], ) - snapshot = conn.create_db_snapshot( + snapshot = client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="g-1" - ).get("DBSnapshot") + )["DBSnapshot"] - assert snapshot.get("Engine") == "postgres" - assert snapshot.get("DBInstanceIdentifier") == "db-primary-1" - assert snapshot.get("DBSnapshotIdentifier") == "g-1" - result = conn.list_tags_for_resource(ResourceName=snapshot["DBSnapshotArn"]) + assert snapshot["Engine"] == "postgres" + assert snapshot["DBInstanceIdentifier"] == "db-primary-1" + assert snapshot["DBSnapshotIdentifier"] == "g-1" + result = client.list_tags_for_resource(ResourceName=snapshot["DBSnapshotArn"]) assert result["TagList"] == [ {"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}, @@ -1120,31 +787,20 @@ def test_create_db_snapshots_copy_tags(): @mock_aws -def test_create_db_snapshots_with_tags(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_create_db_snapshots_with_tags(client): + create_db_instance(DBInstanceIdentifier="db-primary-1") - conn.create_db_snapshot( + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="g-1", Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}], ) - snapshots = conn.describe_db_snapshots(DBInstanceIdentifier="db-primary-1").get( + snapshots = client.describe_db_snapshots(DBInstanceIdentifier="db-primary-1")[ "DBSnapshots" - ) - assert snapshots[0].get("DBSnapshotIdentifier") == "g-1" - assert snapshots[0].get("TagList") == [ + ] + assert snapshots[0]["DBSnapshotIdentifier"] == "g-1" + assert snapshots[0]["TagList"] == [ {"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}, ] @@ -1157,50 +813,40 @@ def test_create_db_snapshots_with_tags(): ids=("by_name", "by_arn"), ) @mock_aws -def test_copy_db_snapshots(delete_db_instance: bool, db_snapshot_identifier: str): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_copy_db_snapshots( + delete_db_instance: bool, db_snapshot_identifier: str, client +): + create_db_instance(DBInstanceIdentifier="db-primary-1") - conn.create_db_snapshot( + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" - ).get("DBSnapshot") + ) if delete_db_instance: # Delete the original instance, but the copy snapshot operation should still succeed. - conn.delete_db_instance(DBInstanceIdentifier="db-primary-1") + client.delete_db_instance(DBInstanceIdentifier="db-primary-1") - target_snapshot = conn.copy_db_snapshot( + target_snapshot = client.copy_db_snapshot( SourceDBSnapshotIdentifier=db_snapshot_identifier, TargetDBSnapshotIdentifier="snapshot-2", - ).get("DBSnapshot") + )["DBSnapshot"] - assert target_snapshot.get("Engine") == "postgres" - assert target_snapshot.get("DBInstanceIdentifier") == "db-primary-1" - assert target_snapshot.get("DBSnapshotIdentifier") == "snapshot-2" - result = conn.list_tags_for_resource(ResourceName=target_snapshot["DBSnapshotArn"]) + assert target_snapshot["Engine"] == "postgres" + assert target_snapshot["DBInstanceIdentifier"] == "db-primary-1" + assert target_snapshot["DBSnapshotIdentifier"] == "snapshot-2" + result = client.list_tags_for_resource( + ResourceName=target_snapshot["DBSnapshotArn"] + ) assert result["TagList"] == [] @mock_aws -def test_copy_db_snapshot_invalid_arns(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - +def test_copy_db_snapshot_invalid_arns(client): invalid_arn = ( f"arn:aws:rds:{DEFAULT_REGION}:123456789012:this-is-not-a-snapshot:snapshot-1" ) with pytest.raises(ClientError) as ex: - conn.copy_db_snapshot( + client.copy_db_snapshot( SourceDBSnapshotIdentifier=invalid_arn, TargetDBSnapshotIdentifier="snapshot-2", ) @@ -1234,122 +880,80 @@ def test_copy_db_snapshot_invalid_arns(): ), ) @mock_aws -def test_copy_db_snapshots_copytags_and_tags(kwargs, expected_tags): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - Engine="postgres", - DBInstanceClass="db.m1.small", - ) - conn.create_db_snapshot( +def test_copy_db_snapshots_copytags_and_tags(kwargs, expected_tags, client): + create_db_instance(DBInstanceIdentifier="db-primary-1") + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot", Tags=original_snapshot_tags, ) - target_snapshot = conn.copy_db_snapshot( + target_snapshot = client.copy_db_snapshot( SourceDBSnapshotIdentifier="snapshot", TargetDBSnapshotIdentifier="snapshot-copy", **kwargs, - ).get("DBSnapshot") - result = conn.list_tags_for_resource(ResourceName=target_snapshot["DBSnapshotArn"]) + )["DBSnapshot"] + result = client.list_tags_for_resource( + ResourceName=target_snapshot["DBSnapshotArn"] + ) assert result["TagList"] == expected_tags @mock_aws -def test_describe_db_snapshots(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_describe_db_snapshots(client): + create_db_instance(DBInstanceIdentifier="db-primary-1") - created = conn.create_db_snapshot( + created = client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" - ).get("DBSnapshot") + )["DBSnapshot"] assert created["Engine"] == "postgres" assert created["SnapshotType"] == "manual" - by_database_id = conn.describe_db_snapshots( - DBInstanceIdentifier="db-primary-1" - ).get("DBSnapshots") - by_snapshot_id = conn.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1").get( - "DBSnapshots" - ) - assert by_snapshot_id == by_database_id + by_database_id = client.describe_db_snapshots(DBInstanceIdentifier="db-primary-1") + by_snapshot_id = client.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1") + assert by_snapshot_id["DBSnapshots"] == by_database_id["DBSnapshots"] - snapshot = by_snapshot_id[0] + snapshot = by_snapshot_id["DBSnapshots"][0] assert snapshot == created - conn.create_db_snapshot( + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-2" ) - snapshots = conn.describe_db_snapshots(DBInstanceIdentifier="db-primary-1").get( + snapshots = client.describe_db_snapshots(DBInstanceIdentifier="db-primary-1")[ "DBSnapshots" - ) - assert len(snapshots) == 2 - - -@mock_aws -def test_promote_read_replica(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) + ] + assert len(snapshots) == 2 + + +@mock_aws +def test_promote_read_replica(client): + create_db_instance(DBInstanceIdentifier="db-primary-1") - conn.create_db_instance_read_replica( + client.create_db_instance_read_replica( DBInstanceIdentifier="db-replica-1", SourceDBInstanceIdentifier="db-primary-1", DBInstanceClass="db.m1.small", ) - conn.promote_read_replica(DBInstanceIdentifier="db-replica-1") + client.promote_read_replica(DBInstanceIdentifier="db-replica-1") - replicas = conn.describe_db_instances(DBInstanceIdentifier="db-primary-1").get( + replicas = client.describe_db_instances(DBInstanceIdentifier="db-primary-1").get( "ReadReplicaDBInstanceIdentifiers" ) assert replicas is None @mock_aws -def test_delete_db_snapshot(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - conn.create_db_snapshot( +def test_delete_db_snapshot(client): + create_db_instance(DBInstanceIdentifier="db-primary-1") + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" ) - _ = conn.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1").get( - "DBSnapshots" - )[0] - conn.delete_db_snapshot(DBSnapshotIdentifier="snapshot-1") + client.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1")["DBSnapshots"][0] + client.delete_db_snapshot(DBSnapshotIdentifier="snapshot-1") with pytest.raises(ClientError): - conn.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1") + client.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1") @pytest.mark.parametrize( @@ -1362,22 +966,18 @@ def test_delete_db_snapshot(): ) @mock_aws def test_restore_db_instance_from_db_snapshot( - db_snapshot_identifier: str, custom_db_subnet_group: bool + db_snapshot_identifier: str, custom_db_subnet_group: bool, client ): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( + create_db_instance( DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, Engine="postgres", DBName="staging-postgres", DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", DBSecurityGroups=["my_sg"], ) - assert len(conn.describe_db_instances()["DBInstances"]) == 1 + assert len(client.describe_db_instances()["DBInstances"]) == 1 - conn.create_db_snapshot( + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" ) @@ -1385,7 +985,7 @@ def test_restore_db_instance_from_db_snapshot( db_subnet_group_name = create_db_subnet_group() # restore - new_instance = conn.restore_db_instance_from_db_snapshot( + new_instance = client.restore_db_instance_from_db_snapshot( DBInstanceIdentifier="db-restore-1", DBSnapshotIdentifier=db_snapshot_identifier )["DBInstance"] kwargs = { @@ -1394,7 +994,7 @@ def test_restore_db_instance_from_db_snapshot( } if custom_db_subnet_group: kwargs["DBSubnetGroupName"] = db_subnet_group_name - new_instance = conn.restore_db_instance_from_db_snapshot(**kwargs)["DBInstance"] + new_instance = client.restore_db_instance_from_db_snapshot(**kwargs)["DBInstance"] if custom_db_subnet_group: assert ( new_instance["DBSubnetGroup"]["DBSubnetGroupName"] == db_subnet_group_name @@ -1413,10 +1013,10 @@ def test_restore_db_instance_from_db_snapshot( assert new_instance["Endpoint"]["Port"] == 5432 # Verify it exists - assert len(conn.describe_db_instances()["DBInstances"]) == 2 + assert len(client.describe_db_instances()["DBInstances"]) == 2 assert ( len( - conn.describe_db_instances(DBInstanceIdentifier="db-restore-1")[ + client.describe_db_instances(DBInstanceIdentifier="db-restore-1")[ "DBInstances" ] ) @@ -1428,29 +1028,24 @@ def test_restore_db_instance_from_db_snapshot( "custom_db_subnet_group", [True, False], ids=("custom_subnet", "default_subnet") ) @mock_aws -def test_restore_db_instance_to_point_in_time(custom_db_subnet_group: bool): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - +def test_restore_db_instance_to_point_in_time(custom_db_subnet_group: bool, client): if custom_db_subnet_group: extra_kwargs = {"DBSubnetGroupName": create_db_subnet_group()} else: extra_kwargs = {} - conn.create_db_instance( + create_db_instance( DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, Engine="postgres", DBName="staging-postgres", DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", DBSecurityGroups=["my_sg"], **extra_kwargs, ) - assert len(conn.describe_db_instances()["DBInstances"]) == 1 + assert len(client.describe_db_instances()["DBInstances"]) == 1 # restore - new_instance = conn.restore_db_instance_to_point_in_time( + new_instance = client.restore_db_instance_to_point_in_time( SourceDBInstanceIdentifier="db-primary-1", TargetDBInstanceIdentifier="db-restore-1", )["DBInstance"] @@ -1468,17 +1063,17 @@ def test_restore_db_instance_to_point_in_time(custom_db_subnet_group: bool): assert new_instance["Endpoint"]["Port"] == 5432 # Verify it exists - assert len(conn.describe_db_instances()["DBInstances"]) == 2 + assert len(client.describe_db_instances()["DBInstances"]) == 2 assert ( len( - conn.describe_db_instances(DBInstanceIdentifier="db-restore-1")[ + client.describe_db_instances(DBInstanceIdentifier="db-restore-1")[ "DBInstances" ] ) == 1 ) # ensure another pit restore can be made - new_instance = conn.restore_db_instance_to_point_in_time( + new_instance = client.restore_db_instance_to_point_in_time( SourceDBInstanceIdentifier="db-primary-1", TargetDBInstanceIdentifier="db-restore-2", )["DBInstance"] @@ -1496,10 +1091,10 @@ def test_restore_db_instance_to_point_in_time(custom_db_subnet_group: bool): assert new_instance["Endpoint"]["Port"] == 5432 # Verify it exists - assert len(conn.describe_db_instances()["DBInstances"]) == 3 + assert len(client.describe_db_instances()["DBInstances"]) == 3 assert ( len( - conn.describe_db_instances(DBInstanceIdentifier="db-restore-2")[ + client.describe_db_instances(DBInstanceIdentifier="db-restore-2")[ "DBInstances" ] ) @@ -1508,26 +1103,22 @@ def test_restore_db_instance_to_point_in_time(custom_db_subnet_group: bool): @mock_aws -def test_restore_db_instance_from_db_snapshot_and_override_params(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( +def test_restore_db_instance_from_db_snapshot_and_override_params(client): + create_db_instance( DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, Engine="postgres", DBName="staging-postgres", DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", Port=1234, DBSecurityGroups=["my_sg"], ) - assert len(conn.describe_db_instances()["DBInstances"]) == 1 - conn.create_db_snapshot( + assert len(client.describe_db_instances()["DBInstances"]) == 1 + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" ) # restore with some updated attributes - new_instance = conn.restore_db_instance_from_db_snapshot( + new_instance = client.restore_db_instance_from_db_snapshot( DBInstanceIdentifier="db-restore-1", DBSnapshotIdentifier="snapshot-1", Port=10000, @@ -1547,30 +1138,26 @@ def test_restore_db_instance_from_db_snapshot_and_override_params(): @mock_aws -def test_create_option_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - option_group = conn.create_option_group( +def test_create_option_group(client): + option_group = client.create_option_group( OptionGroupName="test", EngineName="mysql", MajorEngineVersion="5.6", OptionGroupDescription="test option group", - ) - assert option_group["OptionGroup"]["OptionGroupName"] == "test" - assert option_group["OptionGroup"]["EngineName"] == "mysql" - assert option_group["OptionGroup"]["OptionGroupDescription"] == ( - "test option group" - ) - assert option_group["OptionGroup"]["MajorEngineVersion"] == "5.6" - assert option_group["OptionGroup"]["OptionGroupArn"] == ( - f"arn:aws:rds:us-west-2:{ACCOUNT_ID}:og:test" + )["OptionGroup"] + assert option_group["OptionGroupName"] == "test" + assert option_group["EngineName"] == "mysql" + assert option_group["OptionGroupDescription"] == "test option group" + assert option_group["MajorEngineVersion"] == "5.6" + assert ( + option_group["OptionGroupArn"] == f"arn:aws:rds:us-west-2:{ACCOUNT_ID}:og:test" ) @mock_aws -def test_create_option_group_bad_engine_name(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_create_option_group_bad_engine_name(client): with pytest.raises(ClientError): - conn.create_option_group( + client.create_option_group( OptionGroupName="test", EngineName="invalid_engine", MajorEngineVersion="5.6", @@ -1579,10 +1166,9 @@ def test_create_option_group_bad_engine_name(): @mock_aws -def test_create_option_group_bad_engine_major_version(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_create_option_group_bad_engine_major_version(client): with pytest.raises(ClientError): - conn.create_option_group( + client.create_option_group( OptionGroupName="test", EngineName="mysql", MajorEngineVersion="6.6.6", @@ -1591,10 +1177,9 @@ def test_create_option_group_bad_engine_major_version(): @mock_aws -def test_create_option_group_empty_description(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_create_option_group_empty_description(client): with pytest.raises(ClientError): - conn.create_option_group( + client.create_option_group( OptionGroupName="test", EngineName="mysql", MajorEngineVersion="5.6", @@ -1603,16 +1188,15 @@ def test_create_option_group_empty_description(): @mock_aws -def test_create_option_group_duplicate(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_option_group( +def test_create_option_group_duplicate(client): + client.create_option_group( OptionGroupName="test", EngineName="mysql", MajorEngineVersion="5.6", OptionGroupDescription="test option group", ) with pytest.raises(ClientError): - conn.create_option_group( + client.create_option_group( OptionGroupName="test", EngineName="mysql", MajorEngineVersion="5.6", @@ -1621,75 +1205,71 @@ def test_create_option_group_duplicate(): @mock_aws -def test_describe_option_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_option_group( +def test_describe_option_group(client): + client.create_option_group( OptionGroupName="test", EngineName="mysql", MajorEngineVersion="5.6", OptionGroupDescription="test option group", ) - option_groups = conn.describe_option_groups(OptionGroupName="test") + option_groups = client.describe_option_groups(OptionGroupName="test") assert option_groups["OptionGroupsList"][0]["OptionGroupName"] == "test" @mock_aws -def test_describe_non_existent_option_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_describe_non_existent_option_group(client): with pytest.raises(ClientError): - conn.describe_option_groups(OptionGroupName="not-a-option-group") + client.describe_option_groups(OptionGroupName="not-a-option-group") @mock_aws -def test_delete_option_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_option_group( +def test_delete_option_group(client): + client.create_option_group( OptionGroupName="test", EngineName="mysql", MajorEngineVersion="5.6", OptionGroupDescription="test option group", ) - option_groups = conn.describe_option_groups(OptionGroupName="test") + option_groups = client.describe_option_groups(OptionGroupName="test") assert option_groups["OptionGroupsList"][0]["OptionGroupName"] == "test" - conn.delete_option_group(OptionGroupName="test") + client.delete_option_group(OptionGroupName="test") with pytest.raises(ClientError): - conn.describe_option_groups(OptionGroupName="test") + client.describe_option_groups(OptionGroupName="test") @mock_aws -def test_delete_non_existent_option_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_delete_non_existent_option_group(client): with pytest.raises(ClientError): - conn.delete_option_group(OptionGroupName="non-existent") + client.delete_option_group(OptionGroupName="non-existent") @mock_aws -def test_describe_option_group_options(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - option_group_options = conn.describe_option_group_options(EngineName="sqlserver-ee") +def test_describe_option_group_options(client): + option_group_options = client.describe_option_group_options( + EngineName="sqlserver-ee" + ) assert len(option_group_options["OptionGroupOptions"]) == 4 - option_group_options = conn.describe_option_group_options( + option_group_options = client.describe_option_group_options( EngineName="sqlserver-ee", MajorEngineVersion="11.00" ) assert len(option_group_options["OptionGroupOptions"]) == 2 - option_group_options = conn.describe_option_group_options( + option_group_options = client.describe_option_group_options( EngineName="mysql", MajorEngineVersion="5.6" ) assert len(option_group_options["OptionGroupOptions"]) == 1 with pytest.raises(ClientError): - conn.describe_option_group_options(EngineName="non-existent") + client.describe_option_group_options(EngineName="non-existent") with pytest.raises(ClientError): - conn.describe_option_group_options( + client.describe_option_group_options( EngineName="mysql", MajorEngineVersion="non-existent" ) @pytest.mark.aws_verified @aws_verified -def test_modify_option_group(): - conn = boto3.client("rds", region_name="us-east-1") +def test_modify_option_group(client): option_group_name = f"og-{str(uuid4())[0:6]}" - conn.create_option_group( + client.create_option_group( OptionGroupName=option_group_name, EngineName="mysql", MajorEngineVersion="5.6", @@ -1698,7 +1278,7 @@ def test_modify_option_group(): try: # Verify OptionsToRemove do not have to exist - option_group = conn.modify_option_group( + option_group = client.modify_option_group( OptionGroupName=option_group_name, OptionsToInclude=[], OptionsToRemove=["MEMCACHED"], @@ -1708,14 +1288,14 @@ def test_modify_option_group(): assert option_group["Options"] == [] assert option_group["OptionGroupName"] == option_group_name - option_groups = conn.describe_option_groups(OptionGroupName=option_group_name)[ - "OptionGroupsList" - ] + option_groups = client.describe_option_groups( + OptionGroupName=option_group_name + )["OptionGroupsList"] assert option_groups[0]["Options"] == [] # Include option # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.MySQL.Options.AuditPlugin.html - conn.modify_option_group( + client.modify_option_group( OptionGroupName=option_group_name, OptionsToInclude=[{"OptionName": "MARIADB_AUDIT_PLUGIN"}], OptionsToRemove=[], @@ -1723,9 +1303,9 @@ def test_modify_option_group(): )["OptionGroup"] # Verify it was added successfully - option_groups = conn.describe_option_groups(OptionGroupName=option_group_name)[ - "OptionGroupsList" - ] + option_groups = client.describe_option_groups( + OptionGroupName=option_group_name + )["OptionGroupsList"] options = option_groups[0]["Options"] assert len(options) == 1 @@ -1733,7 +1313,7 @@ def test_modify_option_group(): # AWS automatically adds a description + default option settings, but Moto does not support that yet # Change setting for an existing option - conn.modify_option_group( + client.modify_option_group( OptionGroupName=option_group_name, OptionsToInclude=[ { @@ -1747,9 +1327,9 @@ def test_modify_option_group(): )["OptionGroup"] # Verify it was added successfully - option_groups = conn.describe_option_groups(OptionGroupName=option_group_name)[ - "OptionGroupsList" - ] + option_groups = client.describe_option_groups( + OptionGroupName=option_group_name + )["OptionGroupsList"] options = option_groups[0]["Options"] assert len(options) == 1 @@ -1763,37 +1343,35 @@ def test_modify_option_group(): audit_plugin["Value"] == "1000" # Verify option can be deleted - conn.modify_option_group( + client.modify_option_group( OptionGroupName=option_group_name, OptionsToRemove=["MARIADB_AUDIT_PLUGIN"], ApplyImmediately=True, ) - option_groups = conn.describe_option_groups(OptionGroupName=option_group_name)[ - "OptionGroupsList" - ] + option_groups = client.describe_option_groups( + OptionGroupName=option_group_name + )["OptionGroupsList"] assert option_groups[0]["Options"] == [] finally: - conn.delete_option_group(OptionGroupName=option_group_name) + client.delete_option_group(OptionGroupName=option_group_name) @mock_aws -def test_modify_option_group_no_options(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_option_group( +def test_modify_option_group_no_options(client): + client.create_option_group( OptionGroupName="test", EngineName="mysql", MajorEngineVersion="5.6", OptionGroupDescription="test option group", ) with pytest.raises(ClientError): - conn.modify_option_group(OptionGroupName="test") + client.modify_option_group(OptionGroupName="test") @mock_aws -def test_modify_non_existent_option_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_modify_non_existent_option_group(client): with pytest.raises(ClientError) as client_err: - conn.modify_option_group( + client.modify_option_group( OptionGroupName="non-existent", OptionsToInclude=[{"OptionName": "test-option"}], ) @@ -1803,59 +1381,39 @@ def test_modify_non_existent_option_group(): @mock_aws -def test_delete_database_with_protection(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBInstanceClass="db.m1.small", - DeletionProtection=True, - ) +def test_delete_database_with_protection(client): + create_db_instance(DBInstanceIdentifier="db-primary-1", DeletionProtection=True) with pytest.raises(ClientError) as exc: - conn.delete_db_instance(DBInstanceIdentifier="db-primary-1") + client.delete_db_instance(DBInstanceIdentifier="db-primary-1") err = exc.value.response["Error"] assert err["Message"] == "Can't delete Instance with protection enabled" @mock_aws -def test_delete_non_existent_database(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_delete_non_existent_database(client): with pytest.raises(ClientError) as ex: - conn.delete_db_instance(DBInstanceIdentifier="non-existent") + client.delete_db_instance(DBInstanceIdentifier="non-existent") assert ex.value.response["Error"]["Code"] == "DBInstanceNotFound" assert ex.value.response["Error"]["Message"] == "DBInstance non-existent not found." @mock_aws -def test_list_tags_invalid_arn(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_list_tags_invalid_arn(client): with pytest.raises(ClientError): - conn.list_tags_for_resource(ResourceName="arn:aws:rds:bad-arn") + client.list_tags_for_resource(ResourceName="arn:aws:rds:bad-arn") @mock_aws -def test_list_tags_db(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - result = conn.list_tags_for_resource( +def test_list_tags_db(client): + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:foo" ) assert result["TagList"] == [] - test_instance = conn.create_db_instance( - DBInstanceIdentifier="db-with-tags", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], + test_instance = create_db_instance( Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}], ) - result = conn.list_tags_for_resource( - ResourceName=test_instance["DBInstance"]["DBInstanceArn"] - ) + result = client.list_tags_for_resource(ResourceName=test_instance["DBInstanceArn"]) assert result["TagList"] == [ {"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}, @@ -1863,84 +1421,57 @@ def test_list_tags_db(): @mock_aws -def test_add_tags_db(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( +def test_add_tags_db(client): + create_db_instance( DBInstanceIdentifier="db-without-tags", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-without-tags" ) - assert len(list(result["TagList"])) == 2 - conn.add_tags_to_resource( + assert len(result["TagList"]) == 2 + client.add_tags_to_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-without-tags", Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-without-tags" ) - assert len(list(result["TagList"])) == 3 + assert len(result["TagList"]) == 3 @mock_aws -def test_remove_tags_db(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( +def test_remove_tags_db(client): + create_db_instance( DBInstanceIdentifier="db-with-tags", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-with-tags" ) - assert len(list(result["TagList"])) == 2 - conn.remove_tags_from_resource( + assert len(result["TagList"]) == 2 + client.remove_tags_from_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-with-tags", TagKeys=["foo"] ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-with-tags" ) assert len(result["TagList"]) == 1 @mock_aws -def test_list_tags_snapshot(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - result = conn.list_tags_for_resource( +def test_list_tags_snapshot(client): + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:foo" ) assert result["TagList"] == [] - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - snapshot = conn.create_db_snapshot( + create_db_instance(DBInstanceIdentifier="db-primary-1") + snapshot = client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-with-tags", Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName=snapshot["DBSnapshot"]["DBSnapshotArn"] ) assert result["TagList"] == [ @@ -1950,128 +1481,102 @@ def test_list_tags_snapshot(): @mock_aws -def test_add_tags_snapshot(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - conn.create_db_snapshot( +def test_add_tags_snapshot(client): + create_db_instance(DBInstanceIdentifier="db-primary-1") + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-without-tags", Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags" ) - assert len(list(result["TagList"])) == 2 - conn.add_tags_to_resource( + assert len(result["TagList"]) == 2 + client.add_tags_to_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags", Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags" ) - assert len(list(result["TagList"])) == 3 + assert len(result["TagList"]) == 3 @mock_aws -def test_remove_tags_snapshot(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - conn.create_db_snapshot( +def test_remove_tags_snapshot(client): + create_db_instance(DBInstanceIdentifier="db-primary-1") + client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-with-tags", Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags" ) - assert len(list(result["TagList"])) == 2 - conn.remove_tags_from_resource( + assert len(result["TagList"]) == 2 + client.remove_tags_from_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags", TagKeys=["foo"], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags" ) assert len(result["TagList"]) == 1 @mock_aws -def test_add_tags_option_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_option_group( +def test_add_tags_option_group(client): + client.create_option_group( OptionGroupName="test", EngineName="mysql", MajorEngineVersion="5.6", OptionGroupDescription="test option group", ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test" ) - assert len(list(result["TagList"])) == 0 - conn.add_tags_to_resource( + assert len(result["TagList"]) == 0 + client.add_tags_to_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test", Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test" ) - assert len(list(result["TagList"])) == 2 + assert len(result["TagList"]) == 2 @mock_aws -def test_remove_tags_option_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_option_group( +def test_remove_tags_option_group(client): + client.create_option_group( OptionGroupName="test", EngineName="mysql", MajorEngineVersion="5.6", OptionGroupDescription="test option group", ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test" ) - conn.add_tags_to_resource( + client.add_tags_to_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test", Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test" ) - assert len(list(result["TagList"])) == 2 - conn.remove_tags_from_resource( + assert len(result["TagList"]) == 2 + client.remove_tags_from_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test", TagKeys=["foo"] ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test" ) - assert len(list(result["TagList"])) == 1 + assert len(result["TagList"]) == 1 @mock_aws -def test_create_database_security_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - - result = conn.create_db_security_group( +def test_create_database_security_group(client): + result = client.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group" ) assert result["DBSecurityGroup"]["DBSecurityGroupName"] == "db_sg" @@ -2082,78 +1587,72 @@ def test_create_database_security_group(): @mock_aws -def test_get_security_groups(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - - result = conn.describe_db_security_groups() +def test_get_security_groups(client): + result = client.describe_db_security_groups() assert len(result["DBSecurityGroups"]) == 0 - conn.create_db_security_group( + client.create_db_security_group( DBSecurityGroupName="db_sg1", DBSecurityGroupDescription="DB Security Group" ) - conn.create_db_security_group( + client.create_db_security_group( DBSecurityGroupName="db_sg2", DBSecurityGroupDescription="DB Security Group" ) - result = conn.describe_db_security_groups() + result = client.describe_db_security_groups() assert len(result["DBSecurityGroups"]) == 2 - result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg1") + result = client.describe_db_security_groups(DBSecurityGroupName="db_sg1") assert len(result["DBSecurityGroups"]) == 1 assert result["DBSecurityGroups"][0]["DBSecurityGroupName"] == "db_sg1" @mock_aws -def test_get_non_existent_security_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_get_non_existent_security_group(client): with pytest.raises(ClientError): - conn.describe_db_security_groups(DBSecurityGroupName="not-a-sg") + client.describe_db_security_groups(DBSecurityGroupName="not-a-sg") @mock_aws -def test_delete_database_security_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_security_group( +def test_delete_database_security_group(client): + client.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group" ) - result = conn.describe_db_security_groups() + result = client.describe_db_security_groups() assert len(result["DBSecurityGroups"]) == 1 - conn.delete_db_security_group(DBSecurityGroupName="db_sg") - result = conn.describe_db_security_groups() + client.delete_db_security_group(DBSecurityGroupName="db_sg") + result = client.describe_db_security_groups() assert len(result["DBSecurityGroups"]) == 0 @mock_aws -def test_delete_non_existent_security_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_delete_non_existent_security_group(client): with pytest.raises(ClientError): - conn.delete_db_security_group(DBSecurityGroupName="not-a-db") + client.delete_db_security_group(DBSecurityGroupName="not-a-db") @mock_aws -def test_security_group_authorize(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - security_group = conn.create_db_security_group( +def test_security_group_authorize(client): + security_group = client.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group" ) assert security_group["DBSecurityGroup"]["IPRanges"] == [] - conn.authorize_db_security_group_ingress( + client.authorize_db_security_group_ingress( DBSecurityGroupName="db_sg", CIDRIP="10.3.2.45/32" ) - result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg") + result = client.describe_db_security_groups(DBSecurityGroupName="db_sg") assert len(result["DBSecurityGroups"][0]["IPRanges"]) == 1 assert result["DBSecurityGroups"][0]["IPRanges"] == [ {"Status": "authorized", "CIDRIP": "10.3.2.45/32"} ] - conn.authorize_db_security_group_ingress( + client.authorize_db_security_group_ingress( DBSecurityGroupName="db_sg", CIDRIP="10.3.2.46/32" ) - result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg") + result = client.describe_db_security_groups(DBSecurityGroupName="db_sg") assert len(result["DBSecurityGroups"][0]["IPRanges"]) == 2 assert result["DBSecurityGroups"][0]["IPRanges"] == [ {"Status": "authorized", "CIDRIP": "10.3.2.45/32"}, @@ -2162,28 +1661,18 @@ def test_security_group_authorize(): @mock_aws -def test_add_security_group_to_database(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - DBInstanceClass="postgres", - Engine="postgres", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - ) +def test_add_security_group_to_database(client): + create_db_instance(DBInstanceIdentifier="db-master-1") - result = conn.describe_db_instances() + result = client.describe_db_instances() assert result["DBInstances"][0]["DBSecurityGroups"] == [] - conn.create_db_security_group( + client.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group" ) - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", DBSecurityGroups=["db_sg"] ) - result = conn.describe_db_instances() + result = client.describe_db_instances() assert ( result["DBInstances"][0]["DBSecurityGroups"][0]["DBSecurityGroupName"] == "db_sg" @@ -2191,18 +1680,17 @@ def test_add_security_group_to_database(): @mock_aws -def test_list_tags_security_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - result = conn.describe_db_subnet_groups() +def test_list_tags_security_group(client): + result = client.describe_db_subnet_groups() assert len(result["DBSubnetGroups"]) == 0 - security_group = conn.create_db_security_group( + security_group = client.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group", Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}], )["DBSecurityGroup"]["DBSecurityGroupName"] resource = f"arn:aws:rds:us-west-2:1234567890:secgrp:{security_group}" - result = conn.list_tags_for_resource(ResourceName=resource) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == [ {"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}, @@ -2210,22 +1698,21 @@ def test_list_tags_security_group(): @mock_aws -def test_add_tags_security_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - result = conn.describe_db_subnet_groups() +def test_add_tags_security_group(client): + result = client.describe_db_subnet_groups() assert len(result["DBSubnetGroups"]) == 0 - security_group = conn.create_db_security_group( + security_group = client.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group" )["DBSecurityGroup"]["DBSecurityGroupName"] resource = f"arn:aws:rds:us-west-2:1234567890:secgrp:{security_group}" - conn.add_tags_to_resource( + client.add_tags_to_resource( ResourceName=resource, Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}], ) - result = conn.list_tags_for_resource(ResourceName=resource) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == [ {"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}, @@ -2233,26 +1720,25 @@ def test_add_tags_security_group(): @mock_aws -def test_remove_tags_security_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - result = conn.describe_db_subnet_groups() +def test_remove_tags_security_group(client): + result = client.describe_db_subnet_groups() assert len(result["DBSubnetGroups"]) == 0 - security_group = conn.create_db_security_group( + security_group = client.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group", Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}], )["DBSecurityGroup"]["DBSecurityGroupName"] resource = f"arn:aws:rds:us-west-2:1234567890:secgrp:{security_group}" - conn.remove_tags_from_resource(ResourceName=resource, TagKeys=["foo"]) + client.remove_tags_from_resource(ResourceName=resource, TagKeys=["foo"]) - result = conn.list_tags_for_resource(ResourceName=resource) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == [{"Value": "bar1", "Key": "foo1"}] @mock_aws -def test_create_database_subnet_group(): +def test_create_database_subnet_group(client): vpc_conn = boto3.client("ec2", DEFAULT_REGION) vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] subnet1 = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[ @@ -2263,21 +1749,20 @@ def test_create_database_subnet_group(): ] subnet_ids = [subnet1["SubnetId"], subnet2["SubnetId"]] - conn = boto3.client("rds", region_name=DEFAULT_REGION) - result = conn.create_db_subnet_group( + subnet_group = client.create_db_subnet_group( DBSubnetGroupName="db_subnet", DBSubnetGroupDescription="my db subnet", SubnetIds=subnet_ids, - ) - assert result["DBSubnetGroup"]["DBSubnetGroupName"] == "db_subnet" - assert result["DBSubnetGroup"]["DBSubnetGroupDescription"] == "my db subnet" - subnets = result["DBSubnetGroup"]["Subnets"] + )["DBSubnetGroup"] + assert subnet_group["DBSubnetGroupName"] == "db_subnet" + assert subnet_group["DBSubnetGroupDescription"] == "my db subnet" + subnets = subnet_group["Subnets"] subnet_group_ids = [subnets[0]["SubnetIdentifier"], subnets[1]["SubnetIdentifier"]] - assert list(subnet_group_ids) == subnet_ids + assert subnet_group_ids == subnet_ids @mock_aws -def test_modify_database_subnet_group(): +def test_modify_database_subnet_group(client): vpc_conn = boto3.client("ec2", DEFAULT_REGION) vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] subnet1 = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[ @@ -2287,49 +1772,36 @@ def test_modify_database_subnet_group(): "Subnet" ] - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_subnet_group( + client.create_db_subnet_group( DBSubnetGroupName="db_subnet", DBSubnetGroupDescription="my db subnet", SubnetIds=[subnet1["SubnetId"]], ) - conn.modify_db_subnet_group( + client.modify_db_subnet_group( DBSubnetGroupName="db_subnet", DBSubnetGroupDescription="my updated desc", SubnetIds=[subnet1["SubnetId"], subnet2["SubnetId"]], ) - _ = conn.describe_db_subnet_groups()["DBSubnetGroups"] + _ = client.describe_db_subnet_groups()["DBSubnetGroups"] # FIXME: Group is deleted atm # TODO: we should check whether all attrs are persisted @mock_aws -def test_create_database_in_subnet_group(): - vpc_conn = boto3.client("ec2", DEFAULT_REGION) - vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] - subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[ - "Subnet" - ] - - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_subnet_group( +def test_create_database_in_subnet_group(client): + subnet_id = create_subnet() + client.create_db_subnet_group( DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription="my db subnet", - SubnetIds=[subnet["SubnetId"]], + SubnetIds=[subnet_id], ) - conn.create_db_instance( + create_db_instance( DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, DBSubnetGroupName="db_subnet1", ) - result = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") + result = client.describe_db_instances(DBInstanceIdentifier="db-master-1") assert result["DBInstances"][0]["DBSubnetGroup"]["DBSubnetGroupName"] == ( "db_subnet1" ) @@ -2349,26 +1821,20 @@ def create_db_subnet_group(db_subnet_group_name: str = "custom_db_subnet") -> st @mock_aws -def test_describe_database_subnet_group(): - vpc_conn = boto3.client("ec2", DEFAULT_REGION) - vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] - subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[ - "Subnet" - ] - - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_subnet_group( +def test_describe_database_subnet_group(client): + subnet_id = create_subnet() + client.create_db_subnet_group( DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription="my db subnet", - SubnetIds=[subnet["SubnetId"]], + SubnetIds=[subnet_id], ) - conn.create_db_subnet_group( + client.create_db_subnet_group( DBSubnetGroupName="db_subnet2", DBSubnetGroupDescription="my db subnet", - SubnetIds=[subnet["SubnetId"]], + SubnetIds=[subnet_id], ) - resp = conn.describe_db_subnet_groups() + resp = client.describe_db_subnet_groups() assert len(resp["DBSubnetGroups"]) == 2 subnets = resp["DBSubnetGroups"][0]["Subnets"] @@ -2376,66 +1842,59 @@ def test_describe_database_subnet_group(): assert ( len( - list( - conn.describe_db_subnet_groups(DBSubnetGroupName="db_subnet1")[ - "DBSubnetGroups" - ] - ) + client.describe_db_subnet_groups(DBSubnetGroupName="db_subnet1")[ + "DBSubnetGroups" + ] ) == 1 ) with pytest.raises(ClientError): - conn.describe_db_subnet_groups(DBSubnetGroupName="not-a-subnet") + client.describe_db_subnet_groups(DBSubnetGroupName="not-a-subnet") @mock_aws -def test_delete_database_subnet_group(): - vpc_conn = boto3.client("ec2", DEFAULT_REGION) - vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] - subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[ - "Subnet" - ] - - conn = boto3.client("rds", region_name=DEFAULT_REGION) - result = conn.describe_db_subnet_groups() +def test_delete_database_subnet_group(client): + result = client.describe_db_subnet_groups() assert len(result["DBSubnetGroups"]) == 0 - conn.create_db_subnet_group( + subnet_id = create_subnet() + client.create_db_subnet_group( DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription="my db subnet", - SubnetIds=[subnet["SubnetId"]], + SubnetIds=[subnet_id], ) - result = conn.describe_db_subnet_groups() + result = client.describe_db_subnet_groups() assert len(result["DBSubnetGroups"]) == 1 - conn.delete_db_subnet_group(DBSubnetGroupName="db_subnet1") - result = conn.describe_db_subnet_groups() + client.delete_db_subnet_group(DBSubnetGroupName="db_subnet1") + result = client.describe_db_subnet_groups() assert len(result["DBSubnetGroups"]) == 0 with pytest.raises(ClientError): - conn.delete_db_subnet_group(DBSubnetGroupName="db_subnet1") + client.delete_db_subnet_group(DBSubnetGroupName="db_subnet1") -@mock_aws -def test_list_tags_database_subnet_group(): - vpc_conn = boto3.client("ec2", DEFAULT_REGION) - vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] - subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[ - "Subnet" - ] +def create_subnet() -> str: + ec2_client = boto3.client("ec2", DEFAULT_REGION) + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + response = ec2_client.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24") + return response["Subnet"]["SubnetId"] - conn = boto3.client("rds", region_name=DEFAULT_REGION) - result = conn.describe_db_subnet_groups() + +@mock_aws +def test_list_tags_database_subnet_group(client): + result = client.describe_db_subnet_groups() assert len(result["DBSubnetGroups"]) == 0 - subnet = conn.create_db_subnet_group( + subnet_id = create_subnet() + subnet = client.create_db_subnet_group( DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription="my db subnet", - SubnetIds=[subnet["SubnetId"]], + SubnetIds=[subnet_id], Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}], )["DBSubnetGroup"]["DBSubnetGroupName"] - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName=f"arn:aws:rds:us-west-2:1234567890:subgrp:{subnet}" ) assert result["TagList"] == [ @@ -2445,36 +1904,34 @@ def test_list_tags_database_subnet_group(): @mock_aws -def test_modify_tags_parameter_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_modify_tags_parameter_group(client): client_tags = [{"Key": "character_set_client", "Value": "utf-8"}] - result = conn.create_db_parameter_group( + result = client.create_db_parameter_group( DBParameterGroupName="test-sqlserver-2017", DBParameterGroupFamily="mysql5.6", Description="MySQL Group", Tags=client_tags, ) resource = result["DBParameterGroup"]["DBParameterGroupArn"] - result = conn.list_tags_for_resource(ResourceName=resource) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == client_tags server_tags = [{"Key": "character_set_server", "Value": "utf-8"}] - conn.add_tags_to_resource(ResourceName=resource, Tags=server_tags) + client.add_tags_to_resource(ResourceName=resource, Tags=server_tags) combined_tags = client_tags + server_tags - result = conn.list_tags_for_resource(ResourceName=resource) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == combined_tags - conn.remove_tags_from_resource( + client.remove_tags_from_resource( ResourceName=resource, TagKeys=["character_set_client"] ) - result = conn.list_tags_for_resource(ResourceName=resource) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == server_tags @mock_aws -def test_modify_tags_event_subscription(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_modify_tags_event_subscription(client): tags = [{"Key": "hello", "Value": "world"}] - result = conn.create_event_subscription( + result = client.create_event_subscription( SubscriptionName="my-instance-events", SourceType="db-instance", EventCategories=["backup", "recovery"], @@ -2482,45 +1939,39 @@ def test_modify_tags_event_subscription(): Tags=tags, ) resource = result["EventSubscription"]["EventSubscriptionArn"] - result = conn.list_tags_for_resource(ResourceName=resource) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == tags new_tags = [{"Key": "new_key", "Value": "new_value"}] - conn.add_tags_to_resource(ResourceName=resource, Tags=new_tags) + client.add_tags_to_resource(ResourceName=resource, Tags=new_tags) combined_tags = tags + new_tags - result = conn.list_tags_for_resource(ResourceName=resource) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == combined_tags - conn.remove_tags_from_resource(ResourceName=resource, TagKeys=["new_key"]) - result = conn.list_tags_for_resource(ResourceName=resource) + client.remove_tags_from_resource(ResourceName=resource, TagKeys=["new_key"]) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == tags @mock_aws -def test_add_tags_database_subnet_group(): - vpc_conn = boto3.client("ec2", DEFAULT_REGION) - vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] - subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[ - "Subnet" - ] - - conn = boto3.client("rds", region_name=DEFAULT_REGION) - result = conn.describe_db_subnet_groups() +def test_add_tags_database_subnet_group(client): + result = client.describe_db_subnet_groups() assert len(result["DBSubnetGroups"]) == 0 - subnet = conn.create_db_subnet_group( + subnet_id = create_subnet() + subnet = client.create_db_subnet_group( DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription="my db subnet", - SubnetIds=[subnet["SubnetId"]], + SubnetIds=[subnet_id], Tags=[], )["DBSubnetGroup"]["DBSubnetGroupName"] resource = f"arn:aws:rds:us-west-2:1234567890:subgrp:{subnet}" - conn.add_tags_to_resource( + client.add_tags_to_resource( ResourceName=resource, Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}], ) - result = conn.list_tags_for_resource(ResourceName=resource) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == [ {"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}, @@ -2528,47 +1979,30 @@ def test_add_tags_database_subnet_group(): @mock_aws -def test_remove_tags_database_subnet_group(): - vpc_conn = boto3.client("ec2", DEFAULT_REGION) - vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] - subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[ - "Subnet" - ] - - conn = boto3.client("rds", region_name=DEFAULT_REGION) - result = conn.describe_db_subnet_groups() +def test_remove_tags_database_subnet_group(client): + result = client.describe_db_subnet_groups() assert len(result["DBSubnetGroups"]) == 0 - subnet = conn.create_db_subnet_group( + subnet_id = create_subnet() + subnet = client.create_db_subnet_group( DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription="my db subnet", - SubnetIds=[subnet["SubnetId"]], + SubnetIds=[subnet_id], Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}], )["DBSubnetGroup"]["DBSubnetGroupName"] resource = f"arn:aws:rds:us-west-2:1234567890:subgrp:{subnet}" - conn.remove_tags_from_resource(ResourceName=resource, TagKeys=["foo"]) + client.remove_tags_from_resource(ResourceName=resource, TagKeys=["foo"]) - result = conn.list_tags_for_resource(ResourceName=resource) + result = client.list_tags_for_resource(ResourceName=resource) assert result["TagList"] == [{"Value": "bar1", "Key": "foo1"}] @mock_aws -def test_create_database_replica(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - - conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_create_database_replica(client): + create_db_instance(DBInstanceIdentifier="db-master-1") - replica = conn.create_db_instance_read_replica( + replica = client.create_db_instance_read_replica( DBInstanceIdentifier="db-replica-1", SourceDBInstanceIdentifier="db-master-1", DBInstanceClass="db.m1.small", @@ -2579,18 +2013,20 @@ def test_create_database_replica(): assert replica["DBInstance"]["DBInstanceClass"] == "db.m1.small" assert replica["DBInstance"]["DBInstanceIdentifier"] == "db-replica-1" - master = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") + master = client.describe_db_instances(DBInstanceIdentifier="db-master-1") assert master["DBInstances"][0]["ReadReplicaDBInstanceIdentifiers"] == ( ["db-replica-1"] ) - replica = conn.describe_db_instances(DBInstanceIdentifier="db-replica-1")[ + replica = client.describe_db_instances(DBInstanceIdentifier="db-replica-1")[ "DBInstances" ][0] assert replica["ReadReplicaSourceDBInstanceIdentifier"] == "db-master-1" - conn.delete_db_instance(DBInstanceIdentifier="db-replica-1", SkipFinalSnapshot=True) + client.delete_db_instance( + DBInstanceIdentifier="db-replica-1", SkipFinalSnapshot=True + ) - master = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") + master = client.describe_db_instances(DBInstanceIdentifier="db-master-1") assert master["DBInstances"][0]["ReadReplicaDBInstanceIdentifiers"] == [] @@ -2602,7 +2038,6 @@ def test_create_database_replica_cross_region(): source_id = "db-master-1" source_arn = us1.create_db_instance( DBInstanceIdentifier=source_id, - AllocatedStorage=10, Engine="postgres", DBInstanceClass="db.m1.small", )["DBInstance"]["DBInstanceArn"] @@ -2626,7 +2061,7 @@ def test_create_database_replica_cross_region(): @mock_aws -def test_create_database_with_encrypted_storage(): +def test_create_database_with_encrypted_storage(client): kms_conn = boto3.client("kms", region_name=DEFAULT_REGION) key = kms_conn.create_key( Policy="my RDS encryption policy", @@ -2634,30 +2069,19 @@ def test_create_database_with_encrypted_storage(): KeyUsage="ENCRYPT_DECRYPT", ) - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], + db_instance = create_db_instance( StorageEncrypted=True, KmsKeyId=key["KeyMetadata"]["KeyId"], ) - assert database["DBInstance"]["StorageEncrypted"] is True - assert database["DBInstance"]["KmsKeyId"] == key["KeyMetadata"]["KeyId"] + assert db_instance["StorageEncrypted"] is True + assert db_instance["KmsKeyId"] == key["KeyMetadata"]["KeyId"] @mock_aws -def test_create_db_parameter_group(): - region = DEFAULT_REGION +def test_create_db_parameter_group(client): pg_name = "test" - conn = boto3.client("rds", region_name=region) - db_parameter_group = conn.create_db_parameter_group( + db_parameter_group = client.create_db_parameter_group( DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description="test parameter group", @@ -2671,58 +2095,34 @@ def test_create_db_parameter_group(): "test parameter group" ) assert db_parameter_group["DBParameterGroup"]["DBParameterGroupArn"] == ( - f"arn:aws:rds:{region}:{ACCOUNT_ID}:pg:{pg_name}" + f"arn:aws:rds:{DEFAULT_REGION}:{ACCOUNT_ID}:pg:{pg_name}" ) @mock_aws -def test_create_db_instance_with_parameter_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_parameter_group( +def test_create_db_instance_with_parameter_group(client): + client.create_db_parameter_group( DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description="test parameter group", ) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="mysql", - DBInstanceClass="db.m1.small", - DBParameterGroupName="test", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - ) + db_instance = create_db_instance(DBParameterGroupName="test") - assert len(database["DBInstance"]["DBParameterGroups"]) == 1 - assert database["DBInstance"]["DBParameterGroups"][0]["DBParameterGroupName"] == ( - "test" - ) - assert database["DBInstance"]["DBParameterGroups"][0]["ParameterApplyStatus"] == ( - "in-sync" - ) + assert len(db_instance["DBParameterGroups"]) == 1 + assert db_instance["DBParameterGroups"][0]["DBParameterGroupName"] == "test" + assert db_instance["DBParameterGroups"][0]["ParameterApplyStatus"] == "in-sync" @mock_aws -def test_create_database_with_default_port(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - DBSecurityGroups=["my_sg"], - ) - assert database["DBInstance"]["Endpoint"]["Port"] == 5432 +def test_create_database_with_default_port(client): + db_instance = create_db_instance() + assert db_instance["Endpoint"]["Port"] == 5432 @mock_aws -def test_modify_db_instance_with_parameter_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - database = conn.create_db_instance( +def test_modify_db_instance_with_parameter_group(client): + db_instance = create_db_instance( DBInstanceIdentifier="db-master-1", AllocatedStorage=10, Engine="mysql", @@ -2732,26 +2132,23 @@ def test_modify_db_instance_with_parameter_group(): Port=1234, ) - assert len(database["DBInstance"]["DBParameterGroups"]) == 1 - assert database["DBInstance"]["DBParameterGroups"][0]["DBParameterGroupName"] == ( - "default.mysql5.6" - ) - assert database["DBInstance"]["DBParameterGroups"][0]["ParameterApplyStatus"] == ( - "in-sync" - ) + assert len(db_instance["DBParameterGroups"]) == 1 + parameter_group = db_instance["DBParameterGroups"][0] + assert parameter_group["DBParameterGroupName"] == "default.mysql5.6" + assert parameter_group["ParameterApplyStatus"] == "in-sync" - conn.create_db_parameter_group( + client.create_db_parameter_group( DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description="test parameter group", ) - conn.modify_db_instance( + client.modify_db_instance( DBInstanceIdentifier="db-master-1", DBParameterGroupName="test", ApplyImmediately=True, ) - database = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")[ + database = client.describe_db_instances(DBInstanceIdentifier="db-master-1")[ "DBInstances" ][0] assert len(database["DBParameterGroups"]) == 1 @@ -2760,10 +2157,9 @@ def test_modify_db_instance_with_parameter_group(): @mock_aws -def test_create_db_parameter_group_empty_description(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_create_db_parameter_group_empty_description(client): with pytest.raises(ClientError): - conn.create_db_parameter_group( + client.create_db_parameter_group( DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description="", @@ -2771,15 +2167,14 @@ def test_create_db_parameter_group_empty_description(): @mock_aws -def test_create_db_parameter_group_duplicate(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_parameter_group( +def test_create_db_parameter_group_duplicate(client): + client.create_db_parameter_group( DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description="test parameter group", ) with pytest.raises(ClientError): - conn.create_db_parameter_group( + client.create_db_parameter_group( DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description="test parameter group", @@ -2787,58 +2182,61 @@ def test_create_db_parameter_group_duplicate(): @mock_aws -def test_describe_db_parameter_group(): - region = DEFAULT_REGION +def test_describe_db_parameter_group(client): pg_name = "test" - conn = boto3.client("rds", region_name=region) - conn.create_db_parameter_group( + client.create_db_parameter_group( DBParameterGroupName=pg_name, DBParameterGroupFamily="mysql5.6", Description="test parameter group", ) - db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test") + db_parameter_groups = client.describe_db_parameter_groups( + DBParameterGroupName="test" + ) assert db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupName"] == ( "test" ) assert db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupArn"] == ( - f"arn:aws:rds:{region}:{ACCOUNT_ID}:pg:{pg_name}" + f"arn:aws:rds:{DEFAULT_REGION}:{ACCOUNT_ID}:pg:{pg_name}" ) @mock_aws -def test_describe_non_existent_db_parameter_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test") +def test_describe_non_existent_db_parameter_group(client): + db_parameter_groups = client.describe_db_parameter_groups( + DBParameterGroupName="test" + ) assert len(db_parameter_groups["DBParameterGroups"]) == 0 @mock_aws -def test_delete_db_parameter_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_parameter_group( +def test_delete_db_parameter_group(client): + client.create_db_parameter_group( DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description="test parameter group", ) - db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test") + db_parameter_groups = client.describe_db_parameter_groups( + DBParameterGroupName="test" + ) assert db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupName"] == ( "test" ) - conn.delete_db_parameter_group(DBParameterGroupName="test") - db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test") + client.delete_db_parameter_group(DBParameterGroupName="test") + db_parameter_groups = client.describe_db_parameter_groups( + DBParameterGroupName="test" + ) assert len(db_parameter_groups["DBParameterGroups"]) == 0 @mock_aws -def test_modify_db_parameter_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_parameter_group( +def test_modify_db_parameter_group(client): + client.create_db_parameter_group( DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description="test parameter group", ) - modify_result = conn.modify_db_parameter_group( + modify_result = client.modify_db_parameter_group( DBParameterGroupName="test", Parameters=[ { @@ -2852,7 +2250,7 @@ def test_modify_db_parameter_group(): assert modify_result["DBParameterGroupName"] == "test" - db_parameters = conn.describe_db_parameters(DBParameterGroupName="test") + db_parameters = client.describe_db_parameters(DBParameterGroupName="test") assert db_parameters["Parameters"][0]["ParameterName"] == "foo" assert db_parameters["Parameters"][0]["ParameterValue"] == "foo_val" assert db_parameters["Parameters"][0]["Description"] == "test param" @@ -2860,61 +2258,50 @@ def test_modify_db_parameter_group(): @mock_aws -def test_delete_non_existent_db_parameter_group(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) +def test_delete_non_existent_db_parameter_group(client): with pytest.raises(ClientError): - conn.delete_db_parameter_group(DBParameterGroupName="non-existent") + client.delete_db_parameter_group(DBParameterGroupName="non-existent") @mock_aws -def test_create_parameter_group_with_tags(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_parameter_group( +def test_create_parameter_group_with_tags(client): + client.create_db_parameter_group( DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description="test parameter group", Tags=[{"Key": "foo", "Value": "bar"}], ) - result = conn.list_tags_for_resource( + result = client.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:pg:test" ) assert result["TagList"] == [{"Value": "bar", "Key": "foo"}] @mock_aws -def test_create_db_with_iam_authentication(): - conn = boto3.client("rds", region_name=DEFAULT_REGION) - - database = conn.create_db_instance( +def test_create_db_with_iam_authentication(client): + db_instance = create_db_instance( DBInstanceIdentifier="rds", - DBInstanceClass="db.t1.micro", - Engine="postgres", EnableIAMDatabaseAuthentication=True, ) - db_instance = database["DBInstance"] assert db_instance["IAMDatabaseAuthenticationEnabled"] is True - snapshot = conn.create_db_snapshot( + snapshot = client.create_db_snapshot( DBInstanceIdentifier="rds", DBSnapshotIdentifier="snapshot" - ).get("DBSnapshot") + )["DBSnapshot"] - assert snapshot.get("IAMDatabaseAuthenticationEnabled") is True + assert snapshot["IAMDatabaseAuthenticationEnabled"] is True @mock_aws -def test_create_db_instance_with_tags(): - client = boto3.client("rds", region_name=DEFAULT_REGION) +def test_create_db_instance_with_tags(client): tags = [{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}] db_instance_identifier = "test-db-instance" - resp = client.create_db_instance( + db_instance = create_db_instance( DBInstanceIdentifier=db_instance_identifier, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", Tags=tags, ) - assert resp["DBInstance"]["TagList"] == tags + assert db_instance["TagList"] == tags resp = client.describe_db_instances(DBInstanceIdentifier=db_instance_identifier) assert resp["DBInstances"][0]["TagList"] == tags @@ -2997,17 +2384,11 @@ def test_validate_db_snapshot_identifier_backend_valid(valid_db_snapshot_identif @mock_aws -def test_validate_db_identifier(): - client = boto3.client("rds", region_name=DEFAULT_REGION) +def test_validate_db_identifier(client): invalid_db_instance_identifier = "arn:aws:rds:eu-west-1:123456789012:db:mydb" with pytest.raises(ClientError) as exc: - client.create_db_instance( - DBInstanceIdentifier=invalid_db_instance_identifier, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - ) + create_db_instance(DBInstanceIdentifier=invalid_db_instance_identifier) validation_helper(exc) with pytest.raises(ClientError) as exc: @@ -3036,18 +2417,12 @@ def test_validate_db_identifier(): @mock_aws -def test_validate_db_snapshot_identifier_different_operations(): - client = boto3.client("rds", region_name=DEFAULT_REGION) +def test_validate_db_snapshot_identifier_different_operations(client): db_instance_identifier = "valid-identifier" valid_db_snapshot_identifier = "valid" invalid_db_snapshot_identifier = "--invalid--" - client.create_db_instance( - DBInstanceIdentifier=db_instance_identifier, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - ) + create_db_instance(DBInstanceIdentifier=db_instance_identifier) expected_message = f"Invalid snapshot identifier: {invalid_db_snapshot_identifier}" with pytest.raises(ClientError) as exc: @@ -3107,16 +2482,10 @@ def test_validate_db_snapshot_identifier_different_operations(): ) @mock_aws def test_validate_db_snapshot_identifier_different_error_messages( - invalid_db_snapshot_identifier, expected_message + invalid_db_snapshot_identifier, expected_message, client ): - client = boto3.client("rds", region_name=DEFAULT_REGION) db_instance_identifier = "valid-identifier" - client.create_db_instance( - DBInstanceIdentifier=db_instance_identifier, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - ) + create_db_instance(DBInstanceIdentifier=db_instance_identifier) with pytest.raises(ClientError) as exc: client.create_db_snapshot( @@ -3127,15 +2496,9 @@ def test_validate_db_snapshot_identifier_different_error_messages( @mock_aws -def test_createdb_instance_engine_with_invalid_value(): - client = boto3.client("rds", region_name=DEFAULT_REGION) +def test_createdb_instance_engine_with_invalid_value(client): with pytest.raises(ClientError) as exc: - client.create_db_instance( - DBInstanceIdentifier="test-db-instance", - Engine="invalid-engine", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - ) + create_db_instance(Engine="invalid-engine") err = exc.value.response["Error"] @@ -3147,23 +2510,11 @@ def test_createdb_instance_engine_with_invalid_value(): @mock_aws -def test_describe_db_snapshot_attributes_default(): - client = boto3.client("rds", region_name="us-east-2") - client.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - +def test_describe_db_snapshot_attributes_default(client): + create_db_instance(DBInstanceIdentifier="db-primary-1") client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" - ).get("DBSnapshot") + ) resp = client.describe_db_snapshot_attributes(DBSnapshotIdentifier="snapshot-1") @@ -3172,23 +2523,11 @@ def test_describe_db_snapshot_attributes_default(): @mock_aws -def test_describe_db_snapshot_attributes(): - client = boto3.client("rds", region_name="us-east-2") - client.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) - +def test_describe_db_snapshot_attributes(client): + create_db_instance(DBInstanceIdentifier="db-primary-1") client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" - ).get("DBSnapshot") + ) resp = client.modify_db_snapshot_attribute( DBSnapshotIdentifier="snapshot-1", @@ -3197,58 +2536,38 @@ def test_describe_db_snapshot_attributes(): ) resp = client.describe_db_snapshot_attributes(DBSnapshotIdentifier="snapshot-1") + snapshot_attributes = resp["DBSnapshotAttributesResult"]["DBSnapshotAttributes"] - assert ( - resp["DBSnapshotAttributesResult"]["DBSnapshotAttributes"][0]["AttributeName"] - == "restore" - ) - assert resp["DBSnapshotAttributesResult"]["DBSnapshotAttributes"][0][ - "AttributeValues" - ] == ["Test", "Test2"] + assert snapshot_attributes[0]["AttributeName"] == "restore" + assert snapshot_attributes[0]["AttributeValues"] == ["Test", "Test2"] @mock_aws -def test_modify_db_snapshot_attribute(): - client = boto3.client("rds", region_name="us-east-2") - client.create_db_instance( - DBInstanceIdentifier="db-primary-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - MasterUsername="root", - MasterUserPassword="hunter2", - Port=1234, - DBSecurityGroups=["my_sg"], - ) +def test_modify_db_snapshot_attribute(client): + create_db_instance(DBInstanceIdentifier="db-primary-1") client.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" - ).get("DBSnapshot") + ) - resp = client.modify_db_snapshot_attribute( + client.modify_db_snapshot_attribute( DBSnapshotIdentifier="snapshot-1", AttributeName="restore", ValuesToAdd=["Test", "Test2"], ) - resp = client.modify_db_snapshot_attribute( + client.modify_db_snapshot_attribute( DBSnapshotIdentifier="snapshot-1", AttributeName="restore", ValuesToRemove=["Test"], ) - resp = client.modify_db_snapshot_attribute( + snapshot_attributes = client.modify_db_snapshot_attribute( DBSnapshotIdentifier="snapshot-1", AttributeName="restore", ValuesToAdd=["Test3"], - ) + )["DBSnapshotAttributesResult"]["DBSnapshotAttributes"] - assert ( - resp["DBSnapshotAttributesResult"]["DBSnapshotAttributes"][0]["AttributeName"] - == "restore" - ) - assert resp["DBSnapshotAttributesResult"]["DBSnapshotAttributes"][0][ - "AttributeValues" - ] == ["Test2", "Test3"] + assert snapshot_attributes[0]["AttributeName"] == "restore" + assert snapshot_attributes[0]["AttributeValues"] == ["Test2", "Test3"] def validation_helper(exc):