From e09559081d47949994ca3c501dc35e3f4f7dad72 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 06:49:54 -0500 Subject: [PATCH 01/28] Add support for BigQuery bulk export (to Avro, for now) --- .../db/bigquery/bigquery_db_driver.py | 10 +- .../db/bigquery/load_job_config_options.py | 8 +- records_mover/db/bigquery/loader.py | 4 +- records_mover/db/bigquery/unloader.py | 112 ++++++++++++++++++ records_mover/db/unloader.py | 2 +- records_mover/records/records_format.py | 16 +++ records_mover/records/records_format_file.py | 6 +- records_mover/records/records_types.py | 2 +- .../google/cloud/bigquery/client/__init__.pyi | 15 +++ .../google/cloud/bigquery/job/__init__.pyi | 17 ++- 10 files changed, 184 insertions(+), 8 deletions(-) create mode 100644 records_mover/db/bigquery/unloader.py diff --git a/records_mover/db/bigquery/bigquery_db_driver.py b/records_mover/db/bigquery/bigquery_db_driver.py index c1f8178b7..06eccd5b4 100644 --- a/records_mover/db/bigquery/bigquery_db_driver.py +++ b/records_mover/db/bigquery/bigquery_db_driver.py @@ -8,7 +8,9 @@ from ...url.resolver import UrlResolver import sqlalchemy from .loader import BigQueryLoader +from .unloader import BigQueryUnloader from ..loader import LoaderFromFileobj, LoaderFromRecordsDirectory +from ..unloader import Unloader from ...url.base import BaseDirectoryUrl @@ -26,6 +28,10 @@ def __init__(self, BigQueryLoader(db=self.db, url_resolver=url_resolver, gcs_temp_base_loc=gcs_temp_base_loc) + self._bigquery_unloader =\ + BigQueryUnloader(db=self.db, + url_resolver=url_resolver, + gcs_temp_base_loc=gcs_temp_base_loc) def loader(self) -> Optional[LoaderFromRecordsDirectory]: return self._bigquery_loader @@ -33,8 +39,8 @@ def loader(self) -> Optional[LoaderFromRecordsDirectory]: def loader_from_fileobj(self) -> LoaderFromFileobj: return self._bigquery_loader - def unloader(self) -> None: - return None + def unloader(self) -> Unloader: + return self._bigquery_unloader def type_for_date_plus_time(self, has_tz: bool=False) -> sqlalchemy.sql.sqltypes.DateTime: # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types diff --git a/records_mover/db/bigquery/load_job_config_options.py b/records_mover/db/bigquery/load_job_config_options.py index 1d77b5915..2cd65c1bf 100644 --- a/records_mover/db/bigquery/load_job_config_options.py +++ b/records_mover/db/bigquery/load_job_config_options.py @@ -2,7 +2,9 @@ from ...records.delimited import cant_handle_hint from typing import Set from ...records.load_plan import RecordsLoadPlan -from ...records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat +from ...records.records_format import ( + DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat +) from records_mover.records.delimited import ValidatedRecordsHints from records_mover.mover_types import _assert_never from google.cloud.bigquery.job import CreateDisposition, WriteDisposition @@ -118,6 +120,10 @@ def load_job_config(unhandled_hints: Set[str], config.source_format = 'PARQUET' return config + if isinstance(load_plan.records_format, AvroRecordsFormat): + config.source_format = 'AVRO' + return config + raise NotImplementedError("Not currently able to load " f"{load_plan.records_format.format_type}") diff --git a/records_mover/db/bigquery/loader.py b/records_mover/db/bigquery/loader.py index 8d9666e3c..b6cb5d587 100644 --- a/records_mover/db/bigquery/loader.py +++ b/records_mover/db/bigquery/loader.py @@ -6,7 +6,7 @@ import sqlalchemy from ...records.load_plan import RecordsLoadPlan from ...records.records_format import ( - BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat + BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat ) from ...records.records_directory import RecordsDirectory from ...records.processing_instructions import ProcessingInstructions @@ -165,6 +165,8 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool processing_instructions=processing_instructions) if isinstance(load_plan.records_format, ParquetRecordsFormat): return True + if isinstance(load_plan.records_format, AvroRecordsFormat): + return True if not isinstance(load_plan.records_format, DelimitedRecordsFormat): return False unhandled_hints = set(load_plan.records_format.hints.keys()) diff --git a/records_mover/db/bigquery/unloader.py b/records_mover/db/bigquery/unloader.py new file mode 100644 index 000000000..a81c3f0d2 --- /dev/null +++ b/records_mover/db/bigquery/unloader.py @@ -0,0 +1,112 @@ +import sqlalchemy +import pprint +from contextlib import contextmanager +from typing import List, Iterator, Optional, Union, Tuple +import logging +from google.cloud.bigquery.dbapi.connection import Connection +from google.cloud.bigquery.client import Client +from google.cloud.bigquery.job import ExtractJobConfig +from records_mover.db.unloader import Unloader +from records_mover.records.records_format import BaseRecordsFormat, AvroRecordsFormat +from records_mover.url.base import BaseDirectoryUrl +from records_mover.url.resolver import UrlResolver +from records_mover.records.unload_plan import RecordsUnloadPlan +from records_mover.records.records_directory import RecordsDirectory +from records_mover.db.errors import NoTemporaryBucketConfiguration + +logger = logging.getLogger(__name__) + + +class BigQueryUnloader(Unloader): + def __init__(self, + db: Union[sqlalchemy.engine.Connection, sqlalchemy.engine.Engine], + url_resolver: UrlResolver, + gcs_temp_base_loc: Optional[BaseDirectoryUrl])\ + -> None: + self.db = db + self.url_resolver = url_resolver + self.gcs_temp_base_loc = gcs_temp_base_loc + super().__init__(db=db) + + def can_unload_format(self, target_records_format: BaseRecordsFormat) -> bool: + if isinstance(target_records_format, AvroRecordsFormat): + return True + return False + + def can_unload_to_scheme(self, scheme: str) -> bool: + return scheme == 'gs' + + def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]: + return [AvroRecordsFormat()] + + @contextmanager + def temporary_unloadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]: + if self.gcs_temp_base_loc is None: + raise NoTemporaryBucketConfiguration('Please provide a scratch GCS URL in your config ' + '(e.g., set SCRATCH_GCS_URL to a gs:// URL)') + else: + with self.gcs_temp_base_loc.temporary_directory() as temp_loc: + yield temp_loc + + def _parse_bigquery_schema_name(self, schema: str) -> Tuple[Optional[str], str]: + # https://github.com/mxmzdlv/pybigquery/blob/master/pybigquery/sqlalchemy_bigquery.py#L320 + dataset = None + project = None + + schema_split = schema.split('.') + if len(schema_split) == 1: + dataset, = schema_split + elif len(schema_split) == 2: + project, dataset = schema_split + else: + raise ValueError(f"Could not understand schema name {schema}") + + return (project, dataset) + + def _extract_job_config(self, unload_plan: RecordsUnloadPlan) -> ExtractJobConfig: + config = ExtractJobConfig() + if isinstance(unload_plan.records_format, AvroRecordsFormat): + config.destination_format = 'AVRO' + else: + raise NotImplementedError(f'Please add support for {unload_plan.records_format}') + return config + + def unload(self, + schema: str, + table: str, + unload_plan: RecordsUnloadPlan, + directory: RecordsDirectory) -> Optional[int]: + if directory.scheme != 'gs': + with self.temporary_unloadable_directory_loc() as temp_gcs_loc: + temp_directory = RecordsDirectory(temp_gcs_loc) + out = self.unload(schema=schema, + table=table, + unload_plan=unload_plan, + directory=temp_directory) + temp_directory.copy_to(directory.loc) + return out + logger.info("Loading from records directory into BigQuery") + # https://googleapis.github.io/google-cloud-python/latest/bigquery/usage/tables.html#creating-a-table + connection: Connection =\ + self.db.engine.raw_connection().connection + # https://google-cloud.readthedocs.io/en/latest/bigquery/generated/google.cloud.bigquery.client.Client.html + client: Client = connection._client + project_id, dataset_id = self._parse_bigquery_schema_name(schema) + job_config = self._extract_job_config(unload_plan) + + records_format = unload_plan.records_format + filename = records_format.generate_filename('output') + destination_uri = directory.loc.file_in_this_directory(filename) + job = client.extract_table(f"{schema}.{table}", + destination_uri.url, + # Must match the destination dataset location. + location="US", + job_config=job_config) + try: + job.result() # Waits for table load to complete. + except Exception: + logger.error(f"BigQuery load errors:\n\n{pprint.pformat(job)}\n") + raise + logger.info(f"Unloaded from {dataset_id}:{table} into {filename}") + directory.save_preliminary_manifest() + return None diff --git a/records_mover/db/unloader.py b/records_mover/db/unloader.py index 980b6146d..7f12b960e 100644 --- a/records_mover/db/unloader.py +++ b/records_mover/db/unloader.py @@ -34,7 +34,7 @@ def unload(self, def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]: """Supplies a list of the records formats which can be bulk exported from this database. This may not be the full set - see - can_unlaod_this_format() to test other possibilities. + can_unload_this_format() to test other possibilities. """ ... diff --git a/records_mover/records/records_format.py b/records_mover/records/records_format.py index 53c9b1aa6..e030e6edf 100644 --- a/records_mover/records/records_format.py +++ b/records_mover/records/records_format.py @@ -10,6 +10,22 @@ logger = logging.getLogger(__name__) +class AvroRecordsFormat(BaseRecordsFormat): + "Describes records files in `Avro `_ format" + + def __init__(self) -> None: + "Create a new instance of AvroRecordsFormat" + self.format_type = 'avro' + + def __str__(self) -> str: + return "AvroRecordsFormat" + + def __repr__(self) -> str: + return str(self) + + def generate_filename(self, basename: str) -> str: + return f"{basename}.avro" + class ParquetRecordsFormat(BaseRecordsFormat): "Describes records files in `Parquet `_ format" diff --git a/records_mover/records/records_format_file.py b/records_mover/records/records_format_file.py index 4dbe963f6..d5ee270f5 100644 --- a/records_mover/records/records_format_file.py +++ b/records_mover/records/records_format_file.py @@ -1,4 +1,6 @@ -from .records_format import BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat +from .records_format import ( + BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat +) from ..url.base import BaseDirectoryUrl, BaseFileUrl from .processing_instructions import ProcessingInstructions from .delimited import PartialRecordsHints @@ -22,6 +24,8 @@ def load_format(self, fail_if_dont_understand: bool) -> BaseRecordsFormat: return self.load_delimited_format(format_loc, fail_if_dont_understand) elif format_type == 'parquet': return ParquetRecordsFormat() + elif format_type == 'avro': + return AvroRecordsFormat() else: raise TypeError(f"Format type {format_type} not yet supported in this library") diff --git a/records_mover/records/records_types.py b/records_mover/records/records_types.py index 3226289ad..798ddf2ae 100644 --- a/records_mover/records/records_types.py +++ b/records_mover/records/records_types.py @@ -36,6 +36,6 @@ class UrlDetailsEntry(TypedDict): UrlDetails = Dict[Url, UrlDetailsEntry] -RecordsFormatType = Literal['delimited', 'parquet'] +RecordsFormatType = Literal['avro', 'delimited', 'parquet'] DelimitedVariant = Literal['dumb', 'csv', 'bigquery', 'bluelabs', 'vertica'] diff --git a/types/stubs/google/cloud/bigquery/client/__init__.pyi b/types/stubs/google/cloud/bigquery/client/__init__.pyi index 233ba5b22..3eafda458 100644 --- a/types/stubs/google/cloud/bigquery/client/__init__.pyi +++ b/types/stubs/google/cloud/bigquery/client/__init__.pyi @@ -39,3 +39,18 @@ class Client: retry: Optional[google.api_core.retry.Retry] = None, timeout: Optional[float] = None) -> google.cloud.bigquery.job.LoadJob: ... + + def extract_table(self, + source: Union[google.cloud.bigquery.table.Table, + google.cloud.bigquery.table.TableReference, + str], + destination_uris: Union[str, Sequence[str]], + job_id: Optional[str] = None, + job_id_prefix: Optional[str] = None, + location: Optional[str] = None, + project: Optional[str] = None, + job_config: Optional[google.cloud.bigquery.job.ExtractJobConfig] = None, + retry: Optional[google.api_core.retry.Retry] = None, + timeout: Optional[float] = None, + source_type: Optional[str] = None) -> google.cloud.bigquery.job.ExtractJob: + ... diff --git a/types/stubs/google/cloud/bigquery/job/__init__.pyi b/types/stubs/google/cloud/bigquery/job/__init__.pyi index d87890435..119f78b2a 100644 --- a/types/stubs/google/cloud/bigquery/job/__init__.pyi +++ b/types/stubs/google/cloud/bigquery/job/__init__.pyi @@ -1,6 +1,6 @@ # https://github.com/googleapis/google-cloud-python/blob/de73e45a7183a638113153d0faec105cfc437f0e/bigquery/google/cloud/bigquery/job.py -from typing import Union, List, Optional, Sequence, Mapping, Any +from typing import Union, List, Optional, Sequence, Mapping, Any, Literal, Dict import google.cloud.bigquery.job import google.cloud.bigquery.schema import google.cloud.bigquery.table @@ -47,6 +47,13 @@ class LoadJob: output_rows: Optional[int] +class ExtractJob: + def result(self, + retry: google.api_core.retry.Retry = DEFAULT_RETRY, + timeout: Optional[float] = None) -> _AsyncJob: + ... + + class LoadJobConfig: allow_jagged_rows: bool allow_quoted_newlines: bool @@ -75,3 +82,11 @@ class LoadJobConfig: def to_api_repr(self) -> str: ... + + +# https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.ExtractJobConfig.html#google.cloud.bigquery.job.ExtractJobConfig.compression +class ExtractJobConfig: + compression: Literal['GZIP', 'DEFLATE', 'SNAPPY', 'NONE'] + destination_format: Literal['CSV', 'NEWLINE_DELIMITED_JSON', 'AVRO'] + field_delimeter: str + labels: Dict[str, str] From 0fa5321f6fe9d8776b151cda6e786672563c1ec6 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 08:28:13 -0500 Subject: [PATCH 02/28] Improve test coverage --- records_mover/db/bigquery/loader.py | 2 - records_mover/db/bigquery/unloader.py | 7 +- .../unit/db/bigquery/test_bigquery_loader.py | 32 +++++- .../db/bigquery/test_bigquery_unloader.py | 101 ++++++++++++++++++ 4 files changed, 130 insertions(+), 12 deletions(-) create mode 100644 tests/unit/db/bigquery/test_bigquery_unloader.py diff --git a/records_mover/db/bigquery/loader.py b/records_mover/db/bigquery/loader.py index b6cb5d587..11424003c 100644 --- a/records_mover/db/bigquery/loader.py +++ b/records_mover/db/bigquery/loader.py @@ -87,8 +87,6 @@ def load_from_fileobj(self, schema: str, table: str, # https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html#google.cloud.bigquery.client.Client.load_table_from_file job = client.load_table_from_file(fileobj, f"{schema}.{table}", - # Must match the destination dataset location. - location="US", job_config=job_config) try: diff --git a/records_mover/db/bigquery/unloader.py b/records_mover/db/bigquery/unloader.py index a81c3f0d2..8041d3a1a 100644 --- a/records_mover/db/bigquery/unloader.py +++ b/records_mover/db/bigquery/unloader.py @@ -100,13 +100,8 @@ def unload(self, job = client.extract_table(f"{schema}.{table}", destination_uri.url, # Must match the destination dataset location. - location="US", job_config=job_config) - try: - job.result() # Waits for table load to complete. - except Exception: - logger.error(f"BigQuery load errors:\n\n{pprint.pformat(job)}\n") - raise + job.result() # Waits for table load to complete. logger.info(f"Unloaded from {dataset_id}:{table} into {filename}") directory.save_preliminary_manifest() return None diff --git a/tests/unit/db/bigquery/test_bigquery_loader.py b/tests/unit/db/bigquery/test_bigquery_loader.py index 8617d9791..5e6ba1691 100644 --- a/tests/unit/db/bigquery/test_bigquery_loader.py +++ b/tests/unit/db/bigquery/test_bigquery_loader.py @@ -1,7 +1,9 @@ import unittest from records_mover.db.bigquery.loader import BigQueryLoader -from records_mover.records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat +from records_mover.records.records_format import ( + DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat +) from records_mover.db.errors import NoTemporaryBucketConfiguration from mock import MagicMock, Mock from unittest.mock import patch @@ -156,7 +158,6 @@ def test_load_from_fileobj_true(self, mock_load_job_config): mock_client.load_table_from_file.\ assert_called_with(mock_fileobj, 'my_project.my_dataset.mytable', - location="US", job_config=mock_load_job_config.return_value) mock_job.result.assert_called_with() @@ -200,7 +201,6 @@ def test_load_with_fileobj_fallback(self, mock_load_job_config): mock_client.load_table_from_file.\ assert_called_with(mock_fileobj, 'my_project.my_dataset.mytable', - location="US", job_config=mock_load_job_config.return_value) mock_job.result.assert_called_with() @@ -215,7 +215,31 @@ def test_can_load_this_format_true_parquet(self, mock_load_job_config): mock_db = Mock(name='db') mock_source_records_format = Mock(name='source_records_format', spec=ParquetRecordsFormat) - mock_source_records_format.format_type = 'delimited' + mock_source_records_format.format_type = 'parquet' + mock_processing_instructions = mock_ProcessingInstructions.return_value + mock_load_plan = mock_RecordsLoadPlan.return_value + mock_load_plan.records_format = mock_source_records_format + mock_url_resolver = Mock(name='url_resolver') + mock_source_records_format.hints = {} + bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=None) + out = bigquery_loader.can_load_this_format(mock_source_records_format) + mock_ProcessingInstructions.assert_called_with() + mock_RecordsLoadPlan.\ + assert_called_with(records_format=mock_source_records_format, + processing_instructions=mock_processing_instructions) + self.assertTrue(out) + + @patch('records_mover.db.bigquery.loader.load_job_config') + @patch('records_mover.db.bigquery.loader.ProcessingInstructions') + @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') + def test_can_load_this_format_true_avro(self, + mock_RecordsLoadPlan, + mock_ProcessingInstructions, + mock_load_job_config): + mock_db = Mock(name='db') + mock_source_records_format = Mock(name='source_records_format', spec=AvroRecordsFormat) + mock_source_records_format.format_type = 'avro' mock_processing_instructions = mock_ProcessingInstructions.return_value mock_load_plan = mock_RecordsLoadPlan.return_value mock_load_plan.records_format = mock_source_records_format diff --git a/tests/unit/db/bigquery/test_bigquery_unloader.py b/tests/unit/db/bigquery/test_bigquery_unloader.py new file mode 100644 index 000000000..fb65e38ce --- /dev/null +++ b/tests/unit/db/bigquery/test_bigquery_unloader.py @@ -0,0 +1,101 @@ +import unittest + +from records_mover.db.bigquery.unloader import BigQueryUnloader +from records_mover.records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat +from records_mover.db.errors import NoTemporaryBucketConfiguration +from mock import MagicMock, Mock +from unittest.mock import patch, ANY + + +class TestBigQueryUnloader(unittest.TestCase): + def test_can_unload_format_avro_true(self): + mock_db = Mock(name='mock_db') + mock_url_resolver = MagicMock(name='mock_url_resolver') + mock_gcs_temp_base_loc = MagicMock(name='gcs_temp_base_loc') + big_query_unloader = BigQueryUnloader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=mock_gcs_temp_base_loc) + avro_format = AvroRecordsFormat() + self.assertTrue(big_query_unloader.can_unload_format(avro_format)) + + def test_can_unload_format_delimited_false(self): + mock_db = Mock(name='mock_db') + mock_url_resolver = MagicMock(name='mock_url_resolver') + mock_gcs_temp_base_loc = MagicMock(name='gcs_temp_base_loc') + big_query_unloader = BigQueryUnloader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=mock_gcs_temp_base_loc) + delimited_format = DelimitedRecordsFormat() + self.assertFalse(big_query_unloader.can_unload_format(delimited_format)) + + def test_can_unload_to_scheme_gs_true(self): + mock_db = Mock(name='mock_db') + mock_url_resolver = MagicMock(name='mock_url_resolver') + mock_gcs_temp_base_loc = MagicMock(name='gcs_temp_base_loc') + big_query_unloader = BigQueryUnloader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=mock_gcs_temp_base_loc) + self.assertTrue(big_query_unloader.can_unload_to_scheme('gs')) + + def test_can_unload_to_scheme_other_false(self): + mock_db = Mock(name='mock_db') + mock_url_resolver = MagicMock(name='mock_url_resolver') + mock_gcs_temp_base_loc = MagicMock(name='gcs_temp_base_loc') + big_query_unloader = BigQueryUnloader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=mock_gcs_temp_base_loc) + self.assertFalse(big_query_unloader.can_unload_to_scheme('blah')) + + def test_known_supported_records_formats_for_unload(self): + mock_db = Mock(name='mock_db') + mock_url_resolver = MagicMock(name='mock_url_resolver') + mock_gcs_temp_base_loc = MagicMock(name='gcs_temp_base_loc') + big_query_unloader = BigQueryUnloader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=mock_gcs_temp_base_loc) + self.assertEqual([type(format) + for format in + big_query_unloader.known_supported_records_formats_for_unload()], + [AvroRecordsFormat]) + + def test_temporary_unloadable_directory_loc_raises(self): + mock_db = Mock(name='mock_db') + mock_url_resolver = MagicMock(name='mock_url_resolver') + mock_gcs_temp_base_loc = None + big_query_unloader = BigQueryUnloader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=mock_gcs_temp_base_loc) + with self.assertRaises(NoTemporaryBucketConfiguration): + with big_query_unloader.temporary_unloadable_directory_loc(): + pass + + def test_temporary_unloadable_directory_loc(self): + mock_db = Mock(name='mock_db') + mock_url_resolver = MagicMock(name='mock_url_resolver') + mock_gcs_temp_base_loc = MagicMock(name='gcs_temp_base_loc') + big_query_unloader = BigQueryUnloader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=mock_gcs_temp_base_loc) + with big_query_unloader.temporary_unloadable_directory_loc() as temp_loc: + self.assertEqual(temp_loc, + mock_gcs_temp_base_loc.temporary_directory.return_value.__enter__. + return_value) + + def test_unload(self): + mock_db = Mock(name='mock_db') + mock_url_resolver = MagicMock(name='mock_url_resolver') + mock_gcs_temp_base_loc = MagicMock(name='gcs_temp_base_loc') + big_query_unloader = BigQueryUnloader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=mock_gcs_temp_base_loc) + mock_schema = 'myproject.mydataset' + mock_table = 'mytable' + mock_unload_plan = Mock(name='unload_plan') + mock_unload_plan.records_format = AvroRecordsFormat() + mock_directory = Mock(name='directory') + mock_directory.scheme = 'gs' + big_query_unloader.unload(schema=mock_schema, + table=mock_table, + unload_plan=mock_unload_plan, + directory=mock_directory) + mock_connection = mock_db.engine.raw_connection.return_value.connection + mock_client = mock_connection._client + mock_destination_uri = mock_directory.loc.file_in_this_directory.return_value + mock_url = mock_destination_uri.url + mock_directory.loc.file_in_this_directory.assert_called_with('output.avro') + mock_client.extract_table.assert_called_with('myproject.mydataset.mytable', + mock_url, + job_config=ANY) + mock_directory.save_preliminary_manifest.assert_called_with() From 28f48ab68ce754a986952726bdd62bbde6df6feb Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 08:33:38 -0500 Subject: [PATCH 03/28] Improve test coverage --- tests/unit/db/redshift/test_unloader.py | 30 ++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/tests/unit/db/redshift/test_unloader.py b/tests/unit/db/redshift/test_unloader.py index 73e56a7c2..f8940c74c 100644 --- a/tests/unit/db/redshift/test_unloader.py +++ b/tests/unit/db/redshift/test_unloader.py @@ -1,7 +1,8 @@ import unittest from records_mover.db.redshift.unloader import RedshiftUnloader +from records_mover.db.errors import NoTemporaryBucketConfiguration from records_mover.records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat -from mock import patch, Mock +from mock import patch, Mock, MagicMock class TestRedshiftUnloader(unittest.TestCase): @@ -78,3 +79,30 @@ def test_known_supported_records_formats_for_unload(self): self.assertEqual([f.__class__ for f in formats], [DelimitedRecordsFormat, ParquetRecordsFormat]) + + def test_temporary_unloadable_directory_loc(self): + mock_db = Mock(name='db') + mock_table = Mock(name='table') + mock_s3_temp_base_loc = MagicMock(name='s3_temp_base_loc') + + redshift_unloader =\ + RedshiftUnloader(db=mock_db, + table=mock_table, + s3_temp_base_loc=mock_s3_temp_base_loc) + with redshift_unloader.temporary_unloadable_directory_loc() as loc: + self.assertEqual(loc, + mock_s3_temp_base_loc.temporary_directory.return_value.__enter__. + return_value) + + def test_temporary_unloadable_directory_loc_unset(self): + mock_db = Mock(name='db') + mock_table = Mock(name='table') + mock_s3_temp_base_loc = None + + redshift_unloader =\ + RedshiftUnloader(db=mock_db, + table=mock_table, + s3_temp_base_loc=mock_s3_temp_base_loc) + with self.assertRaises(NoTemporaryBucketConfiguration): + with redshift_unloader.temporary_unloadable_directory_loc(): + pass From 5ebb19b8540aeb68b3737a5527a160c2c3c25a51 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 08:40:04 -0500 Subject: [PATCH 04/28] Improve test coverage --- .../unit/db/bigquery/test_bigquery_loader.py | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/unit/db/bigquery/test_bigquery_loader.py b/tests/unit/db/bigquery/test_bigquery_loader.py index 5e6ba1691..bbb0feb78 100644 --- a/tests/unit/db/bigquery/test_bigquery_loader.py +++ b/tests/unit/db/bigquery/test_bigquery_loader.py @@ -2,13 +2,18 @@ from records_mover.db.bigquery.loader import BigQueryLoader from records_mover.records.records_format import ( - DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat + DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat, + BaseRecordsFormat ) from records_mover.db.errors import NoTemporaryBucketConfiguration from mock import MagicMock, Mock from unittest.mock import patch +class NewRecordsFormat(BaseRecordsFormat): + ... + + class TestBigQueryLoader(unittest.TestCase): @patch('records_mover.db.bigquery.loader.load_job_config') def test_load_with_bad_schema_name(self, mock_load_job_config): @@ -254,6 +259,31 @@ def test_can_load_this_format_true_avro(self, processing_instructions=mock_processing_instructions) self.assertTrue(out) + @patch('records_mover.db.bigquery.loader.load_job_config') + @patch('records_mover.db.bigquery.loader.ProcessingInstructions') + @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') + def test_can_load_this_format_false_newformat(self, + mock_RecordsLoadPlan, + mock_ProcessingInstructions, + mock_load_job_config): + mock_db = Mock(name='db') + mock_source_records_format = Mock(name='source_records_format', spec=NewRecordsFormat) + mock_source_records_format.format_type = 'new' + mock_processing_instructions = mock_ProcessingInstructions.return_value + mock_load_plan = mock_RecordsLoadPlan.return_value + mock_load_plan.records_format = mock_source_records_format + mock_url_resolver = Mock(name='url_resolver') + mock_source_records_format.hints = {} + bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=None) + out = bigquery_loader.can_load_this_format(mock_source_records_format) + mock_ProcessingInstructions.assert_called_with() + mock_RecordsLoadPlan.\ + assert_called_with(records_format=mock_source_records_format, + processing_instructions=mock_processing_instructions) + self.assertFalse(out) + + def test_known_supported_records_formats_for_load(self): mock_db = Mock(name='db') mock_url_resolver = Mock(name='url_resolver') From de77caa83dbf9eccdb4a366188d81a22d3176f10 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 08:40:10 -0500 Subject: [PATCH 05/28] Improve test coverage --- .../records/sources/test_table_edge_cases.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 tests/unit/records/sources/test_table_edge_cases.py diff --git a/tests/unit/records/sources/test_table_edge_cases.py b/tests/unit/records/sources/test_table_edge_cases.py new file mode 100644 index 000000000..7e8109c57 --- /dev/null +++ b/tests/unit/records/sources/test_table_edge_cases.py @@ -0,0 +1,28 @@ +from records_mover.records.sources.table import TableRecordsSource +from mock import Mock, patch, ANY +import unittest +from records_mover.records.targets.base import MightSupportMoveFromTempLocAfterFillingIt + + +class TestTableRecordsSourceEdgeCases(unittest.TestCase): + @patch('records_mover.records.sources.dataframes.DataframesRecordsSource') + @patch('records_mover.records.sources.table.RecordsSchema') + @patch('records_mover.records.sources.table.quote_schema_and_table') + @patch('pandas.read_sql') + def test_init_no_unloader(self, + mock_read_sql, + mock_quote_schema_and_table, + mock_RecordsSchema, + mock_DataframesRecordsSource): + mock_schema_name = Mock(name='schema_name') + mock_table_name = Mock(name='table_name') + mock_driver = Mock(name='driver') + mock_unloader = None + mock_driver.unloader.return_value = mock_unloader + mock_url_resolver = Mock(name='url_resolver') + table_records_source =\ + TableRecordsSource(schema_name=mock_schema_name, + table_name=mock_table_name, + driver=mock_driver, + url_resolver=mock_url_resolver) + self.assertIsNone(table_records_source.records_format) From 09ceea5023c347335125ced748d1ff7fce9eb651 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 08:43:55 -0500 Subject: [PATCH 06/28] Improve test coverage --- .../unit/db/bigquery/test_bigquery_loader.py | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/tests/unit/db/bigquery/test_bigquery_loader.py b/tests/unit/db/bigquery/test_bigquery_loader.py index bbb0feb78..04f80c78c 100644 --- a/tests/unit/db/bigquery/test_bigquery_loader.py +++ b/tests/unit/db/bigquery/test_bigquery_loader.py @@ -132,6 +132,33 @@ def test_can_load_this_format_true(self, mock_load_job_config.assert_called_with(set(), mock_load_plan) self.assertEqual(True, out) + + @patch('records_mover.db.bigquery.loader.load_job_config') + @patch('records_mover.db.bigquery.loader.ProcessingInstructions') + @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') + def test_can_load_this_format_delimited_false(self, + mock_RecordsLoadPlan, + mock_ProcessingInstructions, + mock_load_job_config): + mock_db = Mock(name='db') + mock_source_records_format = Mock(name='source_records_format', spec=DelimitedRecordsFormat) + mock_source_records_format.format_type = 'delimited' + mock_processing_instructions = mock_ProcessingInstructions.return_value + mock_load_plan = mock_RecordsLoadPlan.return_value + mock_load_plan.records_format = mock_source_records_format + mock_url_resolver = Mock(name='url_resolver') + mock_load_job_config.side_effect = NotImplementedError + mock_source_records_format.hints = {} + bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=None) + out = bigquery_loader.can_load_this_format(mock_source_records_format) + mock_ProcessingInstructions.assert_called_with() + mock_RecordsLoadPlan.\ + assert_called_with(records_format=mock_source_records_format, + processing_instructions=mock_processing_instructions) + mock_load_job_config.assert_called_with(set(), mock_load_plan) + self.assertEqual(False, out) + @patch('records_mover.db.bigquery.loader.load_job_config') def test_load_from_fileobj_true(self, mock_load_job_config): mock_db = Mock(name='mock_db') @@ -283,7 +310,6 @@ def test_can_load_this_format_false_newformat(self, processing_instructions=mock_processing_instructions) self.assertFalse(out) - def test_known_supported_records_formats_for_load(self): mock_db = Mock(name='db') mock_url_resolver = Mock(name='url_resolver') From 2a721af5a09379c7521eb1be74841899a71ec512 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 08:46:02 -0500 Subject: [PATCH 07/28] Improve test coverage --- tests/unit/records/sources/test_table_edge_cases.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/records/sources/test_table_edge_cases.py b/tests/unit/records/sources/test_table_edge_cases.py index 7e8109c57..71e5905f0 100644 --- a/tests/unit/records/sources/test_table_edge_cases.py +++ b/tests/unit/records/sources/test_table_edge_cases.py @@ -26,3 +26,5 @@ def test_init_no_unloader(self, driver=mock_driver, url_resolver=mock_url_resolver) self.assertIsNone(table_records_source.records_format) + mock_target_records_format = Mock(name='target_records_format') + self.assertFalse(table_records_source.can_move_to_format(mock_target_records_format)) From f9f88ad74c255892bb7987f11ae3d5a03206fb0d Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 08:48:16 -0500 Subject: [PATCH 08/28] Improve test coverage --- tests/unit/db/redshift/test_unloader.py | 30 +++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/unit/db/redshift/test_unloader.py b/tests/unit/db/redshift/test_unloader.py index f8940c74c..8b81f6b97 100644 --- a/tests/unit/db/redshift/test_unloader.py +++ b/tests/unit/db/redshift/test_unloader.py @@ -35,6 +35,36 @@ def test_can_unload_format_true(self, mock_processing_instructions.fail_if_cant_handle_hint) self.assertEqual(True, out) + @patch('records_mover.db.redshift.unloader.redshift_unload_options') + @patch('records_mover.db.redshift.unloader.RecordsUnloadPlan') + def test_can_unload_format_delimite_false(self, + mock_RecordsUnloadPlan, + mock_redshift_unload_options): + mock_db = Mock(name='db') + mock_table = Mock(name='table') + + mock_target_records_format = Mock(name='target_records_format', spec=DelimitedRecordsFormat) + mock_unload_plan = mock_RecordsUnloadPlan.return_value + mock_unload_plan.records_format = mock_target_records_format + + mock_processing_instructions = mock_unload_plan.processing_instructions + mock_s3_temp_base_loc = Mock(name='s3_temp_base_loc') + mock_target_records_format.hints = {} + mock_redshift_unload_options.side_effect = NotImplementedError + + redshift_unloader =\ + RedshiftUnloader(db=mock_db, + table=mock_table, + s3_temp_base_loc=mock_s3_temp_base_loc) + out = redshift_unloader.can_unload_format(mock_target_records_format) + mock_RecordsUnloadPlan.\ + assert_called_with(records_format=mock_target_records_format) + mock_redshift_unload_options.\ + assert_called_with(set(), + mock_unload_plan.records_format, + mock_processing_instructions.fail_if_cant_handle_hint) + self.assertEqual(False, out) + def test_can_unload_to_scheme_s3_true(self): mock_db = Mock(name='db') mock_table = Mock(name='table') From 8b10df9cd9e5884240b781e8e9e0d9ac26bbe25d Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 08:49:18 -0500 Subject: [PATCH 09/28] Ratchet coverage --- metrics/coverage_high_water_mark | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/coverage_high_water_mark b/metrics/coverage_high_water_mark index 9c2857e0b..8fee528a7 100644 --- a/metrics/coverage_high_water_mark +++ b/metrics/coverage_high_water_mark @@ -1 +1 @@ -93.6200 +93.6400 From 5c04074ae28196356f3cc299eb31935bdbf1a60e Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 08:51:09 -0500 Subject: [PATCH 10/28] Ratchet --- metrics/mypy_high_water_mark | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/mypy_high_water_mark b/metrics/mypy_high_water_mark index d7c0ddd40..f3cebc182 100644 --- a/metrics/mypy_high_water_mark +++ b/metrics/mypy_high_water_mark @@ -1 +1 @@ -92.4400 +92.5200 \ No newline at end of file From c00afd2c72e86749e725cc705eaa6cf690c69daf Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 09:05:36 -0500 Subject: [PATCH 11/28] Flake8 fixes --- records_mover/db/bigquery/unloader.py | 1 - records_mover/records/records_directory.py | 2 +- records_mover/records/records_format.py | 1 + tests/unit/db/bigquery/test_bigquery_loader.py | 1 - tests/unit/db/bigquery/test_bigquery_unloader.py | 6 ++++-- tests/unit/records/sources/test_table_edge_cases.py | 3 +-- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/records_mover/db/bigquery/unloader.py b/records_mover/db/bigquery/unloader.py index 8041d3a1a..4bab6837d 100644 --- a/records_mover/db/bigquery/unloader.py +++ b/records_mover/db/bigquery/unloader.py @@ -1,5 +1,4 @@ import sqlalchemy -import pprint from contextlib import contextmanager from typing import List, Iterator, Optional, Union, Tuple import logging diff --git a/records_mover/records/records_directory.py b/records_mover/records/records_directory.py index 5638e5645..762c125fe 100644 --- a/records_mover/records/records_directory.py +++ b/records_mover/records/records_directory.py @@ -105,7 +105,7 @@ def save_preliminary_manifest(self, url_details: Optional[UrlDetails]=None) -> N manifest_loc = self.loc.file_in_this_directory('manifest') if url_details is None: - logger.warning(f"Building manifest by listing directory contents") + logger.warning("Building manifest by listing directory contents") url_details = { loc.url: { 'content_length': loc.size() diff --git a/records_mover/records/records_format.py b/records_mover/records/records_format.py index e030e6edf..42198c2e1 100644 --- a/records_mover/records/records_format.py +++ b/records_mover/records/records_format.py @@ -26,6 +26,7 @@ def __repr__(self) -> str: def generate_filename(self, basename: str) -> str: return f"{basename}.avro" + class ParquetRecordsFormat(BaseRecordsFormat): "Describes records files in `Parquet `_ format" diff --git a/tests/unit/db/bigquery/test_bigquery_loader.py b/tests/unit/db/bigquery/test_bigquery_loader.py index 04f80c78c..6b2cb0d62 100644 --- a/tests/unit/db/bigquery/test_bigquery_loader.py +++ b/tests/unit/db/bigquery/test_bigquery_loader.py @@ -132,7 +132,6 @@ def test_can_load_this_format_true(self, mock_load_job_config.assert_called_with(set(), mock_load_plan) self.assertEqual(True, out) - @patch('records_mover.db.bigquery.loader.load_job_config') @patch('records_mover.db.bigquery.loader.ProcessingInstructions') @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') diff --git a/tests/unit/db/bigquery/test_bigquery_unloader.py b/tests/unit/db/bigquery/test_bigquery_unloader.py index fb65e38ce..16ccd96bf 100644 --- a/tests/unit/db/bigquery/test_bigquery_unloader.py +++ b/tests/unit/db/bigquery/test_bigquery_unloader.py @@ -1,10 +1,12 @@ import unittest from records_mover.db.bigquery.unloader import BigQueryUnloader -from records_mover.records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat +from records_mover.records.records_format import ( + DelimitedRecordsFormat, AvroRecordsFormat +) from records_mover.db.errors import NoTemporaryBucketConfiguration from mock import MagicMock, Mock -from unittest.mock import patch, ANY +from unittest.mock import ANY class TestBigQueryUnloader(unittest.TestCase): diff --git a/tests/unit/records/sources/test_table_edge_cases.py b/tests/unit/records/sources/test_table_edge_cases.py index 71e5905f0..cbd628242 100644 --- a/tests/unit/records/sources/test_table_edge_cases.py +++ b/tests/unit/records/sources/test_table_edge_cases.py @@ -1,7 +1,6 @@ from records_mover.records.sources.table import TableRecordsSource -from mock import Mock, patch, ANY +from mock import Mock, patch import unittest -from records_mover.records.targets.base import MightSupportMoveFromTempLocAfterFillingIt class TestTableRecordsSourceEdgeCases(unittest.TestCase): From d5c8e84e0fa4c6c1042a17aacf5a64df05da39fe Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 09:05:41 -0500 Subject: [PATCH 12/28] Old Python fixes --- types/stubs/google/cloud/bigquery/job/__init__.pyi | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/types/stubs/google/cloud/bigquery/job/__init__.pyi b/types/stubs/google/cloud/bigquery/job/__init__.pyi index 119f78b2a..fb9f74740 100644 --- a/types/stubs/google/cloud/bigquery/job/__init__.pyi +++ b/types/stubs/google/cloud/bigquery/job/__init__.pyi @@ -1,6 +1,7 @@ # https://github.com/googleapis/google-cloud-python/blob/de73e45a7183a638113153d0faec105cfc437f0e/bigquery/google/cloud/bigquery/job.py -from typing import Union, List, Optional, Sequence, Mapping, Any, Literal, Dict +from typing import Union, List, Optional, Sequence, Mapping, Any, Dict +from typing_extensions import Literal import google.cloud.bigquery.job import google.cloud.bigquery.schema import google.cloud.bigquery.table From 35e7e29df0e19a4547b7b3ba0d70969c8a00c58e Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 09:09:59 -0500 Subject: [PATCH 13/28] Break up big file --- .../unit/db/bigquery/test_bigquery_loader.py | 126 ---------------- ...st_bigquery_loader_can_load_this_format.py | 138 ++++++++++++++++++ 2 files changed, 138 insertions(+), 126 deletions(-) create mode 100644 tests/unit/db/bigquery/test_bigquery_loader_can_load_this_format.py diff --git a/tests/unit/db/bigquery/test_bigquery_loader.py b/tests/unit/db/bigquery/test_bigquery_loader.py index 6b2cb0d62..28b91c596 100644 --- a/tests/unit/db/bigquery/test_bigquery_loader.py +++ b/tests/unit/db/bigquery/test_bigquery_loader.py @@ -10,10 +10,6 @@ from unittest.mock import patch -class NewRecordsFormat(BaseRecordsFormat): - ... - - class TestBigQueryLoader(unittest.TestCase): @patch('records_mover.db.bigquery.loader.load_job_config') def test_load_with_bad_schema_name(self, mock_load_job_config): @@ -107,56 +103,6 @@ def test_load(self, mock_load_job_config): self.assertEqual(out, mock_job.output_rows) - @patch('records_mover.db.bigquery.loader.load_job_config') - @patch('records_mover.db.bigquery.loader.ProcessingInstructions') - @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') - def test_can_load_this_format_true(self, - mock_RecordsLoadPlan, - mock_ProcessingInstructions, - mock_load_job_config): - mock_db = Mock(name='db') - mock_source_records_format = Mock(name='source_records_format', spec=DelimitedRecordsFormat) - mock_source_records_format.format_type = 'delimited' - mock_processing_instructions = mock_ProcessingInstructions.return_value - mock_load_plan = mock_RecordsLoadPlan.return_value - mock_load_plan.records_format = mock_source_records_format - mock_url_resolver = Mock(name='url_resolver') - mock_source_records_format.hints = {} - bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, - gcs_temp_base_loc=None) - out = bigquery_loader.can_load_this_format(mock_source_records_format) - mock_ProcessingInstructions.assert_called_with() - mock_RecordsLoadPlan.\ - assert_called_with(records_format=mock_source_records_format, - processing_instructions=mock_processing_instructions) - mock_load_job_config.assert_called_with(set(), mock_load_plan) - self.assertEqual(True, out) - - @patch('records_mover.db.bigquery.loader.load_job_config') - @patch('records_mover.db.bigquery.loader.ProcessingInstructions') - @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') - def test_can_load_this_format_delimited_false(self, - mock_RecordsLoadPlan, - mock_ProcessingInstructions, - mock_load_job_config): - mock_db = Mock(name='db') - mock_source_records_format = Mock(name='source_records_format', spec=DelimitedRecordsFormat) - mock_source_records_format.format_type = 'delimited' - mock_processing_instructions = mock_ProcessingInstructions.return_value - mock_load_plan = mock_RecordsLoadPlan.return_value - mock_load_plan.records_format = mock_source_records_format - mock_url_resolver = Mock(name='url_resolver') - mock_load_job_config.side_effect = NotImplementedError - mock_source_records_format.hints = {} - bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, - gcs_temp_base_loc=None) - out = bigquery_loader.can_load_this_format(mock_source_records_format) - mock_ProcessingInstructions.assert_called_with() - mock_RecordsLoadPlan.\ - assert_called_with(records_format=mock_source_records_format, - processing_instructions=mock_processing_instructions) - mock_load_job_config.assert_called_with(set(), mock_load_plan) - self.assertEqual(False, out) @patch('records_mover.db.bigquery.loader.load_job_config') def test_load_from_fileobj_true(self, mock_load_job_config): @@ -237,78 +183,6 @@ def test_load_with_fileobj_fallback(self, mock_load_job_config): self.assertEqual(out, mock_job.output_rows) - @patch('records_mover.db.bigquery.loader.load_job_config') - @patch('records_mover.db.bigquery.loader.ProcessingInstructions') - @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') - def test_can_load_this_format_true_parquet(self, - mock_RecordsLoadPlan, - mock_ProcessingInstructions, - mock_load_job_config): - mock_db = Mock(name='db') - mock_source_records_format = Mock(name='source_records_format', spec=ParquetRecordsFormat) - mock_source_records_format.format_type = 'parquet' - mock_processing_instructions = mock_ProcessingInstructions.return_value - mock_load_plan = mock_RecordsLoadPlan.return_value - mock_load_plan.records_format = mock_source_records_format - mock_url_resolver = Mock(name='url_resolver') - mock_source_records_format.hints = {} - bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, - gcs_temp_base_loc=None) - out = bigquery_loader.can_load_this_format(mock_source_records_format) - mock_ProcessingInstructions.assert_called_with() - mock_RecordsLoadPlan.\ - assert_called_with(records_format=mock_source_records_format, - processing_instructions=mock_processing_instructions) - self.assertTrue(out) - - @patch('records_mover.db.bigquery.loader.load_job_config') - @patch('records_mover.db.bigquery.loader.ProcessingInstructions') - @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') - def test_can_load_this_format_true_avro(self, - mock_RecordsLoadPlan, - mock_ProcessingInstructions, - mock_load_job_config): - mock_db = Mock(name='db') - mock_source_records_format = Mock(name='source_records_format', spec=AvroRecordsFormat) - mock_source_records_format.format_type = 'avro' - mock_processing_instructions = mock_ProcessingInstructions.return_value - mock_load_plan = mock_RecordsLoadPlan.return_value - mock_load_plan.records_format = mock_source_records_format - mock_url_resolver = Mock(name='url_resolver') - mock_source_records_format.hints = {} - bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, - gcs_temp_base_loc=None) - out = bigquery_loader.can_load_this_format(mock_source_records_format) - mock_ProcessingInstructions.assert_called_with() - mock_RecordsLoadPlan.\ - assert_called_with(records_format=mock_source_records_format, - processing_instructions=mock_processing_instructions) - self.assertTrue(out) - - @patch('records_mover.db.bigquery.loader.load_job_config') - @patch('records_mover.db.bigquery.loader.ProcessingInstructions') - @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') - def test_can_load_this_format_false_newformat(self, - mock_RecordsLoadPlan, - mock_ProcessingInstructions, - mock_load_job_config): - mock_db = Mock(name='db') - mock_source_records_format = Mock(name='source_records_format', spec=NewRecordsFormat) - mock_source_records_format.format_type = 'new' - mock_processing_instructions = mock_ProcessingInstructions.return_value - mock_load_plan = mock_RecordsLoadPlan.return_value - mock_load_plan.records_format = mock_source_records_format - mock_url_resolver = Mock(name='url_resolver') - mock_source_records_format.hints = {} - bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, - gcs_temp_base_loc=None) - out = bigquery_loader.can_load_this_format(mock_source_records_format) - mock_ProcessingInstructions.assert_called_with() - mock_RecordsLoadPlan.\ - assert_called_with(records_format=mock_source_records_format, - processing_instructions=mock_processing_instructions) - self.assertFalse(out) - def test_known_supported_records_formats_for_load(self): mock_db = Mock(name='db') mock_url_resolver = Mock(name='url_resolver') diff --git a/tests/unit/db/bigquery/test_bigquery_loader_can_load_this_format.py b/tests/unit/db/bigquery/test_bigquery_loader_can_load_this_format.py new file mode 100644 index 000000000..ced3eda19 --- /dev/null +++ b/tests/unit/db/bigquery/test_bigquery_loader_can_load_this_format.py @@ -0,0 +1,138 @@ +import unittest + +from records_mover.db.bigquery.loader import BigQueryLoader +from records_mover.records.records_format import ( + DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat, + BaseRecordsFormat +) +from mock import Mock +from unittest.mock import patch + + +class NewRecordsFormat(BaseRecordsFormat): + ... + + +class TestBigQueryLoaderCanLoadThisFormat(unittest.TestCase): + @patch('records_mover.db.bigquery.loader.load_job_config') + @patch('records_mover.db.bigquery.loader.ProcessingInstructions') + @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') + def test_can_load_this_format_true(self, + mock_RecordsLoadPlan, + mock_ProcessingInstructions, + mock_load_job_config): + mock_db = Mock(name='db') + mock_source_records_format = Mock(name='source_records_format', spec=DelimitedRecordsFormat) + mock_source_records_format.format_type = 'delimited' + mock_processing_instructions = mock_ProcessingInstructions.return_value + mock_load_plan = mock_RecordsLoadPlan.return_value + mock_load_plan.records_format = mock_source_records_format + mock_url_resolver = Mock(name='url_resolver') + mock_source_records_format.hints = {} + bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=None) + out = bigquery_loader.can_load_this_format(mock_source_records_format) + mock_ProcessingInstructions.assert_called_with() + mock_RecordsLoadPlan.\ + assert_called_with(records_format=mock_source_records_format, + processing_instructions=mock_processing_instructions) + mock_load_job_config.assert_called_with(set(), mock_load_plan) + self.assertEqual(True, out) + + @patch('records_mover.db.bigquery.loader.load_job_config') + @patch('records_mover.db.bigquery.loader.ProcessingInstructions') + @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') + def test_can_load_this_format_delimited_false(self, + mock_RecordsLoadPlan, + mock_ProcessingInstructions, + mock_load_job_config): + mock_db = Mock(name='db') + mock_source_records_format = Mock(name='source_records_format', spec=DelimitedRecordsFormat) + mock_source_records_format.format_type = 'delimited' + mock_processing_instructions = mock_ProcessingInstructions.return_value + mock_load_plan = mock_RecordsLoadPlan.return_value + mock_load_plan.records_format = mock_source_records_format + mock_url_resolver = Mock(name='url_resolver') + mock_load_job_config.side_effect = NotImplementedError + mock_source_records_format.hints = {} + bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=None) + out = bigquery_loader.can_load_this_format(mock_source_records_format) + mock_ProcessingInstructions.assert_called_with() + mock_RecordsLoadPlan.\ + assert_called_with(records_format=mock_source_records_format, + processing_instructions=mock_processing_instructions) + mock_load_job_config.assert_called_with(set(), mock_load_plan) + self.assertEqual(False, out) + + @patch('records_mover.db.bigquery.loader.load_job_config') + @patch('records_mover.db.bigquery.loader.ProcessingInstructions') + @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') + def test_can_load_this_format_true_avro(self, + mock_RecordsLoadPlan, + mock_ProcessingInstructions, + mock_load_job_config): + mock_db = Mock(name='db') + mock_source_records_format = Mock(name='source_records_format', spec=AvroRecordsFormat) + mock_source_records_format.format_type = 'avro' + mock_processing_instructions = mock_ProcessingInstructions.return_value + mock_load_plan = mock_RecordsLoadPlan.return_value + mock_load_plan.records_format = mock_source_records_format + mock_url_resolver = Mock(name='url_resolver') + mock_source_records_format.hints = {} + bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=None) + out = bigquery_loader.can_load_this_format(mock_source_records_format) + mock_ProcessingInstructions.assert_called_with() + mock_RecordsLoadPlan.\ + assert_called_with(records_format=mock_source_records_format, + processing_instructions=mock_processing_instructions) + self.assertTrue(out) + + @patch('records_mover.db.bigquery.loader.load_job_config') + @patch('records_mover.db.bigquery.loader.ProcessingInstructions') + @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') + def test_can_load_this_format_false_newformat(self, + mock_RecordsLoadPlan, + mock_ProcessingInstructions, + mock_load_job_config): + mock_db = Mock(name='db') + mock_source_records_format = Mock(name='source_records_format', spec=NewRecordsFormat) + mock_source_records_format.format_type = 'new' + mock_processing_instructions = mock_ProcessingInstructions.return_value + mock_load_plan = mock_RecordsLoadPlan.return_value + mock_load_plan.records_format = mock_source_records_format + mock_url_resolver = Mock(name='url_resolver') + mock_source_records_format.hints = {} + bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=None) + out = bigquery_loader.can_load_this_format(mock_source_records_format) + mock_ProcessingInstructions.assert_called_with() + mock_RecordsLoadPlan.\ + assert_called_with(records_format=mock_source_records_format, + processing_instructions=mock_processing_instructions) + self.assertFalse(out) + + @patch('records_mover.db.bigquery.loader.load_job_config') + @patch('records_mover.db.bigquery.loader.ProcessingInstructions') + @patch('records_mover.db.bigquery.loader.RecordsLoadPlan') + def test_can_load_this_format_true_parquet(self, + mock_RecordsLoadPlan, + mock_ProcessingInstructions, + mock_load_job_config): + mock_db = Mock(name='db') + mock_source_records_format = Mock(name='source_records_format', spec=ParquetRecordsFormat) + mock_source_records_format.format_type = 'parquet' + mock_processing_instructions = mock_ProcessingInstructions.return_value + mock_load_plan = mock_RecordsLoadPlan.return_value + mock_load_plan.records_format = mock_source_records_format + mock_url_resolver = Mock(name='url_resolver') + mock_source_records_format.hints = {} + bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, + gcs_temp_base_loc=None) + out = bigquery_loader.can_load_this_format(mock_source_records_format) + mock_ProcessingInstructions.assert_called_with() + mock_RecordsLoadPlan.\ + assert_called_with(records_format=mock_source_records_format, + processing_instructions=mock_processing_instructions) + self.assertTrue(out) From 99631b6607c4b21b68c0f56ab5b9a63d3b356f68 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 09:13:44 -0500 Subject: [PATCH 14/28] Fix known_supported_records_formats_for_load() --- records_mover/db/bigquery/loader.py | 4 +++- tests/unit/db/bigquery/test_bigquery_loader.py | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/records_mover/db/bigquery/loader.py b/records_mover/db/bigquery/loader.py index 11424003c..22d831409 100644 --- a/records_mover/db/bigquery/loader.py +++ b/records_mover/db/bigquery/loader.py @@ -176,4 +176,6 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool return False def known_supported_records_formats_for_load(self) -> List[BaseRecordsFormat]: - return [DelimitedRecordsFormat(variant='bigquery'), ParquetRecordsFormat()] + return [DelimitedRecordsFormat(variant='bigquery'), + ParquetRecordsFormat(), + AvroRecordsFormat()] diff --git a/tests/unit/db/bigquery/test_bigquery_loader.py b/tests/unit/db/bigquery/test_bigquery_loader.py index 28b91c596..6f1bbca23 100644 --- a/tests/unit/db/bigquery/test_bigquery_loader.py +++ b/tests/unit/db/bigquery/test_bigquery_loader.py @@ -2,8 +2,7 @@ from records_mover.db.bigquery.loader import BigQueryLoader from records_mover.records.records_format import ( - DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat, - BaseRecordsFormat + DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat ) from records_mover.db.errors import NoTemporaryBucketConfiguration from mock import MagicMock, Mock @@ -103,7 +102,6 @@ def test_load(self, mock_load_job_config): self.assertEqual(out, mock_job.output_rows) - @patch('records_mover.db.bigquery.loader.load_job_config') def test_load_from_fileobj_true(self, mock_load_job_config): mock_db = Mock(name='mock_db') @@ -189,12 +187,14 @@ def test_known_supported_records_formats_for_load(self): bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver, gcs_temp_base_loc=None) out = bigquery_loader.known_supported_records_formats_for_load() - self.assertEqual(2, len(out)) + self.assertEqual(3, len(out)) delimited_records_format = out[0] self.assertEqual(type(delimited_records_format), DelimitedRecordsFormat) self.assertEqual('bigquery', delimited_records_format.variant) parquet_records_format = out[1] self.assertEqual(type(parquet_records_format), ParquetRecordsFormat) + avro_records_format = out[2] + self.assertEqual(type(avro_records_format), AvroRecordsFormat) def test_temporary_gcs_directory_loc_none(self): mock_db = Mock(name='db') From ba7d15bed8f3f81c8cd657027f65eeeaea8c34c5 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 09:43:18 -0500 Subject: [PATCH 15/28] Try with config.use_avro_logical_types --- records_mover/db/bigquery/unloader.py | 1 + types/stubs/google/cloud/bigquery/job/__init__.pyi | 1 + 2 files changed, 2 insertions(+) diff --git a/records_mover/db/bigquery/unloader.py b/records_mover/db/bigquery/unloader.py index 4bab6837d..ca72ec665 100644 --- a/records_mover/db/bigquery/unloader.py +++ b/records_mover/db/bigquery/unloader.py @@ -66,6 +66,7 @@ def _extract_job_config(self, unload_plan: RecordsUnloadPlan) -> ExtractJobConfi config = ExtractJobConfig() if isinstance(unload_plan.records_format, AvroRecordsFormat): config.destination_format = 'AVRO' + config.use_avro_logical_types = True else: raise NotImplementedError(f'Please add support for {unload_plan.records_format}') return config diff --git a/types/stubs/google/cloud/bigquery/job/__init__.pyi b/types/stubs/google/cloud/bigquery/job/__init__.pyi index fb9f74740..e036584e6 100644 --- a/types/stubs/google/cloud/bigquery/job/__init__.pyi +++ b/types/stubs/google/cloud/bigquery/job/__init__.pyi @@ -89,5 +89,6 @@ class LoadJobConfig: class ExtractJobConfig: compression: Literal['GZIP', 'DEFLATE', 'SNAPPY', 'NONE'] destination_format: Literal['CSV', 'NEWLINE_DELIMITED_JSON', 'AVRO'] + use_avro_logical_types: bool field_delimeter: str labels: Dict[str, str] From 2dcfda0339cadf153bd800c46db704c60f156293 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 09:53:58 -0500 Subject: [PATCH 16/28] Set config.use_avro_logical_types on import as well --- records_mover/db/bigquery/load_job_config_options.py | 1 + 1 file changed, 1 insertion(+) diff --git a/records_mover/db/bigquery/load_job_config_options.py b/records_mover/db/bigquery/load_job_config_options.py index 2cd65c1bf..8fad719ad 100644 --- a/records_mover/db/bigquery/load_job_config_options.py +++ b/records_mover/db/bigquery/load_job_config_options.py @@ -122,6 +122,7 @@ def load_job_config(unhandled_hints: Set[str], if isinstance(load_plan.records_format, AvroRecordsFormat): config.source_format = 'AVRO' + config.use_avro_logical_types = True return config raise NotImplementedError("Not currently able to load " From 8898433e58b4037f7797456a94e40a3d3ec13794 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 10:16:26 -0500 Subject: [PATCH 17/28] Deal with Avro limitations --- .../db/bigquery/bigquery_db_driver.py | 13 ++++++++- .../db/bigquery/load_job_config_options.py | 1 + records_mover/db/bigquery/unloader.py | 1 + .../records/schema/field/__init__.py | 27 +++++++++++++++++-- .../records/schema/schema/__init__.py | 5 ++++ .../records/expected_column_types.py | 18 +++++++++++++ 6 files changed, 62 insertions(+), 3 deletions(-) diff --git a/records_mover/db/bigquery/bigquery_db_driver.py b/records_mover/db/bigquery/bigquery_db_driver.py index 06eccd5b4..03893a87c 100644 --- a/records_mover/db/bigquery/bigquery_db_driver.py +++ b/records_mover/db/bigquery/bigquery_db_driver.py @@ -1,7 +1,7 @@ from ..driver import DBDriver import logging from ...records import RecordsSchema -from ...records.records_format import BaseRecordsFormat, ParquetRecordsFormat +from ...records.records_format import BaseRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat from ...utils.limits import INT64_MAX, INT64_MIN, FLOAT64_SIGNIFICAND_BITS, num_digits import re from typing import Union, Optional, Tuple @@ -116,6 +116,17 @@ def tweak_records_schema_for_load(self, # # So we need to make sure we don't create any DATETIME # columns if we're loading from a Parquet file. + # + # https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet return records_schema.convert_datetimes_to_datetimetz() + elif isinstance(records_format, AvroRecordsFormat): + # https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types + # + # "Note: There is no logical type that directly + # corresponds to DATETIME, and BigQuery currently doesn't + # support any direct conversion from an Avro type into a + # DATETIME field." + # + return records_schema.convert_datetimes_to_string() else: return records_schema diff --git a/records_mover/db/bigquery/load_job_config_options.py b/records_mover/db/bigquery/load_job_config_options.py index 8fad719ad..89b5bc2e9 100644 --- a/records_mover/db/bigquery/load_job_config_options.py +++ b/records_mover/db/bigquery/load_job_config_options.py @@ -122,6 +122,7 @@ def load_job_config(unhandled_hints: Set[str], if isinstance(load_plan.records_format, AvroRecordsFormat): config.source_format = 'AVRO' + # https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types config.use_avro_logical_types = True return config diff --git a/records_mover/db/bigquery/unloader.py b/records_mover/db/bigquery/unloader.py index ca72ec665..155e3249a 100644 --- a/records_mover/db/bigquery/unloader.py +++ b/records_mover/db/bigquery/unloader.py @@ -66,6 +66,7 @@ def _extract_job_config(self, unload_plan: RecordsUnloadPlan) -> ExtractJobConfi config = ExtractJobConfig() if isinstance(unload_plan.records_format, AvroRecordsFormat): config.destination_format = 'AVRO' + # https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types config.use_avro_logical_types = True else: raise NotImplementedError(f'Please add support for {unload_plan.records_format}') diff --git a/records_mover/records/schema/field/__init__.py b/records_mover/records/schema/field/__init__.py index 297cd18d6..c555f548b 100644 --- a/records_mover/records/schema/field/__init__.py +++ b/records_mover/records/schema/field/__init__.py @@ -16,6 +16,7 @@ FLOAT80_SIGNIFICAND_BITS) from .representation import RecordsSchemaFieldRepresentation from .constraints import (RecordsSchemaFieldConstraints, + RecordsSchemaFieldStringConstraints, RecordsSchemaFieldIntegerConstraints, RecordsSchemaFieldDecimalConstraints) from .statistics import RecordsSchemaFieldStatistics @@ -315,13 +316,35 @@ def from_data(name: str, data: 'FieldDict') -> 'RecordsSchemaField': def convert_datetime_to_datetimetz(self) -> 'RecordsSchemaField': field_type = self.field_type + constraints = self.constraints + statistics = self.statistics if field_type == 'datetime': field_type = 'datetimetz' + if constraints is not None: + constraints = constraints.cast('string') + if statistics is not None: + statistics = statistics.cast('string') return RecordsSchemaField(name=self.name, field_type=field_type, - constraints=self.constraints, - statistics=self.statistics, + constraints=constraints, + statistics=statistics, + representations=self.representations) + + def convert_datetime_to_string(self) -> 'RecordsSchemaField': + field_type = self.field_type + constraints = self.constraints + statistics = self.statistics + if field_type == 'datetime': + field_type = 'string' + if constraints is not None: + constraints = constraints.cast('string') + if statistics is not None: + statistics = statistics.cast('string') + return RecordsSchemaField(name=self.name, + field_type=field_type, + constraints=constraints, + statistics=statistics, representations=self.representations) def cast(self, field_type: 'FieldType') -> 'RecordsSchemaField': diff --git a/records_mover/records/schema/schema/__init__.py b/records_mover/records/schema/schema/__init__.py index 47989340d..521c89c23 100644 --- a/records_mover/records/schema/schema/__init__.py +++ b/records_mover/records/schema/schema/__init__.py @@ -251,3 +251,8 @@ def convert_datetimes_to_datetimetz(self) -> 'RecordsSchema': return RecordsSchema(fields=[field.convert_datetime_to_datetimetz() for field in self.fields], known_representations=self.known_representations) + + def convert_datetimes_to_string(self) -> 'RecordsSchema': + return RecordsSchema(fields=[field.convert_datetime_to_string() + for field in self.fields], + known_representations=self.known_representations) diff --git a/tests/integration/records/expected_column_types.py b/tests/integration/records/expected_column_types.py index 520f26643..eae7dc268 100644 --- a/tests/integration/records/expected_column_types.py +++ b/tests/integration/records/expected_column_types.py @@ -102,6 +102,15 @@ # columns if we're loading from a Parquet file. # # + # bigquery2bigquery: + # + # Avro is used in this copy. According to Google: + # + # "Note: There is no logical type that directly corresponds to + # DATETIME, and BigQuery currently doesn't support any direct + # conversion from an Avro type into a DATETIME field." + # + # https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro # ('postgresql', 'postgresql'): [ 'INTEGER', 'VARCHAR(256)', 'VARCHAR(256)', 'VARCHAR(256)', 'VARCHAR(256)', @@ -128,6 +137,15 @@ 'VARCHAR(256)', 'VARCHAR(256)', 'DATE', 'VARCHAR(8)', 'TIMESTAMP WITHOUT TIME ZONE', 'TIMESTAMP WITH TIME ZONE' ], + ('bigquery', 'bigquery'): [ + "", "", + "", "", + "", "", + "", "", + "", + "", + "" + ], ('bigquery', 'postgresql'): [ 'BIGINT', 'VARCHAR(256)', 'VARCHAR(256)', 'VARCHAR(256)', 'VARCHAR(256)', 'VARCHAR(256)', 'VARCHAR(256)', 'DATE', 'TIME WITHOUT TIME ZONE', From 16b041c5680e22cb418829df29d80909329ca5ab Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 10:30:45 -0500 Subject: [PATCH 18/28] Improve coverage --- tests/unit/records/schema/field/test_field.py | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/tests/unit/records/schema/field/test_field.py b/tests/unit/records/schema/field/test_field.py index 690ee4d59..5fa494d7f 100644 --- a/tests/unit/records/schema/field/test_field.py +++ b/tests/unit/records/schema/field/test_field.py @@ -259,3 +259,56 @@ def test_convert_datetime_to_datetimetz_not_datetime(self): self.assertEqual(out.field_type, mock_field_type) self.assertEqual(out.constraints, mock_constraints) self.assertEqual(out.representations, mock_representations) + + def test_convert_datetime_to_datetimetz_datetime(self): + mock_name = Mock(name='name') + mock_field_type = 'datetime' + mock_constraints = Mock(name='constraints') + mock_statistics = Mock(name='statistics') + mock_representations = Mock(name='representations') + field = RecordsSchemaField(name=mock_name, + field_type=mock_field_type, + constraints=mock_constraints, + statistics=mock_statistics, + representations=mock_representations) + out = field.convert_datetime_to_datetimetz() + self.assertEqual(out.name, mock_name) + self.assertEqual(out.field_type, 'datetimetz') + self.assertEqual(out.constraints, mock_constraints.cast.return_value) + self.assertEqual(out.statistics, mock_statistics.cast.return_value) + self.assertEqual(out.representations, mock_representations) + + def test_convert_datetime_to_string_not_datetime(self): + mock_name = Mock(name='name') + mock_field_type = 'time' + mock_constraints = Mock(name='constraints') + mock_statistics = Mock(name='statistics') + mock_representations = Mock(name='representations') + field = RecordsSchemaField(name=mock_name, + field_type=mock_field_type, + constraints=mock_constraints, + statistics=mock_statistics, + representations=mock_representations) + out = field.convert_datetime_to_string() + self.assertEqual(out.name, mock_name) + self.assertEqual(out.field_type, mock_field_type) + self.assertEqual(out.constraints, mock_constraints) + self.assertEqual(out.representations, mock_representations) + + def test_convert_datetime_to_string_datetime(self): + mock_name = Mock(name='name') + mock_field_type = 'datetime' + mock_constraints = Mock(name='constraints') + mock_statistics = Mock(name='statistics') + mock_representations = Mock(name='representations') + field = RecordsSchemaField(name=mock_name, + field_type=mock_field_type, + constraints=mock_constraints, + statistics=mock_statistics, + representations=mock_representations) + out = field.convert_datetime_to_string() + self.assertEqual(out.name, mock_name) + self.assertEqual(out.field_type, 'string') + self.assertEqual(out.constraints, mock_constraints.cast.return_value) + self.assertEqual(out.statistics, mock_statistics.cast.return_value) + self.assertEqual(out.representations, mock_representations) From 059b2a7781f289066a60b961eb963fd5b0d28872 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 10:32:22 -0500 Subject: [PATCH 19/28] Unratchet --- metrics/coverage_high_water_mark | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/coverage_high_water_mark b/metrics/coverage_high_water_mark index 8fee528a7..9c2857e0b 100644 --- a/metrics/coverage_high_water_mark +++ b/metrics/coverage_high_water_mark @@ -1 +1 @@ -93.6400 +93.6200 From cba3e2e076633463335acaad0a883d5867de96fc Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 10:35:03 -0500 Subject: [PATCH 20/28] Ratchet --- metrics/mypy_high_water_mark | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/mypy_high_water_mark b/metrics/mypy_high_water_mark index f3cebc182..777cfc7ea 100644 --- a/metrics/mypy_high_water_mark +++ b/metrics/mypy_high_water_mark @@ -1 +1 @@ -92.5200 \ No newline at end of file +92.5400 \ No newline at end of file From f69b8237dcf41410cc4d286d252a467df67ed669 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 10:43:08 -0500 Subject: [PATCH 21/28] Bump bigfiles 389: tests/integration/***** 373: setup.py 364: records_mover/records/schema/field/__init__.py Reduce total number of bigfiles violations to 1103 or below! --- metrics/bigfiles_high_water_mark | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/bigfiles_high_water_mark b/metrics/bigfiles_high_water_mark index 21a801b48..64975abef 100644 --- a/metrics/bigfiles_high_water_mark +++ b/metrics/bigfiles_high_water_mark @@ -1 +1 @@ -1103 +1126 From 03f1e28abc6fc5d437e893e857607aa868360a7f Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 10:45:15 -0500 Subject: [PATCH 22/28] Drop unneeded import --- records_mover/records/schema/field/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/records_mover/records/schema/field/__init__.py b/records_mover/records/schema/field/__init__.py index c555f548b..b1e777da1 100644 --- a/records_mover/records/schema/field/__init__.py +++ b/records_mover/records/schema/field/__init__.py @@ -16,7 +16,6 @@ FLOAT80_SIGNIFICAND_BITS) from .representation import RecordsSchemaFieldRepresentation from .constraints import (RecordsSchemaFieldConstraints, - RecordsSchemaFieldStringConstraints, RecordsSchemaFieldIntegerConstraints, RecordsSchemaFieldDecimalConstraints) from .statistics import RecordsSchemaFieldStatistics From ebbf53f5280c98afd69fe78e210247feb9fc1268 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 10:46:36 -0500 Subject: [PATCH 23/28] Ratchet --- metrics/bigfiles_high_water_mark | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/bigfiles_high_water_mark b/metrics/bigfiles_high_water_mark index 64975abef..706146535 100644 --- a/metrics/bigfiles_high_water_mark +++ b/metrics/bigfiles_high_water_mark @@ -1 +1 @@ -1126 +1125 From 0179a71f942a4f86238581590887c20c302af000 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 11:03:31 -0500 Subject: [PATCH 24/28] Reformat array literal --- records_mover/db/bigquery/loader.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/records_mover/db/bigquery/loader.py b/records_mover/db/bigquery/loader.py index 22d831409..ab271aae3 100644 --- a/records_mover/db/bigquery/loader.py +++ b/records_mover/db/bigquery/loader.py @@ -176,6 +176,8 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool return False def known_supported_records_formats_for_load(self) -> List[BaseRecordsFormat]: - return [DelimitedRecordsFormat(variant='bigquery'), - ParquetRecordsFormat(), - AvroRecordsFormat()] + return [ + DelimitedRecordsFormat(variant='bigquery'), + ParquetRecordsFormat(), + AvroRecordsFormat() + ] From 63114b578a33091c6a7968196f28f4b7d64c35d5 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 11:21:55 -0500 Subject: [PATCH 25/28] Move schema adjustment logic --- records_mover/db/bigquery/bigquery_db_driver.py | 10 +++++++++- records_mover/db/driver.py | 5 +++++ records_mover/records/sources/table.py | 2 ++ tests/unit/records/sources/test_table.py | 3 +-- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/records_mover/db/bigquery/bigquery_db_driver.py b/records_mover/db/bigquery/bigquery_db_driver.py index 03893a87c..4b51d4f01 100644 --- a/records_mover/db/bigquery/bigquery_db_driver.py +++ b/records_mover/db/bigquery/bigquery_db_driver.py @@ -119,7 +119,14 @@ def tweak_records_schema_for_load(self, # # https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet return records_schema.convert_datetimes_to_datetimetz() - elif isinstance(records_format, AvroRecordsFormat): + + else: + return records_schema + + def tweak_records_schema_after_unload(self, + records_schema: RecordsSchema, + records_format: BaseRecordsFormat) -> RecordsSchema: + if isinstance(records_format, AvroRecordsFormat): # https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types # # "Note: There is no logical type that directly @@ -127,6 +134,7 @@ def tweak_records_schema_for_load(self, # support any direct conversion from an Avro type into a # DATETIME field." # + # BigQuery exports this as an Avro string type return records_schema.convert_datetimes_to_string() else: return records_schema diff --git a/records_mover/db/driver.py b/records_mover/db/driver.py index 0a2ee2710..7942a0fc7 100644 --- a/records_mover/db/driver.py +++ b/records_mover/db/driver.py @@ -188,6 +188,11 @@ def tweak_records_schema_for_load(self, records_format: BaseRecordsFormat) -> RecordsSchema: return records_schema + def tweak_records_schema_after_unload(self, + records_schema: RecordsSchema, + records_format: BaseRecordsFormat) -> RecordsSchema: + return records_schema + class GenericDBDriver(DBDriver): def loader_from_fileobj(self) -> None: diff --git a/records_mover/records/sources/table.py b/records_mover/records/sources/table.py index f65da93b5..a279fb78a 100644 --- a/records_mover/records/sources/table.py +++ b/records_mover/records/sources/table.py @@ -124,6 +124,8 @@ def move_to_records_directory(self, directory=records_directory) records_schema = self.pull_records_schema() records_directory.save_format(unload_plan.records_format) + records_schema = self.driver.tweak_records_schema_after_unload(records_schema, + unload_plan.records_format) records_directory.save_schema(records_schema) records_directory.finalize_manifest() diff --git a/tests/unit/records/sources/test_table.py b/tests/unit/records/sources/test_table.py index 6dbf4429a..5bd5c5c21 100644 --- a/tests/unit/records/sources/test_table.py +++ b/tests/unit/records/sources/test_table.py @@ -74,9 +74,8 @@ def test_move_to_records_directory(self, unload_plan=mock_unload_plan, directory=mock_records_directory) mock_export_count = self.mock_unloader.unload.return_value - mock_records_schema = mock_RecordsSchema.from_db_table.return_value mock_records_directory.save_schema.\ - assert_called_with(mock_records_schema) + assert_called_with(self.mock_driver.tweak_records_schema_after_unload.return_value) mock_records_directory.save_format.assert_called_with(mock_unload_plan.records_format) mock_records_directory.finalize_manifest.assert_called_with() mock_MoveResult.assert_called_with(move_count=mock_export_count, From 1568ada9c77b6d866c0447b5bdfbe3432df76334 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 11:30:43 -0500 Subject: [PATCH 26/28] Improve coverage --- tests/unit/records/sources/test_table.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/unit/records/sources/test_table.py b/tests/unit/records/sources/test_table.py index 5bd5c5c21..d9b9576d6 100644 --- a/tests/unit/records/sources/test_table.py +++ b/tests/unit/records/sources/test_table.py @@ -1,5 +1,5 @@ from records_mover.records.sources.table import TableRecordsSource -from mock import Mock, patch, ANY +from mock import MagicMock, Mock, patch, ANY import unittest from records_mover.records.targets.base import MightSupportMoveFromTempLocAfterFillingIt @@ -8,7 +8,7 @@ class TestTableRecordsSource(unittest.TestCase): def setUp(self): self.mock_schema_name = Mock(name='schema_name') self.mock_table_name = Mock(name='table_name') - self.mock_driver = Mock(name='driver') + self.mock_driver = MagicMock(name='driver') self.mock_loader = self.mock_driver.loader.return_value self.mock_unloader = self.mock_driver.unloader.return_value self.mock_db_driver = Mock(name='db_driver') @@ -178,3 +178,9 @@ def test_can_move_to_scheme(self): out = self.table_records_source.can_move_to_scheme(Mock()) self.assertEqual(out, self.mock_unloader.can_unload_to_scheme.return_value) + + def test_temporary_unloadable_directory_loc_None(self): + with self.table_records_source.temporary_unloadable_directory_loc() as loc: + self.assertEqual(loc, + self.mock_unloader.temporary_unloadable_directory_loc. + return_value.__enter__.return_value) From 48a7fdbd9202dd28724e0e0cd223893cef0953d5 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 11:33:36 -0500 Subject: [PATCH 27/28] Ratchet coverage --- metrics/coverage_high_water_mark | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/coverage_high_water_mark b/metrics/coverage_high_water_mark index 9c2857e0b..396e92062 100644 --- a/metrics/coverage_high_water_mark +++ b/metrics/coverage_high_water_mark @@ -1 +1 @@ -93.6200 +93.6500 From e0aab23278840b9034a781e08be3e9ea32d94992 Mon Sep 17 00:00:00 2001 From: Vince Broz Date: Tue, 1 Dec 2020 11:37:13 -0500 Subject: [PATCH 28/28] Ratchet --- metrics/mypy_high_water_mark | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/mypy_high_water_mark b/metrics/mypy_high_water_mark index 777cfc7ea..0292e1184 100644 --- a/metrics/mypy_high_water_mark +++ b/metrics/mypy_high_water_mark @@ -1 +1 @@ -92.5400 \ No newline at end of file +92.5500 \ No newline at end of file