Skip to content

Commit

Permalink
PR Feedback #38
Browse files Browse the repository at this point in the history
- Changes configuration object to allow for empty creation, and added
  "raise_if_invalid()" function to determine if the config is good.
- Added the ability to read from ~/.aws/config cached values.
  • Loading branch information
Mark Ide committed Jan 10, 2018
1 parent 0535c37 commit 31a4932
Show file tree
Hide file tree
Showing 4 changed files with 384 additions and 269 deletions.
68 changes: 47 additions & 21 deletions aws_google_auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def parse_args(args):
parser.add_argument('-R', '--region', help='AWS region endpoint ($AWS_DEFAULT_REGION)')
parser.add_argument('-d', '--duration', type=int, help='Credential duration ($DURATION)')
parser.add_argument('-p', '--profile', help='AWS profile (defaults to value of $AWS_PROFILE, then falls back to \'sts\')')
parser.add_argument('-D', '--disable-u2f', action='store_true', help='Disable U2F functionality.')

role_group = parser.add_mutually_exclusive_group()
role_group.add_argument('-a', '--ask-role', action='store_true', help='Set true to always pick the role')
Expand All @@ -45,30 +46,55 @@ def main():
def cli(cli_args):
args = parse_args(args=cli_args)

# If there are arguments that are needed, we can interactively prompt the
# user. Note, the environment variables here are also in configuration.py
# but we need to check the presense here to know if we need to prompot.
# This is intentional. Any non-required params just get passed directly in,
# as we don't care if they were set or not.
username = args.username or os.getenv("GOOGLE_USERNAME") or util.Util.get_input("Google username: ")
idp_id = args.idp_id or os.getenv("GOOGLE_IDP_ID") or util.Util.get_input("Google IDP ID: ")
sp_id = args.sp_id or os.getenv("GOOGLE_SP_ID") or util.Util.get_input("Google SP ID: ")
# Create a blank configuration object (has the defaults pre-filled)
config = configuration.Configuration()

# Have the configuration update itself via the ~/.aws/config on disk.
config.read(args.profile)

# Override anything found in the environment variables. We only want to
# override "None" values (not "False" or "0"), hence why we're not using
# the common "or" method here. Instead we use util.Util.default_if_none().
config.ask_role = util.Util.default_if_none(os.getenv('AWS_ASK_ROLE'), config.ask_role)
config.duration = util.Util.default_if_none(os.getenv('DURATION'), config.duration)
config.idp_id = util.Util.default_if_none(os.getenv('GOOGLE_IDP_ID'), config.idp_id)
config.profile = util.Util.default_if_none(os.getenv('AWS_PROFILE'), config.profile)
config.region = util.Util.default_if_none(os.getenv('AWS_DEFAULT_REGION'), config.region)
config.role_arn = util.Util.default_if_none(os.getenv('AWS_ROLE_ARN'), config.role_arn)
config.sp_id = util.Util.default_if_none(os.getenv('GOOGLE_SP_ID'), config.sp_id)
config.u2f_disabled = util.Util.default_if_none(os.getenv('U2F_DISABLED'), config.u2f_disabled)
config.username = util.Util.default_if_none(os.getenv('GOOGLE_USERNAME'), config.username)

# Override the options (again, as this is higher priority) with anything
# the user specified on the command line.
config.ask_role = util.Util.default_if_none(args.ask_role, config.ask_role)
config.duration = util.Util.default_if_none(args.duration, config.duration)
config.idp_id = util.Util.default_if_none(args.idp_id, config.idp_id)
config.profile = util.Util.default_if_none(args.profile, config.profile)
config.region = util.Util.default_if_none(args.region, config.region)
config.role_arn = util.Util.default_if_none(args.role_arn, config.role_arn)
config.sp_id = util.Util.default_if_none(args.sp_id, config.sp_id)
config.u2f_disabled = util.Util.default_if_none(args.disable_u2f, config.u2f_disabled)
config.username = util.Util.default_if_none(args.username, config.username)

