Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for BigQuery bulk export (to Avro, for now) #136

Merged
merged 28 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e095590
Add support for BigQuery bulk export (to Avro, for now)
vinceatbluelabs Dec 1, 2020
0fa5321
Improve test coverage
vinceatbluelabs Dec 1, 2020
28f48ab
Improve test coverage
vinceatbluelabs Dec 1, 2020
5ebb19b
Improve test coverage
vinceatbluelabs Dec 1, 2020
de77caa
Improve test coverage
vinceatbluelabs Dec 1, 2020
09ceea5
Improve test coverage
vinceatbluelabs Dec 1, 2020
2a721af
Improve test coverage
vinceatbluelabs Dec 1, 2020
f9f88ad
Improve test coverage
vinceatbluelabs Dec 1, 2020
8b10df9
Ratchet coverage
vinceatbluelabs Dec 1, 2020
5c04074
Ratchet
vinceatbluelabs Dec 1, 2020
c00afd2
Flake8 fixes
vinceatbluelabs Dec 1, 2020
d5c8e84
Old Python fixes
vinceatbluelabs Dec 1, 2020
35e7e29
Break up big file
vinceatbluelabs Dec 1, 2020
99631b6
Fix known_supported_records_formats_for_load()
vinceatbluelabs Dec 1, 2020
ba7d15b
Try with config.use_avro_logical_types
vinceatbluelabs Dec 1, 2020
2dcfda0
Set config.use_avro_logical_types on import as well
vinceatbluelabs Dec 1, 2020
8898433
Deal with Avro limitations
vinceatbluelabs Dec 1, 2020
16b041c
Improve coverage
vinceatbluelabs Dec 1, 2020
059b2a7
Unratchet
vinceatbluelabs Dec 1, 2020
cba3e2e
Ratchet
vinceatbluelabs Dec 1, 2020
f69b823
Bump bigfiles
vinceatbluelabs Dec 1, 2020
03f1e28
Drop unneeded import
vinceatbluelabs Dec 1, 2020
ebbf53f
Ratchet
vinceatbluelabs Dec 1, 2020
0179a71
Reformat array literal
vinceatbluelabs Dec 1, 2020
63114b5
Move schema adjustment logic
vinceatbluelabs Dec 1, 2020
1568ada
Improve coverage
vinceatbluelabs Dec 1, 2020
48a7fdb
Ratchet coverage
vinceatbluelabs Dec 1, 2020
e0aab23
Ratchet
vinceatbluelabs Dec 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics/bigfiles_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1103
1125
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

