Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load via INSERT on Redshift when scratch bucket not available #114

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a66f4db
Load via INSERT on Redshift when scratch bucket not available
vinceatbluelabs Oct 27, 2020
d20d77f
Add TODO
vinceatbluelabs Oct 27, 2020
e4bd0ea
Retire SupportsMoveFromTempLocAfterFillingIt from DataUrlTarget
vinceatbluelabs Oct 27, 2020
f53587a
Merge remote-tracking branch 'origin/retire_protocol' into do_slow_re…
vinceatbluelabs Oct 27, 2020
ef0b8c1
Add comment
vinceatbluelabs Oct 27, 2020
643ee51
Implement for Vertica
vinceatbluelabs Oct 27, 2020
2ec1930
TODONE
vinceatbluelabs Oct 27, 2020
97dc95c
Fix refactored logic
vinceatbluelabs Oct 27, 2020
575a3cd
Revert change
vinceatbluelabs Oct 27, 2020
3314b64
Refactor
vinceatbluelabs Oct 27, 2020
ce9371e
Clean up and drop TODOs
vinceatbluelabs Oct 27, 2020
66a5f2f
Fix tests
vinceatbluelabs Oct 28, 2020
bdd6099
Fix unused import
vinceatbluelabs Oct 28, 2020
7b239cb
Fix flake8 issues
vinceatbluelabs Oct 28, 2020
dc7eb21
Fix coverage slip
vinceatbluelabs Oct 28, 2020
ce539ec
Fix missing special case revealed by Redshift testing
vinceatbluelabs Oct 28, 2020
f45011a
Unratchet a bit
vinceatbluelabs Oct 29, 2020
ad67b78
Revert chars/bytes fix--8 should be valid in UTF-8 for simple times
vinceatbluelabs Oct 29, 2020
4caebd9
Reratchet tests
vinceatbluelabs Oct 29, 2020
9ca1a7a
Clarify semantics of can_load_direct() and fix incorrect definition
vinceatbluelabs Oct 29, 2020
b03f064
Fix test
vinceatbluelabs Oct 29, 2020
504b39b
Clarify comment
vinceatbluelabs Oct 29, 2020
b58f999
Rename method to be more explicit
vinceatbluelabs Oct 29, 2020
b6fff82
Drop commented code
vinceatbluelabs Oct 29, 2020
d9ea8a1
Fix quality issue
vinceatbluelabs Oct 29, 2020
4edebe3
Shorten method names and make consistent
vinceatbluelabs Oct 29, 2020
82b940e
Fix function names in tests
vinceatbluelabs Oct 29, 2020
f2f1f0c
Clarify documentation
vinceatbluelabs Oct 29, 2020
e5a49ea
Merge remote-tracking branch 'origin/retire_protocol' into clarify_mo…
vinceatbluelabs Oct 29, 2020
44e7e68
Merge remote-tracking branch 'origin/clarify_move_algorithm_predicate…
vinceatbluelabs Nov 5, 2020
aa05ef7
Merge remote-tracking branch 'origin/master' into do_slow_redshift_lo…
vinceatbluelabs Nov 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.5700
93.6900
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.3400
92.3500
6 changes: 6 additions & 0 deletions records_mover/db/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ def temporary_loadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
with TemporaryDirectory(prefix='temporary_loadable_directory_loc') as dirname:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loader is an abstract class made concrete by each of the database types which can load out of a records directory.

yield FilesystemDirectoryUrl(dirname)

def has_temporary_loadable_directory_loc(self) -> bool:
# The default implementation uses the local filesystem where
# Records Mover runs, and we assume we can make temporary
# files.
return True

