Skip to content

Commit

Permalink
Add new config mechanism and use for S3 scratch bucket (#47)
Browse files Browse the repository at this point in the history
This is more generic configuration mechanism for records-mover that can be set system-, user-, or session-wide as needed.  It'd replace the current `/usr/local/bin/scratch-s3-url` mechanism.
  • Loading branch information
vinceatbluelabs authored May 14, 2020
1 parent ef502cb commit 6ec62d5
Show file tree
Hide file tree
Showing 15 changed files with 291 additions and 15 deletions.
111 changes: 111 additions & 0 deletions docs/CONFIG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Configuring records mover

There are key areas where records mover needs configuration:

1. Database connection details
2. Temporary locations
3. Cloud credentials for object stores

## Database connection details

There are ways to configure database connection details--some are
applicable only when using records mover as a Python library:

1. Setting environment variables (Python only)
2. Passing in pre-configured SQLAlchemy Engine objects (Python only)
3. Configuring db-facts (Python and mvrec)
4. Airflow connections (Python via Airflow)

### Setting environment variables (Python only)

The `Session` object contains a method called
`get_default_db_engine()` which will return a database engine as
configured by a set of env variables. Note that using this method
limits you to dealing with one database at a time, and often requires
that env variables exist in your OS environment; if these trade-offs
aren't acceptable please see the other options below.

The default environment variables match the semantics of
[sqlalchemy.engine.url.URL](https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.engine.url.URL)
and are as follows:

* `DB_HOST`
* `DB_DATABASE`
* `DB_PORT`
* `DB_USERNAME`
* `DB_PASSWORD`
* `DB_TYPE`

Redshift adds the following optional env variable(s):

* `REDSHIFT_SPECTRUM_BASE_URL_`: (optional) Specifies an `s3://` URL
where Redshift spectrum files should be stored when using the
'spectrum' records target.

BigQuery has an alternate set of env variables that should be used
instead of the `DB_` values above:

* `BQ_DEFAULT_PROJECT_ID`: Google Cloud Storage Project to be accessed.
* `BQ_DEFAULT_DATASET_ID`: BigQuery Dataset which should be used if
not otherwise overridden.
* `BQ_SERVICE_ACCOUNT_JSON`: (optional): JSON (not a filename)
representing BigQuery service account credentials.

### Passing in pre-configured SQLAlchemy Engine objects (Python only)

The `database` factory methods for records sources and targets allow a
SQLALchemy Engine to be passed in directly.

### Configuring db-facts (Python and mvrec)

[db-facts](https://github.com/bluelabsio/db-facts) is a complementary
project used to configure database credentials. Please see
[db-facts documentation](https://github.com/bluelabsio/db-facts/blob/master/CONFIGURATION.md)
for details on configuration.

### Airflow connections (Python via Airflow)

If you are running under Airflow, the
`session.creds.get_db_engine(name)` method will look up `name` in your
Airflow connections rather than use `db-facts`. This can be
configured via the `session_type` parameter passed to the `Session()`
constructor.

## Temporary locations

Cloud-based databases are often more efficient exporting to
cloud-native object stores (e.g., S3) than otherwise. Indeed, some
(e.g., Redshift) *only* support exporting to and importing from an
object store. In order to support moves between such databases and
incompatible targets, records mover must first export to the
compatible object store in a temporary location.

Note that you'll need credentials with permission to write to this
object store - see below for how to configure that.

### S3 (Redshift)

To specify the temporary location for Redshift exports and imports,
you can either set the environment variable `SCRATCH_S3_URL` to your
URL or configure a TOML-style file in one of the following locations:

* `/etc/bluelabs/records_mover/app.ini`
* `/etc/xdg/bluelabs/records_mover/app.ini`
* `$HOME/.config/bluelabs/records_mover/app.ini`
* `./.bluelabs/records_mover/app.ini`

Example file:

```toml
[aws]
s3_scratch_url = "s3://mybucket/path/"
```

### Filesystem

Temporary files written to the filesystem (including large data files
downloaded for local processing) will be stored per Python's
[tempfile](https://docs.python.org/3/library/tempfile.html) default,
which allow for configuration via the `TMPDIR`, `TEMP` or `TMP` env
variables, and generally default to
[something reasonable per your OS](https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir).
2 changes: 1 addition & 1 deletion metrics/bigfiles_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
969
968
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.9600
93.9600
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
91.8600
91.8200
20 changes: 18 additions & 2 deletions records_mover/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from records_mover.creds.creds_via_airflow import CredsViaAirflow
from records_mover.creds.creds_via_env import CredsViaEnv
from records_mover.logging import set_stream_logging
from config_resolver import get_config
import subprocess
import os
import sys
Expand All @@ -36,11 +37,26 @@ def _infer_session_type() -> str:


def _infer_scratch_s3_url(session_type: str) -> Optional[str]:
scratch_s3_url: Optional[str]

if "SCRATCH_S3_URL" in os.environ:
return os.environ["SCRATCH_S3_URL"]

# config_resolver logs at the WARNING level for each time it
# attempts to load a config file and doesn't find it - which given
# it searches a variety of places, is quite noisy.
#
# https://github.com/exhuma/config_resolver/blob/master/doc/intro.rst#logging
#
# https://github.com/exhuma/config_resolver/issues/69
logging.getLogger('config_resolver').setLevel(logging.ERROR)
config_result = get_config('records_mover', 'bluelabs')
cfg = config_result.config
if 'aws' in cfg:
s3_scratch_url: Optional[str] = cfg['aws'].get('s3_scratch_url')
if s3_scratch_url is not None:
return s3_scratch_url
else:
logger.debug('No config ini file found')

if session_type == 'cli':
try:
#
Expand Down
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,11 @@ def initialize_options(self) -> None:
]

db_dependencies = [
# https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/issues/195
#
# sqlalchemy 1.3.16 seems to have (accidentally?) introduced
# a breaking change that affects sqlalchemy-redshift:
#
# https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/issues/195
'sqlalchemy!=1.3.16',
'sqlalchemy!=1.3.16,!=1.3.17',
]

bigquery_dependencies = [
Expand Down Expand Up @@ -257,6 +255,7 @@ def initialize_options(self) -> None:
'db-facts>=4,<5',
'chardet',
'tenacity>=6<7',
'config-resolver>=5,<6',
'typing_inspect',
'typing-extensions',
],
Expand Down
34 changes: 27 additions & 7 deletions tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import unittest


@patch('records_mover.session.get_config')
@patch('records_mover.session.subprocess')
@patch('records_mover.session.os')
class TestSession(unittest.TestCase):
@patch('records_mover.db.connect.engine_from_db_facts')
def test_get_db_engine(self,
mock_engine_from_db_facts,
mock_os,
mock_subprocess):
mock_subprocess,
mock_get_config):
mock_db_creds_name = Mock(name='db_creds_name')
mock_creds = Mock(name='creds')
session = Session()
Expand All @@ -24,7 +26,8 @@ def test_get_db_engine(self,
def test_itest_type_uses_creds_via_env(self,
mock_CredsViaEnv,
mock_os,
mock_subprocess):
mock_subprocess,
mock_get_config):
mock_creds = mock_CredsViaEnv.return_value
session = Session(session_type='itest')
self.assertEqual(session.creds, mock_creds)
Expand All @@ -33,7 +36,8 @@ def test_itest_type_uses_creds_via_env(self,
def test_env_type_uses_creds_via_env(self,
mock_CredsViaEnv,
mock_os,
mock_subprocess):
mock_subprocess,
mock_get_config):
mock_creds = mock_CredsViaEnv.return_value
session = Session(session_type='env')
self.assertEqual(session.creds, mock_creds)
Expand All @@ -44,7 +48,8 @@ def test_get_default_db_engine_no_default(self,
mock_engine_from_db_facts,
mock_db_facts_from_env,
mock_os,
mock_subprocess):
mock_subprocess,
mock_get_config):
session = Session()
self.assertEqual(session.get_default_db_engine(), mock_engine_from_db_facts.return_value)
mock_db_facts_from_env.assert_called_with()
Expand All @@ -57,7 +62,8 @@ def test_get_default_db_facts_no_default(self,
mock_engine_from_db_facts,
mock_db_facts_from_env,
mock_os,
mock_subprocess):
mock_subprocess,
mock_get_config):
session = Session()
self.assertEqual(session.get_default_db_facts(), mock_db_facts_from_env.return_value)
mock_db_facts_from_env.assert_called_with()
Expand All @@ -66,7 +72,8 @@ def test_get_default_db_facts_no_default(self,
def test_get_default_db_facts_with_default(self,
mock_engine_from_db_facts,
mock_os,
mock_subprocess):
mock_subprocess,
mock_get_config):
mock_creds = Mock(name='creds')
mock_default_db_creds_name = Mock(name='default_db_creds_name')
session = Session(creds=mock_creds,
Expand All @@ -79,7 +86,8 @@ def test_get_default_db_facts_with_default(self,
def test_set_stream_logging(self,
mock_set_stream_logging,
mock_os,
mock_subprocess):
mock_subprocess,
mock_get_config):
session = Session()
mock_name = Mock(name='name')
mock_level = Mock(name='level')
Expand All @@ -96,3 +104,15 @@ def test_set_stream_logging(self,
stream=mock_stream,
fmt=mock_fmt,
datefmt=mock_datefmt)

@patch('records_mover.session.set_stream_logging')
def test_s3_url_from_get_config(self,
mock_set_stream_logging,
mock_os,
mock_subprocess,
mock_get_config):
mock_os.environ = {}
mock_config_result = mock_get_config.return_value
mock_config_result.config = {'aws': {'s3_scratch_url': 's3://foundit/'}}
session = Session()
self.assertEqual(session._scratch_s3_url, 's3://foundit/')
2 changes: 2 additions & 0 deletions types/stubs/config_resolver/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .core import from_string as from_string, get_config as get_config # noqa
from .exc import NoVersionError as NoVersionError # noqa
63 changes: 63 additions & 0 deletions types/stubs/config_resolver/core.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from .exc import NoVersionError as NoVersionError # noqa
from .handler.base import Handler as Handler
from .util import PrefixFilter as PrefixFilter # noqa
from collections import namedtuple
from config_resolver.handler.ini import IniHandler as IniHandler # noqa
from logging import Filter, Logger
from packaging.version import Version
from typing import Any, Dict, Generator, List, Optional, Tuple, Type

ConfigID = namedtuple("ConfigID", "group app")

LookupResult = namedtuple("LookupResult", "config meta")

LookupMetadata = namedtuple(
"LookupMetadata", ["active_path", "loaded_files", "config_id", "prefix_filter"]
)

FileReadability = namedtuple("FileReadability", "is_readable filename reason version")


def from_string(data: str, handler: Optional[Handler[Any]] = ...) -> LookupResult: ...


def get_config(
app_name: str,
group_name: str = ...,
lookup_options: Optional[Dict[str, Any]] = ...,
handler: Optional[Type[Handler[Any]]] = ...,
) -> LookupResult: ...


def prefixed_logger(
config_id: Optional[ConfigID],
) -> Tuple[Logger, Optional[Filter]]: ...


def get_xdg_dirs(config_id: ConfigID) -> List[str]: ...


def get_xdg_home(config_id: ConfigID) -> str: ...


def effective_path(config_id: ConfigID, search_path: str = ...) -> List[str]: ...


def find_files(
config_id: ConfigID, search_path: Optional[List[str]] = ..., filename: str = ...
) -> Generator[str, None, None]: ...


def effective_filename(config_id: ConfigID, config_filename: str) -> str: ...


def env_name(config_id: ConfigID) -> str: ...


def is_readable(
config_id: ConfigID,
filename: str,
version: Optional[Version] = ...,
secure: bool = ...,
handler: Optional[Type[Handler[Any]]] = ...,
) -> FileReadability: ...
2 changes: 2 additions & 0 deletions types/stubs/config_resolver/exc.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class NoVersionError(Exception):
...
Empty file.
18 changes: 18 additions & 0 deletions types/stubs/config_resolver/handler/base.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from packaging.version import Version as Version
from typing import Any, Optional, TypeVar, Generic

TConfig = TypeVar('TConfig', bound=Any)


class Handler(Generic[TConfig]):
DEFAULT_FILENAME: str = ...
@staticmethod
def empty() -> TConfig: ...
@staticmethod
def from_string(data: str) -> TConfig: ...
@staticmethod
def from_filename(filename: str) -> TConfig: ...
@staticmethod
def get_version(config: TConfig) -> Optional[Version]: ...
@staticmethod
def update_from_file(config: TConfig, filename: str) -> None: ...
18 changes: 18 additions & 0 deletions types/stubs/config_resolver/handler/ini.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from .base import Handler as Handler
from configparser import ConfigParser
from packaging.version import Version
from typing import Optional


class IniHandler(Handler[ConfigParser]):
DEFAULT_FILENAME: str = ...
@staticmethod
def empty() -> ConfigParser: ...
@staticmethod
def from_string(data: str) -> ConfigParser: ...
@staticmethod
def from_filename(filename: str) -> ConfigParser: ...
@staticmethod
def get_version(config: ConfigParser) -> Optional[Version]: ...
@staticmethod
def update_from_file(config: ConfigParser, filename: str) -> None: ...
Loading

0 comments on commit 6ec62d5

Please sign in to comment.