Skip to content

Commit

Permalink
Initial public interface changes (#28)
Browse files Browse the repository at this point in the history
* Move Records objects exports to records package

* Define public interface for db package

* Add SqlAlchemyDbHook for more Airflow support

* Remove utility methods not used by records-mover itself

* Bump feature version

* Add MAINT.md

* Ratchet mypy coverage

* Ratchet flake8

* Export constructors for records formats

* Move integration tests to new interface

* Ratchet mypy coverage

* Explicitly offer Airflow hooks as public interface

* Explicitly offer Airflow hooks as public interface

* Add a test of public interface from internal uses
  • Loading branch information
vinceatbluelabs authored Mar 3, 2020
1 parent 6bb94ac commit 972bc10
Show file tree
Hide file tree
Showing 32 changed files with 230 additions and 342 deletions.
96 changes: 96 additions & 0 deletions MAINT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Maintenance

Packages inside include:

* [records](./records_mover/records/), which is the core API you
can use to move relational data from one place to another.
* [url](./records_mover/url/), which offers some abstractions
across different filesystem-like things (e.g., S3/HTTP/local
filesystems, maybe SFTP in the future)
* [db](./records_mover/db/), which adds some functionality on top of
SQLAlchemy for various different database types.
* [creds](./records_mover/creds/), which manages credentials and
other connection details.
* [pandas](./records_mover/pandas/), which adds functionality on top
of the Pandas data science framework.
* [airflow](./records_mover/airflow/), which helps interface parts
of this library to DAGS running under Airflow.
* [utils](./records_mover/utils/), which is the usual junk drawer of
things that haven't grown enough mass to be exported into their own
package.

Things either labeled private with a prefix of `_` aren't stable
interfaces - they can change rapidly.

If you need access to another function/class, please submit an issue
or a PR make it public. That PR is a good opportunity to talk about
what changes we want to make to the public interface before we make
one--it's a lot harder to change later!

## Development

### Installing development tools

```bash
./deps.sh # uses pyenv and pyenv-virtualenv
```

### Unit testing

To run the tests in your local pyenv:

```bash
make test
```

### Automated integration testing

All of our integration tests use the `itest` script can can be provided
with the `--docker` flag to run inside docker.

To see details on the tests available, run:

```sh
./itest --help
```

To run all of the test suite locally (takes about 30 minutes):

```sh
./itest all
```

To run the same suite with mover itself in a Docker image:

```sh
./itest --docker all
```

### Common issues with integration tests

```vertica
(vertica_python.errors.InsufficientResources) Severity: b'ERROR', Message: b'Insufficient resources to execute plan on pool general [Request Too Large:Memory(KB) Exceeded: Requested = 5254281, Free = 1369370 (Limit = 1377562, Used = 8192)]', Sqlstate: b'53000', Routine: b'Exec_compilePlan', File: b'/scratch_a/release/svrtar2409/vbuild/vertica/Dist/Dist.cpp', Line: b'1540', Error Code: b'3587', SQL: " SELECT S3EXPORT( * USING PARAMETERS url='s3://vince-scratch/PA6ViIBMMWk/records.csv', chunksize=5368709120, to_charset='UTF8', delimiter='\x01', record_terminator='\x02') OVER(PARTITION BEST) FROM public.test_table1 "
```

Try expanding your Docker for Mac memory size to 8G. Vertica is
memory intensive, even under Docker.

### Manual integration testing

There's also a manual records schema JSON functionality
[torture test](tests/integration/table2table/TORTURE.md) available to run -
this may be handy after making large-scale refactors of the records
schema JSON code or when adding load/unload support to a new database
type.

### Semantic versioning

In this house, we use [semantic versioning](http://semver.org) to indicate
when we make breaking changes to interfaces. If you don't want to live
dangerously, and you are currently using version a.y.z (see setup.py to see
what version we're at) specify your requirement like this in requirements.txt:

records_mover>=a.x.y,<b

This will make sure you don't get automatically updated into the next
breaking change.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ records = session.records
#
# You can instead call session.get_db_engine('cred name').
#
# On your laptop, 'cred name' is the same thing passed to dbcli (mapping to something in LastPass).
# On your laptop, 'cred name' is the same thing passed to dbcli (mapping to
# something in your db-facts config).
#
# In Airflow, 'cred name' maps to the connection ID in the admin Connnections UI.
#
Expand Down
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.7300
93.6400
2 changes: 1 addition & 1 deletion metrics/flake8_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
214
208
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
89.0100
89.1300
3 changes: 3 additions & 0 deletions records_mover/airflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__all__ = ["hooks"]

from . import hooks
7 changes: 7 additions & 0 deletions records_mover/airflow/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
__all__ = [
"RecordsHook",
"SqlAlchemyDbHook",
]

from .sqlalchemy_db_hook import SqlAlchemyDbHook
from .records_hook import RecordsHook
3 changes: 2 additions & 1 deletion records_mover/airflow/hooks/records_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from urllib.parse import urlparse
from airflow.exceptions import AirflowException
from records_mover.records.records import Records
from records_mover.db import db_driver, DBDriver
from records_mover.db.factory import db_driver
from records_mover.db import DBDriver
from records_mover.url.resolver import UrlResolver
from airflow.hooks import BaseHook
from airflow.contrib.hooks.aws_hook import AwsHook
Expand Down
22 changes: 22 additions & 0 deletions records_mover/airflow/hooks/sqlalchemy_db_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import sqlalchemy as sa
from records_mover.db import create_sqlalchemy_url
from airflow.hooks import BaseHook


class SqlAlchemyDbHook(BaseHook):
def __init__(self, db_conn_id):
self.db_conn_id = db_conn_id

def get_conn(self):
conn = BaseHook.get_connection(self.db_conn_id)
db_url = create_sqlalchemy_url(
{
'host': conn.host,
'port': str(conn.port),
'database': conn.schema,
'user': conn.login,
'password': conn.password,
'type': conn.extra_dejson.get('type', conn.conn_type.lower()),
}
)
return sa.create_engine(db_url)
8 changes: 7 additions & 1 deletion records_mover/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
__all__ = [
'DBDriver',
'LoadError',
'create_sqlalchemy_url',
]

from .driver import DBDriver # noqa
from .factory import db_driver # noqa
from .errors import LoadError # noqa
from .connect import create_sqlalchemy_url
15 changes: 8 additions & 7 deletions records_mover/db/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
}


def create_vertica_odbc_db_url(db_facts: DBFacts) -> str:
def create_vertica_odbc_sqlalchemy_url(db_facts: DBFacts) -> str:
# Vertica wants the port in its ODBC connect string as a separate
# parameter called "Port":
#
Expand Down Expand Up @@ -53,7 +53,7 @@ def create_vertica_odbc_db_url(db_facts: DBFacts) -> str:
return "vertica+pyodbc:///?odbc_connect={}".format(db_url)


def create_bigquery_db_url(db_facts: DBFacts) -> str:
def create_bigquery_sqlalchemy_url(db_facts: DBFacts) -> str:
"Create URL compatible with https://github.com/mxmzdlv/pybigquery"

default_project_id = db_facts.get('bq_default_project_id')
Expand All @@ -75,11 +75,12 @@ def create_bigquery_db_engine(db_facts: DBFacts) -> sa.engine.Engine:
logger.info(f"Logging into BigQuery as {credentials_info['client_email']}")
else:
logger.info("Found no service account info for BigQuery, using local creds")
url = create_bigquery_db_url(db_facts)
url = create_bigquery_sqlalchemy_url(db_facts)
return sa.engine.create_engine(url, credentials_info=credentials_info)


def create_db_url(db_facts: DBFacts, prefer_odbc: bool=False) -> Union[str, sa.engine.url.URL]:
def create_sqlalchemy_url(db_facts: DBFacts,
prefer_odbc: bool=False) -> Union[str, sa.engine.url.URL]:
db_type = canonicalize_db_type(db_facts['type'])
driver = db_driver_for_type.get(db_type, db_type)
if prefer_odbc:
Expand All @@ -88,13 +89,13 @@ def create_db_url(db_facts: DBFacts, prefer_odbc: bool=False) -> Union[str, sa.e
# still using 'username'
username = db_facts.get('username', db_facts.get('user')) # type: ignore
if driver == 'vertica+pyodbc':
return create_vertica_odbc_db_url(db_facts)
return create_vertica_odbc_sqlalchemy_url(db_facts)
elif driver == 'bigquery':
if 'bq_service_account_json' in db_facts:
raise NotImplementedError("pybigquery does not support providing credentials info "
"(service account JSON) directly")

return create_bigquery_db_url(db_facts)
return create_bigquery_sqlalchemy_url(db_facts)
else:
return sa.engine.url.URL(drivername=driver,
username=username,
Expand All @@ -118,5 +119,5 @@ def engine_from_db_facts(db_facts: DBFacts) -> sa.engine.Engine:
# use create_engine() instead of creating a URL just in case.
return create_bigquery_db_engine(db_facts)
else:
db_url = create_db_url(db_facts)
db_url = create_sqlalchemy_url(db_facts)
return sa.create_engine(db_url)
31 changes: 1 addition & 30 deletions records_mover/pandas/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import json
import numpy as np
from ..utils.structures import map_keys, snake_to_camel, nest_dict
from pandas import DataFrame
from typing import List, Dict, Any
from typing import Any


# http://stackoverflow.com/questions/27050108/convert-numpy-type-to-python
Expand All @@ -18,34 +17,6 @@ def default(self, obj):
return super(NumPyJSONEncoder, self).default(obj)


def dataframe_to_nested_dicts(df: DataFrame,
to_camel: bool=False) -> List[Dict[str, Any]]:
"""
This turns database results (expressed as a pandas dataframe) into
potentially-nested dicts. It uses a '.' in the column names
as hints to nest.
e.g., the dataframe created from this query result:
+-----+---------+--------------+
| abc | foo.bar | foo.baz.bing |
+-----+---------+--------------+
| 1 | 5 | 'bazzle' |
+-----+---------+--------------+
results in this dict:
{'abc': 1, 'foo': {'bar': 5, 'baz': {'bing': 'bazzle'}}}
"""
# 'records' output is like:
# [{"col1": 123, "col2": "abc"}, {"col1": 456, "col2": "xyz"}]
records = df.to_dict(orient='records')
if to_camel:
records = map(lambda d: map_keys(snake_to_camel, d), records)
records = map(nest_dict, records)
return list(records)


def json_dumps(item: str) -> Any:
return json.dumps(item, cls=NumPyJSONEncoder)

Expand Down
66 changes: 0 additions & 66 deletions records_mover/pandas/sparsedf.py

This file was deleted.

17 changes: 16 additions & 1 deletion records_mover/records/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
__all__ = [
'RecordsHints',
'BootstrappingRecordsHints',
'RecordsFormatType',
'RecordsSchema',
'RecordsFormat',
'DelimitedRecordsFormat',
'ParquetRecordsFormat',
'ProcessingInstructions',
'ExistingTableHandling',
'Records',
]

from .types import RecordsHints, BootstrappingRecordsHints, RecordsFormatType # noqa
from .schema import RecordsSchema # noqa
from .records_format import RecordsFormat # noqa
from .records_format import RecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat # noqa
from .processing_instructions import ProcessingInstructions # noqa
from .existing_table_handling import ExistingTableHandling # noqa
from .records import Records # noqa
2 changes: 1 addition & 1 deletion records_mover/records/job/mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def run_records_mover_job(source_method_name: str,
source_method = getattr(session.records.sources, source_method_name)
target_method = getattr(session.records.targets, target_method_name)
logger.info('Starting...')
records = session.records

source_kwargs = config_to_args(config=config['source'],
method=source_method,
Expand All @@ -32,6 +31,7 @@ def run_records_mover_job(source_method_name: str,
session=session)
processing_instructions = ProcessingInstructions(**pi_kwargs)

records = session.records
source = source_method(**source_kwargs)
target = target_method(**target_kwargs)
return records.move(source, target, processing_instructions)
Expand Down
Loading

0 comments on commit 972bc10

Please sign in to comment.