Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuration to be set for session type #70

Merged
merged 9 commits into from
Jun 2, 2020
Merged
75 changes: 72 additions & 3 deletions docs/CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ There are key areas where records mover needs configuration:

1. Database connection details
2. Temporary locations
3. Cloud credentials for object stores
3. Cloud credentials (e.g., S3/GCS/Google Sheets)

## Database connection details

Expand Down Expand Up @@ -87,7 +87,7 @@ object store - see below for how to configure that.

To specify the temporary location for Redshift exports and imports,
you can either set the environment variable `SCRATCH_S3_URL` to your
URL or configure a TOML-style file in one of the following locations:
URL or configure a INI-style file in one of the following locations:

* `/etc/bluelabs/records_mover/app.ini`
* `/etc/xdg/bluelabs/records_mover/app.ini`
Expand All @@ -96,7 +96,7 @@ URL or configure a TOML-style file in one of the following locations:

Example file:

```toml
```ini
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out config_resolver uses https://docs.python.org/3/library/configparser.html which uses a simplified INI format which is not actually TOML. TIL!

[aws]
s3_scratch_url = "s3://mybucket/path/"
```
Expand Down Expand Up @@ -124,3 +124,72 @@ downloaded for local processing) will be stored per Python's
which allow for configuration via the `TMPDIR`, `TEMP` or `TMP` env
variables, and generally default to
[something reasonable per your OS](https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir).

## Cloud credentials (e.g., S3/GCS/Google Sheets)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since database credentials come out of db-facts, the major difference made in switching the default creds provider from CredsViaLastPass and CredsViaEnv is how non-default GCP credentials are passed--this text should address that and help provide guidance.


To be able to access cloud resources, including S3, GCS and Google
Sheets, Records Mover requires credentials.

There are multiple ways to configure these:

1. Vendor system configuration (Python and mvrec)
2. Setting environment variables (Python only)
3. Passing in pre-configured default credential objects (Python only)
4. Using a third-party secrets manager (Python and mvrec)
5. Airflow connections (Python via Airflow)

### Vendor system configuration (Python and mvrec)

Both AWS and GCP have Python libraries which support using credentials
you configure in different ways. Unless told otherwise, Records Mover
will use these credentials as the "default credentials" available via
the 'creds' property under the Session object.

### Setting environment variables (Python and mvrec)

AWS natively supports setting credentials using the
`AWS_ACCESS_KEY_ID`/`AWS_SECRET_ACCESS_KEY`/`AWS_SESSION_TOKEN`
environment variables.

Similarly, GCP supports pointing to a file with application
credentials via the `GOOGLE_APPLICATION_CREDENTIALS` environment
variable.

When using the default 'env' session type, Records Mover also supports
providing a base64ed version of the GCP service account credentials via
the `GCP_SERVICE_ACCOUNT_JSON_BASE64` env variable.

### Passing in pre-configured default credential objects (Python only)

You can pass in credentials objects directly to a Session() object
using the `default_gcs_client`, `default_gcp_creds` and/or
`default_boto3_session` arguments.

### Using a third-party secrets manager (Python and mvrec)

To use a secrets manager of some type, you can instruct Records Mover
to use a different instance of the 'BaseCreds' class which knows how
to use your specific type of secrets manager.

An [example implementation](https://github.com/bluelabsio/records-mover/blob/master/records_mover/creds/creds_via_lastpass.py)
ships with Records Mover to use LastPass' CLI tool to fetch (for
instance) GCP credentials via LastPass.

You can either pass in a instance of a BaseCreds subclass as the
'creds' argument to the Session() constructor in Python, pass in the
string 'lpass' as the value of the 'session_type' parameter to the
Session() constructor, or provide the following config in the `.ini`
file referenced above:

```ini
[session]
session_type = "lpass"
```

### Airflow connections (Python via Airflow)

Similarly, Records Mover ships with a BaseCreds instance which knows
how to fetch credentials using Airflow connections. While Records
Mover will attempt to auto-detect to determine if it is running under
Airflow, you can explicitly tell Records Mover to use this mode by
setting session_type to "airflow" using one of the above methods.
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.6900
93.700
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.1900
92.2700
29 changes: 20 additions & 9 deletions records_mover/session.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from config_resolver import get_config
from db_facts.db_facts_types import DBFacts
from .creds.base_creds import BaseCreds
from .records.records import Records
Expand Down Expand Up @@ -27,11 +28,20 @@ def _infer_session_type() -> str:
if 'RECORDS_MOVER_SESSION_TYPE' in os.environ:
return os.environ['RECORDS_MOVER_SESSION_TYPE']

config_result = get_config('records_mover', 'bluelabs')
cfg = config_result.config
if 'session' in cfg:
session_cfg = cfg['session']
session_type: Optional[str] = session_cfg.get('session_type')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could creds_type be a more appropriate name here? Does this control anything about the session other than how it gets the creds?

if session_type is not None:
logger.info(f"Using session_type={session_type} from config file")
return session_type

if 'AIRFLOW__CORE__EXECUTOR' in os.environ:
# Guess based on an env variable sometimes set by Airflow
return 'airflow'

return 'cli'
return 'env'


def _infer_default_aws_creds_name(session_type: str) -> Optional[str]:
Expand Down Expand Up @@ -73,13 +83,14 @@ def _infer_creds(session_type: str,
default_gcs_client=default_gcs_client,
scratch_s3_url=scratch_s3_url)
elif session_type == 'cli':
#
# https://app.asana.com/0/1128138765527694/1163219515343393
#
# Most people don't use LastPass; other secrets managements
# should be supported and configurable at the system- and
# user- level.
#
return CredsViaEnv(default_db_creds_name=default_db_creds_name,
default_aws_creds_name=default_aws_creds_name,
default_gcp_creds_name=default_gcp_creds_name,
default_db_facts=default_db_facts,
default_boto3_session=default_boto3_session,
default_gcp_creds=default_gcp_creds,
default_gcs_client=default_gcs_client)
elif session_type == 'lpass':
return CredsViaLastPass(default_db_creds_name=default_db_creds_name,
default_aws_creds_name=default_aws_creds_name,
default_gcp_creds_name=default_gcp_creds_name,
Expand Down Expand Up @@ -107,7 +118,7 @@ def _infer_creds(session_type: str,
default_gcs_client=default_gcs_client,
scratch_s3_url=scratch_s3_url)
elif session_type is not None:
raise ValueError("Valid session types: cli, airflow, itest, env - "
raise ValueError("Valid session types: cli, lpass, airflow, itest, env - "
"consider upgrading records-mover if you're looking for "
f"{session_type}.")

Expand Down
3 changes: 2 additions & 1 deletion tests/unit/records/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import unittest
from mock import patch, call, Mock
from records_mover.records.cli import main
Expand All @@ -11,8 +10,10 @@
@patch('records_mover.records.cli.run_records_mover_job')
@patch('records_mover.records.cli.JobConfigSchemaAsArgsParser')
@patch('records_mover.records.cli.arguments_output_to_config')
@patch('records_mover.session.get_config')
class TestCLI(unittest.TestCase):
def test_main(self,
mock_get_config,
mock_arguments_output_to_config,
mock_JobConfigSchemaAsArgsParser,
mock_run_records_mover_job,
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_cli_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TestCLISession(unittest.TestCase):
def test_creds(self,
mock_CredsViaLastPass,
mock_os):
session = Session(session_type='cli',
session = Session(session_type='lpass',
default_db_creds_name=None,
default_aws_creds_name=None)
self.assertEqual(mock_CredsViaLastPass.return_value, session.creds)
Expand All @@ -25,7 +25,7 @@ def test_get_default_db_engine_from_name(self,
mock_engine_from_db_facts,
mock_CredsViaLastPass,
mock_os):
session = Session(session_type='cli',
session = Session(session_type='lpass',
default_db_creds_name='foo',
default_aws_creds_name=None)
mock_creds = mock_CredsViaLastPass.return_value
Expand Down
71 changes: 55 additions & 16 deletions tests/unit/test_session_choices.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@

@patch('records_mover.session.CredsViaLastPass')
@patch('records_mover.session.CredsViaAirflow')
@patch('records_mover.session.CredsViaEnv')
@patch('records_mover.session.get_config')
class TestSessionChoices(unittest.TestCase):
def mock_session(self, **kwargs):
from records_mover import session

return session.Session(scratch_s3_url='s3://foo/', **kwargs)
return session.Session(scratch_s3_url='s3://foo/',
**kwargs)

@patch.dict('os.environ', {
'AIRFLOW__CORE__EXECUTOR': 'whoop',
})
def test_select_airflow_session_by_implicit_env_variable(self,
mock_get_config,
mock_CredsViaEnv,
mock_CredsViaAirflow,
mock_CredsViaLastPass):
session = self.mock_session()
Expand All @@ -29,25 +34,29 @@ def test_select_airflow_session_by_implicit_env_variable(self,
scratch_s3_url='s3://foo/')

def test_select_cli_session_by_default(self,
mock_get_config,
mock_CredsViaEnv,
mock_CredsViaAirflow,
mock_CredsViaLastPass):
session = self.mock_session()
self.assertEqual(session.creds, mock_CredsViaLastPass.return_value)
mock_CredsViaLastPass.assert_called_with(default_db_creds_name=None,
default_aws_creds_name=None,
default_gcp_creds_name=None,
default_db_facts=PleaseInfer.token,
default_boto3_session=PleaseInfer.token,
default_gcp_creds=PleaseInfer.token,
default_gcs_client=PleaseInfer.token,
scratch_s3_url='s3://foo/')
self.assertEqual(session.creds, mock_CredsViaEnv.return_value)
mock_CredsViaEnv.assert_called_with(default_db_creds_name=None,
default_aws_creds_name=None,
default_gcp_creds_name=None,
default_db_facts=PleaseInfer.token,
default_boto3_session=PleaseInfer.token,
default_gcp_creds=PleaseInfer.token,
default_gcs_client=PleaseInfer.token,
scratch_s3_url='s3://foo/')

@patch.dict('os.environ', {
'RECORDS_MOVER_SESSION_TYPE': 'cli',
'RECORDS_MOVER_SESSION_TYPE': 'lpass',
})
def test_select_cli_session_by_explicit_env_variable(self,
mock_CredsViaAirflow,
mock_CredsViaLastPass):
def test_select_lpass_session_by_explicit_env_variable(self,
mock_get_config,
mock_CredsViaEnv,
mock_CredsViaAirflow,
mock_CredsViaLastPass):
session = self.mock_session()
self.assertEqual(session.creds, mock_CredsViaLastPass.return_value)
mock_CredsViaLastPass.assert_called_with(default_db_creds_name=None,
Expand All @@ -63,6 +72,8 @@ def test_select_cli_session_by_explicit_env_variable(self,
'RECORDS_MOVER_SESSION_TYPE': 'airflow',
})
def test_select_airflow_session_by_explicit_env_variable(self,
mock_get_config,
mock_CredsViaEnv,
mock_CredsViaAirflow,
mock_CredsViaLastPass):
session = self.mock_session()
Expand All @@ -80,16 +91,20 @@ def test_select_airflow_session_by_explicit_env_variable(self,
'RECORDS_MOVER_SESSION_TYPE': 'bogus',
})
def test_select_invalid_session_by_explicit_env_variable(self,
mock_get_config,
mock_CredsViaEnv,
mock_CredsViaAirflow,
mock_CredsViaLastPass):
with self.assertRaises(ValueError) as r:
session = self.mock_session()
print(f"Got session: {session}")
self.assertEqual(str(r.exception),
'Valid session types: cli, airflow, itest, env - '
'Valid session types: cli, lpass, airflow, itest, env - '
"consider upgrading records-mover if you're looking for bogus.")

def test_select_airflow_session_by_parameter(self,
mock_get_config,
mock_CredsViaEnv,
mock_CredsViaAirflow,
mock_CredsViaLastPass):
session = self.mock_session(session_type='airflow')
Expand All @@ -104,9 +119,31 @@ def test_select_airflow_session_by_parameter(self,
scratch_s3_url='s3://foo/')

def test_select_cli_session_by_parameter(self,
mock_get_config,
mock_CredsViaEnv,
mock_CredsViaAirflow,
mock_CredsViaLastPass):
session = self.mock_session(session_type='cli')
self.assertEqual(session.creds, mock_CredsViaEnv.return_value)
mock_CredsViaEnv.assert_called_with(default_db_creds_name=None,
default_aws_creds_name=None,
default_gcp_creds_name=None,
default_db_facts=PleaseInfer.token,
default_boto3_session=PleaseInfer.token,
default_gcp_creds=PleaseInfer.token,
default_gcs_client=PleaseInfer.token)

def test_select_lastpass_session_by_config(self,
mock_get_config,
mock_CredsViaEnv,
mock_CredsViaAirflow,
mock_CredsViaLastPass):
mock_get_config.return_value.config = {
'session': {
'session_type': 'lpass'
}
}
session = self.mock_session()
self.assertEqual(session.creds, mock_CredsViaLastPass.return_value)
mock_CredsViaLastPass.assert_called_with(default_db_creds_name=None,
default_aws_creds_name=None,
Expand All @@ -118,10 +155,12 @@ def test_select_cli_session_by_parameter(self,
scratch_s3_url='s3://foo/')

def test_select_invalid_session_by_parameter(self,
mock_get_config,
mock_CredsViaEnv,
mock_CredsViaAirflow,
mock_CredsViaLastPass):
with self.assertRaises(ValueError) as r:
self.mock_session(session_type='bogus')
self.assertEqual(str(r.exception),
"Valid session types: cli, airflow, itest, env - "
"Valid session types: cli, lpass, airflow, itest, env - "
"consider upgrading records-mover if you're looking for bogus.")
3 changes: 3 additions & 0 deletions tests/unit/test_top_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

@patch('google.auth.default')
@patch('google.cloud.storage.Client')
@patch('records_mover.session.get_config')
class TestTopLevel(unittest.TestCase):
def test_sources(self,
mock_get_config,
mock_Client,
mock_google_auth_default):
mock_credentials = Mock(name='credentials')
Expand All @@ -16,6 +18,7 @@ def test_sources(self,
records_mover.records.sources.factory.RecordsSources)

def test_targets(self,
mock_get_config,
mock_Client,
mock_google_auth_default):
mock_credentials = Mock(name='credentials')
Expand Down
10 changes: 7 additions & 3 deletions types/stubs/config_resolver/core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@ from collections import namedtuple
from config_resolver.handler.ini import IniHandler as IniHandler # noqa
from logging import Filter, Logger
from packaging.version import Version
from typing import Any, Dict, Generator, List, Optional, Tuple, Type
from typing import Any, Dict, Generator, List, Optional, Tuple, Type, NamedTuple

ConfigID = namedtuple("ConfigID", "group app")

LookupResult = namedtuple("LookupResult", "config meta")

LookupMetadata = namedtuple(
"LookupMetadata", ["active_path", "loaded_files", "config_id", "prefix_filter"]
)


class LookupResult(NamedTuple):
config: Dict[str, Dict[str, Any]]
meta: LookupMetadata


FileReadability = namedtuple("FileReadability", "is_readable filename reason version")


Expand Down