Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for BigQuery bulk export (to Avro, for now) #136

Merged
merged 28 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e095590
Add support for BigQuery bulk export (to Avro, for now)
vinceatbluelabs Dec 1, 2020
0fa5321
Improve test coverage
vinceatbluelabs Dec 1, 2020
28f48ab
Improve test coverage
vinceatbluelabs Dec 1, 2020
5ebb19b
Improve test coverage
vinceatbluelabs Dec 1, 2020
de77caa
Improve test coverage
vinceatbluelabs Dec 1, 2020
09ceea5
Improve test coverage
vinceatbluelabs Dec 1, 2020
2a721af
Improve test coverage
vinceatbluelabs Dec 1, 2020
f9f88ad
Improve test coverage
vinceatbluelabs Dec 1, 2020
8b10df9
Ratchet coverage
vinceatbluelabs Dec 1, 2020
5c04074
Ratchet
vinceatbluelabs Dec 1, 2020
c00afd2
Flake8 fixes
vinceatbluelabs Dec 1, 2020
d5c8e84
Old Python fixes
vinceatbluelabs Dec 1, 2020
35e7e29
Break up big file
vinceatbluelabs Dec 1, 2020
99631b6
Fix known_supported_records_formats_for_load()
vinceatbluelabs Dec 1, 2020
ba7d15b
Try with config.use_avro_logical_types
vinceatbluelabs Dec 1, 2020
2dcfda0
Set config.use_avro_logical_types on import as well
vinceatbluelabs Dec 1, 2020
8898433
Deal with Avro limitations
vinceatbluelabs Dec 1, 2020
16b041c
Improve coverage
vinceatbluelabs Dec 1, 2020
059b2a7
Unratchet
vinceatbluelabs Dec 1, 2020
cba3e2e
Ratchet
vinceatbluelabs Dec 1, 2020
f69b823
Bump bigfiles
vinceatbluelabs Dec 1, 2020
03f1e28
Drop unneeded import
vinceatbluelabs Dec 1, 2020
ebbf53f
Ratchet
vinceatbluelabs Dec 1, 2020
0179a71
Reformat array literal
vinceatbluelabs Dec 1, 2020
63114b5
Move schema adjustment logic
vinceatbluelabs Dec 1, 2020
1568ada
Improve coverage
vinceatbluelabs Dec 1, 2020
48a7fdb
Ratchet coverage
vinceatbluelabs Dec 1, 2020
e0aab23
Ratchet
vinceatbluelabs Dec 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.6200
93.6400
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.4400
92.5200
10 changes: 8 additions & 2 deletions records_mover/db/bigquery/bigquery_db_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from ...url.resolver import UrlResolver
import sqlalchemy
from .loader import BigQueryLoader
from .unloader import BigQueryUnloader
from ..loader import LoaderFromFileobj, LoaderFromRecordsDirectory
from ..unloader import Unloader
from ...url.base import BaseDirectoryUrl


