Skip to content

Commit

Permalink
Remove redundant GET and PUT secret functions
Browse files Browse the repository at this point in the history
Update README.md
  • Loading branch information
dormant-user committed Jan 19, 2025
1 parent a7dcd79 commit c6963a3
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 56 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import vaultapi


if __name__ == '__main__':
vaultapi.decrypt(get_secret="mykey", table="mytable")
vaultapi_client = vaultapi.VaultAPIClient()
vaultapi_client.get_secret(key="mykey", table_name="mytable")
```

**Initiate - CLI**
Expand Down
2 changes: 0 additions & 2 deletions vaultapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ class EndpointMapping:
get_table: str = "/get-table"
get_secret: str = "/get-secret"
put_secret: str = "/put-secret"
get_secrets: str = "/get-secrets"
put_secrets: str = "/put-secrets"
list_tables: str = "/list-tables"
create_table: str = "/create-table"
delete_secret: str = "/delete-secret"
Expand Down
61 changes: 8 additions & 53 deletions vaultapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,8 @@ def _get_cipher(self, server_url: str, query_params: Dict[str, str]) -> str:
)
return process_response(response)

def update_secret(self, key: str, value: str, table_name: str) -> Dict[str, str]:
"""Update or create a new secret in the vault.
Args:
key: Key for the secret.
value: Value for the secret.
table_name: Table name.
Returns:
Dict[str, str]:
Returns the server response.
"""
url = urljoin(self.env_config.vault_server, server_map.put_secret)
response = self.SESSION.put(
url,
json={
"key": key,
"value": value,
"table_name": table_name,
},
)
return process_response(response)

def update_secrets(
self, secrets: Dict[str, str], table_name: str
) -> Dict[str, str]:
"""Update or create multiple new secrets in the vault.
def update_secret(self, secrets: Dict[str, str], table_name: str) -> Dict[str, str]:
"""Update or create secrets in the vault.
Args:
secrets: Key value pairs with multiple secrets.
Expand All @@ -88,11 +63,11 @@ def update_secrets(
Dict[str, str]:
Returns the server response.
"""
url = urljoin(self.env_config.vault_server, server_map.put_secrets)
url = urljoin(self.env_config.vault_server, server_map.put_secret)
response = self.SESSION.put(
url,
json={
"secrets": secrets,
"secrets": self.transit_shield.encrypt(payload=secrets),
"table_name": table_name,
},
)
Expand Down Expand Up @@ -145,33 +120,18 @@ def create_table(self, table_name: str) -> Dict[str, str]:
return process_response(response)

def get_secret(self, key: str, table_name: str) -> Dict[str, str]:
"""Retrieve a targeted secret from a table.
Args:
key: Name of the secret to be retrieved.
table_name: Name of the table where the secret is stored.
Returns:
Dict[str, str]:
Returns a dictionary of decrypted values.
"""
url = urljoin(self.env_config.vault_server, server_map.get_secret)
cipher_text = self._get_cipher(url, {"key": key, "table_name": table_name})
return self.transit_shield.decrypt(cipher_text)

def get_secrets(self, keys: str, table_name: str) -> Dict[str, str]:
"""Retrieves multiple secrets from a table.
Args:
keys: Comma separated list of secret names to be retrieved.
key: Comma separated list of secret names to be retrieved.
table_name: Table name.
Returns:
Dict[str, str]:
Returns a dictionary of decrypted values.
"""
url = urljoin(self.env_config.vault_server, server_map.get_secrets)
cipher_text = self._get_cipher(url, {"keys": keys, "table_name": table_name})
url = urljoin(self.env_config.vault_server, server_map.get_secret)
cipher_text = self._get_cipher(url, {"key": key, "table_name": table_name})
return self.transit_shield.decrypt(cipher_text)

def get_table(self, table_name: str) -> Dict[str, str]:
Expand All @@ -192,14 +152,12 @@ def decrypt(
self,
table: str,
get_secret: str = None,
get_secrets: str = None,
) -> Dict[str, str] | str:
"""Decrypt function.
Args:
table: Table name to retrieve.
get_secret: Secret key to retrieve.
get_secrets: Comma separated list of secret keys to retrieve.
get_secret: Comma separated list of secret keys to retrieve.
Returns:
Dict[str, str]:
Expand All @@ -212,9 +170,6 @@ def decrypt(
if get_secret:
url = urljoin(self.env_config.vault_server, server_map.get_secret)
params["key"] = get_secret
elif get_secrets:
url = urljoin(self.env_config.vault_server, server_map.get_secrets)
params["keys"] = get_secrets
else:
url = urljoin(self.env_config.vault_server, server_map.get_table)
return self.transit_shield.decrypt(self._get_cipher(url, params))

0 comments on commit c6963a3

Please sign in to comment.