@abstractmethod
def known_supported_records_formats_for_load(self) -> List[BaseRecordsFormat]:
"""Candidates to look through when negotiating a common records format
Expand Down
19 changes: 15 additions & 4 deletions records_mover/db/redshift/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import logging
from .records_copy import redshift_copy_options
from ...records.load_plan import RecordsLoadPlan
from ..errors import CredsDoNotSupportS3Import
from typing import Optional, Union, Callable, ContextManager, List, Iterator
from ..errors import CredsDoNotSupportS3Import, NoTemporaryBucketConfiguration
from typing import Optional, Union, List, Iterator
from ...url import BaseDirectoryUrl
from botocore.credentials import Credentials
from ...records.delimited import complain_on_unhandled_hints
Expand All @@ -23,11 +23,19 @@ class RedshiftLoader(LoaderFromRecordsDirectory):
def __init__(self,
db: Union[sqlalchemy.engine.Engine, sqlalchemy.engine.Connection],
meta: sqlalchemy.MetaData,
temporary_s3_directory_loc: Callable[[], ContextManager[BaseDirectoryUrl]])\
s3_temp_base_loc: Optional[BaseDirectoryUrl])\
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was doing a too-clever-by-half thing before and passing in a function that creates a temporary directory in the right place in S3.

Now I need to know things like "do I actually have a bucket in which to create the temporary directory", so I'm passing in something more low level (at the price of duplicating a three line function between two classes with no inheritance relationship).

-> None:
self.db = db
self.meta = meta
self.temporary_s3_directory_loc = temporary_s3_directory_loc
self.s3_temp_base_loc = s3_temp_base_loc

@contextmanager
def temporary_s3_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
if self.s3_temp_base_loc is None:
raise NoTemporaryBucketConfiguration('Please provide a scratch S3 URL in your config')
else:
with self.s3_temp_base_loc.temporary_directory() as temp_loc:
yield temp_loc

def load(self,
schema: str,
Expand Down Expand Up @@ -158,3 +166,6 @@ def best_scheme_to_load_from(self) -> str:
def temporary_loadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
with self.temporary_s3_directory_loc() as temp_loc:
yield temp_loc

def has_temporary_loadable_directory_loc(self) -> bool:
return self.s3_temp_base_loc is not None
17 changes: 4 additions & 13 deletions records_mover/db/redshift/redshift_db_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
num_digits)
from .sql import schema_sql_from_admin_views
import timeout_decorator
from contextlib import contextmanager
from typing import Iterator, Optional, Union, Dict, List, Tuple
from typing import Optional, Union, Dict, List, Tuple
from ...url.base import BaseDirectoryUrl
from records_mover.db.quoting import quote_group_name, quote_schema_and_table
from .unloader import RedshiftUnloader
from ..unloader import Unloader
from .loader import RedshiftLoader
from ..loader import LoaderFromRecordsDirectory
from ..errors import NoTemporaryBucketConfiguration


logger = logging.getLogger(__name__)

Expand All @@ -33,11 +32,11 @@ def __init__(self,
self._redshift_loader =\
RedshiftLoader(db=db,
meta=self.meta,
temporary_s3_directory_loc=self.temporary_s3_directory_loc)
s3_temp_base_loc=s3_temp_base_loc)
self._redshift_unloader =\
RedshiftUnloader(db=db,
table=self.table,
temporary_s3_directory_loc=self.temporary_s3_directory_loc)
s3_temp_base_loc=s3_temp_base_loc)

def schema_sql(self, schema: str, table: str) -> str:
out = schema_sql_from_admin_views(schema, table, self.db)
Expand All @@ -46,14 +45,6 @@ def schema_sql(self, schema: str, table: str) -> str:
else:
return out

@contextmanager
def temporary_s3_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
if self.s3_temp_base_loc is None:
raise NoTemporaryBucketConfiguration('Please provide a scratch S3 URL in your config')
else:
with self.s3_temp_base_loc.temporary_directory() as temp_loc:
yield temp_loc

# if this timeout goes off (at least for Redshift), it's probably
# because memory is filling because sqlalchemy's cache of all
# tables and columns filled up memory in the job.
Expand Down
17 changes: 13 additions & 4 deletions records_mover/db/redshift/unloader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from contextlib import contextmanager
from sqlalchemy_redshift.commands import UnloadFromSelect
from ...records.records_directory import RecordsDirectory
import sqlalchemy
Expand All @@ -11,10 +12,10 @@
from ...records.records_format import (
BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat
)
from typing import Union, Callable, Optional, ContextManager, List
from typing import Union, Callable, Optional, List, Iterator
from ...url.base import BaseDirectoryUrl
from botocore.credentials import Credentials
from ..errors import CredsDoNotSupportS3Export
from ..errors import CredsDoNotSupportS3Export, NoTemporaryBucketConfiguration
from ...records.delimited import complain_on_unhandled_hints
from ..unloader import Unloader

Expand All @@ -26,11 +27,19 @@ class RedshiftUnloader(Unloader):
def __init__(self,
db: Union[sqlalchemy.engine.Engine, sqlalchemy.engine.Connection],
table: Callable[[str, str], Table],
temporary_s3_directory_loc: Callable[[], ContextManager[BaseDirectoryUrl]],
s3_temp_base_loc: Optional[BaseDirectoryUrl],
**kwargs) -> None:
super().__init__(db=db)
self.table = table
self.temporary_s3_directory_loc = temporary_s3_directory_loc
self.s3_temp_base_loc = s3_temp_base_loc

@contextmanager
def temporary_s3_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
if self.s3_temp_base_loc is None:
raise NoTemporaryBucketConfiguration('Please provide a scratch S3 URL in your config')
else:
with self.s3_temp_base_loc.temporary_directory() as temp_loc:
yield temp_loc

def unload_to_s3_directory(self,
schema: str,
Expand Down
7 changes: 4 additions & 3 deletions records_mover/records/mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
SupportsToFileobjsSource,
FileobjsSource, SupportsToDataframesSource)
from .targets.base import (RecordsTarget, SupportsMoveFromRecordsDirectory,
SupportsMoveFromTempLocAfterFillingIt,
MightSupportMoveFromTempLocAfterFillingIt,
Copy link
Contributor Author

@vinceatbluelabs vinceatbluelabs Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another one of those interfaces implemented by sources and targets (in this case, targets). I renamed this interface to be more tentative - as now it can tell you at runtime whether it is able to do do what it says on the tin.

Specifically, this interface has a function which tells the target to:

  1. create a temporary location that the target can load data from efficiently (e.g., an S3 bucket directory that Redshift can run a COPY statement from)
  2. tell the source to fill in that temporary location with a records directory (e.g., copy a CSV file from your local disk to that temporary S3 directory and add a manifest that Redshift likes)
  3. load from that temporary location (e.g., run the Redshift COPY command)

MightSupportMoveFromFileobjsSource,
SupportsMoveFromDataframes)
from .sources import base as sources_base
Expand Down Expand Up @@ -123,8 +123,9 @@ def move(records_source: RecordsSource,
as fileobjs_source:
return move(fileobjs_source, records_target, processing_instructions)
elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and
isinstance(records_target, SupportsMoveFromTempLocAfterFillingIt) and
records_source.has_compatible_format(records_target)):
isinstance(records_target, MightSupportMoveFromTempLocAfterFillingIt) and
records_source.has_compatible_format(records_target) and
records_target.can_move_from_temp_loc_after_filling_it()):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a key bit - I'm changing the move() function (the key algorithm for Records Mover) to only use this interface's function if the interface says it's OK.

In the most common case of a Table target, that means that the database in question has a temporary bucket location configured that it can do a bulk export to.

logger.info(f"Mover: copying from {records_source} to {records_target} "
f"by filling in a temporary location...")
return records_target.move_from_temp_loc_after_filling_it(records_source,
Expand Down
9 changes: 8 additions & 1 deletion records_mover/records/targets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,14 @@ def can_move_from_fileobjs_source(self) -> bool:
pass


class SupportsMoveFromTempLocAfterFillingIt(NegotiatesRecordsFormat, metaclass=ABCMeta):
class MightSupportMoveFromTempLocAfterFillingIt(NegotiatesRecordsFormat, metaclass=ABCMeta):
@abstractmethod
def can_move_from_temp_loc_after_filling_it(self) -> bool:
"""Returns True if target as currently configured can be handed a
temporary location and fill it.
"""
pass

@abstractmethod
def move_from_temp_loc_after_filling_it(self,
records_source:
Expand Down
22 changes: 0 additions & 22 deletions records_mover/records/targets/data_url.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from ..results import MoveResult
from ..records_directory import RecordsDirectory
from .base import (SupportsMoveFromDataframes,
SupportsMoveFromTempLocAfterFillingIt,
SupportsMoveToRecordsDirectory,
SupportsMoveFromRecordsDirectory)
from ..processing_instructions import ProcessingInstructions
from ...url.base import BaseFileUrl
Expand All @@ -16,7 +14,6 @@


class DataUrlTarget(SupportsMoveFromDataframes,
SupportsMoveFromTempLocAfterFillingIt,
SupportsMoveFromRecordsDirectory):
def __init__(self,
output_loc: BaseFileUrl,
Expand Down Expand Up @@ -63,25 +60,6 @@ def move_from_records_directory(self,
records_format.generate_filename('data'): self.output_loc.url
})

def move_from_temp_loc_after_filling_it(self,
records_source:
SupportsMoveToRecordsDirectory,
processing_instructions:
ProcessingInstructions) -> MoveResult:
pis = processing_instructions
records_format = records_source.compatible_format(self)
if records_format is None:
raise NotImplementedError("No compatible records format between "
f"{records_source} and {self}")
with self.output_loc.temporary_directory() as temp_loc:
directory = RecordsDirectory(records_loc=temp_loc)
records_source.\
move_to_records_directory(directory,
records_format=records_format,
processing_instructions=pis)
return self.move_from_records_directory(directory,
processing_instructions)

def can_move_from_this_format(self,
source_records_format: BaseRecordsFormat) -> bool:
if self.records_format is None:
Expand Down
22 changes: 12 additions & 10 deletions records_mover/records/targets/table/move_from_dataframes_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,27 @@ def __init__(self,
super().__init__(prep, target_table_details, processing_instructions)

def move(self) -> MoveResult:
if len(self.table_target.known_supported_records_formats()) != 0:
if self.table_target.can_move_from_fileobjs_source():
return self.move_from_dataframes_source_via_fileobjs()
else:
# Some databases, like Redshift, can't load from a
# stream, but can load from files on an object store
# they're pointed to.
return self.move_from_dataframes_source_via_records_directory()
target_supports_formats = len(self.table_target.known_supported_records_formats()) != 0
if (target_supports_formats and self.table_target.can_move_from_fileobjs_source()):
return self.move_from_dataframes_source_via_fileobjs()
elif (target_supports_formats and
self.table_target.can_move_from_temp_loc_after_filling_it()):
# Some databases, like Redshift, can't load from a
# stream, but can load from files on an object store
# they're pointed to.
return self.move_from_dataframes_source_via_temporary_records_directory()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a slightly more esoteric usage of this interface, where the database table target has been told to take a dataframe and load the data inside. There's a bit of a re-creation of a couple of parts of the move() algorithm here. Not sure I'm happy with that re-creation; it might be an area for future refactoring if I can figure out how to make it work as part of the main move() algorithm.

But I digress.

This changes things so that when we are given dataframe (or create one as an intermediary step), we can ingest it into a database even if that database doesn't have a temporary bucket configured.

else:
logger.info("Known formats for target database: "
f"{self.table_target.known_supported_records_formats()}")
logger.info("Table target can move from fileobjs source? "
f"{self.table_target.can_move_from_fileobjs_source()}")
logger.warning("Loading via INSERT statement as this DB "
"driver does not yet support more direct LOAD methods. "
"driver does not yet support or is not configured for "
"more direct load methods. "
"This may be very slow.")
return self.move_from_dataframes_source_via_insert()

def move_from_dataframes_source_via_records_directory(self) -> MoveResult:
def move_from_dataframes_source_via_temporary_records_directory(self) -> MoveResult:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify that this function uses a temp directory, as it was a bit of a surprise to me as well by the original name.

records_format = next(iter(self.table_target.known_supported_records_formats()), None)
with self.dfs_source.to_fileobjs_source(self.processing_instructions,
records_format) as fileobjs_source:
Expand Down
11 changes: 9 additions & 2 deletions records_mover/records/targets/table/target.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from records_mover.records.targets.base import (
SupportsMoveFromRecordsDirectory,
SupportsMoveFromTempLocAfterFillingIt,
MightSupportMoveFromTempLocAfterFillingIt,
MightSupportMoveFromFileobjsSource,
SupportsMoveFromDataframes,
)
Expand Down Expand Up @@ -31,7 +31,7 @@


class TableRecordsTarget(SupportsMoveFromRecordsDirectory,
SupportsMoveFromTempLocAfterFillingIt,
MightSupportMoveFromTempLocAfterFillingIt,
MightSupportMoveFromFileobjsSource,
SupportsMoveFromDataframes,
TargetTableDetails):
Expand Down Expand Up @@ -109,6 +109,13 @@ def can_move_from_this_format(self,
return False
return loader.can_load_this_format(source_records_format)

def can_move_from_temp_loc_after_filling_it(self) -> bool:
driver = self.db_driver(self.db_engine)
loader = driver.loader()
if loader is None:
return False
return loader.has_temporary_loadable_directory_loc()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key implementation of the expanded interface.


def move_from_temp_loc_after_filling_it(self,
records_source:
SupportsMoveToRecordsDirectory,
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/airflow/test_google_cloud_credentials_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from records_mover.airflow.hooks.google_cloud_credentials_hook import GoogleCloudCredentialsHook
from mock import Mock
import unittest


class TestGoogleCloudCredentialsHook(unittest.TestCase):
def test_get_conn(self):
mock_init = Mock('__init__')
GoogleCloudBaseHook.__init__ = mock_init
mock_init.return_value = None
hook = GoogleCloudCredentialsHook()
mock_get_credentials = Mock('get_credentials')
hook._get_credentials = mock_get_credentials
conn = hook.get_conn()
self.assertEqual(conn, mock_get_credentials.return_value)
6 changes: 3 additions & 3 deletions tests/unit/db/redshift/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ class TestRedshiftLoader(unittest.TestCase):
def setUp(self):
self.mock_db = Mock(name='db')
self.mock_meta = Mock(name='meta')
self.mock_temporary_s3_directory_loc = MagicMock(name='temporary_s3_directory_loc')
self.s3_temp_base_loc = MagicMock(name='s3_temp_base_loc')

self.redshift_loader =\
RedshiftLoader(db=self.mock_db,
meta=self.mock_meta,
temporary_s3_directory_loc=self.mock_temporary_s3_directory_loc)
s3_temp_base_loc=self.s3_temp_base_loc)

@patch('records_mover.db.redshift.loader.redshift_copy_options')
@patch('records_mover.db.redshift.loader.ProcessingInstructions')
Expand Down Expand Up @@ -80,7 +80,7 @@ def test_load_non_s3(self,
mock_directory = Mock(name='directory')
mock_directory.scheme = 'mumble'

mock_temp_s3_loc = self.mock_temporary_s3_directory_loc.return_value.__enter__.return_value
mock_temp_s3_loc = self.s3_temp_base_loc.temporary_directory().__enter__()
mock_s3_directory = mock_directory.copy_to.return_value
mock_s3_directory.scheme = 's3'

Expand Down
7 changes: 0 additions & 7 deletions tests/unit/db/redshift/test_redshift_db_driver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .base_test_redshift_db_driver import BaseTestRedshiftDBDriver
from records_mover.db.redshift.redshift_db_driver import NoTemporaryBucketConfiguration
from unittest.mock import patch
import sqlalchemy

Expand Down Expand Up @@ -102,9 +101,3 @@ def test_type_for_floating_point(self):
input_fp_significand_bits)
self.assertEqual(type(actual_col_type), sqlalchemy.sql.sqltypes.Float)
self.assertEqual(actual_col_type.precision, expected_fp_significand_bits)

def test_temporary_s3_directory_loc_unset(self):
self.redshift_db_driver.s3_temp_base_loc = None
with self.assertRaises(NoTemporaryBucketConfiguration):
with self.redshift_db_driver.temporary_s3_directory_loc():
pass
5 changes: 2 additions & 3 deletions tests/unit/db/redshift/test_unloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@ def test_can_unload_this_format_true(self,
mock_redshift_unload_options):
mock_db = Mock(name='db')
mock_table = Mock(name='table')
mock_temporary_s3_directory_loc = Mock(name='temporary_s3_directory_loc')

mock_target_records_format = Mock(name='target_records_format', spec=DelimitedRecordsFormat)
mock_unload_plan = mock_RecordsUnloadPlan.return_value
mock_unload_plan.records_format = mock_target_records_format

mock_processing_instructions = mock_unload_plan.processing_instructions
mock_temporary_s3_directory_loc = Mock(name='temporary_s3_directory_loc')
mock_s3_temp_base_loc = Mock(name='s3_temp_base_loc')
mock_target_records_format.hints = {}

redshift_unloader =\
RedshiftUnloader(db=mock_db,
table=mock_table,
temporary_s3_directory_loc=mock_temporary_s3_directory_loc)
s3_temp_base_loc=mock_s3_temp_base_loc)
out = redshift_unloader.can_unload_this_format(mock_target_records_format)
mock_RecordsUnloadPlan.\
assert_called_with(records_format=mock_target_records_format)
Expand Down
Loading