diff --git a/compliance/utils/credentials.py b/compliance/utils/credentials.py index c6ccf70a..17821447 100644 --- a/compliance/utils/credentials.py +++ b/compliance/utils/credentials.py @@ -14,6 +14,9 @@ """Compliance credentials configuration.""" import logging +import os +import shlex +import subprocess # nosec B404 from collections import OrderedDict, namedtuple from configparser import RawConfigParser from os import environ @@ -27,7 +30,26 @@ class OnePasswordBackend: - pass + def __init__(self, **kwargs): + self._url = kwargs.get("url") + self._vault = kwargs.get("vault", "auditree") + + def get_section(self, section): + cmd = f"op item get --vault {self._vault} --format json {section}" + subprocess.run( # nosec B603 + shlex.split(cmd), + env=os.environ, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + + def attribute_error_msg(self, section, attr): + return ( + f'Unable to locate field "{attr}" ' + f'in secure note "{section}" ' + f'at 1Password vault "{self._vault}"' + ) class ConfigParserBackend: @@ -36,6 +58,21 @@ def __init__(self, **kwargs): self._cfg_file = kwargs.get("cfg_file") self._cfg.read(str(Path(self._cfg_file).expanduser())) + def get_section(self, section): + params = [] + values = [] + if self._cfg.has_section(section): + params = self._cfg.options(section) + values = [self._cfg.get(section, x) for x in self._cfg.options(section)] + return OrderedDict(zip(params, values)) + + def attribute_error_msg(self, section, attr): + return ( + f'Unable to locate attribute "{attr}" ' + f'in section "{section}" ' + f'at config file "{self._cfg_file}"' + ) + class Config: """Handle credentials configuration.""" @@ -46,7 +83,8 @@ def __init__(self, cfg_file="~/.credentials", backend_cfg=None): """ Create an instance of a dictionary-like configuration object. - :param cfg_file: The path to the RawConfigParser compatible config file + :param cfg_file: The path to a config file for building a ConfigParserBackend. + :param backend_cfg: A dictionary with the backend config """ if backend_cfg is None: @@ -54,6 +92,11 @@ def __init__(self, cfg_file="~/.credentials", backend_cfg=None): self._init_backend(backend_cfg) def _init_backend(self, backend_cfg): + """ + Create an instance of a dictionary-like configuration object. + + :param cfg_file: The path to the RawConfigParser compatible config file + """ name = backend_cfg.get("name") if backend_cfg.get("name") not in Config.BACKENDS: raise ValueError(f"Invalid credentials backend name: {name}") @@ -84,13 +127,7 @@ def _getattr_wrapper(t, attr): try: return t.__getattribute__(attr) except AttributeError as exc: - exc.args = ( - ( - f'Unable to locate attribute "{attr}" ' - f'in section "{type(t).__name__}" ' - f'at config file "{self._cfg_file}"' - ), - ) + exc.args = (self._backend.attribute_error_msg(type(t).__name__, attr),) raise exc env_vars = [k for k in environ.keys() if k.startswith(f"{section.upper()}_")] @@ -98,12 +135,8 @@ def _getattr_wrapper(t, attr): env_values = [environ[e] for e in env_vars] if env_vars: logger.debug(f'Loading credentials from ENV vars: {", ".join(env_vars)}') - params = [] - if self._cfg.has_section(section): - params = self._cfg.options(section) - values = [self._cfg.get(section, x) for x in params] - d = OrderedDict(zip(params, values)) + d = self._backend.get_section(section) if env_vars: d.update(zip(env_keys, env_values))