-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
credentials.py
2125 lines (1793 loc) · 79.7 KB
/
credentials.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import time
import datetime
import logging
import os
import getpass
import threading
import json
import subprocess
from collections import namedtuple
from copy import deepcopy
from hashlib import sha1
from dateutil.parser import parse
from dateutil.tz import tzlocal, tzutc
import botocore.configloader
import botocore.compat
from botocore import UNSIGNED
from botocore.compat import total_seconds
from botocore.compat import compat_shell_split
from botocore.config import Config
from botocore.exceptions import UnknownCredentialError
from botocore.exceptions import PartialCredentialsError
from botocore.exceptions import ConfigNotFound
from botocore.exceptions import InvalidConfigError
from botocore.exceptions import InfiniteLoopConfigError
from botocore.exceptions import RefreshWithMFAUnsupportedError
from botocore.exceptions import MetadataRetrievalError
from botocore.exceptions import CredentialRetrievalError
from botocore.exceptions import UnauthorizedSSOTokenError
from botocore.utils import InstanceMetadataFetcher, parse_key_val_file
from botocore.utils import ContainerMetadataFetcher
from botocore.utils import FileWebIdentityTokenLoader
from botocore.utils import SSOTokenLoader
logger = logging.getLogger(__name__)
ReadOnlyCredentials = namedtuple('ReadOnlyCredentials',
['access_key', 'secret_key', 'token'])
def create_credential_resolver(session, cache=None, region_name=None):
"""Create a default credential resolver.
This creates a pre-configured credential resolver
that includes the default lookup chain for
credentials.
"""
profile_name = session.get_config_variable('profile') or 'default'
metadata_timeout = session.get_config_variable('metadata_service_timeout')
num_attempts = session.get_config_variable('metadata_service_num_attempts')
disable_env_vars = session.instance_variables().get('profile') is not None
imds_config = {
'ec2_metadata_service_endpoint': session.get_config_variable(
'ec2_metadata_service_endpoint'),
'imds_use_ipv6': session.get_config_variable('imds_use_ipv6')
}
if cache is None:
cache = {}
env_provider = EnvProvider()
container_provider = ContainerProvider()
instance_metadata_provider = InstanceMetadataProvider(
iam_role_fetcher=InstanceMetadataFetcher(
timeout=metadata_timeout,
num_attempts=num_attempts,
user_agent=session.user_agent(),
config=imds_config)
)
profile_provider_builder = ProfileProviderBuilder(
session, cache=cache, region_name=region_name)
assume_role_provider = AssumeRoleProvider(
load_config=lambda: session.full_config,
client_creator=_get_client_creator(session, region_name),
cache=cache,
profile_name=profile_name,
credential_sourcer=CanonicalNameCredentialSourcer([
env_provider, container_provider, instance_metadata_provider
]),
profile_provider_builder=profile_provider_builder,
)
pre_profile = [
env_provider,
assume_role_provider,
]
profile_providers = profile_provider_builder.providers(
profile_name=profile_name,
disable_env_vars=disable_env_vars,
)
post_profile = [
OriginalEC2Provider(),
BotoProvider(),
container_provider,
instance_metadata_provider,
]
providers = pre_profile + profile_providers + post_profile
if disable_env_vars:
# An explicitly provided profile will negate an EnvProvider.
# We will defer to providers that understand the "profile"
# concept to retrieve credentials.
# The one edge case if is all three values are provided via
# env vars:
# export AWS_ACCESS_KEY_ID=foo
# export AWS_SECRET_ACCESS_KEY=bar
# export AWS_PROFILE=baz
# Then, just like our client() calls, the explicit credentials
# will take precedence.
#
# This precedence is enforced by leaving the EnvProvider in the chain.
# This means that the only way a "profile" would win is if the
# EnvProvider does not return credentials, which is what we want
# in this scenario.
providers.remove(env_provider)
logger.debug('Skipping environment variable credential check'
' because profile name was explicitly set.')
resolver = CredentialResolver(providers=providers)
return resolver
class ProfileProviderBuilder(object):
"""This class handles the creation of profile based providers.
NOTE: This class is only intended for internal use.
This class handles the creation and ordering of the various credential
providers that primarly source their configuration from the shared config.
This is needed to enable sharing between the default credential chain and
the source profile chain created by the assume role provider.
"""
def __init__(self, session, cache=None, region_name=None,
sso_token_cache=None):
self._session = session
self._cache = cache
self._region_name = region_name
self._sso_token_cache = sso_token_cache
def providers(self, profile_name, disable_env_vars=False):
return [
self._create_web_identity_provider(
profile_name, disable_env_vars,
),
self._create_sso_provider(profile_name),
self._create_shared_credential_provider(profile_name),
self._create_process_provider(profile_name),
self._create_config_provider(profile_name),
]
def _create_process_provider(self, profile_name):
return ProcessProvider(
profile_name=profile_name,
load_config=lambda: self._session.full_config,
)
def _create_shared_credential_provider(self, profile_name):
credential_file = self._session.get_config_variable('credentials_file')
return SharedCredentialProvider(
profile_name=profile_name,
creds_filename=credential_file,
)
def _create_config_provider(self, profile_name):
config_file = self._session.get_config_variable('config_file')
return ConfigProvider(
profile_name=profile_name,
config_filename=config_file,
)
def _create_web_identity_provider(self, profile_name, disable_env_vars):
return AssumeRoleWithWebIdentityProvider(
load_config=lambda: self._session.full_config,
client_creator=_get_client_creator(
self._session, self._region_name),
cache=self._cache,
profile_name=profile_name,
disable_env_vars=disable_env_vars,
)
def _create_sso_provider(self, profile_name):
return SSOProvider(
load_config=lambda: self._session.full_config,
client_creator=self._session.create_client,
profile_name=profile_name,
cache=self._cache,
token_cache=self._sso_token_cache,
)
def get_credentials(session):
resolver = create_credential_resolver(session)
return resolver.load_credentials()
def _local_now():
return datetime.datetime.now(tzlocal())
def _parse_if_needed(value):
if isinstance(value, datetime.datetime):
return value
return parse(value)
def _serialize_if_needed(value, iso=False):
if isinstance(value, datetime.datetime):
if iso:
return value.isoformat()
return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
return value
def _get_client_creator(session, region_name):
def client_creator(service_name, **kwargs):
create_client_kwargs = {
'region_name': region_name
}
create_client_kwargs.update(**kwargs)
return session.create_client(service_name, **create_client_kwargs)
return client_creator
def create_assume_role_refresher(client, params):
def refresh():
response = client.assume_role(**params)
credentials = response['Credentials']
# We need to normalize the credential names to
# the values expected by the refresh creds.
return {
'access_key': credentials['AccessKeyId'],
'secret_key': credentials['SecretAccessKey'],
'token': credentials['SessionToken'],
'expiry_time': _serialize_if_needed(credentials['Expiration']),
}
return refresh
def create_mfa_serial_refresher(actual_refresh):
class _Refresher(object):
def __init__(self, refresh):
self._refresh = refresh
self._has_been_called = False
def __call__(self):
if self._has_been_called:
# We can explore an option in the future to support
# reprompting for MFA, but for now we just error out
# when the temp creds expire.
raise RefreshWithMFAUnsupportedError()
self._has_been_called = True
return self._refresh()
return _Refresher(actual_refresh)
class JSONFileCache(object):
"""JSON file cache.
This provides a dict like interface that stores JSON serializable
objects.
The objects are serialized to JSON and stored in a file. These
values can be retrieved at a later time.
"""
CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache'))
def __init__(self, working_dir=CACHE_DIR):
self._working_dir = working_dir
def __contains__(self, cache_key):
actual_key = self._convert_cache_key(cache_key)
return os.path.isfile(actual_key)
def __getitem__(self, cache_key):
"""Retrieve value from a cache key."""
actual_key = self._convert_cache_key(cache_key)
try:
with open(actual_key) as f:
return json.load(f)
except (OSError, ValueError, IOError):
raise KeyError(cache_key)
def __setitem__(self, cache_key, value):
full_key = self._convert_cache_key(cache_key)
try:
file_content = json.dumps(value, default=_serialize_if_needed)
except (TypeError, ValueError):
raise ValueError("Value cannot be cached, must be "
"JSON serializable: %s" % value)
if not os.path.isdir(self._working_dir):
os.makedirs(self._working_dir)
with os.fdopen(os.open(full_key,
os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f:
f.truncate()
f.write(file_content)
def _convert_cache_key(self, cache_key):
full_path = os.path.join(self._working_dir, cache_key + '.json')
return full_path
class Credentials(object):
"""
Holds the credentials needed to authenticate requests.
:ivar access_key: The access key part of the credentials.
:ivar secret_key: The secret key part of the credentials.
:ivar token: The security token, valid only for session credentials.
:ivar method: A string which identifies where the credentials
were found.
"""
def __init__(self, access_key, secret_key, token=None,
method=None):
self.access_key = access_key
self.secret_key = secret_key
self.token = token
if method is None:
method = 'explicit'
self.method = method
self._normalize()
def _normalize(self):
# Keys would sometimes (accidentally) contain non-ascii characters.
# It would cause a confusing UnicodeDecodeError in Python 2.
# We explicitly convert them into unicode to avoid such error.
#
# Eventually the service will decide whether to accept the credential.
# This also complies with the behavior in Python 3.
self.access_key = botocore.compat.ensure_unicode(self.access_key)
self.secret_key = botocore.compat.ensure_unicode(self.secret_key)
def get_frozen_credentials(self):
return ReadOnlyCredentials(self.access_key,
self.secret_key,
self.token)
class RefreshableCredentials(Credentials):
"""
Holds the credentials needed to authenticate requests. In addition, it
knows how to refresh itself.
:ivar access_key: The access key part of the credentials.
:ivar secret_key: The secret key part of the credentials.
:ivar token: The security token, valid only for session credentials.
:ivar method: A string which identifies where the credentials
were found.
"""
# The time at which we'll attempt to refresh, but not
# block if someone else is refreshing.
_advisory_refresh_timeout = 15 * 60
# The time at which all threads will block waiting for
# refreshed credentials.
_mandatory_refresh_timeout = 10 * 60
def __init__(self, access_key, secret_key, token,
expiry_time, refresh_using, method,
time_fetcher=_local_now):
self._refresh_using = refresh_using
self._access_key = access_key
self._secret_key = secret_key
self._token = token
self._expiry_time = expiry_time
self._time_fetcher = time_fetcher
self._refresh_lock = threading.Lock()
self.method = method
self._frozen_credentials = ReadOnlyCredentials(
access_key, secret_key, token)
self._normalize()
def _normalize(self):
self._access_key = botocore.compat.ensure_unicode(self._access_key)
self._secret_key = botocore.compat.ensure_unicode(self._secret_key)
@classmethod
def create_from_metadata(cls, metadata, refresh_using, method):
instance = cls(
access_key=metadata['access_key'],
secret_key=metadata['secret_key'],
token=metadata['token'],
expiry_time=cls._expiry_datetime(metadata['expiry_time']),
method=method,
refresh_using=refresh_using
)
return instance
@property
def access_key(self):
"""Warning: Using this property can lead to race conditions if you
access another property subsequently along the refresh boundary.
Please use get_frozen_credentials instead.
"""
self._refresh()
return self._access_key
@access_key.setter
def access_key(self, value):
self._access_key = value
@property
def secret_key(self):
"""Warning: Using this property can lead to race conditions if you
access another property subsequently along the refresh boundary.
Please use get_frozen_credentials instead.
"""
self._refresh()
return self._secret_key
@secret_key.setter
def secret_key(self, value):
self._secret_key = value
@property
def token(self):
"""Warning: Using this property can lead to race conditions if you
access another property subsequently along the refresh boundary.
Please use get_frozen_credentials instead.
"""
self._refresh()
return self._token
@token.setter
def token(self, value):
self._token = value
def _seconds_remaining(self):
delta = self._expiry_time - self._time_fetcher()
return total_seconds(delta)
def refresh_needed(self, refresh_in=None):
"""Check if a refresh is needed.
A refresh is needed if the expiry time associated
with the temporary credentials is less than the
provided ``refresh_in``. If ``time_delta`` is not
provided, ``self.advisory_refresh_needed`` will be used.
For example, if your temporary credentials expire
in 10 minutes and the provided ``refresh_in`` is
``15 * 60``, then this function will return ``True``.
:type refresh_in: int
:param refresh_in: The number of seconds before the
credentials expire in which refresh attempts should
be made.
:return: True if refresh needed, False otherwise.
"""
if self._expiry_time is None:
# No expiration, so assume we don't need to refresh.
return False
if refresh_in is None:
refresh_in = self._advisory_refresh_timeout
# The credentials should be refreshed if they're going to expire
# in less than 5 minutes.
if self._seconds_remaining() >= refresh_in:
# There's enough time left. Don't refresh.
return False
logger.debug("Credentials need to be refreshed.")
return True
def _is_expired(self):
# Checks if the current credentials are expired.
return self.refresh_needed(refresh_in=0)
def _refresh(self):
# In the common case where we don't need a refresh, we
# can immediately exit and not require acquiring the
# refresh lock.
if not self.refresh_needed(self._advisory_refresh_timeout):
return
# acquire() doesn't accept kwargs, but False is indicating
# that we should not block if we can't acquire the lock.
# If we aren't able to acquire the lock, we'll trigger
# the else clause.
if self._refresh_lock.acquire(False):
try:
if not self.refresh_needed(self._advisory_refresh_timeout):
return
is_mandatory_refresh = self.refresh_needed(
self._mandatory_refresh_timeout)
self._protected_refresh(is_mandatory=is_mandatory_refresh)
return
finally:
self._refresh_lock.release()
elif self.refresh_needed(self._mandatory_refresh_timeout):
# If we're within the mandatory refresh window,
# we must block until we get refreshed credentials.
with self._refresh_lock:
if not self.refresh_needed(self._mandatory_refresh_timeout):
return
self._protected_refresh(is_mandatory=True)
def _protected_refresh(self, is_mandatory):
# precondition: this method should only be called if you've acquired
# the self._refresh_lock.
try:
metadata = self._refresh_using()
except Exception as e:
period_name = 'mandatory' if is_mandatory else 'advisory'
logger.warning("Refreshing temporary credentials failed "
"during %s refresh period.",
period_name, exc_info=True)
if is_mandatory:
# If this is a mandatory refresh, then
# all errors that occur when we attempt to refresh
# credentials are propagated back to the user.
raise
# Otherwise we'll just return.
# The end result will be that we'll use the current
# set of temporary credentials we have.
return
self._set_from_data(metadata)
self._frozen_credentials = ReadOnlyCredentials(
self._access_key, self._secret_key, self._token)
if self._is_expired():
# We successfully refreshed credentials but for whatever
# reason, our refreshing function returned credentials
# that are still expired. In this scenario, the only
# thing we can do is let the user know and raise
# an exception.
msg = ("Credentials were refreshed, but the "
"refreshed credentials are still expired.")
logger.warning(msg)
raise RuntimeError(msg)
@staticmethod
def _expiry_datetime(time_str):
return parse(time_str)
def _set_from_data(self, data):
expected_keys = ['access_key', 'secret_key', 'token', 'expiry_time']
if not data:
missing_keys = expected_keys
else:
missing_keys = [k for k in expected_keys if k not in data]
if missing_keys:
message = "Credential refresh failed, response did not contain: %s"
raise CredentialRetrievalError(
provider=self.method,
error_msg=message % ', '.join(missing_keys),
)
self.access_key = data['access_key']
self.secret_key = data['secret_key']
self.token = data['token']
self._expiry_time = parse(data['expiry_time'])
logger.debug("Retrieved credentials will expire at: %s",
self._expiry_time)
self._normalize()
def get_frozen_credentials(self):
"""Return immutable credentials.
The ``access_key``, ``secret_key``, and ``token`` properties
on this class will always check and refresh credentials if
needed before returning the particular credentials.
This has an edge case where you can get inconsistent
credentials. Imagine this:
# Current creds are "t1"
tmp.access_key ---> expired? no, so return t1.access_key
# ---- time is now expired, creds need refreshing to "t2" ----
tmp.secret_key ---> expired? yes, refresh and return t2.secret_key
This means we're using the access key from t1 with the secret key
from t2. To fix this issue, you can request a frozen credential object
which is guaranteed not to change.
The frozen credentials returned from this method should be used
immediately and then discarded. The typical usage pattern would
be::
creds = RefreshableCredentials(...)
some_code = SomeSignerObject()
# I'm about to sign the request.
# The frozen credentials are only used for the
# duration of generate_presigned_url and will be
# immediately thrown away.
request = some_code.sign_some_request(
with_credentials=creds.get_frozen_credentials())
print("Signed request:", request)
"""
self._refresh()
return self._frozen_credentials
class DeferredRefreshableCredentials(RefreshableCredentials):
"""Refreshable credentials that don't require initial credentials.
refresh_using will be called upon first access.
"""
def __init__(self, refresh_using, method, time_fetcher=_local_now):
self._refresh_using = refresh_using
self._access_key = None
self._secret_key = None
self._token = None
self._expiry_time = None
self._time_fetcher = time_fetcher
self._refresh_lock = threading.Lock()
self.method = method
self._frozen_credentials = None
def refresh_needed(self, refresh_in=None):
if self._frozen_credentials is None:
return True
return super(DeferredRefreshableCredentials, self).refresh_needed(
refresh_in
)
class CachedCredentialFetcher(object):
DEFAULT_EXPIRY_WINDOW_SECONDS = 60 * 15
def __init__(self, cache=None, expiry_window_seconds=None):
if cache is None:
cache = {}
self._cache = cache
self._cache_key = self._create_cache_key()
if expiry_window_seconds is None:
expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS
self._expiry_window_seconds = expiry_window_seconds
def _create_cache_key(self):
raise NotImplementedError('_create_cache_key()')
def _make_file_safe(self, filename):
# Replace :, path sep, and / to make it the string filename safe.
filename = filename.replace(':', '_').replace(os.path.sep, '_')
return filename.replace('/', '_')
def _get_credentials(self):
raise NotImplementedError('_get_credentials()')
def fetch_credentials(self):
return self._get_cached_credentials()
def _get_cached_credentials(self):
"""Get up-to-date credentials.
This will check the cache for up-to-date credentials, calling assume
role if none are available.
"""
response = self._load_from_cache()
if response is None:
response = self._get_credentials()
self._write_to_cache(response)
else:
logger.debug("Credentials for role retrieved from cache.")
creds = response['Credentials']
expiration = _serialize_if_needed(creds['Expiration'], iso=True)
return {
'access_key': creds['AccessKeyId'],
'secret_key': creds['SecretAccessKey'],
'token': creds['SessionToken'],
'expiry_time': expiration,
}
def _load_from_cache(self):
if self._cache_key in self._cache:
creds = deepcopy(self._cache[self._cache_key])
if not self._is_expired(creds):
return creds
else:
logger.debug(
"Credentials were found in cache, but they are expired."
)
return None
def _write_to_cache(self, response):
self._cache[self._cache_key] = deepcopy(response)
def _is_expired(self, credentials):
"""Check if credentials are expired."""
end_time = _parse_if_needed(credentials['Credentials']['Expiration'])
seconds = total_seconds(end_time - _local_now())
return seconds < self._expiry_window_seconds
class BaseAssumeRoleCredentialFetcher(CachedCredentialFetcher):
def __init__(self, client_creator, role_arn, extra_args=None,
cache=None, expiry_window_seconds=None):
self._client_creator = client_creator
self._role_arn = role_arn
if extra_args is None:
self._assume_kwargs = {}
else:
self._assume_kwargs = deepcopy(extra_args)
self._assume_kwargs['RoleArn'] = self._role_arn
self._role_session_name = self._assume_kwargs.get('RoleSessionName')
self._using_default_session_name = False
if not self._role_session_name:
self._generate_assume_role_name()
super(BaseAssumeRoleCredentialFetcher, self).__init__(
cache, expiry_window_seconds
)
def _generate_assume_role_name(self):
self._role_session_name = 'botocore-session-%s' % (int(time.time()))
self._assume_kwargs['RoleSessionName'] = self._role_session_name
self._using_default_session_name = True
def _create_cache_key(self):
"""Create a predictable cache key for the current configuration.
The cache key is intended to be compatible with file names.
"""
args = deepcopy(self._assume_kwargs)
# The role session name gets randomly generated, so we don't want it
# in the hash.
if self._using_default_session_name:
del args['RoleSessionName']
if 'Policy' in args:
# To have a predictable hash, the keys of the policy must be
# sorted, so we have to load it here to make sure it gets sorted
# later on.
args['Policy'] = json.loads(args['Policy'])
args = json.dumps(args, sort_keys=True)
argument_hash = sha1(args.encode('utf-8')).hexdigest()
return self._make_file_safe(argument_hash)
class AssumeRoleCredentialFetcher(BaseAssumeRoleCredentialFetcher):
def __init__(self, client_creator, source_credentials, role_arn,
extra_args=None, mfa_prompter=None, cache=None,
expiry_window_seconds=None):
"""
:type client_creator: callable
:param client_creator: A callable that creates a client taking
arguments like ``Session.create_client``.
:type source_credentials: Credentials
:param source_credentials: The credentials to use to create the
client for the call to AssumeRole.
:type role_arn: str
:param role_arn: The ARN of the role to be assumed.
:type extra_args: dict
:param extra_args: Any additional arguments to add to the assume
role request using the format of the botocore operation.
Possible keys include, but may not be limited to,
DurationSeconds, Policy, SerialNumber, ExternalId and
RoleSessionName.
:type mfa_prompter: callable
:param mfa_prompter: A callable that returns input provided by the
user (i.e raw_input, getpass.getpass, etc.).
:type cache: dict
:param cache: An object that supports ``__getitem__``,
``__setitem__``, and ``__contains__``. An example of this is
the ``JSONFileCache`` class in aws-cli.
:type expiry_window_seconds: int
:param expiry_window_seconds: The amount of time, in seconds,
"""
self._source_credentials = source_credentials
self._mfa_prompter = mfa_prompter
if self._mfa_prompter is None:
self._mfa_prompter = getpass.getpass
super(AssumeRoleCredentialFetcher, self).__init__(
client_creator, role_arn, extra_args=extra_args,
cache=cache, expiry_window_seconds=expiry_window_seconds
)
def _get_credentials(self):
"""Get credentials by calling assume role."""
kwargs = self._assume_role_kwargs()
client = self._create_client()
return client.assume_role(**kwargs)
def _assume_role_kwargs(self):
"""Get the arguments for assume role based on current configuration."""
assume_role_kwargs = deepcopy(self._assume_kwargs)
mfa_serial = assume_role_kwargs.get('SerialNumber')
if mfa_serial is not None:
prompt = 'Enter MFA code for %s: ' % mfa_serial
token_code = self._mfa_prompter(prompt)
assume_role_kwargs['TokenCode'] = token_code
duration_seconds = assume_role_kwargs.get('DurationSeconds')
if duration_seconds is not None:
assume_role_kwargs['DurationSeconds'] = duration_seconds
return assume_role_kwargs
def _create_client(self):
"""Create an STS client using the source credentials."""
frozen_credentials = self._source_credentials.get_frozen_credentials()
return self._client_creator(
'sts',
aws_access_key_id=frozen_credentials.access_key,
aws_secret_access_key=frozen_credentials.secret_key,
aws_session_token=frozen_credentials.token,
)
class AssumeRoleWithWebIdentityCredentialFetcher(
BaseAssumeRoleCredentialFetcher
):
def __init__(self, client_creator, web_identity_token_loader, role_arn,
extra_args=None, cache=None, expiry_window_seconds=None):
"""
:type client_creator: callable
:param client_creator: A callable that creates a client taking
arguments like ``Session.create_client``.
:type web_identity_token_loader: callable
:param web_identity_token_loader: A callable that takes no arguments
and returns a web identity token str.
:type role_arn: str
:param role_arn: The ARN of the role to be assumed.
:type extra_args: dict
:param extra_args: Any additional arguments to add to the assume
role request using the format of the botocore operation.
Possible keys include, but may not be limited to,
DurationSeconds, Policy, SerialNumber, ExternalId and
RoleSessionName.
:type cache: dict
:param cache: An object that supports ``__getitem__``,
``__setitem__``, and ``__contains__``. An example of this is
the ``JSONFileCache`` class in aws-cli.
:type expiry_window_seconds: int
:param expiry_window_seconds: The amount of time, in seconds,
"""
self._web_identity_token_loader = web_identity_token_loader
super(AssumeRoleWithWebIdentityCredentialFetcher, self).__init__(
client_creator, role_arn, extra_args=extra_args,
cache=cache, expiry_window_seconds=expiry_window_seconds
)
def _get_credentials(self):
"""Get credentials by calling assume role."""
kwargs = self._assume_role_kwargs()
# Assume role with web identity does not require credentials other than
# the token, explicitly configure the client to not sign requests.
config = Config(signature_version=UNSIGNED)
client = self._client_creator('sts', config=config)
return client.assume_role_with_web_identity(**kwargs)
def _assume_role_kwargs(self):
"""Get the arguments for assume role based on current configuration."""
assume_role_kwargs = deepcopy(self._assume_kwargs)
identity_token = self._web_identity_token_loader()
assume_role_kwargs['WebIdentityToken'] = identity_token
return assume_role_kwargs
class CredentialProvider(object):
# A short name to identify the provider within botocore.
METHOD = None
# A name to identify the provider for use in cross-sdk features like
# assume role's `credential_source` configuration option. These names
# are to be treated in a case-insensitive way. NOTE: any providers not
# implemented in botocore MUST prefix their canonical names with
# 'custom' or we DO NOT guarantee that it will work with any features
# that this provides.
CANONICAL_NAME = None
def __init__(self, session=None):
self.session = session
def load(self):
"""
Loads the credentials from their source & sets them on the object.
Subclasses should implement this method (by reading from disk, the
environment, the network or wherever), returning ``True`` if they were
found & loaded.
If not found, this method should return ``False``, indictating that the
``CredentialResolver`` should fall back to the next available method.
The default implementation does nothing, assuming the user has set the
``access_key/secret_key/token`` themselves.
:returns: Whether credentials were found & set
:rtype: Credentials
"""
return True
def _extract_creds_from_mapping(self, mapping, *key_names):
found = []
for key_name in key_names:
try:
found.append(mapping[key_name])
except KeyError:
raise PartialCredentialsError(provider=self.METHOD,
cred_var=key_name)
return found
class ProcessProvider(CredentialProvider):
METHOD = 'custom-process'
def __init__(self, profile_name, load_config, popen=subprocess.Popen):
self._profile_name = profile_name
self._load_config = load_config
self._loaded_config = None
self._popen = popen
def load(self):
credential_process = self._credential_process
if credential_process is None:
return
creds_dict = self._retrieve_credentials_using(credential_process)
if creds_dict.get('expiry_time') is not None:
return RefreshableCredentials.create_from_metadata(
creds_dict,
lambda: self._retrieve_credentials_using(credential_process),
self.METHOD
)
return Credentials(
access_key=creds_dict['access_key'],
secret_key=creds_dict['secret_key'],
token=creds_dict.get('token'),
method=self.METHOD
)
def _retrieve_credentials_using(self, credential_process):
# We're not using shell=True, so we need to pass the
# command and all arguments as a list.
process_list = compat_shell_split(credential_process)
p = self._popen(process_list,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise CredentialRetrievalError(
provider=self.METHOD, error_msg=stderr.decode('utf-8'))
parsed = botocore.compat.json.loads(stdout.decode('utf-8'))
version = parsed.get('Version', '<Version key not provided>')
if version != 1:
raise CredentialRetrievalError(
provider=self.METHOD,
error_msg=("Unsupported version '%s' for credential process "
"provider, supported versions: 1" % version))
try:
return {
'access_key': parsed['AccessKeyId'],
'secret_key': parsed['SecretAccessKey'],
'token': parsed.get('SessionToken'),
'expiry_time': parsed.get('Expiration'),
}
except KeyError as e:
raise CredentialRetrievalError(
provider=self.METHOD,
error_msg="Missing required key in response: %s" % e
)