Skip to content

Commit

Permalink
Add Avro support for Redshift import (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
vinceatbluelabs authored Dec 2, 2020
1 parent 28915f6 commit d333118
Show file tree
Hide file tree
Showing 30 changed files with 295 additions and 165 deletions.
2 changes: 1 addition & 1 deletion metrics/bigfiles_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1125
1094
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.6500
93.6700
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.5500
92.5400
5 changes: 2 additions & 3 deletions records_mover/db/bigquery/bigquery_db_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ def tweak_records_schema_for_load(self,
# columns if we're loading from a Parquet file.
#
# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet
return records_schema.convert_datetimes_to_datetimetz()

return records_schema.cast_field_types({'datetime': 'datetimetz'})
else:
return records_schema

Expand All @@ -135,6 +134,6 @@ def tweak_records_schema_after_unload(self,
# DATETIME field."
#
# BigQuery exports this as an Avro string type
return records_schema.convert_datetimes_to_string()
return records_schema.cast_field_types({'datetime': 'string'})
else:
return records_schema
6 changes: 5 additions & 1 deletion records_mover/db/bigquery/unloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ def can_unload_format(self, target_records_format: BaseRecordsFormat) -> bool:
return False

def can_unload_to_scheme(self, scheme: str) -> bool:
return scheme == 'gs'
if scheme == 'gs':
return True
# Otherwise we'll need a temporary bucket configured for
# BigQuery to unload into
return self.gcs_temp_base_loc is not None

def known_supported_records_formats_for_unload(self) -> List[BaseRecordsFormat]:
return [AvroRecordsFormat()]
Expand Down
36 changes: 18 additions & 18 deletions records_mover/db/redshift/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from records_mover.logging import register_secret
from ..loader import LoaderFromRecordsDirectory
from ...records.records_directory import RecordsDirectory
from ...records.records_format import BaseRecordsFormat, DelimitedRecordsFormat
from ...records.records_format import BaseRecordsFormat, DelimitedRecordsFormat, AvroRecordsFormat
from ...records.processing_instructions import ProcessingInstructions
import sqlalchemy
from sqlalchemy.schema import Table
Expand Down Expand Up @@ -43,9 +43,6 @@ def load(self,
table: str,
load_plan: RecordsLoadPlan,
directory: RecordsDirectory) -> Optional[int]:
if not isinstance(load_plan.records_format, DelimitedRecordsFormat):
raise NotImplementedError('Teach me how to load '
f'{load_plan.records_format.format_type} format')

if directory.scheme != 's3':
with self.temporary_s3_directory_loc() as temp_s3_loc:
Expand All @@ -56,18 +53,19 @@ def load(self,
directory=s3_directory)

to = Table(table, self.meta, schema=schema) # no autoload
unhandled_hints = set(load_plan.records_format.hints.keys())
unhandled_hints = set()
if isinstance(load_plan.records_format, DelimitedRecordsFormat):
unhandled_hints = set(load_plan.records_format.hints.keys())
processing_instructions = load_plan.processing_instructions
validated_hints = load_plan.records_format.\
validate(fail_if_cant_handle_hint=processing_instructions.fail_if_cant_handle_hint)
redshift_options = redshift_copy_options(unhandled_hints,
validated_hints,
load_plan.records_format,
processing_instructions.fail_if_cant_handle_hint,
processing_instructions.fail_if_row_invalid,
processing_instructions.max_failure_rows)
logger.info(f"Copying to Redshift with options: {redshift_options}")
complain_on_unhandled_hints(processing_instructions.fail_if_dont_understand,
unhandled_hints, load_plan.records_format.hints)
if isinstance(load_plan.records_format, DelimitedRecordsFormat):
complain_on_unhandled_hints(processing_instructions.fail_if_dont_understand,
unhandled_hints, load_plan.records_format.hints)
# http://sqlalchemy-redshift.readthedocs.io/en/latest/commands.html
loc = directory.loc
if not callable(getattr(loc, 'aws_creds', None)):
Expand Down Expand Up @@ -114,19 +112,20 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool
processing_instructions = ProcessingInstructions()
load_plan = RecordsLoadPlan(records_format=source_records_format,
processing_instructions=processing_instructions)
if not isinstance(load_plan.records_format, DelimitedRecordsFormat):
return False
unhandled_hints = set(load_plan.records_format.hints.keys())
unhandled_hints = set()
records_format = load_plan.records_format
if isinstance(records_format, DelimitedRecordsFormat):
unhandled_hints = set(records_format.hints.keys())
processing_instructions = load_plan.processing_instructions
hints = load_plan.records_format.\
validate(fail_if_cant_handle_hint=processing_instructions.fail_if_cant_handle_hint)
redshift_copy_options(unhandled_hints,
hints,
records_format,
processing_instructions.fail_if_cant_handle_hint,
processing_instructions.fail_if_row_invalid,
processing_instructions.max_failure_rows)
complain_on_unhandled_hints(processing_instructions.fail_if_dont_understand,
unhandled_hints, load_plan.records_format.hints)
if isinstance(records_format, DelimitedRecordsFormat):
complain_on_unhandled_hints(processing_instructions.fail_if_dont_understand,
unhandled_hints,
records_format.hints)
return True
except NotImplementedError:
return False
Expand Down Expand Up @@ -158,6 +157,7 @@ def known_supported_records_formats_for_load(self) -> List[BaseRecordsFormat]:
}),
# Supports newlines in strings, but not empty strings.
DelimitedRecordsFormat(variant='bluelabs'),
AvroRecordsFormat(),
]

