Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add SecretManager block #86

Merged
merged 21 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `SecretBlock` block with `read_secret`, `write_secret`, and `delete_secret` methods - [#86](https://github.com/PrefectHQ/prefect-gcp/pull/86)

### Changed

- Made `GcpCredentials.get_access_token` sync compatible - [#80](https://github.com/PrefectHQ/prefect-gcp/pull/80)
Expand Down
108 changes: 105 additions & 3 deletions prefect_gcp/secret_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
from functools import partial
from typing import TYPE_CHECKING, Optional, Union
from typing import Optional, Union

from anyio import to_thread
from google.api_core.exceptions import NotFound
from google.cloud.secretmanager_v1.types.resources import (
Replication,
Secret,
SecretPayload,
)
from google.cloud.secretmanager_v1.types.service import (
AccessSecretVersionRequest,
AddSecretVersionRequest,
CreateSecretRequest,
DeleteSecretRequest,
)
from prefect import get_run_logger, task
from prefect.blocks.abstract import SecretBlock
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
from pydantic import Field

if TYPE_CHECKING:
from prefect_gcp.credentials import GcpCredentials
from prefect_gcp.credentials import GcpCredentials


@task
Expand Down Expand Up @@ -270,3 +284,91 @@ def example_cloud_storage_delete_secret_version_flow():
partial_destroy = partial(client.destroy_secret_version, name=name, timeout=timeout)
await to_thread.run_sync(partial_destroy)
return name


class SecretManager(SecretBlock):
ahuang11 marked this conversation as resolved.
Show resolved Hide resolved

gcp_credentials: GcpCredentials
secret_name: str = Field(default=..., description="Name of the secret to manage.")

@sync_compatible
async def read_secret(
self,
version_id: Union[str, int] = "latest",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to have the secret version be an attribute on the block, but still default to latest. Also I think we can just accept a string for the version. I can't think of any time that we'd need to do math with the version number, and we can cast it to an integer if that case does come up.

) -> bytes:
client = self.gcp_credentials.get_secret_manager_client()
project = self.gcp_credentials.project
name = f"projects/{project}/secrets/{self.secret_name}/versions/{version_id}"
request = AccessSecretVersionRequest(name=name)

self.logger.debug(f"Preparing to read secret data from {name!r}.")
response = await run_sync_in_worker_thread(
client.access_secret_version, request=request
)
secret = response.payload.data.decode("UTF-8")
self.logger.info(f"The secret {name!r} data was successfully read.")
return secret

@sync_compatible
async def write_secret(self, secret_data: bytes) -> str:
"""
Writes the secret data to the secret storage service; if it doesn't exist
it will be created.

Args:
secret_data: The secret to write.

Returns:
The path that the secret was written to.
"""
client = self.gcp_credentials.get_secret_manager_client()
project = self.gcp_credentials.project
parent = f"projects/{project}/secrets/{self.secret_name}"
payload = SecretPayload(data=secret_data)
add_request = AddSecretVersionRequest(parent=parent, payload=payload)

self.logger.debug(f"Preparing to write secret data to {parent!r}.")
try:
response = await run_sync_in_worker_thread(
client.add_secret_version, request=add_request
)
except NotFound:
self.logger.info(
f"The secret {parent!r} does not exist yet, creating it now."
)
create_parent = f"projects/{project}"
secret_id = self.secret_name
secret = Secret(replication=Replication(automatic=Replication.Automatic()))
create_request = CreateSecretRequest(
parent=create_parent, secret_id=secret_id, secret=secret
)
await run_sync_in_worker_thread(
client.create_secret, request=create_request
)

self.logger.debug(f"Preparing to write secret data to {parent!r} again.")
response = await run_sync_in_worker_thread(
client.add_secret_version, request=add_request
)

self.logger.info(f"The secret data was written successfully to {parent!r}.")
return response.name

@sync_compatible
async def delete_secret(self) -> str:
"""
Deletes the secret from the secret storage service.

Returns:
The path that the secret was deleted from.
"""
client = self.gcp_credentials.get_secret_manager_client()
project = self.gcp_credentials.project

name = f"projects/{project}/secrets/{self.secret_name}"
request = DeleteSecretRequest(name=name)

self.logger.debug(f"Preparing to delete the secret {name!r}.")
await run_sync_in_worker_thread(client.delete_secret, request=request)
self.logger.info(f"The secret {name!r} was successfully deleted.")
return name
27 changes: 20 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest.mock import MagicMock

import pytest
from google.api_core.exceptions import NotFound as ApiCoreNotFound
from google.cloud.aiplatform_v1.types.job_state import JobState
from google.cloud.exceptions import NotFound
from prefect.testing.utilities import prefect_test_harness
Expand Down Expand Up @@ -139,25 +140,37 @@ class SecretManagerClient:
def __init__(self, credentials=None, project=None):
self.credentials = credentials
self.project = project
self._secrets = {}

def create_secret(self, parent=None, secret_id=None, **kwds):
def create_secret(self, request=None, parent=None, secret_id=None, **kwds):
response = MagicMock()
response.name = secret_id
if request:
parent = request.parent
secret_id = request.secret_id
name = f"{parent}/secrets/{secret_id}"
response.name = name
self._secrets[name] = None
return response

def add_secret_version(self, parent, payload, **kwds):
def add_secret_version(self, request=None, parent=None, payload=None, **kwds):
response = MagicMock()
response.name = payload["data"]
if request:
parent = request.parent

if parent not in self._secrets:
raise ApiCoreNotFound(f"{parent!r} does not exist.")

response.name = parent
return response

def access_secret_version(self, name, **kwds):
def access_secret_version(self, request=None, name=None, **kwds):
response = MagicMock()
payload = MagicMock()
payload.data = f"{name}".encode()
payload.data = "secret_data".encode("utf-8")
response.payload = payload
return response

def delete_secret(self, name, **kwds):
def delete_secret(self, request=None, name=None, **kwds):
return name

def destroy_secret_version(self, name, **kwds):
Expand Down
32 changes: 29 additions & 3 deletions tests/test_secret_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from prefect import flow

from prefect_gcp.secret_manager import (
SecretManager,
create_secret,
delete_secret,
delete_secret_version,
Expand All @@ -15,27 +16,28 @@ def test_create_secret(gcp_credentials):
def test_flow():
return create_secret("secret_name", gcp_credentials)

assert test_flow() == "secret_name"
assert test_flow() == "projects/gcp_credentials_project/secrets/secret_name"


@pytest.mark.parametrize("secret_value", ["secret", b"secret_byte"])
def test_update_secret(secret_value, gcp_credentials):
@flow
def test_flow():
create_secret("secret_name", gcp_credentials)
return update_secret("secret_name", secret_value, gcp_credentials)

if isinstance(secret_value, str):
secret_value = secret_value.encode("UTF-8")

assert test_flow() == secret_value
assert test_flow() == "projects/gcp_credentials_project/secrets/secret_name"


def test_read_secret(gcp_credentials):
@flow
def test_flow():
return read_secret("secret_name", gcp_credentials)

expected = "projects/gcp_credentials_project/secrets/secret_name/versions/latest"
expected = "secret_data"
assert test_flow() == expected


Expand Down Expand Up @@ -72,3 +74,27 @@ def test_flow():
project = project or gcp_credentials.project
path = f"projects/{project}/secrets/{secret_name}/versions/{version_id}"
assert test_flow() == path


class TestSecretManager:
@pytest.fixture
def secret_manager(self, gcp_credentials):
_secret_manager = SecretManager(
gcp_credentials=gcp_credentials, secret_name="my_secret_name"
)
return _secret_manager

def test_write_secret(self, secret_manager):
expected = "projects/gcp_credentials_project/secrets/my_secret_name"
actual = secret_manager.write_secret(secret_data=b"my_secret_data")
assert actual == expected

def test_read_secret(self, secret_manager):
expected = "secret_data"
actual = secret_manager.read_secret()
assert actual == expected

def test_delete_secret(self, secret_manager):
expected = "projects/gcp_credentials_project/secrets/my_secret_name"
actual = secret_manager.delete_secret()
assert actual == expected