From 1b33493f1f76d289aae55106418821e9450d9c4f Mon Sep 17 00:00:00 2001 From: vinceatbluelabs Date: Fri, 6 Nov 2020 12:58:43 -0500 Subject: [PATCH] Do slow redshift unload via SELECT when bucket unload not available (#117) --- metrics/coverage_high_water_mark | 2 +- metrics/mypy_high_water_mark | 2 +- records_mover/db/loader.py | 17 +++-- records_mover/db/postgres/unloader.py | 5 ++ records_mover/db/redshift/loader.py | 3 + records_mover/db/redshift/unloader.py | 7 ++ records_mover/db/unloader.py | 22 ++++++ records_mover/db/vertica/unloader.py | 21 +++--- records_mover/records/mover.py | 3 + records_mover/records/sources/base.py | 10 +++ records_mover/records/sources/fileobjs.py | 5 ++ records_mover/records/sources/table.py | 7 ++ records_mover/records/targets/base.py | 5 ++ records_mover/records/targets/table/target.py | 8 +++ tests/unit/creds/test_base_creds.py | 16 +++++ tests/unit/db/postgres/test_unloader.py | 3 + tests/unit/db/redshift/test_unloader.py | 47 ++++++++++++- tests/unit/db/vertica/test_unloader.py | 19 +----- .../db/vertica/test_unloader_no_aws_creds.py | 4 +- tests/unit/db/vertica/test_unloader_no_s3.py | 68 ++++++++----------- tests/unit/records/sources/test_fileobjs.py | 11 +++ tests/unit/records/sources/test_table.py | 5 ++ .../unit/records/targets/table/test_target.py | 17 +++++ 23 files changed, 230 insertions(+), 77 deletions(-) diff --git a/metrics/coverage_high_water_mark b/metrics/coverage_high_water_mark index 8e663fb80..a847ba664 100644 --- a/metrics/coverage_high_water_mark +++ b/metrics/coverage_high_water_mark @@ -1 +1 @@ -93.6900 +93.7300 \ No newline at end of file diff --git a/metrics/mypy_high_water_mark b/metrics/mypy_high_water_mark index 27c0b2848..299efb6ea 100644 --- a/metrics/mypy_high_water_mark +++ b/metrics/mypy_high_water_mark @@ -1 +1 @@ -92.3500 +92.3800 \ No newline at end of file diff --git a/records_mover/db/loader.py b/records_mover/db/loader.py index b36813a44..c7c60bb71 100644 --- a/records_mover/db/loader.py +++ b/records_mover/db/loader.py @@ -27,13 +27,10 @@ def load(self, load_plan: RecordsLoadPlan, directory: RecordsDirectory) -> Optional[int]: """Loads the data from the data specified to the RecordsDirectory - instance named 'directory'. Guarantees a manifest file named - 'manifest' is written to the target directory pointing to the - target records. + instance named 'directory'. Returns number of rows loaded (if database provides that - info). - """ + info).""" ... @abstractmethod @@ -42,12 +39,22 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool method""" ... + def temporary_loadable_directory_scheme(self) -> str: + """If we need to provide a temporary location that this database can + load from with the temporary_loadable_directory_loc() method,, what + URL scheme will be used?""" + return 'file' + @contextmanager def temporary_loadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]: + """Provide a temporary directory which can be used for bulk import to + this database and clean it up when done""" with TemporaryDirectory(prefix='temporary_loadable_directory_loc') as dirname: yield FilesystemDirectoryUrl(dirname) def has_temporary_loadable_directory_loc(self) -> bool: + """Returns True if a temporary directory can be provided by + temporary_loadable_directory_loc()""" # The default implementation uses the local filesystem where # Records Mover runs, and we assume we can make temporary # files. diff --git a/records_mover/db/postgres/unloader.py b/records_mover/db/postgres/unloader.py index 29b2d1457..4010081b0 100644 --- a/records_mover/db/postgres/unloader.py +++ b/records_mover/db/postgres/unloader.py @@ -101,6 +101,11 @@ def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]: }), ] + def can_unload_to_scheme(self, scheme: str) -> bool: + # Unloading is done via streams, so it is scheme-independent + # and requires no scratch buckets. + return True + def can_unload_this_format(self, target_records_format: BaseRecordsFormat) -> bool: try: unload_plan = RecordsUnloadPlan(records_format=target_records_format) diff --git a/records_mover/db/redshift/loader.py b/records_mover/db/redshift/loader.py index d876a3c3b..2170d2028 100644 --- a/records_mover/db/redshift/loader.py +++ b/records_mover/db/redshift/loader.py @@ -162,6 +162,9 @@ def known_supported_records_formats_for_load(self) -> List[BaseRecordsFormat]: def best_scheme_to_load_from(self) -> str: return 's3' + def temporary_loadable_directory_scheme(self) -> str: + return 's3' + @contextmanager def temporary_loadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]: with self.temporary_s3_directory_loc() as temp_loc: diff --git a/records_mover/db/redshift/unloader.py b/records_mover/db/redshift/unloader.py index 3e6c7a87c..7b55c5836 100644 --- a/records_mover/db/redshift/unloader.py +++ b/records_mover/db/redshift/unloader.py @@ -103,6 +103,13 @@ def unload(self, directory.copy_from(temp_s3_loc) return out + def can_unload_to_scheme(self, scheme: str) -> bool: + if scheme == 's3': + return True + # Otherwise we'll need a temporary bucket configured for + # Redshift to unload into + return self.s3_temp_base_loc is not None + def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]: return [DelimitedRecordsFormat(variant='bluelabs'), ParquetRecordsFormat()] diff --git a/records_mover/db/unloader.py b/records_mover/db/unloader.py index fcddbea81..7ab996478 100644 --- a/records_mover/db/unloader.py +++ b/records_mover/db/unloader.py @@ -19,17 +19,39 @@ def unload(self, table: str, unload_plan: RecordsUnloadPlan, directory: RecordsDirectory) -> Optional[int]: + """Export data in the specified table to the RecordsDirectory instance + named 'directory'. Guarantees a manifest file named + 'manifest' is written to the target directory pointing to the + target records. + + Returns number of rows loaded (if database provides that + info).""" ... @abstractmethod def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]: + """Supplies a list of the records formats which can be bulk exported + from this database. This may not be the full set - see + can_unlaod_this_format() to test other possibilities. + """ ... @abstractmethod def can_unload_this_format(self, target_records_format: BaseRecordsFormat) -> bool: + """Determine whether a specific records format can be exported by this + database.""" + ... + + @abstractmethod + def can_unload_to_scheme(self, scheme: str) -> bool: + """Determine whether a URL scheme can be unloaded to by this database. + Depending on capabilities of this driver and the underlying database, + this may depend on whether a temporary location has been configured.""" ... def best_records_format(self) -> Optional[BaseRecordsFormat]: + """Returns the ideal records format for export by this database. + Useful in the absence of other constraints.""" supported_formats = self.known_supported_records_formats_for_unload() if len(supported_formats) == 0: return None diff --git a/records_mover/db/vertica/unloader.py b/records_mover/db/vertica/unloader.py index 96a257df0..51fc82bb5 100644 --- a/records_mover/db/vertica/unloader.py +++ b/records_mover/db/vertica/unloader.py @@ -47,7 +47,7 @@ def unload(self, table: str, unload_plan: RecordsUnloadPlan, directory: RecordsDirectory) -> Optional[int]: - if not self.s3_available(): + if not self.s3_export_available(): raise NotImplementedError('S3 currently required for Vertica bulk unload') try: if directory.scheme == 's3': @@ -64,10 +64,10 @@ def unload(self, logger.warning(msg) raise NotImplementedError('S3 currently required for Vertica bulk unload') - def s3_available(self) -> bool: - if self.s3_temp_base_loc is None: - logger.info("Not attempting S3 export - SCRATCH_S3_URL not set") - return False + def s3_temp_bucket_available(self) -> bool: + return self.s3_temp_base_loc is not None + + def s3_export_available(self) -> bool: out = self.db.execute("SELECT lib_name from user_libraries where lib_name = 'awslib'") available = len(list(out.fetchall())) == 1 if not available: @@ -119,15 +119,14 @@ def unload_to_s3_directory(self, return export_count def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]: - if self.s3_available(): - return [DelimitedRecordsFormat(variant='vertica')] - else: - return [] + return [DelimitedRecordsFormat(variant='vertica')] - def can_unload_this_format(self, target_records_format: BaseRecordsFormat) -> bool: - if not self.s3_available(): + def can_unload_to_scheme(self, scheme: str) -> bool: + if not self.s3_export_available(): return False + return scheme == 's3' or self.s3_temp_bucket_available() + def can_unload_this_format(self, target_records_format: BaseRecordsFormat) -> bool: try: unload_plan = RecordsUnloadPlan(records_format=target_records_format) if not isinstance(unload_plan.records_format, DelimitedRecordsFormat): diff --git a/records_mover/records/mover.py b/records_mover/records/mover.py index c63adb28b..30052d4f4 100644 --- a/records_mover/records/mover.py +++ b/records_mover/records/mover.py @@ -86,6 +86,7 @@ def move(records_source: RecordsSource, processing_instructions) elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and isinstance(records_target, targets_base.SupportsRecordsDirectory) and + records_source.can_move_to_scheme(records_target.records_directory().loc.scheme) and records_source.has_compatible_format(records_target)): # if target can accept records and doesn't specify a # records_format, or uses the same as the source, we can just @@ -126,6 +127,8 @@ def move(records_source: RecordsSource, elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and isinstance(records_target, MightSupportMoveFromTempLocAfterFillingIt) and records_source.has_compatible_format(records_target) and + records_source. + can_move_to_scheme(records_target.temporary_loadable_directory_scheme()) and records_target.can_move_from_temp_loc_after_filling_it()): logger.info(f"Mover: copying from {records_source} to {records_target} " f"by filling in a temporary location...") diff --git a/records_mover/records/sources/base.py b/records_mover/records/sources/base.py index 44e09c154..72e84bb18 100644 --- a/records_mover/records/sources/base.py +++ b/records_mover/records/sources/base.py @@ -96,6 +96,16 @@ def records_directory(self) -> RecordsDirectory: class SupportsMoveToRecordsDirectory(NegotiatesRecordsFormat, metaclass=ABCMeta): + @abstractmethod + def can_move_to_scheme(self, scheme: str) -> bool: + """If true is returned, the given scheme is a compatible place where + the unload will be done. Note that this may include streaming + data down to Records Mover byte by byte--which can be + expensive when data is large and/or network bandwidth is + limited. + """ + pass + @abstractmethod def move_to_records_directory(self, records_directory: RecordsDirectory, diff --git a/records_mover/records/sources/fileobjs.py b/records_mover/records/sources/fileobjs.py index 0c6255aca..c015d8216 100644 --- a/records_mover/records/sources/fileobjs.py +++ b/records_mover/records/sources/fileobjs.py @@ -38,6 +38,11 @@ def can_move_to_this_format(self, target_records_format: BaseRecordsFormat) -> bool: return self.records_format == target_records_format + def can_move_to_scheme(self, scheme: str) -> bool: + # Any URL can accept a stream of data using + # move_to_records_directory() below + return True + @staticmethod @contextmanager def infer_if_needed(target_names_to_input_fileobjs: Mapping[str, IO[bytes]], diff --git a/records_mover/records/sources/table.py b/records_mover/records/sources/table.py index 50b01f99d..7e0081d50 100644 --- a/records_mover/records/sources/table.py +++ b/records_mover/records/sources/table.py @@ -53,6 +53,13 @@ def can_move_to_this_format(self, return False return unloader.can_unload_this_format(target_records_format) + def can_move_to_scheme(self, scheme: str) -> bool: + unloader = self.driver.unloader() + if unloader is None: + # bulk export is not provided by this database + return False + return unloader.can_unload_to_scheme(scheme) + @contextmanager def to_dataframes_source(self, processing_instructions: ProcessingInstructions) -> \ diff --git a/records_mover/records/targets/base.py b/records_mover/records/targets/base.py index 7c3e56b3f..90e57816d 100644 --- a/records_mover/records/targets/base.py +++ b/records_mover/records/targets/base.py @@ -131,6 +131,11 @@ def can_move_from_temp_loc_after_filling_it(self) -> bool: """ pass + @abstractmethod + def temporary_loadable_directory_scheme(self) -> str: + """Which URL scheme will be used to create the temporary location to fill in.""" + pass + @abstractmethod def move_from_temp_loc_after_filling_it(self, records_source: diff --git a/records_mover/records/targets/table/target.py b/records_mover/records/targets/table/target.py index 2fffd3ea3..379b0a37f 100644 --- a/records_mover/records/targets/table/target.py +++ b/records_mover/records/targets/table/target.py @@ -116,6 +116,14 @@ def can_move_from_temp_loc_after_filling_it(self) -> bool: return False return loader.has_temporary_loadable_directory_loc() + def temporary_loadable_directory_scheme(self) -> str: + driver = self.db_driver(self.db_engine) + loader = driver.loader() + if loader is None: + raise TypeError("Please check can_move_from_temp_loc_after_filling_it() " + "before calling this") + return loader.temporary_loadable_directory_scheme() + def move_from_temp_loc_after_filling_it(self, records_source: SupportsMoveToRecordsDirectory, diff --git a/tests/unit/creds/test_base_creds.py b/tests/unit/creds/test_base_creds.py index 18f04859b..ef1d41211 100644 --- a/tests/unit/creds/test_base_creds.py +++ b/tests/unit/creds/test_base_creds.py @@ -4,6 +4,7 @@ from db_facts.db_facts_types import DBFacts from unittest.mock import Mock, patch from records_mover.creds.base_creds import BaseCreds +import boto3 class ExampleCredsSubclass(BaseCreds): @@ -20,6 +21,11 @@ def db_facts(self, db_creds_name: str) -> DBFacts: mock_db_facts.db_creds_name = db_creds_name return mock_db_facts + def boto3_session(self, aws_creds_name: str) -> 'boto3.session.Session': + assert aws_creds_name == 'my_default_creds' + self.boto3_session_called = True + return Mock(name='boto3_session') + class TestBaseCreds(unittest.TestCase): maxDiff = None @@ -156,3 +162,13 @@ def test_s3_scratch_bucket_no_config_file(self, out = creds.default_scratch_s3_url() self.assertIsNone(out) mock_get_config.assert_called_with('records_mover', 'bluelabs') + + @patch('records_mover.creds.base_creds.get_config') + @patch('records_mover.creds.base_creds.os') + def test_default_boto3_session_with_default_creds_name(self, + mock_os, + mock_get_config): + mock_get_config.return_value.config = {} + creds = ExampleCredsSubclass(default_aws_creds_name='my_default_creds') + creds.default_boto3_session() + self.assertTrue(creds.boto3_session_called) diff --git a/tests/unit/db/postgres/test_unloader.py b/tests/unit/db/postgres/test_unloader.py index 940c95bf7..2d718fed1 100644 --- a/tests/unit/db/postgres/test_unloader.py +++ b/tests/unit/db/postgres/test_unloader.py @@ -141,3 +141,6 @@ def test_best_records_format(self): 'compression': None }), self.unloader.best_records_format()) + + def test_can_unload_to_scheme_any_true(self): + self.assertTrue(self.unloader.can_unload_to_scheme(Mock())) diff --git a/tests/unit/db/redshift/test_unloader.py b/tests/unit/db/redshift/test_unloader.py index 2e17f7613..6a664aa9f 100644 --- a/tests/unit/db/redshift/test_unloader.py +++ b/tests/unit/db/redshift/test_unloader.py @@ -1,6 +1,6 @@ import unittest from records_mover.db.redshift.unloader import RedshiftUnloader -from records_mover.records.records_format import DelimitedRecordsFormat +from records_mover.records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat from mock import patch, Mock @@ -33,3 +33,48 @@ def test_can_unload_this_format_true(self, mock_unload_plan.records_format, mock_processing_instructions.fail_if_cant_handle_hint) self.assertEqual(True, out) + + def test_can_unload_to_scheme_s3_true(self): + mock_db = Mock(name='db') + mock_table = Mock(name='table') + + redshift_unloader =\ + RedshiftUnloader(db=mock_db, + table=mock_table, + s3_temp_base_loc=None) + self.assertTrue(redshift_unloader.can_unload_to_scheme('s3')) + + def test_can_unload_to_scheme_file_without_temp_bucket_true(self): + mock_db = Mock(name='db') + mock_table = Mock(name='table') + + redshift_unloader =\ + RedshiftUnloader(db=mock_db, + table=mock_table, + s3_temp_base_loc=None) + self.assertFalse(redshift_unloader.can_unload_to_scheme('file')) + + def test_can_unload_to_scheme_file_with_temp_bucket_true(self): + mock_db = Mock(name='db') + mock_table = Mock(name='table') + mock_s3_temp_base_loc = Mock(name='s3_temp_base_loc') + + redshift_unloader =\ + RedshiftUnloader(db=mock_db, + table=mock_table, + s3_temp_base_loc=mock_s3_temp_base_loc) + self.assertTrue(redshift_unloader.can_unload_to_scheme('file')) + + def test_known_supported_records_formats_for_unload(self): + mock_db = Mock(name='db') + mock_table = Mock(name='table') + mock_s3_temp_base_loc = Mock(name='s3_temp_base_loc') + + redshift_unloader =\ + RedshiftUnloader(db=mock_db, + table=mock_table, + s3_temp_base_loc=mock_s3_temp_base_loc) + formats = redshift_unloader.known_supported_records_formats_for_unload() + + self.assertEqual([f.__class__ for f in formats], + [DelimitedRecordsFormat, ParquetRecordsFormat]) diff --git a/tests/unit/db/vertica/test_unloader.py b/tests/unit/db/vertica/test_unloader.py index 1c77e865a..f66c3a59b 100644 --- a/tests/unit/db/vertica/test_unloader.py +++ b/tests/unit/db/vertica/test_unloader.py @@ -8,35 +8,25 @@ class TestVerticaUnloader(unittest.TestCase): maxDiff = None @patch('records_mover.db.vertica.unloader.vertica_export_options') - def test_can_unload_this_format_with_s3_true(self, mock_vertica_export_options): + def test_can_unload_this_format_true(self, mock_vertica_export_options): mock_db = Mock(name='db') mock_source_records_format = Mock(name='source_records_format', spec=DelimitedRecordsFormat) mock_s3_temp_base_loc = Mock(name='s3_temp_base_loc') vertica_unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=mock_s3_temp_base_loc) - mock_resultset = Mock(name='resultset') - mock_db.execute.return_value = mock_resultset - mock_resultset.fetchall.return_value = ['awslib'] mock_source_records_format.hints = {} out = vertica_unloader.can_unload_this_format(mock_source_records_format) - mock_db.execute.\ - assert_called_with("SELECT lib_name from user_libraries where lib_name = 'awslib'") mock_vertica_export_options.assert_called_with(set(), ANY) self.assertEqual(True, out) @patch('records_mover.db.vertica.unloader.vertica_export_options') - def test_can_unload_this_format_with_s3_false(self, mock_vertica_export_options): + def test_can_unload_this_format_false(self, mock_vertica_export_options): mock_db = Mock(name='db') mock_source_records_format = Mock(name='source_records_format', spec=DelimitedRecordsFormat) mock_s3_temp_base_loc = Mock(name='s3_temp_base_loc') vertica_unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=mock_s3_temp_base_loc) - mock_resultset = Mock(name='resultset') - mock_db.execute.return_value = mock_resultset - mock_resultset.fetchall.return_value = ['awslib'] mock_source_records_format.hints = {} mock_vertica_export_options.side_effect = NotImplementedError out = vertica_unloader.can_unload_this_format(mock_source_records_format) - mock_db.execute.\ - assert_called_with("SELECT lib_name from user_libraries where lib_name = 'awslib'") mock_vertica_export_options.assert_called_with(set(), ANY) self.assertEqual(False, out) @@ -45,11 +35,6 @@ def test_known_supported_records_formats_for_unload(self): mock_source_records_format = Mock(name='source_records_format', spec=DelimitedRecordsFormat) mock_s3_temp_base_loc = Mock(name='s3_temp_base_loc') vertica_unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=mock_s3_temp_base_loc) - mock_resultset = Mock(name='resultset') - mock_db.execute.return_value = mock_resultset - mock_resultset.fetchall.return_value = ['awslib'] mock_source_records_format.hints = {} out = vertica_unloader.known_supported_records_formats_for_unload() - mock_db.execute.\ - assert_called_with("SELECT lib_name from user_libraries where lib_name = 'awslib'") self.assertEqual(out, [DelimitedRecordsFormat(variant='vertica')]) diff --git a/tests/unit/db/vertica/test_unloader_no_aws_creds.py b/tests/unit/db/vertica/test_unloader_no_aws_creds.py index 5b5aaaff5..fab9cc512 100644 --- a/tests/unit/db/vertica/test_unloader_no_aws_creds.py +++ b/tests/unit/db/vertica/test_unloader_no_aws_creds.py @@ -55,7 +55,7 @@ def test_unload_with_no_aws_creds(self, unload_plan=mock_unload_plan, directory=mock_directory) - def test_s3_available_false_no_awslib(self): + def test_s3_export_available_false_no_awslib(self): mock_db = Mock(name='db') mock_s3_temp_base_loc = Mock(name='s3_temp_base_loc') mock_target_records_format = Mock(name='target_records_format', spec=DelimitedRecordsFormat) @@ -63,4 +63,4 @@ def test_s3_available_false_no_awslib(self): mock_out = mock_db.execute.return_value mock_out.fetchall.return_value = [] unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=mock_s3_temp_base_loc) - self.assertEqual(False, unloader.s3_available()) + self.assertEqual(False, unloader.s3_export_available()) diff --git a/tests/unit/db/vertica/test_unloader_no_s3.py b/tests/unit/db/vertica/test_unloader_no_s3.py index ae2ac9036..e61eba43a 100644 --- a/tests/unit/db/vertica/test_unloader_no_s3.py +++ b/tests/unit/db/vertica/test_unloader_no_s3.py @@ -1,57 +1,47 @@ from records_mover.db.vertica.unloader import VerticaUnloader from records_mover.db.errors import NoTemporaryBucketConfiguration -from records_mover.records.records_format import DelimitedRecordsFormat import unittest -from mock import patch, Mock +from mock import Mock class TestVerticaUnloaderNoS3(unittest.TestCase): maxDiff = None - @patch('records_mover.db.unloader.RecordsUnloadPlan') - def test_no_temp_bucket_can_unload_this_format_true(self, - mock_RecordsUnloadPlan): + def test_temporary_loadable_directory_load_with_no_s3_temp_bucket_configured(self): mock_db = Mock(name='db') mock_s3_temp_base_loc = None - mock_target_records_format = Mock(name='target_records_format', spec=DelimitedRecordsFormat) - mock_target_records_format.hints = {} - unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=mock_s3_temp_base_loc) - - out = unloader.can_unload_this_format(mock_target_records_format) - self.assertFalse(out) + vertica_unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=mock_s3_temp_base_loc) + with self.assertRaises(NoTemporaryBucketConfiguration): + with vertica_unloader.temporary_loadable_directory_loc(): + pass - def test_no_temp_bucket_unload(self): - mock_db = Mock(name='db') - mock_s3_temp_base_loc = None - unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=mock_s3_temp_base_loc) - - mock_schema = Mock(name='schema') - mock_table = Mock(name='table') - mock_unload_plan = Mock(name='unload_plan') - mock_unload_plan.records_format = Mock(spec=DelimitedRecordsFormat) - mock_unload_plan.records_format.hints = { - 'compression': 'GZIP' - } - mock_directory = Mock(name='directory') - - with self.assertRaises(NotImplementedError): - unloader.unload(schema=mock_schema, - table=mock_table, - unload_plan=mock_unload_plan, - directory=mock_directory) - - def test_no_temp_bucket_known_supported_records_formats_for_unload(self): + def test_can_unload_to_scheme_s3_but_no_s3_export_false(self): mock_db = Mock(name='db') + mock_resultset = Mock(name='resultset') + mock_db.execute.return_value = mock_resultset + mock_resultset.fetchall.return_value = [] + mock_s3_temp_base_loc = None - unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=mock_s3_temp_base_loc) + vertica_unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=mock_s3_temp_base_loc) + self.assertFalse(vertica_unloader.can_unload_to_scheme('s3')) - out = unloader.known_supported_records_formats_for_unload() - self.assertEqual([], out) + mock_db.execute.\ + assert_called_with("SELECT lib_name from user_libraries where lib_name = 'awslib'") - def test_temporary_loadable_directory_load_with_no_s3_temp_bucket_configured(self): + def test_can_unload_to_scheme_s3_but_with_s3_export_true(self): mock_db = Mock(name='db') + mock_resultset = Mock(name='resultset') + mock_db.execute.return_value = mock_resultset + mock_resultset.fetchall.return_value = ['awslib'] + mock_s3_temp_base_loc = None vertica_unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=mock_s3_temp_base_loc) - with self.assertRaises(NoTemporaryBucketConfiguration): - with vertica_unloader.temporary_loadable_directory_loc(): - pass + self.assertTrue(vertica_unloader.can_unload_to_scheme('s3')) + + mock_db.execute.\ + assert_called_with("SELECT lib_name from user_libraries where lib_name = 'awslib'") + + def test_s3_temp_bucket_available_false(self): + mock_db = Mock(name='db') + vertica_unloader = VerticaUnloader(db=mock_db, s3_temp_base_loc=None) + self.assertFalse(vertica_unloader.s3_temp_bucket_available()) diff --git a/tests/unit/records/sources/test_fileobjs.py b/tests/unit/records/sources/test_fileobjs.py index 76fa5285f..d1ffc22bc 100644 --- a/tests/unit/records/sources/test_fileobjs.py +++ b/tests/unit/records/sources/test_fileobjs.py @@ -110,6 +110,17 @@ def test_can_move_to_this_format_yes(self): out = source.can_move_to_this_format(mock_records_format) self.assertTrue(out) + def test_can_move_to_schema_yes(self): + mock_records_format = Mock(name='records_format') + mock_records_schema = Mock(name='records_schema') + mock_records_schema = Mock(name='records_schema') + mock_target_names_to_input_fileobjs = Mock(name='target_names_to_input_fileobjs') + source = FileobjsSource(target_names_to_input_fileobjs=mock_target_names_to_input_fileobjs, + records_schema=mock_records_schema, + records_format=mock_records_format) + out = source.can_move_to_scheme(Mock()) + self.assertTrue(out) + def test_str(self): mock_records_format = 'mumble' mock_records_schema = Mock(name='records_schema') diff --git a/tests/unit/records/sources/test_table.py b/tests/unit/records/sources/test_table.py index 0aeca57c0..95922c87e 100644 --- a/tests/unit/records/sources/test_table.py +++ b/tests/unit/records/sources/test_table.py @@ -174,3 +174,8 @@ def test_with_cast_dataframe_types(self): mock_records_schema.cast_dataframe_types.return_value, mock_records_schema.cast_dataframe_types.return_value, ]) + + def test_can_move_to_scheme(self): + out = self.table_records_source.can_move_to_scheme(Mock()) + self.assertEqual(out, + self.mock_unloader.can_unload_to_scheme.return_value) diff --git a/tests/unit/records/targets/table/test_target.py b/tests/unit/records/targets/table/test_target.py index dbf692ba9..3f9f276b8 100644 --- a/tests/unit/records/targets/table/test_target.py +++ b/tests/unit/records/targets/table/test_target.py @@ -60,3 +60,20 @@ def test_can_move_from_format_no_loader(self): self.assertFalse(self.target.can_move_from_format(mock_source_records_format)) self.mock_db_driver.assert_called_with(self.mock_db_engine) + + def test_can_move_from_format_with_loader(self): + mock_driver = self.mock_db_driver.return_value + mock_loader = mock_driver.loader.return_value + mock_loader.has_temporary_loadable_directory_loc.return_value = True + self.assertTrue(self.target.can_move_from_temp_loc_after_filling_it()) + + self.mock_db_driver.assert_called_with(self.mock_db_engine) + mock_loader.has_temporary_loadable_directory_loc.assert_called_with() + + def test_temporary_loadable_directory_schemer(self): + mock_driver = self.mock_db_driver.return_value + mock_loader = mock_driver.loader.return_value + mock_loader.has_temporary_loadable_directory_loc.return_value = True + out = self.target.temporary_loadable_directory_scheme() + self.assertEqual(out, + mock_loader.temporary_loadable_directory_scheme.return_value)