Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Adds create_secret and delete_secret tasks #13

Merged
merged 4 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ jobs:
strategy:
matrix:
python-version:
- 3.7
- 3.8
- 3.9
- "3.7"
- "3.8"
- "3.9"
- "3.10"
fail-fast: false
steps:
- uses: actions/checkout@v3
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ plugins:
show_object_full_path: False
show_category_heading: False
show_bases: False
show_signature: False
ahuang11 marked this conversation as resolved.
Show resolved Hide resolved
heading_level: 1
watch:
- prefect_aws/
Expand Down
209 changes: 199 additions & 10 deletions prefect_aws/secrets_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Tasks for interacting with AWS Secrets Manager"""
from functools import partial
from typing import Dict, Optional, Union
from typing import Dict, List, Optional, Union

from anyio import to_thread
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -93,11 +93,11 @@ async def update_secret(
Returns:
Dict[str, str]: A dict containing the secret ARN (Amazon Resource Name),
name, and current version ID.
```json
```python
{
"ARN": "string",
"Name": "string",
"VersionId": "string"
"ARN": str,
"Name": str,
"VersionId": str
}
```

Expand All @@ -115,7 +115,7 @@ def example_update_secret():
aws_access_key_id="acccess_key_id",
aws_secret_access_key="secret_access_key"
)
secret_value = read_secret(
update_secret(
secret_name="life_the_universe_and_everything",
secret_value="42",
aws_credentials=aws_credentials
Expand Down Expand Up @@ -151,10 +151,199 @@ def example_update_secret():


@task
def create_secret():
raise NotImplementedError()
async def create_secret(
secret_name: str,
secret_value: Union[str, bytes],
aws_credentials: AwsCredentials,
description: Optional[str] = None,
tags: Optional[List[Dict[str, str]]] = None,
) -> Dict[str, str]:
"""
Creates a secret in AWS Secrets Manager.

Args:
secret_name: The name of the secret to create.
secret_value: The value to store in the created secret.
aws_credentials: Credentials to use for authentication with AWS.
description: A description for the created secret.
tags: A list of tags to attach to the secret. Each tag should be specified as a
dictionary in the following format:
```python
{
"Key": str,
"Value": str
}
```

Returns:
Dict[str, str]: A dict containing the secret ARN (Amazon Resource Name),
name, and current version ID.
```python
{
"ARN": str,
"Name": str,
"VersionId": str
}
```
Example:
Create a secret:

```python
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import create_secret

@flow
def example_create_secret():
aws_credentials = AwsCredentials(
aws_access_key_id="acccess_key_id",
aws_secret_access_key="secret_access_key"
)
create_secret(
secret_name="life_the_universe_and_everything",
secret_value="42",
aws_credentials=aws_credentials
)

example_create_secret()
```


"""
create_secret_kwargs: Dict[str, Union[str, bytes, List[Dict[str, str]]]] = dict(
Name=secret_name
)
if description is not None:
create_secret_kwargs["Description"] = description
if tags is not None:
create_secret_kwargs["Tags"] = tags
if isinstance(secret_value, bytes):
create_secret_kwargs["SecretBinary"] = secret_value
elif isinstance(secret_value, str):
create_secret_kwargs["SecretString"] = secret_value
else:
raise ValueError("Please provide a bytes or str value for secret_value")

logger = get_run_logger()
logger.info("Creating secret named %s", secret_name)

client = aws_credentials.get_boto3_session().client("secretsmanager")

try:
create_secret = partial(client.create_secret, **create_secret_kwargs)
response = await to_thread.run_sync(create_secret)
response.pop("ResponseMetadata", None)
ahuang11 marked this conversation as resolved.
Show resolved Hide resolved
return response
except ClientError:
logger.exception("Unable to create secret %s", secret_name)
raise


@task
def delete_secret():
raise NotImplementedError()
async def delete_secret(
secret_name: str,
aws_credentials: AwsCredentials,
recovery_window_in_days: int = 30,
force_delete_without_recovery: bool = False,
):
"""
Deletes a secret from AWS Secrets Manager.

Secrets can either be deleted immediately by setting `force_delete_without_recovery`
equal to `True`. Otherwise, secrets will be marked for deletion and available for
recovery for the number of days specified in `recovery_window_in_days`

Args:
secret_name: Name of the secret to be deleted.
aws_credentials: Credentials to use for authentication with AWS.
recovery_window_in_days: Number of days a secret should be recoverable for
before permenant deletion. Minium window is 7 days and maximum window
is 30 days. If `force_delete_without_recovery` is set to `True`, this
value will be ignored.
force_delete_without_recovery: If `True`, the secret will be immediately
deleted and will not be recoverable.

Returns:
Dict[str, str]: A dict containing the secret ARN (Amazon Resource Name),
name, and deletion date of the secret. DeletionDate is the date and
time of the delete request plus the number of days in
`recovery_window_in_days`.
```python
{
"ARN": str,
"Name": str,
"DeletionDate": datetime.datetime
}
```

