Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify VaultCredentialProvider to accept non-user/pass attributes #7

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions vault12factor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,17 @@ def __init__(self, vaulturl: str, vaultauth: VaultAuthentication, secretpath: st
self.pin_cacert = pin_cacert
self.ssl_verify = ssl_verify
self.debug_output = debug_output
self._cache = None # type: Dict[str, str]
self._cache = {} # type: Dict[str, str]
self._leasetime = None # type: datetime.datetime
self._updatetime = None # type: datetime.datetime
self._lease_id = None # type: str
self._refresh()

def _attach_secret_attribute(instance, attribute):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Please add PEP484 type annotations for attribute and the return value (None)
  • Please don't override the self convention (as per PEP-8)

class_name = instance.__class__.__name__ + 'Child'
child_class = type(class_name, (instance.__class__,), {attribute: property(lambda self: self._get_or_update(attribute))})
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead either shadow self here or rename this instance of the parameter name? This way this should be able to stay PEP-8 compliant, right?


instance.__class__ = child_class

def _now(self) -> datetime.datetime:
return datetime.datetime.now(pytz.timezone("UTC"))
Expand All @@ -248,41 +255,59 @@ def _refresh(self) -> None:
(self.secretpath, str(e))
) from e

if "data" not in result or "username" not in result["data"] or "password" not in result["data"]:
if not result:
try:
mountpoint, path = self.secretpath.split('/', 1)
result = vcl.secrets.kv.v2.read_secret_version(mount_point=mountpoint,path=path)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
result = vcl.secrets.kv.v2.read_secret_version(mount_point=mountpoint,path=path)
result = vcl.secrets.kv.v2.read_secret_version(mount_point=mountpoint, path=path)

except RequestException as e:
raise VaultCredentialProviderException(
"Unable to read credentials from path '%s' with request error: %s" %
(self.secretpath, str(e))
) from e

if "data" not in result:
raise VaultCredentialProviderException(
"Read dict from Vault path %s did not match expected structure (data->{username, password}): %s" %
"Read dict from Vault path %s did not match expected structure: %s" %
(self.secretpath, str(result))
)

self._cache = result["data"]
self._lease_id = result["lease_id"]
# Before updating the cache, check for vault keys that have been removed and remove the property
for old_key in list(set(self._cache.keys()) - set(result["data"].keys())):
delattr(self.__class__, old_key)

# Normalize secret data between kv engines and everything else
if "data" in result["data"]:
secret = result["data"]["data"]
else:
secret = result["data"]

self._cache = secret

# Add any keys in the to our object as a property
for attribute in secret.keys():
if not getattr(self, attribute, None):
self._attach_secret_attribute(attribute)

self._lease_id = secret.get("lease_id")
self._leasetime = self._now()
self._updatetime = self._leasetime + datetime.timedelta(seconds=int(result["lease_duration"]))
self._updatetime = self._leasetime + datetime.timedelta(
seconds=int(secret.get("lease_duration", os.getenv("VAULT_DEFAULT_CACHE_DURATION", "3600"))))

_log.debug("Loaded new Vault DB credentials from %s:\nlease_id=%s\nleasetime=%s\nduration=%s\n"
"username=%s\npassword=%s",
_log.debug("Loaded new Vault credentials from %s:\nlease_id=%s\nleasetime=%s\nduration=%s",
self.secretpath,
self._lease_id, str(self._leasetime), result["lease_duration"], self._cache["username"],
self._cache["password"] if self.debug_output else "Password withheld, debug output is disabled")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be useful to log the returned data similarly to what I did here (i.e. withhold the output if debug_output isn't set, but log it otherwise). I found that always useful for debugging interactions with secret engines etc.

self._lease_id,
str(self._leasetime),
secret.get("lease_duration", os.getenv("VAULT_DEFAULT_CACHE_DURATION", "3600")))

def _get_or_update(self, key: str) -> str:
if self._cache is None or (self._updatetime - self._now()).total_seconds() < 10:
# if we have less than 10 seconds in a lease ot no lease at all, we get new credentials
_log.info("Vault DB credential lease has expired, refreshing for %s" % key)
_log.info("Vault credential lease has expired, refreshing for %s" % key)
self._refresh()
_log.info("refresh done (%s, %s)" % (self._lease_id, str(self._updatetime)))

return self._cache[key]

@property
def username(self) -> str:
return self._get_or_update("username")

@property
def password(self) -> str:
return self._get_or_update("password")


class DjangoAutoRefreshDBCredentialsDict(dict):
def __init__(self, provider: VaultCredentialProvider, *args: Any, **kwargs: Any) -> None:
self._provider = provider
Expand Down