Skip to content

Commit

Permalink
Do slow redshift unload via SELECT when bucket unload not available (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vinceatbluelabs authored Nov 6, 2020
1 parent 066f85e commit 1b33493
Show file tree
Hide file tree
Showing 23 changed files with 230 additions and 77 deletions.
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.6900
93.7300
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.3500
92.3800
17 changes: 12 additions & 5 deletions records_mover/db/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ def load(self,
load_plan: RecordsLoadPlan,
directory: RecordsDirectory) -> Optional[int]:
"""Loads the data from the data specified to the RecordsDirectory
instance named 'directory'. Guarantees a manifest file named
'manifest' is written to the target directory pointing to the
target records.
instance named 'directory'.
Returns number of rows loaded (if database provides that
info).
"""
info)."""
...

@abstractmethod
Expand All @@ -42,12 +39,22 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool
method"""
...

def temporary_loadable_directory_scheme(self) -> str:
"""If we need to provide a temporary location that this database can
load from with the temporary_loadable_directory_loc() method,, what
URL scheme will be used?"""
return 'file'

@contextmanager
def temporary_loadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
"""Provide a temporary directory which can be used for bulk import to
this database and clean it up when done"""
with TemporaryDirectory(prefix='temporary_loadable_directory_loc') as dirname:
yield FilesystemDirectoryUrl(dirname)

def has_temporary_loadable_directory_loc(self) -> bool:
"""Returns True if a temporary directory can be provided by
temporary_loadable_directory_loc()"""
# The default implementation uses the local filesystem where
# Records Mover runs, and we assume we can make temporary
# files.
Expand Down
5 changes: 5 additions & 0 deletions records_mover/db/postgres/unloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]:
}),
]

def can_unload_to_scheme(self, scheme: str) -> bool:
# Unloading is done via streams, so it is scheme-independent
# and requires no scratch buckets.
return True

def can_unload_this_format(self, target_records_format: BaseRecordsFormat) -> bool:
try:
unload_plan = RecordsUnloadPlan(records_format=target_records_format)
Expand Down
3 changes: 3 additions & 0 deletions records_mover/db/redshift/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ def known_supported_records_formats_for_load(self) -> List[BaseRecordsFormat]:
def best_scheme_to_load_from(self) -> str:
return 's3'

def temporary_loadable_directory_scheme(self) -> str:
return 's3'

@contextmanager
def temporary_loadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
with self.temporary_s3_directory_loc() as temp_loc:
Expand Down
7 changes: 7 additions & 0 deletions records_mover/db/redshift/unloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ def unload(self,
directory.copy_from(temp_s3_loc)
return out

def can_unload_to_scheme(self, scheme: str) -> bool:
if scheme == 's3':
return True
# Otherwise we'll need a temporary bucket configured for
# Redshift to unload into
return self.s3_temp_base_loc is not None

def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]:
return [DelimitedRecordsFormat(variant='bluelabs'), ParquetRecordsFormat()]

Expand Down
22 changes: 22 additions & 0 deletions records_mover/db/unloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,39 @@ def unload(self,
table: str,
unload_plan: RecordsUnloadPlan,
directory: RecordsDirectory) -> Optional[int]:
"""Export data in the specified table to the RecordsDirectory instance
named 'directory'. Guarantees a manifest file named
'manifest' is written to the target directory pointing to the
target records.
Returns number of rows loaded (if database provides that
info)."""
...

@abstractmethod
def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]:
"""Supplies a list of the records formats which can be bulk exported
from this database. This may not be the full set - see
can_unlaod_this_format() to test other possibilities.
"""
...

@abstractmethod
def can_unload_this_format(self, target_records_format: BaseRecordsFormat) -> bool:
"""Determine whether a specific records format can be exported by this
database."""
...

@abstractmethod
def can_unload_to_scheme(self, scheme: str) -> bool:
"""Determine whether a URL scheme can be unloaded to by this database.
Depending on capabilities of this driver and the underlying database,
this may depend on whether a temporary location has been configured."""
...