Expand All @@ -26,15 +28,19 @@ def __init__(self,
BigQueryLoader(db=self.db,
url_resolver=url_resolver,
gcs_temp_base_loc=gcs_temp_base_loc)
self._bigquery_unloader =\
BigQueryUnloader(db=self.db,
url_resolver=url_resolver,
gcs_temp_base_loc=gcs_temp_base_loc)

def loader(self) -> Optional[LoaderFromRecordsDirectory]:
return self._bigquery_loader

def loader_from_fileobj(self) -> LoaderFromFileobj:
return self._bigquery_loader

def unloader(self) -> None:
return None
def unloader(self) -> Unloader:
return self._bigquery_unloader

def type_for_date_plus_time(self, has_tz: bool=False) -> sqlalchemy.sql.sqltypes.DateTime:
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
Expand Down
8 changes: 7 additions & 1 deletion records_mover/db/bigquery/load_job_config_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from ...records.delimited import cant_handle_hint
from typing import Set
from ...records.load_plan import RecordsLoadPlan
from ...records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat
from ...records.records_format import (
DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat
)
from records_mover.records.delimited import ValidatedRecordsHints
from records_mover.mover_types import _assert_never
from google.cloud.bigquery.job import CreateDisposition, WriteDisposition
Expand Down Expand Up @@ -118,6 +120,10 @@ def load_job_config(unhandled_hints: Set[str],
config.source_format = 'PARQUET'
return config

if isinstance(load_plan.records_format, AvroRecordsFormat):
config.source_format = 'AVRO'
return config

raise NotImplementedError("Not currently able to load "
f"{load_plan.records_format.format_type}")

Expand Down
6 changes: 3 additions & 3 deletions records_mover/db/bigquery/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sqlalchemy
from ...records.load_plan import RecordsLoadPlan
from ...records.records_format import (
BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat
BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat
)
from ...records.records_directory import RecordsDirectory
from ...records.processing_instructions import ProcessingInstructions
Expand Down Expand Up @@ -87,8 +87,6 @@ def load_from_fileobj(self, schema: str, table: str,
# https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html#google.cloud.bigquery.client.Client.load_table_from_file
job = client.load_table_from_file(fileobj,
f"{schema}.{table}",
# Must match the destination dataset location.
location="US",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this was ever needed - it's in the example code, but everything seems to run fine without it, so there must be some inference logic to figure out the dataset location based on the client object.

job_config=job_config)

try:
Expand Down Expand Up @@ -165,6 +163,8 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool
processing_instructions=processing_instructions)
if isinstance(load_plan.records_format, ParquetRecordsFormat):
return True
if isinstance(load_plan.records_format, AvroRecordsFormat):
return True
if not isinstance(load_plan.records_format, DelimitedRecordsFormat):
return False
unhandled_hints = set(load_plan.records_format.hints.keys())
Expand Down
106 changes: 106 additions & 0 deletions records_mover/db/bigquery/unloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import sqlalchemy
from contextlib import contextmanager
from typing import List, Iterator, Optional, Union, Tuple
import logging
from google.cloud.bigquery.dbapi.connection import Connection
from google.cloud.bigquery.client import Client
from google.cloud.bigquery.job import ExtractJobConfig
from records_mover.db.unloader import Unloader
from records_mover.records.records_format import BaseRecordsFormat, AvroRecordsFormat
from records_mover.url.base import BaseDirectoryUrl
from records_mover.url.resolver import UrlResolver
from records_mover.records.unload_plan import RecordsUnloadPlan
from records_mover.records.records_directory import RecordsDirectory
from records_mover.db.errors import NoTemporaryBucketConfiguration

logger = logging.getLogger(__name__)


class BigQueryUnloader(Unloader):
def __init__(self,
db: Union[sqlalchemy.engine.Connection, sqlalchemy.engine.Engine],
url_resolver: UrlResolver,
gcs_temp_base_loc: Optional[BaseDirectoryUrl])\
-> None:
self.db = db
self.url_resolver = url_resolver
self.gcs_temp_base_loc = gcs_temp_base_loc
super().__init__(db=db)

def can_unload_format(self, target_records_format: BaseRecordsFormat) -> bool:
if isinstance(target_records_format, AvroRecordsFormat):
return True
return False

def can_unload_to_scheme(self, scheme: str) -> bool:
return scheme == 'gs'

def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]:
return [AvroRecordsFormat()]

@contextmanager
def temporary_unloadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
if self.gcs_temp_base_loc is None:
raise NoTemporaryBucketConfiguration('Please provide a scratch GCS URL in your config '
'(e.g., set SCRATCH_GCS_URL to a gs:// URL)')
else:
with self.gcs_temp_base_loc.temporary_directory() as temp_loc:
yield temp_loc

def _parse_bigquery_schema_name(self, schema: str) -> Tuple[Optional[str], str]:
# https://github.com/mxmzdlv/pybigquery/blob/master/pybigquery/sqlalchemy_bigquery.py#L320
dataset = None
project = None

schema_split = schema.split('.')
if len(schema_split) == 1:
dataset, = schema_split
elif len(schema_split) == 2:
project, dataset = schema_split
else:
raise ValueError(f"Could not understand schema name {schema}")

return (project, dataset)

def _extract_job_config(self, unload_plan: RecordsUnloadPlan) -> ExtractJobConfig:
config = ExtractJobConfig()
if isinstance(unload_plan.records_format, AvroRecordsFormat):
config.destination_format = 'AVRO'
else:
raise NotImplementedError(f'Please add support for {unload_plan.records_format}')
return config

def unload(self,
schema: str,
table: str,
unload_plan: RecordsUnloadPlan,
directory: RecordsDirectory) -> Optional[int]:
if directory.scheme != 'gs':
with self.temporary_unloadable_directory_loc() as temp_gcs_loc:
temp_directory = RecordsDirectory(temp_gcs_loc)
out = self.unload(schema=schema,
table=table,
unload_plan=unload_plan,
directory=temp_directory)
temp_directory.copy_to(directory.loc)
return out
logger.info("Loading from records directory into BigQuery")
# https://googleapis.github.io/google-cloud-python/latest/bigquery/usage/tables.html#creating-a-table
connection: Connection =\
self.db.engine.raw_connection().connection
# https://google-cloud.readthedocs.io/en/latest/bigquery/generated/google.cloud.bigquery.client.Client.html
client: Client = connection._client
project_id, dataset_id = self._parse_bigquery_schema_name(schema)
job_config = self._extract_job_config(unload_plan)

records_format = unload_plan.records_format
filename = records_format.generate_filename('output')
destination_uri = directory.loc.file_in_this_directory(filename)
job = client.extract_table(f"{schema}.{table}",
destination_uri.url,
# Must match the destination dataset location.
job_config=job_config)
job.result() # Waits for table load to complete.
logger.info(f"Unloaded from {dataset_id}:{table} into {filename}")
directory.save_preliminary_manifest()
return None
2 changes: 1 addition & 1 deletion records_mover/db/unloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def unload(self,
def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]:
"""Supplies a list of the records formats which can be bulk exported
from this database. This may not be the full set - see
can_unlaod_this_format() to test other possibilities.
can_unload_this_format() to test other possibilities.
"""
...

Expand Down
2 changes: 1 addition & 1 deletion records_mover/records/records_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def save_preliminary_manifest(self, url_details: Optional[UrlDetails]=None) -> N

manifest_loc = self.loc.file_in_this_directory('manifest')
if url_details is None:
logger.warning(f"Building manifest by listing directory contents")
logger.warning("Building manifest by listing directory contents")
url_details = {
loc.url: {
'content_length': loc.size()
Expand Down
17 changes: 17 additions & 0 deletions records_mover/records/records_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@
logger = logging.getLogger(__name__)


class AvroRecordsFormat(BaseRecordsFormat):
"Describes records files in `Avro <https://avro.apache.org/>`_ format"

def __init__(self) -> None:
"Create a new instance of AvroRecordsFormat"
self.format_type = 'avro'

def __str__(self) -> str:
return "AvroRecordsFormat"

def __repr__(self) -> str:
return str(self)

def generate_filename(self, basename: str) -> str:
return f"{basename}.avro"


class ParquetRecordsFormat(BaseRecordsFormat):
"Describes records files in `Parquet <https://parquet.apache.org/>`_ format"

Expand Down
6 changes: 5 additions & 1 deletion records_mover/records/records_format_file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from .records_format import BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat
from .records_format import (
BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat
)
from ..url.base import BaseDirectoryUrl, BaseFileUrl
from .processing_instructions import ProcessingInstructions
from .delimited import PartialRecordsHints
Expand All @@ -22,6 +24,8 @@ def load_format(self, fail_if_dont_understand: bool) -> BaseRecordsFormat:
return self.load_delimited_format(format_loc, fail_if_dont_understand)
elif format_type == 'parquet':
return ParquetRecordsFormat()
elif format_type == 'avro':
return AvroRecordsFormat()
else:
raise TypeError(f"Format type {format_type} not yet supported in this library")

Expand Down
2 changes: 1 addition & 1 deletion records_mover/records/records_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ class UrlDetailsEntry(TypedDict):
UrlDetails = Dict[Url, UrlDetailsEntry]


RecordsFormatType = Literal['delimited', 'parquet']
RecordsFormatType = Literal['avro', 'delimited', 'parquet']

DelimitedVariant = Literal['dumb', 'csv', 'bigquery', 'bluelabs', 'vertica']
87 changes: 83 additions & 4 deletions tests/unit/db/bigquery/test_bigquery_loader.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import unittest

from records_mover.db.bigquery.loader import BigQueryLoader
from records_mover.records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat
from records_mover.records.records_format import (
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat,
BaseRecordsFormat
)
from records_mover.db.errors import NoTemporaryBucketConfiguration
from mock import MagicMock, Mock
from unittest.mock import patch


class NewRecordsFormat(BaseRecordsFormat):
...


class TestBigQueryLoader(unittest.TestCase):
@patch('records_mover.db.bigquery.loader.load_job_config')
def test_load_with_bad_schema_name(self, mock_load_job_config):
Expand Down Expand Up @@ -125,6 +132,32 @@ def test_can_load_this_format_true(self,
mock_load_job_config.assert_called_with(set(), mock_load_plan)
self.assertEqual(True, out)

@patch('records_mover.db.bigquery.loader.load_job_config')
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
@patch('records_mover.db.bigquery.loader.ProcessingInstructions')
@patch('records_mover.db.bigquery.loader.RecordsLoadPlan')
def test_can_load_this_format_delimited_false(self,
mock_RecordsLoadPlan,
mock_ProcessingInstructions,
mock_load_job_config):
mock_db = Mock(name='db')
mock_source_records_format = Mock(name='source_records_format', spec=DelimitedRecordsFormat)
mock_source_records_format.format_type = 'delimited'
mock_processing_instructions = mock_ProcessingInstructions.return_value
mock_load_plan = mock_RecordsLoadPlan.return_value
mock_load_plan.records_format = mock_source_records_format
mock_url_resolver = Mock(name='url_resolver')
mock_load_job_config.side_effect = NotImplementedError
mock_source_records_format.hints = {}
bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver,
gcs_temp_base_loc=None)
out = bigquery_loader.can_load_this_format(mock_source_records_format)
mock_ProcessingInstructions.assert_called_with()
mock_RecordsLoadPlan.\
assert_called_with(records_format=mock_source_records_format,
processing_instructions=mock_processing_instructions)
mock_load_job_config.assert_called_with(set(), mock_load_plan)
self.assertEqual(False, out)

@patch('records_mover.db.bigquery.loader.load_job_config')
def test_load_from_fileobj_true(self, mock_load_job_config):
mock_db = Mock(name='mock_db')
Expand Down Expand Up @@ -156,7 +189,6 @@ def test_load_from_fileobj_true(self, mock_load_job_config):
mock_client.load_table_from_file.\
assert_called_with(mock_fileobj,
'my_project.my_dataset.mytable',
location="US",
job_config=mock_load_job_config.return_value)
mock_job.result.assert_called_with()

Expand Down Expand Up @@ -200,7 +232,6 @@ def test_load_with_fileobj_fallback(self, mock_load_job_config):
mock_client.load_table_from_file.\
assert_called_with(mock_fileobj,
'my_project.my_dataset.mytable',
location="US",
job_config=mock_load_job_config.return_value)
mock_job.result.assert_called_with()

Expand All @@ -215,7 +246,31 @@ def test_can_load_this_format_true_parquet(self,
mock_load_job_config):
mock_db = Mock(name='db')
mock_source_records_format = Mock(name='source_records_format', spec=ParquetRecordsFormat)
mock_source_records_format.format_type = 'delimited'
mock_source_records_format.format_type = 'parquet'
mock_processing_instructions = mock_ProcessingInstructions.return_value
mock_load_plan = mock_RecordsLoadPlan.return_value
mock_load_plan.records_format = mock_source_records_format
mock_url_resolver = Mock(name='url_resolver')
mock_source_records_format.hints = {}
bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver,
gcs_temp_base_loc=None)
out = bigquery_loader.can_load_this_format(mock_source_records_format)
mock_ProcessingInstructions.assert_called_with()
mock_RecordsLoadPlan.\
assert_called_with(records_format=mock_source_records_format,
processing_instructions=mock_processing_instructions)
self.assertTrue(out)

@patch('records_mover.db.bigquery.loader.load_job_config')
@patch('records_mover.db.bigquery.loader.ProcessingInstructions')
@patch('records_mover.db.bigquery.loader.RecordsLoadPlan')
def test_can_load_this_format_true_avro(self,
mock_RecordsLoadPlan,
mock_ProcessingInstructions,
mock_load_job_config):
mock_db = Mock(name='db')
mock_source_records_format = Mock(name='source_records_format', spec=AvroRecordsFormat)
mock_source_records_format.format_type = 'avro'
mock_processing_instructions = mock_ProcessingInstructions.return_value
mock_load_plan = mock_RecordsLoadPlan.return_value
mock_load_plan.records_format = mock_source_records_format
Expand All @@ -230,6 +285,30 @@ def test_can_load_this_format_true_parquet(self,
processing_instructions=mock_processing_instructions)
self.assertTrue(out)

@patch('records_mover.db.bigquery.loader.load_job_config')
@patch('records_mover.db.bigquery.loader.ProcessingInstructions')
@patch('records_mover.db.bigquery.loader.RecordsLoadPlan')
def test_can_load_this_format_false_newformat(self,
mock_RecordsLoadPlan,
mock_ProcessingInstructions,
mock_load_job_config):
mock_db = Mock(name='db')
mock_source_records_format = Mock(name='source_records_format', spec=NewRecordsFormat)
mock_source_records_format.format_type = 'new'
mock_processing_instructions = mock_ProcessingInstructions.return_value
mock_load_plan = mock_RecordsLoadPlan.return_value
mock_load_plan.records_format = mock_source_records_format
mock_url_resolver = Mock(name='url_resolver')
mock_source_records_format.hints = {}
bigquery_loader = BigQueryLoader(db=mock_db, url_resolver=mock_url_resolver,
gcs_temp_base_loc=None)
out = bigquery_loader.can_load_this_format(mock_source_records_format)
mock_ProcessingInstructions.assert_called_with()
mock_RecordsLoadPlan.\
assert_called_with(records_format=mock_source_records_format,
processing_instructions=mock_processing_instructions)
self.assertFalse(out)

def test_known_supported_records_formats_for_load(self):
mock_db = Mock(name='db')
mock_url_resolver = Mock(name='url_resolver')
Expand Down
Loading