Examples:
Delete a secret immediately:

```python
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import delete_secret

@flow
def example_delete_secret_immediately():
aws_credentials = AwsCredentials(
aws_access_key_id="acccess_key_id",
aws_secret_access_key="secret_access_key"
)
delete_secret(
secret_name="life_the_universe_and_everything",
aws_credentials=aws_credentials,
force_delete_without_recovery: True
)

example_delete_secret_immediately()
```

Delete a secret with a 90 day recovery window:

```python
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import delete_secret

@flow
def example_delete_secret_with_recovery_window():
aws_credentials = AwsCredentials(
aws_access_key_id="acccess_key_id",
aws_secret_access_key="secret_access_key"
)
delete_secret(
secret_name="life_the_universe_and_everything",
aws_credentials=aws_credentials,
recovery_window_in_days=90
)

example_delete_secret_with_recovery_window()
```


"""
if not force_delete_without_recovery and not (7 <= recovery_window_in_days <= 30):
raise ValueError("Recovery window must be between 7 and 30 days.")

delete_secret_kwargs: Dict[str, Union[str, int, bool]] = dict(SecretId=secret_name)
if force_delete_without_recovery:
delete_secret_kwargs[
"ForceDeleteWithoutRecovery"
] = force_delete_without_recovery
else:
delete_secret_kwargs["RecoveryWindowInDays"] = recovery_window_in_days

logger = get_run_logger()
logger.info("Deleting secret %s", secret_name)

client = aws_credentials.get_boto3_session().client("secretsmanager")

try:
delete_secret = partial(client.delete_secret, **delete_secret_kwargs)
response = await to_thread.run_sync(delete_secret)
response.pop("ResponseMetadata", None)
return response
except ClientError:
logger.exception("Unable to delete secret %s", secret_name)
raise
82 changes: 78 additions & 4 deletions tests/test_secrets_manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from datetime import date, timedelta

import boto3
import pytest
from moto import mock_secretsmanager
from prefect import flow

from prefect_aws.secrets_manager import read_secret, update_secret
from prefect_aws.secrets_manager import (
create_secret,
delete_secret,
read_secret,
update_secret,
)


@pytest.fixture
Expand Down Expand Up @@ -61,7 +68,7 @@ async def test_flow():
assert (await test_flow()).result().result() == expected_value


async def test_update_secret(secret_under_test, aws_credentials):
async def test_update_secret(secret_under_test, aws_credentials, secretsmanager_client):
current_secret_value = secret_under_test["expected_value"]
new_secret_value = (
current_secret_value + "2"
Expand All @@ -80,11 +87,78 @@ async def test_flow():
flow_state = await test_flow()
assert flow_state.result().result().get("Name") == secret_under_test["secret_name"]

sm_client = aws_credentials.get_boto3_session().client("secretsmanager")
updated_secret = sm_client.get_secret_value(
updated_secret = secretsmanager_client.get_secret_value(
SecretId=secret_under_test["secret_name"]
)
assert (
updated_secret.get("SecretString") == new_secret_value
or updated_secret.get("SecretBinary") == new_secret_value
)


@pytest.mark.parametrize(
["secret_name", "secret_value"], [["string_secret", "42"], ["binary_secret", b"42"]]
)
async def test_create_secret(
aws_credentials, secret_name, secret_value, secretsmanager_client
):
@flow
async def test_flow():
return await create_secret(
secret_name=secret_name,
secret_value=secret_value,
aws_credentials=aws_credentials,
)

flow_state = await test_flow()
assert flow_state.result().result().get("Name") == secret_name

updated_secret = secretsmanager_client.get_secret_value(SecretId=secret_name)
assert (
updated_secret.get("SecretString") == secret_value
or updated_secret.get("SecretBinary") == secret_value
)


@pytest.mark.parametrize(
["recovery_window_in_days", "force_delete_without_recovery"],
[
[30, False],
[90, False],
[7, False],
[6, False],
[10, False],
[15, True],
[31, True],
],
)
async def test_delete_secret(
aws_credentials,
secret_under_test,
recovery_window_in_days,
force_delete_without_recovery,
):
@flow
async def test_flow():
return await delete_secret(
secret_name=secret_under_test["secret_name"],
aws_credentials=aws_credentials,
recovery_window_in_days=recovery_window_in_days,
force_delete_without_recovery=force_delete_without_recovery,
)

flow_state = await test_flow()
if not force_delete_without_recovery and not 7 <= recovery_window_in_days <= 30:
with pytest.raises(ValueError):
result = flow_state.result().result()
ahuang11 marked this conversation as resolved.
Show resolved Hide resolved

else:
result = flow_state.result().result()
assert result.get("Name") == secret_under_test["secret_name"]
deletion_date = result.get("DeletionDate")
if not force_delete_without_recovery:
assert deletion_date.date() == (
date.today() + timedelta(days=recovery_window_in_days)
)
else:
assert deletion_date.date() == date.today()