def best_records_format(self) -> Optional[BaseRecordsFormat]:
"""Returns the ideal records format for export by this database.
Useful in the absence of other constraints."""
supported_formats = self.known_supported_records_formats_for_unload()
if len(supported_formats) == 0:
return None
Expand Down
21 changes: 10 additions & 11 deletions records_mover/db/vertica/unloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def unload(self,
table: str,
unload_plan: RecordsUnloadPlan,
directory: RecordsDirectory) -> Optional[int]:
if not self.s3_available():
if not self.s3_export_available():
raise NotImplementedError('S3 currently required for Vertica bulk unload')
try:
if directory.scheme == 's3':
Expand All @@ -64,10 +64,10 @@ def unload(self,
logger.warning(msg)
raise NotImplementedError('S3 currently required for Vertica bulk unload')

def s3_available(self) -> bool:
if self.s3_temp_base_loc is None:
logger.info("Not attempting S3 export - SCRATCH_S3_URL not set")
return False
def s3_temp_bucket_available(self) -> bool:
return self.s3_temp_base_loc is not None

def s3_export_available(self) -> bool:
out = self.db.execute("SELECT lib_name from user_libraries where lib_name = 'awslib'")
available = len(list(out.fetchall())) == 1
if not available:
Expand Down Expand Up @@ -119,15 +119,14 @@ def unload_to_s3_directory(self,
return export_count

def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]:
if self.s3_available():
return [DelimitedRecordsFormat(variant='vertica')]
else:
return []
return [DelimitedRecordsFormat(variant='vertica')]

def can_unload_this_format(self, target_records_format: BaseRecordsFormat) -> bool:
if not self.s3_available():
def can_unload_to_scheme(self, scheme: str) -> bool:
if not self.s3_export_available():
return False
return scheme == 's3' or self.s3_temp_bucket_available()

def can_unload_this_format(self, target_records_format: BaseRecordsFormat) -> bool:
try:
unload_plan = RecordsUnloadPlan(records_format=target_records_format)
if not isinstance(unload_plan.records_format, DelimitedRecordsFormat):
Expand Down
3 changes: 3 additions & 0 deletions records_mover/records/mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def move(records_source: RecordsSource,
processing_instructions)
elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and
isinstance(records_target, targets_base.SupportsRecordsDirectory) and
records_source.can_move_to_scheme(records_target.records_directory().loc.scheme) and
records_source.has_compatible_format(records_target)):
# if target can accept records and doesn't specify a
# records_format, or uses the same as the source, we can just
Expand Down Expand Up @@ -126,6 +127,8 @@ def move(records_source: RecordsSource,
elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and
isinstance(records_target, MightSupportMoveFromTempLocAfterFillingIt) and
records_source.has_compatible_format(records_target) and
records_source.
can_move_to_scheme(records_target.temporary_loadable_directory_scheme()) and
records_target.can_move_from_temp_loc_after_filling_it()):
logger.info(f"Mover: copying from {records_source} to {records_target} "
f"by filling in a temporary location...")
Expand Down
10 changes: 10 additions & 0 deletions records_mover/records/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ def records_directory(self) -> RecordsDirectory:


class SupportsMoveToRecordsDirectory(NegotiatesRecordsFormat, metaclass=ABCMeta):
@abstractmethod
def can_move_to_scheme(self, scheme: str) -> bool:
"""If true is returned, the given scheme is a compatible place where
the unload will be done. Note that this may include streaming
data down to Records Mover byte by byte--which can be
expensive when data is large and/or network bandwidth is
limited.
"""
pass