389: tests/integration/*****
373: setup.py
364: records_mover/records/schema/field/__init__.py

Reduce total number of bigfiles violations to 1103 or below!

2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.6200
93.6500
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.4400
92.5500
31 changes: 28 additions & 3 deletions records_mover/db/bigquery/bigquery_db_driver.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from ..driver import DBDriver
import logging
from ...records import RecordsSchema
from ...records.records_format import BaseRecordsFormat, ParquetRecordsFormat
from ...records.records_format import BaseRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat
from ...utils.limits import INT64_MAX, INT64_MIN, FLOAT64_SIGNIFICAND_BITS, num_digits
import re
from typing import Union, Optional, Tuple
from ...url.resolver import UrlResolver
import sqlalchemy
from .loader import BigQueryLoader
from .unloader import BigQueryUnloader
from ..loader import LoaderFromFileobj, LoaderFromRecordsDirectory
from ..unloader import Unloader
from ...url.base import BaseDirectoryUrl


Expand All @@ -26,15 +28,19 @@ def __init__(self,
BigQueryLoader(db=self.db,
url_resolver=url_resolver,
gcs_temp_base_loc=gcs_temp_base_loc)
self._bigquery_unloader =\
BigQueryUnloader(db=self.db,
url_resolver=url_resolver,
gcs_temp_base_loc=gcs_temp_base_loc)

def loader(self) -> Optional[LoaderFromRecordsDirectory]:
return self._bigquery_loader

def loader_from_fileobj(self) -> LoaderFromFileobj:
return self._bigquery_loader

def unloader(self) -> None:
return None
def unloader(self) -> Unloader:
return self._bigquery_unloader

def type_for_date_plus_time(self, has_tz: bool=False) -> sqlalchemy.sql.sqltypes.DateTime:
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
Expand Down Expand Up @@ -110,6 +116,25 @@ def tweak_records_schema_for_load(self,
#
# So we need to make sure we don't create any DATETIME
# columns if we're loading from a Parquet file.
#
# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet
return records_schema.convert_datetimes_to_datetimetz()

else:
return records_schema

def tweak_records_schema_after_unload(self,
records_schema: RecordsSchema,
records_format: BaseRecordsFormat) -> RecordsSchema:
if isinstance(records_format, AvroRecordsFormat):
# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types
#
# "Note: There is no logical type that directly
# corresponds to DATETIME, and BigQuery currently doesn't
# support any direct conversion from an Avro type into a
# DATETIME field."
#
# BigQuery exports this as an Avro string type
return records_schema.convert_datetimes_to_string()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This came up during the table2table integration test; BigQuery fails to load if you try to load the same string it exports from a DATETIME column into a DATETIME column.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😬

else:
return records_schema
10 changes: 9 additions & 1 deletion records_mover/db/bigquery/load_job_config_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from ...records.delimited import cant_handle_hint
from typing import Set
from ...records.load_plan import RecordsLoadPlan
from ...records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat
from ...records.records_format import (
DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat
)
from records_mover.records.delimited import ValidatedRecordsHints
from records_mover.mover_types import _assert_never
from google.cloud.bigquery.job import CreateDisposition, WriteDisposition
Expand Down Expand Up @@ -118,6 +120,12 @@ def load_job_config(unhandled_hints: Set[str],
config.source_format = 'PARQUET'
return config

if isinstance(load_plan.records_format, AvroRecordsFormat):
config.source_format = 'AVRO'
# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types
config.use_avro_logical_types = True
return config

raise NotImplementedError("Not currently able to load "
f"{load_plan.records_format.format_type}")

Expand Down
12 changes: 8 additions & 4 deletions records_mover/db/bigquery/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sqlalchemy
from ...records.load_plan import RecordsLoadPlan
from ...records.records_format import (
BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat
BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat
)
from ...records.records_directory import RecordsDirectory
from ...records.processing_instructions import ProcessingInstructions
Expand Down Expand Up @@ -87,8 +87,6 @@ def load_from_fileobj(self, schema: str, table: str,
# https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html#google.cloud.bigquery.client.Client.load_table_from_file
job = client.load_table_from_file(fileobj,
f"{schema}.{table}",
# Must match the destination dataset location.
location="US",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this was ever needed - it's in the example code, but everything seems to run fine without it, so there must be some inference logic to figure out the dataset location based on the client object.

job_config=job_config)

try:
Expand Down Expand Up @@ -165,6 +163,8 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool
processing_instructions=processing_instructions)
if isinstance(load_plan.records_format, ParquetRecordsFormat):
return True
if isinstance(load_plan.records_format, AvroRecordsFormat):
return True
if not isinstance(load_plan.records_format, DelimitedRecordsFormat):
return False
unhandled_hints = set(load_plan.records_format.hints.keys())
Expand All @@ -176,4 +176,8 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool
return False

def known_supported_records_formats_for_load(self) -> List[BaseRecordsFormat]:
return [DelimitedRecordsFormat(variant='bigquery'), ParquetRecordsFormat()]
return [
DelimitedRecordsFormat(variant='bigquery'),
ParquetRecordsFormat(),
AvroRecordsFormat()
]
108 changes: 108 additions & 0 deletions records_mover/db/bigquery/unloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import sqlalchemy
from contextlib import contextmanager
from typing import List, Iterator, Optional, Union, Tuple
import logging
from google.cloud.bigquery.dbapi.connection import Connection
from google.cloud.bigquery.client import Client
from google.cloud.bigquery.job import ExtractJobConfig
from records_mover.db.unloader import Unloader
from records_mover.records.records_format import BaseRecordsFormat, AvroRecordsFormat
from records_mover.url.base import BaseDirectoryUrl
from records_mover.url.resolver import UrlResolver
from records_mover.records.unload_plan import RecordsUnloadPlan
from records_mover.records.records_directory import RecordsDirectory
from records_mover.db.errors import NoTemporaryBucketConfiguration

logger = logging.getLogger(__name__)


class BigQueryUnloader(Unloader):
def __init__(self,
db: Union[sqlalchemy.engine.Connection, sqlalchemy.engine.Engine],
url_resolver: UrlResolver,
gcs_temp_base_loc: Optional[BaseDirectoryUrl])\
-> None:
self.db = db
self.url_resolver = url_resolver
self.gcs_temp_base_loc = gcs_temp_base_loc
super().__init__(db=db)

def can_unload_format(self, target_records_format: BaseRecordsFormat) -> bool:
if isinstance(target_records_format, AvroRecordsFormat):
return True
return False

def can_unload_to_scheme(self, scheme: str) -> bool:
return scheme == 'gs'

def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]:
return [AvroRecordsFormat()]

@contextmanager
def temporary_unloadable_directory_loc(self) -> Iterator[BaseDirectoryUrl]:
if self.gcs_temp_base_loc is None:
raise NoTemporaryBucketConfiguration('Please provide a scratch GCS URL in your config '
'(e.g., set SCRATCH_GCS_URL to a gs:// URL)')
else:
with self.gcs_temp_base_loc.temporary_directory() as temp_loc:
yield temp_loc

def _parse_bigquery_schema_name(self, schema: str) -> Tuple[Optional[str], str]:
# https://github.com/mxmzdlv/pybigquery/blob/master/pybigquery/sqlalchemy_bigquery.py#L320
dataset = None
project = None

schema_split = schema.split('.')
if len(schema_split) == 1:
dataset, = schema_split
elif len(schema_split) == 2:
project, dataset = schema_split
else:
raise ValueError(f"Could not understand schema name {schema}")

return (project, dataset)

def _extract_job_config(self, unload_plan: RecordsUnloadPlan) -> ExtractJobConfig:
config = ExtractJobConfig()
if isinstance(unload_plan.records_format, AvroRecordsFormat):
config.destination_format = 'AVRO'
# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types
config.use_avro_logical_types = True
else:
raise NotImplementedError(f'Please add support for {unload_plan.records_format}')
return config

def unload(self,
schema: str,
table: str,
unload_plan: RecordsUnloadPlan,
directory: RecordsDirectory) -> Optional[int]:
if directory.scheme != 'gs':
with self.temporary_unloadable_directory_loc() as temp_gcs_loc:
temp_directory = RecordsDirectory(temp_gcs_loc)
out = self.unload(schema=schema,
table=table,
unload_plan=unload_plan,
directory=temp_directory)
temp_directory.copy_to(directory.loc)
return out
logger.info("Loading from records directory into BigQuery")
# https://googleapis.github.io/google-cloud-python/latest/bigquery/usage/tables.html#creating-a-table
connection: Connection =\
self.db.engine.raw_connection().connection
# https://google-cloud.readthedocs.io/en/latest/bigquery/generated/google.cloud.bigquery.client.Client.html
client: Client = connection._client
project_id, dataset_id = self._parse_bigquery_schema_name(schema)
job_config = self._extract_job_config(unload_plan)

records_format = unload_plan.records_format
filename = records_format.generate_filename('output')
destination_uri = directory.loc.file_in_this_directory(filename)
job = client.extract_table(f"{schema}.{table}",
destination_uri.url,
# Must match the destination dataset location.
job_config=job_config)
job.result() # Waits for table load to complete.
logger.info(f"Unloaded from {dataset_id}:{table} into {filename}")
directory.save_preliminary_manifest()
return None
5 changes: 5 additions & 0 deletions records_mover/db/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ def tweak_records_schema_for_load(self,
records_format: BaseRecordsFormat) -> RecordsSchema:
return records_schema

def tweak_records_schema_after_unload(self,
records_schema: RecordsSchema,
records_format: BaseRecordsFormat) -> RecordsSchema:
return records_schema


class GenericDBDriver(DBDriver):
def loader_from_fileobj(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion records_mover/db/unloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def unload(self,
def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]:
"""Supplies a list of the records formats which can be bulk exported
from this database. This may not be the full set - see
can_unlaod_this_format() to test other possibilities.
can_unload_this_format() to test other possibilities.
"""
...

Expand Down
2 changes: 1 addition & 1 deletion records_mover/records/records_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def save_preliminary_manifest(self, url_details: Optional[UrlDetails]=None) -> N

manifest_loc = self.loc.file_in_this_directory('manifest')
if url_details is None:
logger.warning(f"Building manifest by listing directory contents")
logger.warning("Building manifest by listing directory contents")
url_details = {
loc.url: {
'content_length': loc.size()
Expand Down
17 changes: 17 additions & 0 deletions records_mover/records/records_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@
logger = logging.getLogger(__name__)


class AvroRecordsFormat(BaseRecordsFormat):
"Describes records files in `Avro <https://avro.apache.org/>`_ format"

def __init__(self) -> None:
"Create a new instance of AvroRecordsFormat"
self.format_type = 'avro'

def __str__(self) -> str:
return "AvroRecordsFormat"

def __repr__(self) -> str:
return str(self)

def generate_filename(self, basename: str) -> str:
return f"{basename}.avro"


class ParquetRecordsFormat(BaseRecordsFormat):
"Describes records files in `Parquet <https://parquet.apache.org/>`_ format"

Expand Down
6 changes: 5 additions & 1 deletion records_mover/records/records_format_file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from .records_format import BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat
from .records_format import (
BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat
)
from ..url.base import BaseDirectoryUrl, BaseFileUrl
from .processing_instructions import ProcessingInstructions
from .delimited import PartialRecordsHints
Expand All @@ -22,6 +24,8 @@ def load_format(self, fail_if_dont_understand: bool) -> BaseRecordsFormat:
return self.load_delimited_format(format_loc, fail_if_dont_understand)
elif format_type == 'parquet':
return ParquetRecordsFormat()
elif format_type == 'avro':
return AvroRecordsFormat()
else:
raise TypeError(f"Format type {format_type} not yet supported in this library")

Expand Down
2 changes: 1 addition & 1 deletion records_mover/records/records_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ class UrlDetailsEntry(TypedDict):
UrlDetails = Dict[Url, UrlDetailsEntry]


RecordsFormatType = Literal['delimited', 'parquet']
RecordsFormatType = Literal['avro', 'delimited', 'parquet']

DelimitedVariant = Literal['dumb', 'csv', 'bigquery', 'bluelabs', 'vertica']
26 changes: 24 additions & 2 deletions records_mover/records/schema/field/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,35 @@ def from_data(name: str, data: 'FieldDict') -> 'RecordsSchemaField':

def convert_datetime_to_datetimetz(self) -> 'RecordsSchemaField':
field_type = self.field_type
constraints = self.constraints
statistics = self.statistics
if field_type == 'datetime':
field_type = 'datetimetz'
if constraints is not None:
constraints = constraints.cast('string')
if statistics is not None:
statistics = statistics.cast('string')

return RecordsSchemaField(name=self.name,
field_type=field_type,
constraints=self.constraints,
statistics=self.statistics,
constraints=constraints,
statistics=statistics,
representations=self.representations)

def convert_datetime_to_string(self) -> 'RecordsSchemaField':
field_type = self.field_type
constraints = self.constraints
statistics = self.statistics
if field_type == 'datetime':
field_type = 'string'
if constraints is not None:
constraints = constraints.cast('string')
if statistics is not None:
statistics = statistics.cast('string')
return RecordsSchemaField(name=self.name,
field_type=field_type,
constraints=constraints,
statistics=statistics,
representations=self.representations)

def cast(self, field_type: 'FieldType') -> 'RecordsSchemaField':
Expand Down
5 changes: 5 additions & 0 deletions records_mover/records/schema/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,8 @@ def convert_datetimes_to_datetimetz(self) -> 'RecordsSchema':
return RecordsSchema(fields=[field.convert_datetime_to_datetimetz()
for field in self.fields],
known_representations=self.known_representations)

def convert_datetimes_to_string(self) -> 'RecordsSchema':
return RecordsSchema(fields=[field.convert_datetime_to_string()
for field in self.fields],
known_representations=self.known_representations)
2 changes: 2 additions & 0 deletions records_mover/records/sources/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ def move_to_records_directory(self,
directory=records_directory)
records_schema = self.pull_records_schema()
records_directory.save_format(unload_plan.records_format)
records_schema = self.driver.tweak_records_schema_after_unload(records_schema,
unload_plan.records_format)
records_directory.save_schema(records_schema)
records_directory.finalize_manifest()

Expand Down
Loading