Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow IAM username suffix to be added to S3 scratch URL #69

Merged
merged 17 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics/bigfiles_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
981
982
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment to setup.py to explain a new version constraint.

2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
94.0400
94.0500
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.2700
92.2800
82 changes: 70 additions & 12 deletions records_mover/creds/base_creds.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from db_facts.db_facts_types import DBFacts
from .database import db_facts_from_env
from typing import TYPE_CHECKING, Iterable, Union, Optional
from records_mover.mover_types import NotYetFetched
from records_mover.mover_types import PleaseInfer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I standardized on the latter here; the difference in semantics was so subtle as to be meaningless for the uses here.

from config_resolver import get_config
import os
import logging
if TYPE_CHECKING:
# see the 'gsheets' extras_require option in setup.py - needed for this!
Expand Down Expand Up @@ -31,16 +33,19 @@ def __init__(self,
default_db_creds_name: Optional[str] = None,
default_aws_creds_name: Optional[str] = None,
default_gcp_creds_name: Optional[str] = None,
default_db_facts: Union[NotYetFetched, DBFacts] = NotYetFetched.token,
default_boto3_session: Union[NotYetFetched,
default_db_facts: Union[PleaseInfer, DBFacts] = PleaseInfer.token,
default_boto3_session: Union[PleaseInfer,
'boto3.session.Session',
None] = NotYetFetched.token,
default_gcp_creds: Union[NotYetFetched,
None] = PleaseInfer.token,
default_gcp_creds: Union[PleaseInfer,
'google.auth.credentials.Credentials',
None] = NotYetFetched.token,
default_gcs_client: Union[NotYetFetched,
None] = PleaseInfer.token,
default_gcs_client: Union[PleaseInfer,
'google.cloud.storage.Client',
None] = NotYetFetched.token) -> None:
None] = PleaseInfer.token,
scratch_s3_url: Union[PleaseInfer,
str,
None] = PleaseInfer.token) -> None:
self._default_db_creds_name = default_db_creds_name
self._default_aws_creds_name = default_aws_creds_name
self._default_gcp_creds_name = default_gcp_creds_name
Expand All @@ -50,6 +55,8 @@ def __init__(self,
self.__default_gcs_client = default_gcs_client
self.__default_boto3_session = default_boto3_session

self._scratch_s3_url = scratch_s3_url

def google_sheets(self, gcp_creds_name: str) -> 'google.auth.credentials.Credentials':
scopes = ('https://www.googleapis.com/auth/spreadsheets',)
return self._gcp_creds(gcp_creds_name, scopes)
Expand All @@ -71,7 +78,7 @@ def boto3_session(self, aws_creds_name: str) -> 'boto3.session.Session':
raise NotImplementedError

def default_boto3_session(self) -> Optional['boto3.session.Session']:
if self.__default_boto3_session is not NotYetFetched.token:
if self.__default_boto3_session is not PleaseInfer.token:
return self.__default_boto3_session

try:
Expand All @@ -88,7 +95,7 @@ def default_boto3_session(self) -> Optional['boto3.session.Session']:
return self.__default_boto3_session

def default_gcs_creds(self) -> Optional['google.auth.credentials.Credentials']:
if self.__default_gcs_creds is not NotYetFetched.token:
if self.__default_gcs_creds is not PleaseInfer.token:
return self.__default_gcs_creds

try:
Expand All @@ -113,7 +120,7 @@ def default_gcs_creds(self) -> Optional['google.auth.credentials.Credentials']:
return self.__default_gcs_creds

def default_gcs_client(self) -> Optional['google.cloud.storage.Client']:
if self.__default_gcs_client is not NotYetFetched.token:
if self.__default_gcs_client is not PleaseInfer.token:
return self.__default_gcs_client

gcs_creds = self.default_gcs_creds()
Expand All @@ -140,11 +147,62 @@ def default_gcs_client(self) -> Optional['google.cloud.storage.Client']:
return self.__default_gcs_client

def default_db_facts(self) -> DBFacts:
if self.__default_db_facts is not NotYetFetched.token:
if self.__default_db_facts is not PleaseInfer.token:
return self.__default_db_facts

if self._default_db_creds_name is None:
self.__default_db_facts = db_facts_from_env()
else:
self.__default_db_facts = self.db_facts(self._default_db_creds_name)
return self.__default_db_facts

def _append_aws_username_to_bucket(self,
prefix: str,
boto3_session: 'boto3.session.Session') -> Optional[str]:
sts_client = boto3_session.client('sts')
caller_identity = sts_client.get_caller_identity()
arn = caller_identity['Arn']
last_section_of_arn = arn.split(':')[-1]
# Check that this is an actual user and not, say,
# an assumed role or something else.
if last_section_of_arn.startswith('user/'):
username = last_section_of_arn.split('/')[-1]
return f"{prefix}{username}/"
else:
logger.warning('Cannot generate S3 scratch URL with IAM username, '
f'as there is no username in {arn}')
return None

def _infer_scratch_s3_url(self,
boto3_session: Optional['boto3.session.Session']) -> Optional[str]:
if "SCRATCH_S3_URL" in os.environ:
return os.environ["SCRATCH_S3_URL"]

config_result = get_config('records_mover', 'bluelabs')
cfg = config_result.config
if 'aws' in cfg:
aws_cfg = cfg['aws']
s3_scratch_url: Optional[str] = aws_cfg.get('s3_scratch_url')
if s3_scratch_url is not None:
return s3_scratch_url
else:
s3_scratch_url_prefix: Optional[str] =\
aws_cfg.get('s3_scratch_url_appended_with_iam_username')
if s3_scratch_url_prefix is not None:
if boto3_session is None:
logger.warning('Cannot generate S3 scratch URL with IAM username, '
'as I have no IAM username')
return None
return self._append_aws_username_to_bucket(s3_scratch_url_prefix,
boto3_session)
else:
logger.debug('No S3 scratch bucket config found')
return None
else:
logger.debug('No config ini file found')
return None

def default_scratch_s3_url(self) -> Optional[str]:
if self._scratch_s3_url is PleaseInfer.token:
self._scratch_s3_url = self._infer_scratch_s3_url(self.default_boto3_session())
return self._scratch_s3_url
2 changes: 1 addition & 1 deletion records_mover/mover_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
# This is a mypy-friendly way of doing a singleton object:
#
# https://github.com/python/typing/issues/236
class NotYetFetched(Enum):
class PleaseInfer(Enum):
token = 1
100 changes: 30 additions & 70 deletions records_mover/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@
from .url.base import BaseFileUrl, BaseDirectoryUrl
from typing import Union, Optional, IO
from .url.resolver import UrlResolver
from enum import Enum
from records_mover.creds.creds_via_lastpass import CredsViaLastPass
from records_mover.creds.creds_via_airflow import CredsViaAirflow
from records_mover.creds.creds_via_env import CredsViaEnv
from records_mover.logging import set_stream_logging
from records_mover.mover_types import NotYetFetched
from config_resolver import get_config
import subprocess
from records_mover.mover_types import PleaseInfer
import os
import sys
import logging
Expand All @@ -37,41 +34,6 @@ def _infer_session_type() -> str:
return 'cli'


def _infer_scratch_s3_url(session_type: str) -> Optional[str]:
if "SCRATCH_S3_URL" in os.environ:
return os.environ["SCRATCH_S3_URL"]

# config_resolver logs at the WARNING level for each time it
# attempts to load a config file and doesn't find it - which given
# it searches a variety of places, is quite noisy.
#
# https://github.com/exhuma/config_resolver/blob/master/doc/intro.rst#logging
#
# https://github.com/exhuma/config_resolver/issues/69
logging.getLogger('config_resolver').setLevel(logging.ERROR)
config_result = get_config('records_mover', 'bluelabs')
cfg = config_result.config
if 'aws' in cfg:
s3_scratch_url: Optional[str] = cfg['aws'].get('s3_scratch_url')
if s3_scratch_url is not None:
return s3_scratch_url
else:
logger.debug('No config ini file found')

if session_type == 'cli':
try:
#
# https://app.asana.com/0/1128138765527694/1163219515343393
#
# This method of configuration needs to be replaced with
# something more conventional and documented.
#
return subprocess.check_output("scratch-s3-url").decode('ASCII').rstrip()
except FileNotFoundError:
pass
return None


def _infer_default_aws_creds_name(session_type: str) -> Optional[str]:
if session_type == 'airflow':
return 'aws_default'
Expand All @@ -88,24 +50,28 @@ def _infer_creds(session_type: str,
default_db_creds_name: Optional[str],
default_aws_creds_name: Optional[str],
default_gcp_creds_name: Optional[str],
default_db_facts: Union[NotYetFetched, DBFacts],
default_boto3_session: Union[NotYetFetched,
default_db_facts: Union[PleaseInfer, DBFacts],
default_boto3_session: Union[PleaseInfer,
'boto3.session.Session',
None],
default_gcp_creds: Union[NotYetFetched,
default_gcp_creds: Union[PleaseInfer,
'google.auth.credentials.Credentials',
None],
default_gcs_client: Union[NotYetFetched,
default_gcs_client: Union[PleaseInfer,
'google.cloud.storage.Client',
None]) -> BaseCreds:
None],
scratch_s3_url: Union[PleaseInfer,
str,
None]) -> BaseCreds:
if session_type == 'airflow':
return CredsViaAirflow(default_db_creds_name=default_db_creds_name,
default_aws_creds_name=default_aws_creds_name,
default_gcp_creds_name=default_gcp_creds_name,
default_db_facts=default_db_facts,
default_boto3_session=default_boto3_session,
default_gcp_creds=default_gcp_creds,
default_gcs_client=default_gcs_client)
default_gcs_client=default_gcs_client,
scratch_s3_url=scratch_s3_url)
elif session_type == 'cli':
#
# https://app.asana.com/0/1128138765527694/1163219515343393
Expand All @@ -120,36 +86,32 @@ def _infer_creds(session_type: str,
default_db_facts=default_db_facts,
default_boto3_session=default_boto3_session,
default_gcp_creds=default_gcp_creds,
default_gcs_client=default_gcs_client)
default_gcs_client=default_gcs_client,
scratch_s3_url=scratch_s3_url)
elif session_type == 'itest':
return CredsViaEnv(default_db_creds_name=default_db_creds_name,
default_aws_creds_name=default_aws_creds_name,
default_gcp_creds_name=default_gcp_creds_name,
default_db_facts=default_db_facts,
default_boto3_session=default_boto3_session,
default_gcp_creds=default_gcp_creds,
default_gcs_client=default_gcs_client)
default_gcs_client=default_gcs_client,
scratch_s3_url=scratch_s3_url)
elif session_type == 'env':
return CredsViaEnv(default_db_creds_name=default_db_creds_name,
default_aws_creds_name=default_aws_creds_name,
default_gcp_creds_name=default_gcp_creds_name,
default_db_facts=default_db_facts,
default_boto3_session=default_boto3_session,
default_gcp_creds=default_gcp_creds,
default_gcs_client=default_gcs_client)
default_gcs_client=default_gcs_client,
scratch_s3_url=scratch_s3_url)
elif session_type is not None:
raise ValueError("Valid job context types: cli, airflow, docker-itest, env - "
raise ValueError("Valid job context types: cli, airflow, itest, env - "
"consider upgrading records-mover if you're looking for "
f"{session_type}.")


# This is a mypy-friendly way of doing a singleton object:
#
# https://github.com/python/typing/issues/236
class PleaseInfer(Enum):
token = 1


class Session():
def __init__(self,
default_db_creds_name: Optional[str] = None,
Expand All @@ -158,16 +120,16 @@ def __init__(self,
session_type: Union[str, PleaseInfer] = PleaseInfer.token,
scratch_s3_url: Union[None, str, PleaseInfer] = PleaseInfer.token,
creds: Union[BaseCreds, PleaseInfer] = PleaseInfer.token,
default_db_facts: Union[NotYetFetched, DBFacts] = NotYetFetched.token,
default_boto3_session: Union[NotYetFetched,
default_db_facts: Union[PleaseInfer, DBFacts] = PleaseInfer.token,
default_boto3_session: Union[PleaseInfer,
'boto3.session.Session',
None] = NotYetFetched.token,
default_gcp_creds: Union[NotYetFetched,
None] = PleaseInfer.token,
default_gcp_creds: Union[PleaseInfer,
'google.auth.credentials.Credentials',
None] = NotYetFetched.token,
default_gcs_client: Union[NotYetFetched,
None] = PleaseInfer.token,
default_gcs_client: Union[PleaseInfer,
'google.cloud.storage.Client',
None] = NotYetFetched.token) -> None:
None] = PleaseInfer.token) -> None:
if session_type is PleaseInfer.token:
session_type = _infer_session_type()

Expand All @@ -185,12 +147,9 @@ def __init__(self,
default_db_facts=default_db_facts,
default_boto3_session=default_boto3_session,
default_gcp_creds=default_gcp_creds,
default_gcs_client=default_gcs_client)

if scratch_s3_url is PleaseInfer.token:
scratch_s3_url = _infer_scratch_s3_url(session_type)
default_gcs_client=default_gcs_client,
scratch_s3_url=scratch_s3_url)