# There are some mandatory arguments. Make sure the user supplied them.
if config.username is None:
config.username = util.Util.get_input("Google username: ")
if config.idp_id is None:
config.idp_id = util.Util.get_input("Google IDP ID: ")
if config.sp_id is None:
config.sp_id = util.Util.get_input("Google SP ID: ")

# There is no way (intentional) to pass in the password via the command
# line nor environment variables. This prevents password leakage.
passwd = getpass.getpass("Google Password: ")

# Build the configuration with all the user's options
config = configuration.Configuration(
ask_role=args.ask_role,
duration=args.duration,
idp_id=idp_id,
profile=args.profile,
region=args.region,
role_arn=args.role_arn,
sp_id=sp_id,
username=username,
password=passwd)
config.password = getpass.getpass("Google Password: ")

# Validate Options
try:
config.raise_if_invalid()
except AssertionError:
print("Invalid parameters.")
raise

google_client = google.Google(config)
google_client.do_login()
Expand Down
187 changes: 82 additions & 105 deletions aws_google_auth/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,125 +10,82 @@
class Configuration:

def __init__(self, **kwargs):
self.set_ask_role(kwargs.get('ask_role', None) or os.getenv("AWS_ASK_ROLE") or False)
self.set_duration(kwargs.get('duration', None) or os.getenv("DURATION") or self.max_duration)
self.set_u2f_disabled(kwargs.get('u2f_disabled', None) or os.getenv("U2F_DISABLED") or False)
self.set_region(kwargs.get('region', None) or os.getenv("AWS_DEFAULT_REGION") or 'ap-southeast-2')
self.set_profile(kwargs.get('profile', None) or os.getenv("AWS_PROFILE") or 'sts')
self.set_role_arn(kwargs.get('role_arn', None) or os.getenv("AWS_ROLE_ARN") or None)
self.set_idp_id(kwargs.get('idp_id', None) or os.getenv("GOOGLE_IDP_ID") or None)
self.set_sp_id(kwargs.get('sp_id', None) or os.getenv("GOOGLE_SP_ID") or None)
self.set_username(kwargs.get('username', None) or os.getenv("GOOGLE_USERNAME") or None)
self.password = kwargs.get('password', None) or None
self.options = {}
self.__boto_session = botocore.session.Session()

# Set up some defaults. These can be overridden as fit.
self.ask_role = False
self.duration = self.max_duration
self.idp_id = None
self.password = None
self.profile = "sts"
self.region = "ap-southeast-2"
self.role_arn = None
self.sp_id = None
self.u2f_disabled = False
self.username = None

@property
def max_duration(self):
return 3600

def get_ask_role(self):
return self.__ask_role

def set_ask_role(self, value):
assert (value.__class__ is bool), "Expected ask_role to be a boolean. Got {}.".format(value.__class__)
self.__ask_role = value

ask_role = property(get_ask_role, set_ask_role)

def get_duration(self):
return self.__duration

def set_duration(self, duration_seconds):
assert (duration_seconds.__class__ is int), "Expected duration to be an integer. Got {}.".format(duration_seconds.__class__)
assert (duration_seconds > 0), "Expected duration to be greater than 0. Got {}.".format(duration_seconds)
assert (duration_seconds <= self.max_duration), "Expected duration to be less than or equal to max_duration ({}). Got {}.".format(self.max_duration, duration_seconds)
self.__duration = duration_seconds

duration = property(get_duration, set_duration)

def get_profile(self):
return self.__profile

def set_profile(self, profile):
assert (profile.__class__ is str), "Expected profile to be a string. Got {}.".format(profile.__class__)
self.__profile = profile

profile = property(get_profile, set_profile)

def get_region(self):
return self.__region

def set_region(self, region):
assert (region.__class__ is str), "Expected region to be a string. Got {}.".format(region.__class__)
self.__region = region

region = property(get_region, set_region)

def get_idp_id(self):
return self.__idp_id

def set_idp_id(self, idp):
assert (idp is not None), "Expected idp_id to be set to non-None value."
self.__idp_id = idp

