From 972bc10b35d67a18538412174478c407cf049fc1 Mon Sep 17 00:00:00 2001 From: vinceatbluelabs Date: Mon, 2 Mar 2020 20:35:47 -0500 Subject: [PATCH] Initial public interface changes (#28) * Move Records objects exports to records package * Define public interface for db package * Add SqlAlchemyDbHook for more Airflow support * Remove utility methods not used by records-mover itself * Bump feature version * Add MAINT.md * Ratchet mypy coverage * Ratchet flake8 * Export constructors for records formats * Move integration tests to new interface * Ratchet mypy coverage * Explicitly offer Airflow hooks as public interface * Explicitly offer Airflow hooks as public interface * Add a test of public interface from internal uses --- MAINT.md | 96 +++++++++++++++++++ README.md | 3 +- metrics/coverage_high_water_mark | 2 +- metrics/flake8_high_water_mark | 2 +- metrics/mypy_high_water_mark | 2 +- records_mover/airflow/__init__.py | 3 + records_mover/airflow/hooks/__init__.py | 7 ++ records_mover/airflow/hooks/records_hook.py | 3 +- .../airflow/hooks/sqlalchemy_db_hook.py | 22 +++++ records_mover/db/__init__.py | 8 +- records_mover/db/connect.py | 15 +-- records_mover/pandas/__init__.py | 31 +----- records_mover/pandas/sparsedf.py | 66 ------------- records_mover/records/__init__.py | 17 +++- records_mover/records/job/mover.py | 2 +- records_mover/records/records.py | 29 +----- records_mover/session.py | 3 +- records_mover/utils/structures.py | 19 +--- records_mover/version.py | 2 +- .../multi_db/test_records_table2table.py | 5 +- .../records/single_db/test_records_load.py | 11 +-- .../records/single_db/test_records_numeric.py | 15 +-- .../records/single_db/test_records_save_df.py | 12 ++- .../records/single_db/test_records_unload.py | 10 +- tests/unit/db/test_connect.py | 4 +- .../unit/db/test_sqlalchemy_driver_picking.py | 16 ++-- tests/unit/pandas/test_pandas.py | 42 -------- tests/unit/pandas/test_pandas_sparse_df.py | 55 ----------- tests/unit/records/test_records.py | 13 --- tests/unit/records/test_records_infer.py | 16 +++- tests/unit/resources/test_config.yml | 36 ------- tests/unit/test_public_interface.py | 5 + 32 files changed, 230 insertions(+), 342 deletions(-) create mode 100644 MAINT.md create mode 100644 records_mover/airflow/hooks/sqlalchemy_db_hook.py delete mode 100644 records_mover/pandas/sparsedf.py delete mode 100644 tests/unit/pandas/test_pandas_sparse_df.py delete mode 100644 tests/unit/records/test_records.py delete mode 100644 tests/unit/resources/test_config.yml create mode 100644 tests/unit/test_public_interface.py diff --git a/MAINT.md b/MAINT.md new file mode 100644 index 000000000..7ee141550 --- /dev/null +++ b/MAINT.md @@ -0,0 +1,96 @@ +# Maintenance + +Packages inside include: + +* [records](./records_mover/records/), which is the core API you + can use to move relational data from one place to another. +* [url](./records_mover/url/), which offers some abstractions + across different filesystem-like things (e.g., S3/HTTP/local + filesystems, maybe SFTP in the future) +* [db](./records_mover/db/), which adds some functionality on top of + SQLAlchemy for various different database types. +* [creds](./records_mover/creds/), which manages credentials and + other connection details. +* [pandas](./records_mover/pandas/), which adds functionality on top + of the Pandas data science framework. +* [airflow](./records_mover/airflow/), which helps interface parts + of this library to DAGS running under Airflow. +* [utils](./records_mover/utils/), which is the usual junk drawer of + things that haven't grown enough mass to be exported into their own + package. + +Things either labeled private with a prefix of `_` aren't stable +interfaces - they can change rapidly. + +If you need access to another function/class, please submit an issue +or a PR make it public. That PR is a good opportunity to talk about +what changes we want to make to the public interface before we make +one--it's a lot harder to change later! + +## Development + +### Installing development tools + +```bash +./deps.sh # uses pyenv and pyenv-virtualenv +``` + +### Unit testing + +To run the tests in your local pyenv: + +```bash +make test +``` + +### Automated integration testing + +All of our integration tests use the `itest` script can can be provided +with the `--docker` flag to run inside docker. + +To see details on the tests available, run: + + ```sh + ./itest --help + ``` + +To run all of the test suite locally (takes about 30 minutes): + + ```sh + ./itest all + ``` + +To run the same suite with mover itself in a Docker image: + + ```sh + ./itest --docker all + ``` + +### Common issues with integration tests + +```vertica +(vertica_python.errors.InsufficientResources) Severity: b'ERROR', Message: b'Insufficient resources to execute plan on pool general [Request Too Large:Memory(KB) Exceeded: Requested = 5254281, Free = 1369370 (Limit = 1377562, Used = 8192)]', Sqlstate: b'53000', Routine: b'Exec_compilePlan', File: b'/scratch_a/release/svrtar2409/vbuild/vertica/Dist/Dist.cpp', Line: b'1540', Error Code: b'3587', SQL: " SELECT S3EXPORT( * USING PARAMETERS url='s3://vince-scratch/PA6ViIBMMWk/records.csv', chunksize=5368709120, to_charset='UTF8', delimiter='\x01', record_terminator='\x02') OVER(PARTITION BEST) FROM public.test_table1 " +``` + +Try expanding your Docker for Mac memory size to 8G. Vertica is +memory intensive, even under Docker. + +### Manual integration testing + +There's also a manual records schema JSON functionality +[torture test](tests/integration/table2table/TORTURE.md) available to run - +this may be handy after making large-scale refactors of the records +schema JSON code or when adding load/unload support to a new database +type. + +### Semantic versioning + +In this house, we use [semantic versioning](http://semver.org) to indicate +when we make breaking changes to interfaces. If you don't want to live +dangerously, and you are currently using version a.y.z (see setup.py to see +what version we're at) specify your requirement like this in requirements.txt: + +records_mover>=a.x.y, str: +def create_vertica_odbc_sqlalchemy_url(db_facts: DBFacts) -> str: # Vertica wants the port in its ODBC connect string as a separate # parameter called "Port": # @@ -53,7 +53,7 @@ def create_vertica_odbc_db_url(db_facts: DBFacts) -> str: return "vertica+pyodbc:///?odbc_connect={}".format(db_url) -def create_bigquery_db_url(db_facts: DBFacts) -> str: +def create_bigquery_sqlalchemy_url(db_facts: DBFacts) -> str: "Create URL compatible with https://github.com/mxmzdlv/pybigquery" default_project_id = db_facts.get('bq_default_project_id') @@ -75,11 +75,12 @@ def create_bigquery_db_engine(db_facts: DBFacts) -> sa.engine.Engine: logger.info(f"Logging into BigQuery as {credentials_info['client_email']}") else: logger.info("Found no service account info for BigQuery, using local creds") - url = create_bigquery_db_url(db_facts) + url = create_bigquery_sqlalchemy_url(db_facts) return sa.engine.create_engine(url, credentials_info=credentials_info) -def create_db_url(db_facts: DBFacts, prefer_odbc: bool=False) -> Union[str, sa.engine.url.URL]: +def create_sqlalchemy_url(db_facts: DBFacts, + prefer_odbc: bool=False) -> Union[str, sa.engine.url.URL]: db_type = canonicalize_db_type(db_facts['type']) driver = db_driver_for_type.get(db_type, db_type) if prefer_odbc: @@ -88,13 +89,13 @@ def create_db_url(db_facts: DBFacts, prefer_odbc: bool=False) -> Union[str, sa.e # still using 'username' username = db_facts.get('username', db_facts.get('user')) # type: ignore if driver == 'vertica+pyodbc': - return create_vertica_odbc_db_url(db_facts) + return create_vertica_odbc_sqlalchemy_url(db_facts) elif driver == 'bigquery': if 'bq_service_account_json' in db_facts: raise NotImplementedError("pybigquery does not support providing credentials info " "(service account JSON) directly") - return create_bigquery_db_url(db_facts) + return create_bigquery_sqlalchemy_url(db_facts) else: return sa.engine.url.URL(drivername=driver, username=username, @@ -118,5 +119,5 @@ def engine_from_db_facts(db_facts: DBFacts) -> sa.engine.Engine: # use create_engine() instead of creating a URL just in case. return create_bigquery_db_engine(db_facts) else: - db_url = create_db_url(db_facts) + db_url = create_sqlalchemy_url(db_facts) return sa.create_engine(db_url) diff --git a/records_mover/pandas/__init__.py b/records_mover/pandas/__init__.py index 6921d53a4..8133a99ff 100644 --- a/records_mover/pandas/__init__.py +++ b/records_mover/pandas/__init__.py @@ -1,8 +1,7 @@ import json import numpy as np -from ..utils.structures import map_keys, snake_to_camel, nest_dict from pandas import DataFrame -from typing import List, Dict, Any +from typing import Any # http://stackoverflow.com/questions/27050108/convert-numpy-type-to-python @@ -18,34 +17,6 @@ def default(self, obj): return super(NumPyJSONEncoder, self).default(obj) -def dataframe_to_nested_dicts(df: DataFrame, - to_camel: bool=False) -> List[Dict[str, Any]]: - """ - This turns database results (expressed as a pandas dataframe) into - potentially-nested dicts. It uses a '.' in the column names - as hints to nest. - - e.g., the dataframe created from this query result: - - +-----+---------+--------------+ - | abc | foo.bar | foo.baz.bing | - +-----+---------+--------------+ - | 1 | 5 | 'bazzle' | - +-----+---------+--------------+ - - results in this dict: - - {'abc': 1, 'foo': {'bar': 5, 'baz': {'bing': 'bazzle'}}} - """ - # 'records' output is like: - # [{"col1": 123, "col2": "abc"}, {"col1": 456, "col2": "xyz"}] - records = df.to_dict(orient='records') - if to_camel: - records = map(lambda d: map_keys(snake_to_camel, d), records) - records = map(nest_dict, records) - return list(records) - - def json_dumps(item: str) -> Any: return json.dumps(item, cls=NumPyJSONEncoder) diff --git a/records_mover/pandas/sparsedf.py b/records_mover/pandas/sparsedf.py deleted file mode 100644 index 8214b9755..000000000 --- a/records_mover/pandas/sparsedf.py +++ /dev/null @@ -1,66 +0,0 @@ -from pandas import Series, isnull, DataFrame -from math import ceil -import json -from typing import List - - -def compress_sparse_dataframe_columns(df: DataFrame, - min_required_populated_ratio: float=0.25, - mandatory: List[str]=[]): - """Compresses a sparse dataframe by throwing sparse columns into a - single column containing a dict representing the sparse values. - - min_required_populated_ratio: What ratio of rows must provide this - data element for us to grace it with - a column automatically. - - mandatory: Which columns should be provided always, even if - there's almost no data for it. - - df: - a b c d e f - 0 0 NaN NaN NaN NaN NaN - 1 1 1.0 NaN NaN NaN NaN - 2 2 NaN 2.0 NaN NaN NaN - 3 3 NaN NaN 3.0 NaN NaN - 4 4 NaN NaN NaN 4.0 NaN - 5 5 NaN NaN NaN NaN 5.0 - - compressed_df: - a compressed - 0 0 {} - 1 1 {'b': 1.0} - 2 2 {'c': 2.0} - 3 3 {'d': 3.0} - 4 4 {'e': 4.0} - 5 5 {'f': 5.0} - - - After saving dataframe to Redshift: - - analytics=> select * from vbroz.compressed_test - where json_extract_path_text(compressed, 'c') <> ''; - a | compressed - ---+------------ - 2 | {"c": 2.0} - (1 row) - - """ - min_rows_to_save = ceil(min_required_populated_ratio * len(df.index)) - core_df = df.dropna(axis=1, how='all', thresh=min_rows_to_save) - save_these_columns = set(mandatory) - set(core_df) - save_these_series = {col: df[col] for col in save_these_columns} - core_df = core_df.assign(**save_these_series) - - kept_columns = set(core_df) - compressed_df = df.drop(kept_columns, 1) - wordy_compressed_dicts = compressed_df.to_dict(orient='records') - - def remove_null_keys(d): - return {k: v for k, v in d.items() if not isnull(v)} - - compressed_dicts = list(map(remove_null_keys, wordy_compressed_dicts)) - compressed_json = list(map(lambda d: json.dumps(d), compressed_dicts)) - - return core_df.assign(compressed=Series(compressed_json, - index=core_df.index)) diff --git a/records_mover/records/__init__.py b/records_mover/records/__init__.py index 2beb36adf..a7d444dca 100644 --- a/records_mover/records/__init__.py +++ b/records_mover/records/__init__.py @@ -1,4 +1,19 @@ +__all__ = [ + 'RecordsHints', + 'BootstrappingRecordsHints', + 'RecordsFormatType', + 'RecordsSchema', + 'RecordsFormat', + 'DelimitedRecordsFormat', + 'ParquetRecordsFormat', + 'ProcessingInstructions', + 'ExistingTableHandling', + 'Records', +] + from .types import RecordsHints, BootstrappingRecordsHints, RecordsFormatType # noqa from .schema import RecordsSchema # noqa -from .records_format import RecordsFormat # noqa +from .records_format import RecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat # noqa +from .processing_instructions import ProcessingInstructions # noqa +from .existing_table_handling import ExistingTableHandling # noqa from .records import Records # noqa diff --git a/records_mover/records/job/mover.py b/records_mover/records/job/mover.py index 4237b93cd..88041dd10 100644 --- a/records_mover/records/job/mover.py +++ b/records_mover/records/job/mover.py @@ -18,7 +18,6 @@ def run_records_mover_job(source_method_name: str, source_method = getattr(session.records.sources, source_method_name) target_method = getattr(session.records.targets, target_method_name) logger.info('Starting...') - records = session.records source_kwargs = config_to_args(config=config['source'], method=source_method, @@ -32,6 +31,7 @@ def run_records_mover_job(source_method_name: str, session=session) processing_instructions = ProcessingInstructions(**pi_kwargs) + records = session.records source = source_method(**source_kwargs) target = target_method(**target_kwargs) return records.move(source, target, processing_instructions) diff --git a/records_mover/records/records.py b/records_mover/records/records.py index ce28c4da7..3f6d2d65f 100644 --- a/records_mover/records/records.py +++ b/records_mover/records/records.py @@ -1,22 +1,15 @@ -from sqlalchemy import MetaData from sqlalchemy.engine import Engine, Connection -from .processing_instructions import ProcessingInstructions -from .existing_table_handling import ExistingTableHandling import logging -from .unload_plan import RecordsUnloadPlan -from .records_format import RecordsFormat -from ..db import DBDriver, LoadError +from ..db import DBDriver from ..url.resolver import UrlResolver -from enum import Enum -from .mover import move -from .types import RecordsFormatType from .sources import RecordsSources from .targets import RecordsTargets -from typing import Callable, Optional, Union, TYPE_CHECKING +from .mover import move +from enum import Enum +from typing import Callable, Union, TYPE_CHECKING if TYPE_CHECKING: from records_mover import Session # noqa - logger = logging.getLogger(__name__) @@ -42,15 +35,7 @@ def __init__(self, db_driver = session.db_driver if url_resolver is PleaseInfer.token: url_resolver = session.url_resolver - self.meta = MetaData() - self.db_driver = db_driver - self.url_resolver = url_resolver self.move = move - self.RecordsFormat = RecordsFormat - self.RecordsUnloadPlan = RecordsUnloadPlan - self.ProcessingInstructions = ProcessingInstructions - self.ExistingTableHandling = ExistingTableHandling - self.LoadError = LoadError self.sources = RecordsSources(db_driver=db_driver, url_resolver=url_resolver) self.targets = RecordsTargets(url_resolver=url_resolver, @@ -71,9 +56,3 @@ def __init__(self, db_engine=db_engine) results = records.move(source, target) """ - - def best_records_format_variant(self, - records_format_type: RecordsFormatType, - db_engine: Engine) -> Optional[str]: - driver = self.db_driver(db_engine) - return driver.best_records_format_variant(records_format_type) diff --git a/records_mover/session.py b/records_mover/session.py index 04777d409..d39a08624 100644 --- a/records_mover/session.py +++ b/records_mover/session.py @@ -2,7 +2,8 @@ from sqlalchemy.engine import Engine import boto3 from .records.records import Records -from .db import db_driver, DBDriver +from .db.factory import db_driver +from .db import DBDriver from .url.base import BaseFileUrl, BaseDirectoryUrl import sqlalchemy from typing import Union, Optional diff --git a/records_mover/utils/structures.py b/records_mover/utils/structures.py index 2206e08b8..4b242798d 100644 --- a/records_mover/utils/structures.py +++ b/records_mover/utils/structures.py @@ -1,21 +1,4 @@ -from typing import Dict, Any, TypeVar, Callable, List, Union - - -# Thank you, StackOverflow! -def snake_to_camel(snake_str: str) -> str: - components = snake_str.split('_') - # We capitalize the first letter of each component except the first one - # with the 'capitalize' method and join them together. - return components[0] + "".join(x.capitalize() for x in components[1:]) - - -A = TypeVar('A') -B = TypeVar('B') -C = TypeVar('C') - - -def map_keys(f: Callable[[A], B], dict_to_convert: Dict[A, C]) -> Dict[B, C]: - return {f(name): val for name, val in dict_to_convert.items()} +from typing import Dict, Any, TypeVar, List, Union V = TypeVar('V') diff --git a/records_mover/version.py b/records_mover/version.py index 020ed73d7..0404d8103 100644 --- a/records_mover/version.py +++ b/records_mover/version.py @@ -1 +1 @@ -__version__ = '0.2.2' +__version__ = '0.3.0' diff --git a/tests/integration/records/multi_db/test_records_table2table.py b/tests/integration/records/multi_db/test_records_table2table.py index 9cf348d60..1feb6eab6 100644 --- a/tests/integration/records/multi_db/test_records_table2table.py +++ b/tests/integration/records/multi_db/test_records_table2table.py @@ -1,6 +1,6 @@ from records_mover.db.quoting import quote_schema_and_table from records_mover import Session -from records_mover.records.existing_table_handling import ExistingTableHandling +from records_mover.records import ExistingTableHandling import logging import time import unittest @@ -58,7 +58,6 @@ def move_and_verify(self, source_dbname, target_dbname, variant_used_internally) records = session.records targets = records.targets sources = records.sources - move = records.move source_engine = session.get_db_engine(source_dbname) target_engine = session.get_db_engine(target_dbname) source_schema_name = schema_name(source_dbname) @@ -78,7 +77,7 @@ def move_and_verify(self, source_dbname, target_dbname, variant_used_internally) table_name=TARGET_TABLE_NAME, db_engine=target_engine, existing_table_handling=existing) - out = move(source, target) + out = records.move(source, target) # redshift doesn't give reliable info on load results, so this # will be None or 1 self.assertNotEqual(0, out.move_count) diff --git a/tests/integration/records/single_db/test_records_load.py b/tests/integration/records/single_db/test_records_load.py index 56963e63b..611dbca8c 100644 --- a/tests/integration/records/single_db/test_records_load.py +++ b/tests/integration/records/single_db/test_records_load.py @@ -3,7 +3,7 @@ from .base_records_test import BaseRecordsIntegrationTest from ..table_validator import RecordsTableValidator -from records_mover.records.schema import RecordsSchema +from records_mover.records import RecordsSchema, RecordsFormat logger = logging.getLogger(__name__) @@ -103,9 +103,9 @@ def load(self, format_type, variant, hints={}, broken=False, sourcefn=None): if sourcefn is None: sourcefn = self.local_source - records_format = self.records.RecordsFormat(format_type=format_type, - variant=variant, - hints=hints) + records_format = RecordsFormat(format_type=format_type, + variant=variant, + hints=hints) # # CSV type inference is not smart enough to identify the # date/time columns as anything but strings yet. @@ -130,7 +130,6 @@ def load(self, format_type, variant, hints={}, broken=False, sourcefn=None): records_schema = RecordsSchema.from_json(f.read()) targets = self.records.targets - move = self.records.move filename = self.records_filename(format_type, variant, hints, broken=broken) logger.info(f"Testing load from {filename}") @@ -140,7 +139,7 @@ def load(self, format_type, variant, hints={}, broken=False, sourcefn=None): targets.table(schema_name=self.schema_name, table_name=self.table_name, db_engine=self.engine) as target: - out = move(source, target) + out = self.records.move(source, target) if not self.gives_exact_load_count(): self.assertIsNone(out.move_count) else: diff --git a/tests/integration/records/single_db/test_records_numeric.py b/tests/integration/records/single_db/test_records_numeric.py index 4864e48ef..84485d482 100644 --- a/tests/integration/records/single_db/test_records_numeric.py +++ b/tests/integration/records/single_db/test_records_numeric.py @@ -6,7 +6,9 @@ import json import pathlib from .numeric_expectations import expected_field_info, expected_column_types -from records_mover.records.schema import RecordsSchema +from records_mover.records import ( + RecordsSchema, DelimitedRecordsFormat, ProcessingInstructions +) from ..records_numeric_database_fixture import RecordsNumericDatabaseFixture logger = logging.getLogger(__name__) @@ -53,8 +55,8 @@ def test_numeric_schema_fields_created(self): self.numeric_fixture.bring_up() with tempfile.TemporaryDirectory(prefix='test_records_numeric_schema') as tempdir: output_url = pathlib.Path(tempdir).resolve().as_uri() + '/' - records_format = self.records.RecordsFormat() - processing_instructions = self.records.ProcessingInstructions() + records_format = DelimitedRecordsFormat() + processing_instructions = ProcessingInstructions() source = self.records.sources.table(schema_name=self.schema_name, table_name=self.table_name, db_engine=self.engine) @@ -81,18 +83,17 @@ def validate_table(self): def test_numeric_database_columns_created(self): records_schema = RecordsSchema.from_data(example_numeric_records_schema) - processing_instructions = self.records.ProcessingInstructions() + processing_instructions = ProcessingInstructions() preferred_records_format = { 'redshift': 'bluelabs', 'bigquery': 'bigquery', 'vertica': 'vertica', 'postgresql': 'bluelabs', } + records_format = DelimitedRecordsFormat(variant=preferred_records_format[self.engine.name]) source = self.records.sources.\ local_file('/dev/null', - records_format=self.records. - RecordsFormat(format_type='delimited', - variant=preferred_records_format[self.engine.name]), + records_format=records_format, records_schema=records_schema) target = self.records.targets.table(schema_name=self.schema_name, table_name=self.table_name, diff --git a/tests/integration/records/single_db/test_records_save_df.py b/tests/integration/records/single_db/test_records_save_df.py index ace07202d..ea5027b99 100644 --- a/tests/integration/records/single_db/test_records_save_df.py +++ b/tests/integration/records/single_db/test_records_save_df.py @@ -2,7 +2,9 @@ import logging from .base_records_test import BaseRecordsIntegrationTest from ..directory_validator import RecordsDirectoryValidator -from records_mover.records.schema import RecordsSchema +from records_mover.records import ( + RecordsSchema, DelimitedRecordsFormat, ProcessingInstructions +) import tempfile import pathlib import datetime @@ -21,7 +23,7 @@ def save_and_verify(self, records_format, processing_instructions=None): from pandas import DataFrame if processing_instructions is None: - processing_instructions = self.records.ProcessingInstructions() + processing_instructions = ProcessingInstructions() us_eastern = pytz.timezone('US/Eastern') df = DataFrame.from_dict([odict[ 'num': 123, @@ -64,15 +66,15 @@ def verify_records_directory(self, format_type, variant, tempdir, hints={}): def test_save_with_defaults(self): hints = {} - self.save_and_verify(records_format=self.records.RecordsFormat(hints=hints)) + self.save_and_verify(records_format=DelimitedRecordsFormat(hints=hints)) def test_save_csv_variant(self): - records_format = self.records.RecordsFormat(variant='csv') + records_format = DelimitedRecordsFormat(variant='csv') self.save_and_verify(records_format=records_format) def test_save_with_no_compression(self): hints = { 'compression': None, } - records_format = self.records.RecordsFormat(hints=hints) + records_format = DelimitedRecordsFormat(hints=hints) self.save_and_verify(records_format=records_format) diff --git a/tests/integration/records/single_db/test_records_unload.py b/tests/integration/records/single_db/test_records_unload.py index 7ccd032f7..f693e6371 100644 --- a/tests/integration/records/single_db/test_records_unload.py +++ b/tests/integration/records/single_db/test_records_unload.py @@ -4,6 +4,7 @@ from ..directory_validator import RecordsDirectoryValidator from ..records_database_fixture import RecordsDatabaseFixture from .base_records_test import BaseRecordsIntegrationTest +from records_mover.records import DelimitedRecordsFormat logger = logging.getLogger(__name__) @@ -56,21 +57,18 @@ def unload_and_verify(self, format_type, variant, hints={}): self.verify_records_directory(format_type, variant, tempdir, hints=hints) def unload(self, variant, directory, hints={}): - records_format = self.records.RecordsFormat(format_type='delimited', - variant=variant, - - hints=hints) + records_format = DelimitedRecordsFormat(variant=variant, + hints=hints) directory_url = pathlib.Path(directory).resolve().as_uri() + '/' targets = self.records.targets sources = self.records.sources - move = self.records.move source = sources.table(schema_name=self.schema_name, table_name=self.table_name, db_engine=self.engine) target = targets.directory_from_url(output_url=directory_url, records_format=records_format) - out = move(source, target) + out = self.records.move(source, target) self.assertTrue(out.move_count in [1, None]) def verify_records_directory(self, format_type, variant, tempdir, hints={}): diff --git a/tests/unit/db/test_connect.py b/tests/unit/db/test_connect.py index 73823ba55..c7e5d0ce1 100644 --- a/tests/unit/db/test_connect.py +++ b/tests/unit/db/test_connect.py @@ -42,7 +42,7 @@ def test_creating_bigquery_url(self, 'type': 'bigquery', 'bq_default_project_id': 'bluelabs-tools-dev', } - url = connect.create_db_url(db_facts) + url = connect.create_sqlalchemy_url(db_facts) self.assertEqual(url, 'bigquery://bluelabs-tools-dev') @patch('records_mover.db.connect.db_facts_from_lpass') @@ -57,7 +57,7 @@ def test_creating_bigquery_url_with_dataset(self, 'bq_default_project_id': 'bluelabs-tools-dev', 'bq_default_dataset_id': 'myfancydataset', } - url = connect.create_db_url(db_facts) + url = connect.create_sqlalchemy_url(db_facts) self.assertEqual(url, 'bigquery://bluelabs-tools-dev/myfancydataset') @patch('records_mover.db.connect.db_facts_from_lpass') diff --git a/tests/unit/db/test_sqlalchemy_driver_picking.py b/tests/unit/db/test_sqlalchemy_driver_picking.py index 6a43b4f1b..721d05d27 100644 --- a/tests/unit/db/test_sqlalchemy_driver_picking.py +++ b/tests/unit/db/test_sqlalchemy_driver_picking.py @@ -6,9 +6,9 @@ class TestSQLAlchemyDriverPicking(unittest.TestCase): @patch('records_mover.db.connect.db_facts_from_lpass') @patch('records_mover.db.connect.sa.create_engine') - def test_create_db_url(self, - mock_create_engine, - mock_db_facts_from_lpass): + def test_create_sqlalchemy_url(self, + mock_create_engine, + mock_db_facts_from_lpass): expected_mappings = { 'psql (redshift)': 'redshift://myuser:hunter1@myhost:123/analyticsdb', @@ -35,14 +35,14 @@ def test_create_db_url(self, 'port': 123, 'database': 'analyticsdb' } - actual_url = connect.create_db_url(db_facts) + actual_url = connect.create_sqlalchemy_url(db_facts) assert str(actual_url) == expected_url @patch('records_mover.db.connect.db_facts_from_lpass') @patch('records_mover.db.connect.sa.create_engine') - def test_create_db_url_odbc_preferred(self, - mock_create_engine, - mock_db_facts_from_lpass): + def test_create_sqlalchemy_url_odbc_preferred(self, + mock_create_engine, + mock_db_facts_from_lpass): expected_mappings = { 'psql (redshift)': 'redshift://myuser:hunter1@myhost:123/analyticsdb', @@ -71,5 +71,5 @@ def test_create_db_url_odbc_preferred(self, 'port': 123, 'database': 'analyticsdb' } - actual_url = connect.create_db_url(db_facts, prefer_odbc=True) + actual_url = connect.create_sqlalchemy_url(db_facts, prefer_odbc=True) assert str(actual_url) == expected_url diff --git a/tests/unit/pandas/test_pandas.py b/tests/unit/pandas/test_pandas.py index 6be969584..847d12ad9 100644 --- a/tests/unit/pandas/test_pandas.py +++ b/tests/unit/pandas/test_pandas.py @@ -1,51 +1,9 @@ -import pandas as pd import numpy as np import unittest from records_mover import pandas class TestPandas(unittest.TestCase): - def test_dataframe_to_nested_dicts(self): - data = np.array([(1, 5, 'bazzle')], - dtype=[ - ('abc', 'i4'), - ('foo.bar', 'i4'), - ('foo.baz.bing', 'U10') - ]) - - mock_dataframe = pd.DataFrame(data) - - expected_dicts = [ - {'abc': 1, 'foo': {'bar': 5, 'baz': {'bing': 'bazzle'}}} - ] - - self.assertEqual(pandas.dataframe_to_nested_dicts(mock_dataframe), - expected_dicts) - - def test_dataframe_to_nested_dicts_snake_to_camel(self): - data = np.array([(1, 5, 'bazzle', 'whatevs')], - dtype=[ - ('abc_def', 'i4'), - ('foo.bar', 'i4'), - ('foo.baz.bing_blong', 'U10'), - ('test_first.test_second.test_last_in_chain', - 'U10'), - ]) - - mock_dataframe = pd.DataFrame(data) - - expected_dicts = [ - { - 'abcDef': 1, - 'foo': {'bar': 5, 'baz': {'bingBlong': 'bazzle'}}, - 'testFirst': {'testSecond': {'testLastInChain': 'whatevs'}}, - }, - ] - - self.assertEqual(pandas.dataframe_to_nested_dicts(mock_dataframe, - to_camel=True), - expected_dicts) - def test_json_dumps(self): data = [ 'a', diff --git a/tests/unit/pandas/test_pandas_sparse_df.py b/tests/unit/pandas/test_pandas_sparse_df.py deleted file mode 100644 index bb91dbd40..000000000 --- a/tests/unit/pandas/test_pandas_sparse_df.py +++ /dev/null @@ -1,55 +0,0 @@ -from numpy import NaN -import unittest -from pandas import DataFrame -from pandas.testing import assert_frame_equal -from records_mover.pandas import sparsedf - - -class TestPandasSparseDataFrames(unittest.TestCase): - maxDiff = None - - def test_compress_sparse_dataframe_columns(self): - test_1 = [ - {"a": 0}, - {"a": 1, "b": 1}, - {"a": 2, "c": 2}, - {"a": 3, "d": 3}, - {"a": 4, "e": 4}, - {"a": 5, "f": 5}, - ] - - df = DataFrame.from_dict(test_1) - compressed_df = sparsedf.compress_sparse_dataframe_columns(df) - dicts = compressed_df.to_dict(orient='records') - self.maxDiff = None - self.assertListEqual(dicts, - [{"a": 0, "compressed": '{}'}, - {"a": 1, "compressed": '{"b": 1.0}'}, - {"a": 2, "compressed": '{"c": 2.0}'}, - {"a": 3, "compressed": '{"d": 3.0}'}, - {"a": 4, "compressed": '{"e": 4.0}'}, - {"a": 5, "compressed": '{"f": 5.0}'}]) - - def test_compress_sparse_dataframe_columns_with_mandatory(self): - test_1 = [ - {"a": 0}, - {"a": 1, "b": 1}, - {"a": 2, "c": 2}, - {"a": 3, "d": 3}, - {"a": 4, "e": 4}, - {"a": 5, "f": 5}, - ] - - df = DataFrame.from_dict(test_1) - compressed_df =\ - sparsedf.compress_sparse_dataframe_columns(df, - mandatory=["d", "e"]) - expected_dicts = \ - [{"a": 0, "d": NaN, "e": NaN, "compressed": '{}'}, - {"a": 1, "d": NaN, "e": NaN, "compressed": '{"b": 1.0}'}, - {"a": 2, "d": NaN, "e": NaN, "compressed": '{"c": 2.0}'}, - {"a": 3, "d": 3.0, "e": NaN, "compressed": '{}'}, - {"a": 4, "d": NaN, "e": 4.0, "compressed": '{}'}, - {"a": 5, "d": NaN, "e": NaN, "compressed": '{"f": 5.0}'}] - expected = DataFrame.from_dict(expected_dicts) - assert_frame_equal(compressed_df, expected, check_like=True) diff --git a/tests/unit/records/test_records.py b/tests/unit/records/test_records.py deleted file mode 100644 index 785def5a4..000000000 --- a/tests/unit/records/test_records.py +++ /dev/null @@ -1,13 +0,0 @@ -from mock import Mock, patch -from .base_test_records import BaseTestRecords - - -@patch('records_mover.records.sources.directory.RecordsDirectory') -class TestRecords(BaseTestRecords): - def test_best_records_format_variant(self, mock_RecordsDirectory): - mock_records_format_type = Mock(name='mock_records_format_type') - mock_db_engine = Mock(name='db_engine') - out = self.records.best_records_format_variant(mock_records_format_type, - mock_db_engine) - self.mock_db_driver.best_records_format_variant.assert_called_with(mock_records_format_type) - self.assertEqual(out, self.mock_db_driver.best_records_format_variant.return_value) diff --git a/tests/unit/records/test_records_infer.py b/tests/unit/records/test_records_infer.py index 4537a8bd3..3f43a3b08 100644 --- a/tests/unit/records/test_records_infer.py +++ b/tests/unit/records/test_records_infer.py @@ -6,8 +6,18 @@ class TestRecordsInfer(unittest.TestCase): @patch('records_mover.Session') @patch('records_mover.records.records.Records') - def test_record_infer(self, mock_Records, mock_Session): + @patch('records_mover.records.records.RecordsSources') + @patch('records_mover.records.records.RecordsTargets') + def test_record_infer(self, + mock_RecordsTargets, + mock_RecordsSources, + mock_Records, + mock_Session): mock_session = mock_Session.return_value records = Records() - self.assertEqual(records.db_driver, mock_session.db_driver) - self.assertEqual(records.url_resolver, mock_session.url_resolver) + self.assertEqual(records.sources, mock_RecordsSources.return_value) + self.assertEqual(records.targets, mock_RecordsTargets.return_value) + mock_RecordsTargets.assert_called_with(db_driver=mock_session.db_driver, + url_resolver=mock_session.url_resolver) + mock_RecordsSources.assert_called_with(db_driver=mock_session.db_driver, + url_resolver=mock_session.url_resolver) diff --git a/tests/unit/resources/test_config.yml b/tests/unit/resources/test_config.yml deleted file mode 100644 index e051177d7..000000000 --- a/tests/unit/resources/test_config.yml +++ /dev/null @@ -1,36 +0,0 @@ -file_available_pub: - arn: "arn:fake:file-available" - service_name: "sftp-poller" - method_name: "poll" - -sync_success_pub: - arn: "arn:fake:sync-success" - service_name: "sftp-poller" - method_name: "poll" - -sync_failure_pub: - arn: "arn:fake:sync-failure" - service_name: "sftp-poller" - method_name: "poll" - -hosts: - fake_ftp: - host: "ftp.example.com" - port: 666 - username: "jdoe" - password: "best_password1" - path: "/outgoing" - type: "sftp" - -db: - host: "sql.example.com" - port: "5439" - user: "dbadmin" - password: "tribute_to_best_password1" - database: "analytics" - table: "schema.manifest" - type: "redshift" - -urn: - sync: "urn:cms:sftp-poller" - fake_ftp: "urn:fake:fake_ftp" diff --git a/tests/unit/test_public_interface.py b/tests/unit/test_public_interface.py new file mode 100644 index 000000000..833815050 --- /dev/null +++ b/tests/unit/test_public_interface.py @@ -0,0 +1,5 @@ +from records_mover.airflow.hooks import records_hook # noqa +from records_mover import Session # noqa +from records_mover.records import ExistingTableHandling # noqa +from records_mover.records import DelimitedRecordsFormat # noqa +from records_mover.records import RecordsFormat # noqa