@abstractmethod
def move_to_records_directory(self,
records_directory: RecordsDirectory,
Expand Down
5 changes: 5 additions & 0 deletions records_mover/records/sources/fileobjs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def can_move_to_this_format(self,
target_records_format: BaseRecordsFormat) -> bool:
return self.records_format == target_records_format

def can_move_to_scheme(self, scheme: str) -> bool:
# Any URL can accept a stream of data using
# move_to_records_directory() below
return True

@staticmethod
@contextmanager
def infer_if_needed(target_names_to_input_fileobjs: Mapping[str, IO[bytes]],
Expand Down
7 changes: 7 additions & 0 deletions records_mover/records/sources/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ def can_move_to_this_format(self,
return False
return unloader.can_unload_this_format(target_records_format)

def can_move_to_scheme(self, scheme: str) -> bool:
unloader = self.driver.unloader()
if unloader is None:
# bulk export is not provided by this database
return False
return unloader.can_unload_to_scheme(scheme)

@contextmanager
def to_dataframes_source(self,
processing_instructions: ProcessingInstructions) -> \
Expand Down
5 changes: 5 additions & 0 deletions records_mover/records/targets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ def can_move_from_temp_loc_after_filling_it(self) -> bool:
"""
pass

@abstractmethod
def temporary_loadable_directory_scheme(self) -> str:
"""Which URL scheme will be used to create the temporary location to fill in."""
pass

@abstractmethod
def move_from_temp_loc_after_filling_it(self,
records_source:
Expand Down
8 changes: 8 additions & 0 deletions records_mover/records/targets/table/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ def can_move_from_temp_loc_after_filling_it(self) -> bool:
return False
return loader.has_temporary_loadable_directory_loc()

def temporary_loadable_directory_scheme(self) -> str:
driver = self.db_driver(self.db_engine)
loader = driver.loader()
if loader is None:
raise TypeError("Please check can_move_from_temp_loc_after_filling_it() "
"before calling this")
return loader.temporary_loadable_directory_scheme()

def move_from_temp_loc_after_filling_it(self,
records_source:
SupportsMoveToRecordsDirectory,
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/creds/test_base_creds.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from db_facts.db_facts_types import DBFacts
from unittest.mock import Mock, patch
from records_mover.creds.base_creds import BaseCreds
import boto3


class ExampleCredsSubclass(BaseCreds):
Expand All @@ -20,6 +21,11 @@ def db_facts(self, db_creds_name: str) -> DBFacts:
mock_db_facts.db_creds_name = db_creds_name
return mock_db_facts

def boto3_session(self, aws_creds_name: str) -> 'boto3.session.Session':
assert aws_creds_name == 'my_default_creds'
self.boto3_session_called = True
return Mock(name='boto3_session')


class TestBaseCreds(unittest.TestCase):
maxDiff = None
Expand Down Expand Up @@ -156,3 +162,13 @@ def test_s3_scratch_bucket_no_config_file(self,
out = creds.default_scratch_s3_url()
self.assertIsNone(out)
mock_get_config.assert_called_with('records_mover', 'bluelabs')

@patch('records_mover.creds.base_creds.get_config')
@patch('records_mover.creds.base_creds.os')
def test_default_boto3_session_with_default_creds_name(self,
mock_os,
mock_get_config):
mock_get_config.return_value.config = {}
creds = ExampleCredsSubclass(default_aws_creds_name='my_default_creds')
creds.default_boto3_session()
self.assertTrue(creds.boto3_session_called)
3 changes: 3 additions & 0 deletions tests/unit/db/postgres/test_unloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,6 @@ def test_best_records_format(self):
'compression': None
}),
self.unloader.best_records_format())

def test_can_unload_to_scheme_any_true(self):
self.assertTrue(self.unloader.can_unload_to_scheme(Mock()))
47 changes: 46 additions & 1 deletion tests/unit/db/redshift/test_unloader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest
from records_mover.db.redshift.unloader import RedshiftUnloader
from records_mover.records.records_format import DelimitedRecordsFormat
from records_mover.records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat
from mock import patch, Mock


Expand Down Expand Up @@ -33,3 +33,48 @@ def test_can_unload_this_format_true(self,
mock_unload_plan.records_format,
mock_processing_instructions.fail_if_cant_handle_hint)
self.assertEqual(True, out)

def test_can_unload_to_scheme_s3_true(self):
mock_db = Mock(name='db')
mock_table = Mock(name='table')

redshift_unloader =\
RedshiftUnloader(db=mock_db,
table=mock_table,
s3_temp_base_loc=None)
self.assertTrue(redshift_unloader.can_unload_to_scheme('s3'))

def test_can_unload_to_scheme_file_without_temp_bucket_true(self):
mock_db = Mock(name='db')
mock_table = Mock(name='table')

redshift_unloader =\
RedshiftUnloader(db=mock_db,
table=mock_table,
s3_temp_base_loc=None)
self.assertFalse(redshift_unloader.can_unload_to_scheme('file'))

def test_can_unload_to_scheme_file_with_temp_bucket_true(self):
mock_db = Mock(name='db')
mock_table = Mock(name='table')
mock_s3_temp_base_loc = Mock(name='s3_temp_base_loc')

redshift_unloader =\
RedshiftUnloader(db=mock_db,
table=mock_table,
s3_temp_base_loc=mock_s3_temp_base_loc)
self.assertTrue(redshift_unloader.can_unload_to_scheme('file'))

def test_known_supported_records_formats_for_unload(self):
mock_db = Mock(name='db')
mock_table = Mock(name='table')
mock_s3_temp_base_loc = Mock(name='s3_temp_base_loc')

redshift_unloader =\
RedshiftUnloader(db=mock_db,
table=mock_table,
s3_temp_base_loc=mock_s3_temp_base_loc)
formats = redshift_unloader.known_supported_records_formats_for_unload()

self.assertEqual([f.__class__ for f in formats],
[DelimitedRecordsFormat, ParquetRecordsFormat])
Loading

0 comments on commit 1b33493

Please sign in to comment.