idp_id = property(get_idp_id, set_idp_id)

def get_sp_id(self):
return self.__sp_id

def set_sp_id(self, sp_id):
assert (sp_id is not None), "Expected sp_id to be set to non-None value."
self.__sp_id = sp_id

sp_id = property(get_sp_id, set_sp_id)

def get_username(self):
return self.__username

def set_username(self, username):
assert (username.__class__ is str), "Expected username to be a string. Got {}.".format(username.__class__)
self.__username = username
@property
def credentials_file(self):
return os.path.expanduser(self.__boto_session.get_config_variable('credentials_file'))

username = property(get_username, set_username)
@property
def config_file(self):
return os.path.expanduser(self.__boto_session.get_config_variable('config_file'))

def get_role_arn(self):
return self.__role_arn
def ensure_config_files_exist(self):
for file in [self.config_file, self.credentials_file]:
directory = os.path.dirname(file)
if not os.path.exists(directory):
os.mkdir(directory, 0o700)
if not os.path.exists(file):
util.Util.touch(file)

def set_role_arn(self, arn):
if arn is not None:
assert (arn.__class__ is str), "Expected role_arn to be None or a string. Got {}.".format(arn.__class__)
assert ("arn:aws:iam::" in arn), "Expected role_arn to contain 'arn:aws:iam::'. Got '{}'.".format(arn)
self.__role_arn = arn
# Will raise exeptions if the configuration is invalid, otherwise returns
# None. Use this at any point to validate the conifguration is in a good
# state.
def raise_if_invalid(self):
# ask_role
assert (self.ask_role.__class__ is bool), "Expected ask_role to be a boolean. Got {}.".format(self.ask_role.__class__)

role_arn = property(get_role_arn, set_role_arn)
# duration
assert (self.duration.__class__ is int), "Expected duration to be an integer. Got {}.".format(self.duration.__class__)
assert (self.duration > 0), "Expected duration to be greater than 0. Got {}.".format(self.duration)
assert (self.duration <= self.max_duration), "Expected duration to be less than or equal to max_duration ({}). Got {}.".format(self.max_duration, self.duration)

def get_u2f_disabled(self):
return self.__u2f_disabled
# profile
assert (self.profile.__class__ is str), "Expected profile to be a string. Got {}.".format(self.profile.__class__)

def set_u2f_disabled(self, u2f_disabled):
assert (u2f_disabled.__class__ is bool), "Expected u2f_disabled to be a boolean. Got {}.".format(u2f_disabled.__class__)
self.__u2f_disabled = u2f_disabled
# region
assert (self.region.__class__ is str), "Expected region to be a string. Got {}.".format(self.region.__class__)

u2f_disabled = property(get_u2f_disabled, set_u2f_disabled)
# idp_id
assert (self.idp_id is not None), "Expected idp_id to be set to non-None value."

def get_credentials_file(self):
return os.path.expanduser(self.__boto_session.get_config_variable('credentials_file'))
# sp_id
assert (self.sp_id is not None), "Expected sp_id to be set to non-None value."

credentials_file = property(get_credentials_file)
# username
assert (self.username.__class__ is str), "Expected username to be a string. Got {}.".format(self.username.__class__)

def get_config_file(self):
return os.path.expanduser(self.__boto_session.get_config_variable('config_file'))
# password
assert (self.password.__class__ is str), "Expected password to be a string. Got {}.".format(self.password.__class__)

config_file = property(get_config_file)
# role_arn (Can be blank, we'll just prompt)
if self.role_arn is not None:
assert (self.role_arn.__class__ is str), "Expected role_arn to be None or a string. Got {}.".format(self.role_arn.__class__)
assert ("arn:aws:iam::" in self.role_arn), "Expected role_arn to contain 'arn:aws:iam::'. Got '{}'.".format(self.role_arn)

