Skip to content

Commit

Permalink
test: dedup kms_bucket fixture (#1129)
Browse files Browse the repository at this point in the history
After adding back KMS permissions to the kokoro project, KMS integration tests now pass. 

However, upon investigation, I noticed that we have a duplicate set of kms pytest fixtures. This removes the duplicates and changes fixture scope to per-function.

Fixes #1128
  • Loading branch information
cojenco authored Oct 11, 2023
1 parent 28c02dd commit a455195
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 92 deletions.
60 changes: 56 additions & 4 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import pytest

from google.api_core import exceptions
from google.cloud import kms
from google.cloud.storage._helpers import _base64_md5hash
from . import _helpers

Expand Down Expand Up @@ -235,12 +237,12 @@ def file_data():
return _file_data


@pytest.fixture(scope="session")
@pytest.fixture(scope="function")
def kms_bucket_name():
return _helpers.unique_name("gcp-systest-kms")


@pytest.fixture(scope="session")
@pytest.fixture(scope="function")
def kms_bucket(storage_client, kms_bucket_name, no_mtls):
bucket = _helpers.retry_429_503(storage_client.create_bucket)(kms_bucket_name)

Expand All @@ -249,11 +251,61 @@ def kms_bucket(storage_client, kms_bucket_name, no_mtls):
_helpers.delete_bucket(bucket)


@pytest.fixture(scope="session")
@pytest.fixture(scope="function")
def kms_key_name(storage_client, kms_bucket):
return _kms_key_name(storage_client, kms_bucket, default_key_name)


@pytest.fixture(scope="session")
@pytest.fixture(scope="function")
def alt_kms_key_name(storage_client, kms_bucket):
return _kms_key_name(storage_client, kms_bucket, alt_key_name)


@pytest.fixture(scope="session")
def kms_client():
return kms.KeyManagementServiceClient()


@pytest.fixture(scope="function")
def keyring(storage_client, kms_bucket, kms_client):
project = storage_client.project
location = kms_bucket.location.lower()
purpose = kms.enums.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT

# If the keyring doesn't exist create it.
keyring_path = kms_client.key_ring_path(project, location, keyring_name)

try:
kms_client.get_key_ring(keyring_path)
except exceptions.NotFound:
parent = kms_client.location_path(project, location)
kms_client.create_key_ring(parent, keyring_name, {})

# Mark this service account as an owner of the new keyring
service_account_email = storage_client.get_service_account_email()
policy = {
"bindings": [
{
"role": "roles/cloudkms.cryptoKeyEncrypterDecrypter",
"members": ["serviceAccount:" + service_account_email],
}
]
}
kms_client.set_iam_policy(keyring_path, policy)

# Populate the keyring with the keys we use in the tests
key_names = [
"gcs-test",
"gcs-test-alternate",
"explicit-kms-key-name",
"default-kms-key-name",
"override-default-kms-key-name",
"alt-default-kms-key-name",
]
for key_name in key_names:
key_path = kms_client.crypto_key_path(project, location, keyring_name, key_name)
try:
kms_client.get_crypto_key(key_path)
except exceptions.NotFound:
key = {"purpose": purpose}
kms_client.create_crypto_key(keyring_path, key_name, key)
88 changes: 0 additions & 88 deletions tests/system/test_kms_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,99 +14,11 @@

import os

import pytest

from google.api_core import exceptions
from google.cloud import kms
from . import _helpers

keyring_name = "gcs-test"
default_key_name = "gcs-test"
alt_key_name = "gcs-test-alternate"
_key_name_format = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}"


def _kms_key_name(client, bucket, key_name):
return _key_name_format.format(
client.project,
bucket.location.lower(),
keyring_name,
key_name,
)


@pytest.fixture(scope="session")
def kms_bucket_name():
return _helpers.unique_name("gcp-systest-kms")


@pytest.fixture(scope="session")
def kms_bucket(storage_client, kms_bucket_name, no_mtls):
bucket = _helpers.retry_429_503(storage_client.create_bucket)(kms_bucket_name)

yield bucket

_helpers.delete_bucket(bucket)


@pytest.fixture(scope="session")
def kms_client():
return kms.KeyManagementServiceClient()


@pytest.fixture(scope="function")
def keyring(storage_client, kms_bucket, kms_client):
project = storage_client.project
location = kms_bucket.location.lower()
purpose = kms.enums.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT

# If the keyring doesn't exist create it.
keyring_path = kms_client.key_ring_path(project, location, keyring_name)

try:
kms_client.get_key_ring(keyring_path)
except exceptions.NotFound:
parent = kms_client.location_path(project, location)
kms_client.create_key_ring(parent, keyring_name, {})

# Mark this service account as an owner of the new keyring
service_account_email = storage_client.get_service_account_email()
policy = {
"bindings": [
{
"role": "roles/cloudkms.cryptoKeyEncrypterDecrypter",
"members": ["serviceAccount:" + service_account_email],
}
]
}
kms_client.set_iam_policy(keyring_path, policy)

# Populate the keyring with the keys we use in the tests
key_names = [
"gcs-test",
"gcs-test-alternate",
"explicit-kms-key-name",
"default-kms-key-name",
"override-default-kms-key-name",
"alt-default-kms-key-name",
]
for key_name in key_names:
key_path = kms_client.crypto_key_path(project, location, keyring_name, key_name)
try:
kms_client.get_crypto_key(key_path)
except exceptions.NotFound:
key = {"purpose": purpose}
kms_client.create_crypto_key(keyring_path, key_name, key)


@pytest.fixture(scope="session")
def kms_key_name(storage_client, kms_bucket):
return _kms_key_name(storage_client, kms_bucket, default_key_name)


@pytest.fixture(scope="session")
def alt_kms_key_name(storage_client, kms_bucket):
return _kms_key_name(storage_client, kms_bucket, alt_key_name)


def test_blob_w_explicit_kms_key_name(
Expand Down

0 comments on commit a455195

Please sign in to comment.