Skip to content

Commit

Permalink
Fix deprecation warnings (breaking) (#44)
Browse files Browse the repository at this point in the history
* Migrate to non-deprecated interface in sqlalchemy-redshift

* Migrate to non-deprecated interface in google-cloud-bigquery

* Remove deprecated dataframe() source factory method args (breaking)

* Change deprecated call to assertDictContainsSubset
  • Loading branch information
vinceatbluelabs authored Apr 19, 2020
1 parent a64ea93 commit 9aecda7
Show file tree
Hide file tree
Showing 17 changed files with 386 additions and 47 deletions.
2 changes: 1 addition & 1 deletion metrics/flake8_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
195
189
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
89.9900
90.1100
5 changes: 2 additions & 3 deletions records_mover/db/bigquery/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ def load_from_fileobj(self, schema: str, table: str,
# https://google-cloud.readthedocs.io/en/latest/bigquery/generated/google.cloud.bigquery.client.Client.html
client: Client = connection._client
project_id, dataset_id = self._parse_bigquery_schema_name(schema)
dataset_ref = client.dataset(dataset_id, project_id)
table_ref = dataset_ref.table(table)
# https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.LoadJobConfig.html

target_records_format = load_plan.records_format
Expand All @@ -61,8 +59,9 @@ def load_from_fileobj(self, schema: str, table: str,
complain_on_unhandled_hints(processing_instructions.fail_if_dont_understand,
unhandled_hints, target_records_format.hints)
logger.info(f"Using BigQuery load options: {job_config.to_api_repr()}")
# https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html#google.cloud.bigquery.client.Client.load_table_from_file
job = client.load_table_from_file(fileobj,
table_ref,
f"{schema}.{table}",
# Must match the destination dataset location.
location="US",
job_config=job_config)
Expand Down
2 changes: 1 addition & 1 deletion records_mover/db/redshift/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def load(self,
session_token=aws_creds.token, manifest=True,
region=directory.loc.region, # type: ignore
empty_as_null=True,
**redshift_options)
**redshift_options) # type: ignore
logger.info(f"Starting Redshift COPY from {directory}...")
self.db.execute(copy)
logger.info("Redshift COPY complete.")
Expand Down
26 changes: 20 additions & 6 deletions records_mover/db/redshift/records_copy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from ...utils import quiet_remove
from ...records.hints import cant_handle_hint
from sqlalchemy_redshift.commands import Format
from sqlalchemy_redshift.commands import Format, Encoding, Compression
from typing import Dict, Optional, Set
from ...records import RecordsHints

Expand All @@ -13,18 +13,32 @@ def redshift_copy_options(unhandled_hints: Set[str],
fail_if_row_invalid: bool,
max_failure_rows: Optional[int]) -> RedshiftCopyOptions:
redshift_options: RedshiftCopyOptions = {}
redshift_options['compression'] = hints['compression']
if hints['compression'] == 'GZIP':
redshift_options['compression'] = Compression.gzip
elif hints['compression'] == 'LZO':
redshift_options['compression'] = Compression.lzop
elif hints['compression'] == 'BZIP':
redshift_options['compression'] = Compression.bzip2
else:
cant_handle_hint(fail_if_cant_handle_hint, 'compression', hints)
redshift_options['compression'] = Compression(hints['compression'])
quiet_remove(unhandled_hints, 'compression')
if hints['dateformat'] is None:
redshift_options['date_format'] = 'auto'
else:
redshift_options['date_format'] = hints['dateformat']
quiet_remove(unhandled_hints, 'dateformat')
if hints['encoding'] not in ['UTF8', 'UTF16', 'UTF16LE', 'UTF16BE']:
cant_handle_hint(fail_if_cant_handle_hint, 'encoding', hints)
redshift_options['encoding'] = 'UTF8'
if hints['encoding'] == 'UTF8':
redshift_options['encoding'] = Encoding.utf8
elif hints['encoding'] == 'UTF16':
redshift_options['encoding'] = Encoding.utf16
elif hints['encoding'] == 'UTF16LE':
redshift_options['encoding'] = Encoding.utf16le
elif hints['encoding'] == 'UTF16BE':
redshift_options['encoding'] = Encoding.utf16be
else:
redshift_options['encoding'] = hints['encoding']
cant_handle_hint(fail_if_cant_handle_hint, 'encoding', hints)
redshift_options['encoding'] = Encoding(hints['encoding'])
quiet_remove(unhandled_hints, 'encoding')
redshift_options['quote'] = hints['quotechar']
quiet_remove(unhandled_hints, 'quotechar')
Expand Down
8 changes: 0 additions & 8 deletions records_mover/records/sources/factory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from ..deprecated import warn_deprecated
import pathlib
from contextlib import contextmanager
from ..records_format import BaseRecordsFormat
Expand Down Expand Up @@ -52,25 +51,18 @@ def __init__(self,
@contextmanager
def dataframe(self,
df: 'DataFrame',
schema_name: Optional[str]=None,
table_name: Optional[str]=None,
db_engine: Optional['Engine']=None,
processing_instructions: ProcessingInstructions=
ProcessingInstructions(),
records_schema: Optional[RecordsSchema]=None,
include_index: bool=False) -> Iterator['DataframesRecordsSource']:
"""
:param df: Pandas dataframe to move data from.
:param schema_name: Obsolete - not used.
:param table_name: Obsolete - not used.
:param db_engine: Obsolete - not used.
:param processing_instructions: Instructions used during creation of the schema SQL.
:param records_schema: Description of the column names and types of the records.
:param include_index: If true, the Pandas dataframe index column will be included in
the move.
"""

warn_deprecated(schema_name=schema_name, table_name=table_name, db_engine=db_engine)
from .dataframes import DataframesRecordsSource # noqa
yield DataframesRecordsSource(dfs=[df],
records_schema=records_schema,
Expand Down
16 changes: 4 additions & 12 deletions tests/unit/db/bigquery/test_bigquery_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_load_with_default_project(self, mock_load_job_config):
mock_file_url = mock_url_resolver.file_url
big_query_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver)
mock_schema = 'my_dataset'
mock_table = Mock(name='mock_table')
mock_table = 'my_table'
mock_load_plan = Mock(name='mock_load_plan')
mock_load_plan.records_format = Mock(name='records_format', spec=DelimitedRecordsFormat)
mock_target_records_format = mock_load_plan.records_format
Expand All @@ -47,21 +47,17 @@ def test_load_with_default_project(self, mock_load_job_config):

mock_connection = mock_db.engine.raw_connection.return_value.connection
mock_client = mock_connection._client
mock_dataset_ref = mock_client.dataset.return_value
mock_table_ref = mock_dataset_ref.table.return_value
mock_job = mock_client.load_table_from_file.return_value
mock_job.output_rows = 42
mock_loc = mock_file_url.return_value
mock_f = mock_loc.open.return_value.__enter__.return_value
out = big_query_loader.load(schema=mock_schema, table=mock_table,
load_plan=mock_load_plan,
directory=mock_directory)
mock_client.dataset.assert_called_with('my_dataset', None)
mock_dataset_ref.table.assert_called_with(mock_table)
mock_file_url.assert_called_with(mock_url)
mock_client.load_table_from_file.\
assert_called_with(mock_f,
mock_table_ref,
'my_dataset.my_table',
location="US",
job_config=mock_load_job_config.return_value)
mock_job.result.assert_called_with()
Expand All @@ -75,7 +71,7 @@ def test_load(self, mock_load_job_config):
mock_file_url = mock_url_resolver.file_url
big_query_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver)
mock_schema = 'my_project.my_dataset'
mock_table = Mock(name='mock_table')
mock_table = 'mytable'
mock_load_plan = Mock(name='mock_load_plan')
mock_load_plan.records_format = Mock(name='records_format', spec=DelimitedRecordsFormat)
mock_target_records_format = mock_load_plan.records_format
Expand All @@ -87,21 +83,17 @@ def test_load(self, mock_load_job_config):

mock_connection = mock_db.engine.raw_connection.return_value.connection
mock_client = mock_connection._client
mock_dataset_ref = mock_client.dataset.return_value
mock_table_ref = mock_dataset_ref.table.return_value
mock_job = mock_client.load_table_from_file.return_value
mock_job.output_rows = 42
mock_loc = mock_file_url.return_value
mock_f = mock_loc.open.return_value.__enter__.return_value
out = big_query_loader.load(schema=mock_schema, table=mock_table,
load_plan=mock_load_plan,
directory=mock_directory)
mock_client.dataset.assert_called_with('my_dataset', 'my_project')
mock_dataset_ref.table.assert_called_with(mock_table)
mock_file_url.assert_called_with(mock_url)
mock_client.load_table_from_file.\
assert_called_with(mock_f,
mock_table_ref,
'my_project.my_dataset.mytable',
location="US",
job_config=mock_load_job_config.return_value)
mock_job.result.assert_called_with()
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/db/redshift/test_records_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import unittest
from records_mover.db.redshift.records_copy import redshift_copy_options
from records_mover.records import DelimitedRecordsFormat
from sqlalchemy_redshift.commands import Encoding


class TestRecordsCopy(unittest.TestCase):
def test_redshift_copy_options_bad_compression_roll_with_it(self):
records_format =\
DelimitedRecordsFormat(variant='bluelabs',
hints={
'compression': 'somethingnew'
})
unhandled_hints = set(records_format.hints.keys())
# This isn't in the Enum...for now.
with self.assertRaises(ValueError):
redshift_copy_options(unhandled_hints,
records_format.hints,
fail_if_cant_handle_hint=False,
fail_if_row_invalid=True,
max_failure_rows=0)

def test_redshift_copy_options_encodings(self):
tests = {
'UTF16': Encoding.utf16,
'UTF16LE': Encoding.utf16le,
'UTF16BE': Encoding.utf16be
}
for hint_spelling, redshift_sqlalchemy_spelling in tests.items():

records_format =\
DelimitedRecordsFormat(variant='bluelabs',
hints={
'encoding': hint_spelling
})
unhandled_hints = set(records_format.hints.keys())
out = redshift_copy_options(unhandled_hints,
records_format.hints,
fail_if_cant_handle_hint=True,
fail_if_row_invalid=True,
max_failure_rows=0)
self.assertIs(out['encoding'], redshift_sqlalchemy_spelling)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .base_test_redshift_db_driver import BaseTestRedshiftDBDriver
from ...records.format_hints import bluelabs_format_hints
from records_mover.db.redshift.redshift_db_driver import Table
from sqlalchemy_redshift.commands import Encoding, Compression


class TestRedshiftDBDriverImportBlueLabs(BaseTestRedshiftDBDriver):
Expand All @@ -12,11 +13,11 @@ def test_load_bluelabs(self,

expected_args = {
'access_key_id': 'fake_aws_id',
'compression': 'GZIP',
'compression': Compression.gzip,
'data_location': 's3://mybucket/myparent/mychild/_manifest',
'date_format': 'YYYY-MM-DD',
'delimiter': ',',
'encoding': 'UTF8',
'encoding': Encoding.utf8,
'escape': True,
'ignore_header': 0,
'manifest': True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from records_mover.records.hints import logger as driver_logger
from ...records.format_hints import (christmas_tree_format_1_hints,
christmas_tree_format_2_hints)
from sqlalchemy_redshift.commands import Encoding, Compression


class TestRedshiftDBDriverImportBlueLabs(BaseTestRedshiftDBDriver):
Expand All @@ -20,10 +21,10 @@ def test_load_vertica_christmas_tree_unsupported_options_with_fast_warns_1(self,

expected_best_effort_args = {
'access_key_id': 'fake_aws_id',
'compression': 'LZO',
'compression': Compression.lzop,
'data_location': 's3://mybucket/myparent/mychild/_manifest',
'date_format': 'auto',
'encoding': 'UTF8',
'encoding': Encoding.utf8,
'delimiter': '\x01',
'escape': True,
'ignore_header': 1,
Expand Down Expand Up @@ -54,11 +55,11 @@ def test_load_christmas_tree_unsupported_options_with_fast_warns_2(self,

expected_best_effort_args = {
'access_key_id': 'fake_aws_id',
'compression': 'BZIP',
'compression': Compression.bzip2,
'data_location': 's3://mybucket/myparent/mychild/_manifest',
'date_format': 'MM-DD-YYYY',
'delimiter': '\x01',
'encoding': 'UTF8',
'encoding': Encoding.utf8,
'escape': False,
'ignore_header': 0,
'manifest': True,
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/db/redshift/test_redshift_db_driver_import_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ...records.format_hints import (csv_format_hints)
from records_mover.db.redshift.redshift_db_driver import Table
from records_mover.db.redshift.records_copy import Format
from sqlalchemy_redshift.commands import Compression, Encoding


class TestRedshiftDBDriverImport(BaseTestRedshiftDBDriver):
Expand All @@ -12,10 +13,10 @@ def test_load_csv(self, mock_CopyCommand):

expected_args = {
'access_key_id': 'fake_aws_id',
'compression': 'GZIP',
'compression': Compression.gzip,
'data_location': 's3://mybucket/myparent/mychild/_manifest',
'date_format': 'MM/DD/YY',
'encoding': 'UTF8',
'encoding': Encoding.utf8,
'format': Format.csv,
'ignore_header': 1,
'manifest': True,
Expand Down
6 changes: 1 addition & 5 deletions tests/unit/records/sources/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@ def setUp(self):
@patch('records_mover.records.sources.dataframes.DataframesRecordsSource')
def test_dataframe(self, mock_DataframesRecordsSource):
mock_df = Mock(name='df')
mock_schema_name = Mock(name='schema_name')
mock_table_name = Mock(name='table_name')
mock_db_engine = Mock(name='db_engine')
mock_processing_instructions = Mock(name='processing_instructions')
with self.records_sources.\
dataframe(df=mock_df, schema_name=mock_schema_name,
table_name=mock_table_name, db_engine=mock_db_engine,
dataframe(df=mock_df,
processing_instructions=mock_processing_instructions) as df:
mock_DataframesRecordsSource.\
assert_called_with(dfs=ANY,
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/records/test_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ def test_sniff_hints_from_fileobjs_encodings(self):
initial_hints['encoding'] = test_details['initial']
out = sniff_hints_from_fileobjs(fileobjs=fileobjs,
initial_hints=initial_hints)
self.assertDictContainsSubset({
needed_settings = {
'compression': None,
'encoding': test_details['hint'],
'field-delimiter': ',',
'header-row': True,
'record-terminator': '\n'
}, out)
}
self.assertTrue(set(needed_settings.items()).issubset(set(out.items())))
Empty file.
Loading

0 comments on commit 9aecda7

Please sign in to comment.