def ensure_config_files_exist(self):
for file in [self.config_file, self.credentials_file]:
directory = os.path.dirname(file)
if not os.path.exists(directory):
os.mkdir(directory, 0o700)
if not os.path.exists(file):
util.Util.touch(file)
# u2f_disabled
assert (self.u2f_disabled.__class__ is bool), "Expected u2f_disabled to be a boolean. Got {}.".format(self.u2f_disabled.__class__)

# Write the configuration (and credentials) out to disk. This allows for
# regular AWS tooling (aws cli and boto) to use the credentials in the
# profile the user specified.
def write(self, amazon_object):
self.ensure_config_files_exist()

Expand All @@ -142,12 +99,10 @@ def write(self, amazon_object):
config_parser.set(self.profile, 'region', self.region)
config_parser.set(self.profile, 'aws_google_auth_ask_role', self.ask_role)
config_parser.set(self.profile, 'aws_google_auth_duration', self.duration)
config_parser.set(self.profile, 'aws_google_auth_u2f_disabled', self.u2f_disabled)
config_parser.set(self.profile, 'aws_google_auth_region', self.region)
config_parser.set(self.profile, 'aws_google_auth_profile', self.profile)
config_parser.set(self.profile, 'aws_google_auth_role_arn', self.role_arn)
config_parser.set(self.profile, 'aws_google_auth_idp_id', self.idp_id)
config_parser.set(self.profile, 'aws_google_auth_role_arn', self.role_arn)
config_parser.set(self.profile, 'aws_google_auth_sp_id', self.sp_id)
config_parser.set(self.profile, 'aws_google_auth_u2f_disabled', self.u2f_disabled)
config_parser.set(self.profile, 'aws_google_auth_username', self.username)
with open(self.config_file, 'w+') as f:
config_parser.write(f)
Expand All @@ -159,8 +114,30 @@ def write(self, amazon_object):
credentials_parser.add_section(self.profile)
credentials_parser.set(self.profile, 'aws_access_key_id', amazon_object.access_key_id)
credentials_parser.set(self.profile, 'aws_secret_access_key', amazon_object.secret_access_key)
credentials_parser.set(self.profile, 'aws_session_token', amazon_object.session_token)
credentials_parser.set(self.profile, 'aws_security_token', amazon_object.session_token)
credentials_parser.set(self.profile, 'aws_session_expiration', amazon_object.expiration.strftime('%Y-%m-%dT%H:%M:%S%z'))
credentials_parser.set(self.profile, 'aws_session_token', amazon_object.session_token)
with open(self.credentials_file, 'w+') as f:
credentials_parser.write(f)

# Read from the configuration file and override ALL values currently stored
# in the configuration object. As this is potentially destructive, it's
# important to only run this in the beginning of the object initialization.
# We do not read AWS credentials, as this tool's use case is to obtain
# them.
def read(self, profile):
self.ensure_config_files_exist()

profile = util.Util.default_if_none(profile, self.profile)
config_parser = configparser.RawConfigParser()
config_parser.read(self.config_file)
if config_parser.has_section(profile):
self.profile = profile
self.ask_role = util.Util.default_if_none(config_parser[profile].getboolean('aws_google_auth_ask_role', None), self.ask_role)
self.duration = util.Util.default_if_none(config_parser[profile].getint('aws_google_auth_duration', None), self.duration)
self.idp_id = util.Util.default_if_none(config_parser[profile].get('aws_google_auth_idp_id', None), self.idp_id)
self.region = util.Util.default_if_none(config_parser[profile].get('region', None), self.region)
self.role_arn = util.Util.default_if_none(config_parser[profile].get('aws_google_auth_role_arn', None), self.role_arn)
self.sp_id = util.Util.default_if_none(config_parser[profile].get('aws_google_auth_sp_id', None), self.sp_id)
self.u2f_disabled = util.Util.default_if_none(config_parser[profile].getboolean('aws_google_auth_u2f_disabled', None), self.u2f_disabled)
self.username = util.Util.default_if_none(config_parser[profile].get('aws_google_auth_username', None), self.username)
Loading

0 comments on commit 31a4932

Please sign in to comment.