diff --git a/tests/system/conftest.py b/tests/system/conftest.py index fe90ceb80..329be584f 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -17,6 +17,8 @@ import pytest +from google.api_core import exceptions +from google.cloud import kms from google.cloud.storage._helpers import _base64_md5hash from . import _helpers @@ -235,12 +237,12 @@ def file_data(): return _file_data -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def kms_bucket_name(): return _helpers.unique_name("gcp-systest-kms") -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def kms_bucket(storage_client, kms_bucket_name, no_mtls): bucket = _helpers.retry_429_503(storage_client.create_bucket)(kms_bucket_name) @@ -249,11 +251,61 @@ def kms_bucket(storage_client, kms_bucket_name, no_mtls): _helpers.delete_bucket(bucket) -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def kms_key_name(storage_client, kms_bucket): return _kms_key_name(storage_client, kms_bucket, default_key_name) -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def alt_kms_key_name(storage_client, kms_bucket): return _kms_key_name(storage_client, kms_bucket, alt_key_name) + + +@pytest.fixture(scope="session") +def kms_client(): + return kms.KeyManagementServiceClient() + + +@pytest.fixture(scope="function") +def keyring(storage_client, kms_bucket, kms_client): + project = storage_client.project + location = kms_bucket.location.lower() + purpose = kms.enums.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT + + # If the keyring doesn't exist create it. + keyring_path = kms_client.key_ring_path(project, location, keyring_name) + + try: + kms_client.get_key_ring(keyring_path) + except exceptions.NotFound: + parent = kms_client.location_path(project, location) + kms_client.create_key_ring(parent, keyring_name, {}) + + # Mark this service account as an owner of the new keyring + service_account_email = storage_client.get_service_account_email() + policy = { + "bindings": [ + { + "role": "roles/cloudkms.cryptoKeyEncrypterDecrypter", + "members": ["serviceAccount:" + service_account_email], + } + ] + } + kms_client.set_iam_policy(keyring_path, policy) + + # Populate the keyring with the keys we use in the tests + key_names = [ + "gcs-test", + "gcs-test-alternate", + "explicit-kms-key-name", + "default-kms-key-name", + "override-default-kms-key-name", + "alt-default-kms-key-name", + ] + for key_name in key_names: + key_path = kms_client.crypto_key_path(project, location, keyring_name, key_name) + try: + kms_client.get_crypto_key(key_path) + except exceptions.NotFound: + key = {"purpose": purpose} + kms_client.create_crypto_key(keyring_path, key_name, key) diff --git a/tests/system/test_kms_integration.py b/tests/system/test_kms_integration.py index f047baced..619ffe110 100644 --- a/tests/system/test_kms_integration.py +++ b/tests/system/test_kms_integration.py @@ -14,99 +14,11 @@ import os -import pytest - -from google.api_core import exceptions -from google.cloud import kms from . import _helpers keyring_name = "gcs-test" default_key_name = "gcs-test" alt_key_name = "gcs-test-alternate" -_key_name_format = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}" - - -def _kms_key_name(client, bucket, key_name): - return _key_name_format.format( - client.project, - bucket.location.lower(), - keyring_name, - key_name, - ) - - -@pytest.fixture(scope="session") -def kms_bucket_name(): - return _helpers.unique_name("gcp-systest-kms") - - -@pytest.fixture(scope="session") -def kms_bucket(storage_client, kms_bucket_name, no_mtls): - bucket = _helpers.retry_429_503(storage_client.create_bucket)(kms_bucket_name) - - yield bucket - - _helpers.delete_bucket(bucket) - - -@pytest.fixture(scope="session") -def kms_client(): - return kms.KeyManagementServiceClient() - - -@pytest.fixture(scope="function") -def keyring(storage_client, kms_bucket, kms_client): - project = storage_client.project - location = kms_bucket.location.lower() - purpose = kms.enums.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT - - # If the keyring doesn't exist create it. - keyring_path = kms_client.key_ring_path(project, location, keyring_name) - - try: - kms_client.get_key_ring(keyring_path) - except exceptions.NotFound: - parent = kms_client.location_path(project, location) - kms_client.create_key_ring(parent, keyring_name, {}) - - # Mark this service account as an owner of the new keyring - service_account_email = storage_client.get_service_account_email() - policy = { - "bindings": [ - { - "role": "roles/cloudkms.cryptoKeyEncrypterDecrypter", - "members": ["serviceAccount:" + service_account_email], - } - ] - } - kms_client.set_iam_policy(keyring_path, policy) - - # Populate the keyring with the keys we use in the tests - key_names = [ - "gcs-test", - "gcs-test-alternate", - "explicit-kms-key-name", - "default-kms-key-name", - "override-default-kms-key-name", - "alt-default-kms-key-name", - ] - for key_name in key_names: - key_path = kms_client.crypto_key_path(project, location, keyring_name, key_name) - try: - kms_client.get_crypto_key(key_path) - except exceptions.NotFound: - key = {"purpose": purpose} - kms_client.create_crypto_key(keyring_path, key_name, key) - - -@pytest.fixture(scope="session") -def kms_key_name(storage_client, kms_bucket): - return _kms_key_name(storage_client, kms_bucket, default_key_name) - - -@pytest.fixture(scope="session") -def alt_kms_key_name(storage_client, kms_bucket): - return _kms_key_name(storage_client, kms_bucket, alt_key_name) def test_blob_w_explicit_kms_key_name(