self._scratch_s3_url = scratch_s3_url
self.creds = creds

@property
Expand Down Expand Up @@ -219,9 +178,10 @@ def db_driver(self, db: Union['Engine', 'Connection']) -> 'DBDriver':
from .db.factory import db_driver

kwargs = {}
if self._scratch_s3_url is not None:
scratch_s3_url = self.creds.default_scratch_s3_url()
if scratch_s3_url is not None:
try:
s3_temp_base_loc = self.directory_url(self._scratch_s3_url)
s3_temp_base_loc = self.directory_url(scratch_s3_url)
kwargs['s3_temp_base_loc'] = s3_temp_base_loc
except NotImplementedError:
logger.debug('boto3 not installed', exc_info=True)
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ def initialize_options(self) -> None:
'db-facts>=4,<5',
'chardet',
'tenacity>=6<7',
'config-resolver>=5,<6',
# v5.0.1 resolves https://github.com/exhuma/config_resolver/issues/69
'config-resolver>=5.0.1,<6',
'typing_inspect',
'typing-extensions',
],
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/records/single_db/test_records_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def local_source(self, filename, records_format, records_schema):

@contextmanager
def s3_url_source(self, filename, records_format, records_schema):
base_dir = self.session.directory_url(self.session._scratch_s3_url)
base_dir = self.session.directory_url(self.session.creds.default_scratch_s3_url())

with base_dir.temporary_directory() as temp_dir_loc:
file_loc = temp_dir_loc.file_in_this_directory('foo.gz')
Expand Down
Loading