Skip to content

Commit

Permalink
sync
Browse files Browse the repository at this point in the history
  • Loading branch information
cletomartin committed Jan 30, 2023
1 parent 0311f27 commit e338ebf
Showing 1 changed file with 47 additions and 14 deletions.
61 changes: 47 additions & 14 deletions compliance/utils/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
"""Compliance credentials configuration."""

import logging
import os
import shlex
import subprocess # nosec B404
from collections import OrderedDict, namedtuple
from configparser import RawConfigParser
from os import environ
Expand All @@ -27,7 +30,26 @@


class OnePasswordBackend:
pass
def __init__(self, **kwargs):
self._url = kwargs.get("url")
self._vault = kwargs.get("vault", "auditree")

def get_section(self, section):
cmd = f"op item get --vault {self._vault} --format json {section}"
subprocess.run( # nosec B603
shlex.split(cmd),
env=os.environ,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)

def attribute_error_msg(self, section, attr):
return (
f'Unable to locate field "{attr}" '
f'in secure note "{section}" '
f'at 1Password vault "{self._vault}"'
)


class ConfigParserBackend:
Expand All @@ -36,6 +58,21 @@ def __init__(self, **kwargs):
self._cfg_file = kwargs.get("cfg_file")
self._cfg.read(str(Path(self._cfg_file).expanduser()))

def get_section(self, section):
params = []
values = []
if self._cfg.has_section(section):
params = self._cfg.options(section)
values = [self._cfg.get(section, x) for x in self._cfg.options(section)]
return OrderedDict(zip(params, values))

def attribute_error_msg(self, section, attr):
return (
f'Unable to locate attribute "{attr}" '
f'in section "{section}" '
f'at config file "{self._cfg_file}"'
)


class Config:
"""Handle credentials configuration."""
Expand All @@ -46,14 +83,20 @@ def __init__(self, cfg_file="~/.credentials", backend_cfg=None):
"""
Create an instance of a dictionary-like configuration object.
:param cfg_file: The path to the RawConfigParser compatible config file
:param cfg_file: The path to a config file for building a ConfigParserBackend.
:param backend_cfg: A dictionary with the backend config
"""

if backend_cfg is None:
backend_cfg = {"name": "configparser", "cfg_file": cfg_file}
self._init_backend(backend_cfg)

def _init_backend(self, backend_cfg):
"""
Create an instance of a dictionary-like configuration object.
:param cfg_file: The path to the RawConfigParser compatible config file
"""
name = backend_cfg.get("name")
if backend_cfg.get("name") not in Config.BACKENDS:
raise ValueError(f"Invalid credentials backend name: {name}")
Expand Down Expand Up @@ -84,26 +127,16 @@ def _getattr_wrapper(t, attr):
try:
return t.__getattribute__(attr)
except AttributeError as exc:
exc.args = (
(
f'Unable to locate attribute "{attr}" '
f'in section "{type(t).__name__}" '
f'at config file "{self._cfg_file}"'
),
)
exc.args = (self._backend.attribute_error_msg(type(t).__name__, attr),)
raise exc

env_vars = [k for k in environ.keys() if k.startswith(f"{section.upper()}_")]
env_keys = [k.split(section.upper())[1].lstrip("_").lower() for k in env_vars]
env_values = [environ[e] for e in env_vars]
if env_vars:
logger.debug(f'Loading credentials from ENV vars: {", ".join(env_vars)}')
params = []
if self._cfg.has_section(section):
params = self._cfg.options(section)
values = [self._cfg.get(section, x) for x in params]

d = OrderedDict(zip(params, values))
d = self._backend.get_section(section)

if env_vars:
d.update(zip(env_keys, env_values))
Expand Down

0 comments on commit e338ebf

Please sign in to comment.