def best_scheme_to_load_from(self) -> str:
Expand Down
18 changes: 16 additions & 2 deletions records_mover/db/redshift/records_copy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from ...utils import quiet_remove
from ...records.delimited import cant_handle_hint, ValidatedRecordsHints
from ...records.delimited import cant_handle_hint
from records_mover.records.records_format import (
BaseRecordsFormat, DelimitedRecordsFormat, AvroRecordsFormat
)
from records_mover.mover_types import _assert_never
from sqlalchemy_redshift.commands import Format, Encoding, Compression
from typing import Dict, Optional, Set
Expand All @@ -8,11 +11,22 @@


def redshift_copy_options(unhandled_hints: Set[str],
hints: ValidatedRecordsHints,
records_format: BaseRecordsFormat,
fail_if_cant_handle_hint: bool,
fail_if_row_invalid: bool,
max_failure_rows: Optional[int]) -> RedshiftCopyOptions:
redshift_options: RedshiftCopyOptions = {}

if isinstance(records_format, AvroRecordsFormat):
redshift_options['format'] = Format.avro
return redshift_options

if not isinstance(records_format, DelimitedRecordsFormat):
raise NotImplementedError(f"Teach me how to COPY to {records_format}")

hints = records_format.\
validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)

if hints.compression == 'GZIP':
redshift_options['compression'] = Compression.gzip
elif hints.compression == 'LZO':
Expand Down
35 changes: 35 additions & 0 deletions records_mover/db/redshift/redshift_db_driver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from ..driver import DBDriver
import sqlalchemy
from sqlalchemy.schema import Table
from records_mover.records import RecordsSchema
from records_mover.records.records_format import BaseRecordsFormat, AvroRecordsFormat
import logging
from ...utils.limits import (INT16_MIN, INT16_MAX,
INT32_MIN, INT32_MAX,
Expand Down Expand Up @@ -154,3 +156,36 @@ def loader_from_fileobj(self) -> None:

def unloader(self) -> Optional[Unloader]:
return self._redshift_unloader

def tweak_records_schema_for_load(self,
records_schema: RecordsSchema,
records_format: BaseRecordsFormat) -> RecordsSchema:
if isinstance(records_format, AvroRecordsFormat):
# upon testing, Redshift does not seem to support any of
# Avro's logicalTypes - here's an example Avro schema; the
# data will only import into the base type, not the
# logical type :
#
# {"name": "date", "type": ["null", {"type": "int",
# "logicalType": "date" }]},
# {"name": "time", "type": [ "null", {"type": "long",
# "logicalType": "time-micros"}]},
# {"name": "timestamp", "type": [ "null", {"type": "string",
# "logicalType": "datetime"}]},
# {"name": "timestamptz", "type": [ "null", {"type": "long",
# "logicalType": "timestamp-micros"}]}]}
#
# We could potentially tell Redshift to use
# TIMEFORMAT/DATEFORMAT on import. Unfortunately, while
# it supports 'epochsecs' and 'epochmillis', it doesn't
# support 'epochdays', or 'epochmicros', which would be
# needed for the above.
return records_schema.cast_field_types({
'date': 'integer',
'time': 'integer',
'datetime': 'string',
# timestamp-micros is too large for default integer size
'datetimetz': 'string',
})
else:
return records_schema
12 changes: 11 additions & 1 deletion records_mover/mover_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
JsonValue = Optional[Union[bool, str, float, int, Mapping[str, Any], List[Any]]]


# mypy way of validating we're covering all cases of an enum
# mypy way of validating we're covering all cases of an enum. This
# version includes an assert at runtime in case poorly-typed input is
# given.
#
# https://github.com/python/mypy/issues/6366#issuecomment-560369716
def _assert_never(x: NoReturn, errmsg: Optional[str] = None) -> NoReturn:
Expand All @@ -17,6 +19,14 @@ def _assert_never(x: NoReturn, errmsg: Optional[str] = None) -> NoReturn:
assert False, errmsg


# mypy way of validating we're covering all cases of an enum. This
# version allows poorly typed things to pass through at runtime.
#
# https://github.com/python/mypy/issues/6366#issuecomment-560369716
def _ensure_all_cases_covered(x: NoReturn) -> NoReturn:
pass


# mypy-friendly way of doing a singleton object:
#
# https://github.com/python/typing/issues/236
Expand Down
5 changes: 4 additions & 1 deletion records_mover/records/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
'RecordsFormat',
'DelimitedVariant',
'DelimitedRecordsFormat',
'AvroRecordsFormat',
'ParquetRecordsFormat',
'ProcessingInstructions',
'ExistingTableHandling',
Expand All @@ -20,7 +21,9 @@
from .records_types import RecordsFormatType, DelimitedVariant
from .schema import RecordsSchema
from .mover import move
from .records_format import RecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat
from .records_format import (
RecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat
)
from .processing_instructions import ProcessingInstructions
from .existing_table_handling import ExistingTableHandling
from .records import Records
Expand Down
3 changes: 2 additions & 1 deletion records_mover/records/job/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from ...utils.json_schema import method_signature_to_json_schema, JsonParameter, JsonSchemaDocument
from ..existing_table_handling import ExistingTableHandling
from records_mover.records.delimited.hints import Hints
from records_mover.records.records_types import RECORDS_FORMAT_TYPES
from typing import Any, Dict, List, Callable
from ...mover_types import JsonSchema

Expand All @@ -25,7 +26,7 @@ def method_to_json_schema(method: Callable[..., Any]) -> JsonSchema:
optional=True),
JsonParameter('format',
JsonSchemaDocument('string',
enum=['parquet', 'delimited'],
enum=RECORDS_FORMAT_TYPES,
description="Records format type. "
"Note that 'delimited' includes "
"CSV/TSV/etc."),
Expand Down
14 changes: 14 additions & 0 deletions records_mover/records/records_format.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from records_mover.mover_types import _ensure_all_cases_covered
from .processing_instructions import ProcessingInstructions
from . import PartialRecordsHints, UntypedRecordsHints
from .base_records_format import BaseRecordsFormat
Expand Down Expand Up @@ -26,6 +27,11 @@ def __repr__(self) -> str:
def generate_filename(self, basename: str) -> str:
return f"{basename}.avro"

def __eq__(self, other: object) -> bool:
if isinstance(other, AvroRecordsFormat):
return True
return False


class ParquetRecordsFormat(BaseRecordsFormat):
"Describes records files in `Parquet <https://parquet.apache.org/>`_ format"
Expand All @@ -43,6 +49,11 @@ def __repr__(self) -> str:
def generate_filename(self, basename: str) -> str:
return f"{basename}.parquet"

def __eq__(self, other: object) -> bool:
if isinstance(other, ParquetRecordsFormat):
return True
return False


class DelimitedRecordsFormat(BaseRecordsFormat):
"Describes records data files in delimited (CSV) format."
Expand Down Expand Up @@ -229,5 +240,8 @@ def RecordsFormat(format_type: 'RecordsFormatType' = 'delimited',
processing_instructions=processing_instructions)
elif format_type == 'parquet':
return ParquetRecordsFormat()
elif format_type == 'avro':
return AvroRecordsFormat()
else:
_ensure_all_cases_covered(format_type)
raise NotImplementedError(f'Teach me to handle format type {format_type}')
7 changes: 7 additions & 0 deletions records_mover/records/records_format_file.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from records_mover.mover_types import _ensure_all_cases_covered
from records_mover.records.records_types import RecordsFormatType
from .records_format import (
BaseRecordsFormat, DelimitedRecordsFormat, ParquetRecordsFormat, AvroRecordsFormat
)
from ..url.base import BaseDirectoryUrl, BaseFileUrl
from .processing_instructions import ProcessingInstructions
from .delimited import PartialRecordsHints
from typing import cast
import logging

logger = logging.getLogger(__name__)
Expand All @@ -20,13 +23,17 @@ def load_format(self, fail_if_dont_understand: bool) -> BaseRecordsFormat:
raise TypeError(f"_format file not found in bucket under {prefix}*")
format_loc = matching_locs[0]
format_type = format_loc.filename()[len(prefix):]
# cast so we can ensure at build-time we have covered all
# cases below
format_type = cast(RecordsFormatType, format_type)
if format_type == 'delimited':
return self.load_delimited_format(format_loc, fail_if_dont_understand)
elif format_type == 'parquet':
return ParquetRecordsFormat()
elif format_type == 'avro':
return AvroRecordsFormat()
else:
_ensure_all_cases_covered(format_type)
raise TypeError(f"Format type {format_type} not yet supported in this library")

def load_delimited_format(self, format_loc: BaseFileUrl,
Expand Down
3 changes: 3 additions & 0 deletions records_mover/records/records_types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Dict, List
from typing_extensions import Literal, TypedDict
from typing_inspect import get_args


# https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_EXTERNAL_TABLE.html
Expand Down Expand Up @@ -38,4 +39,6 @@ class UrlDetailsEntry(TypedDict):

RecordsFormatType = Literal['avro', 'delimited', 'parquet']

RECORDS_FORMAT_TYPES: List[str] = list(get_args(RecordsFormatType))

DelimitedVariant = Literal['dumb', 'csv', 'bigquery', 'bluelabs', 'vertica']
35 changes: 2 additions & 33 deletions records_mover/records/schema/field/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,40 +313,9 @@ def from_data(name: str, data: 'FieldDict') -> 'RecordsSchemaField':
from_data(data.get('statistics'), field_type=field_type),
representations=representations)

def convert_datetime_to_datetimetz(self) -> 'RecordsSchemaField':
field_type = self.field_type
constraints = self.constraints
statistics = self.statistics
if field_type == 'datetime':
field_type = 'datetimetz'
if constraints is not None:
constraints = constraints.cast('string')
if statistics is not None:
statistics = statistics.cast('string')

return RecordsSchemaField(name=self.name,
field_type=field_type,
constraints=constraints,
statistics=statistics,
representations=self.representations)

def convert_datetime_to_string(self) -> 'RecordsSchemaField':
field_type = self.field_type
constraints = self.constraints
statistics = self.statistics
if field_type == 'datetime':
field_type = 'string'
if constraints is not None:
constraints = constraints.cast('string')
if statistics is not None:
statistics = statistics.cast('string')
return RecordsSchemaField(name=self.name,
field_type=field_type,
constraints=constraints,
statistics=statistics,
representations=self.representations)

def cast(self, field_type: 'FieldType') -> 'RecordsSchemaField':
if self.field_type == field_type:
return self
if self.constraints is None:
constraints = None
else:
Expand Down
1 change: 0 additions & 1 deletion records_mover/records/schema/field/field_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@
'datetime',
'datetimetz']

# Be sure to add new things below in FieldType, too
RECORDS_FIELD_TYPES: List[str] = list(get_args(FieldType))
Loading

0 comments on commit d333118

Please sign in to comment.