Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RM-45-records_mover-db-redshift-records_copy.py-13-1-C901-redshift_copy_options-is-too-complex-27 #237

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics/flake8_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3
2
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
91.6400
90.6000
330 changes: 195 additions & 135 deletions records_mover/db/redshift/records_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,157 +3,217 @@
from records_mover.records.records_format import (
BaseRecordsFormat, DelimitedRecordsFormat, AvroRecordsFormat
)
from records_mover.records.delimited import ValidatedRecordsHints
from records_mover.mover_types import _assert_never
from sqlalchemy_redshift.commands import Format, Encoding, Compression
from typing import Dict, Optional, Set

RedshiftCopyOptions = Dict[str, Optional[object]]


def redshift_copy_options(unhandled_hints: Set[str],
records_format: BaseRecordsFormat,
fail_if_cant_handle_hint: bool,
fail_if_row_invalid: bool,
max_failure_rows: Optional[int]) -> RedshiftCopyOptions:
redshift_options: RedshiftCopyOptions = {}
class RedshiftDelimitedRecordsHandler:
def __init__(self,
records_format: DelimitedRecordsFormat,
fail_if_cant_handle_hint: bool,
unhandled_hints: Set[str],
fail_if_row_invalid: bool,
max_failure_rows: Optional[int]):
self.redshift_options: RedshiftCopyOptions = dict()
ryantimjohn marked this conversation as resolved.
Show resolved Hide resolved
self.hints: ValidatedRecordsHints = records_format.\
validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)
self.unhandled_hints = unhandled_hints
self.fail_if_cant_handle_hint = fail_if_cant_handle_hint
self.fail_if_row_invalid = fail_if_row_invalid
self.max_failure_rows = max_failure_rows
self.process_compression()
self.process_dateformat()
self.process_encoding()
self.process_quotechar()
self.process_quoting()
self.process_temporal_info()
self.process_max_failure_rows()
self.process_records_terminator()
self.process_header_row()

if isinstance(records_format, AvroRecordsFormat):
redshift_options['format'] = Format.avro
return redshift_options
def process_compression(self):
compression_map = {
'GZIP': Compression.gzip,
'LZO': Compression.lzop,
'BZIP': Compression.bzip2,
None: None
}
compression = compression_map.get(self.hints.compression)
if compression is None and self.hints.compression is not None:
_assert_never(self.hints.compression)
else:
self.redshift_options['compression'] = compression
quiet_remove(self.unhandled_hints, 'compression')

if not isinstance(records_format, DelimitedRecordsFormat):
raise NotImplementedError(f"Teach me how to COPY to {records_format}")
def process_dateformat(self):
if self.hints.dateformat is None:
self.redshift_options['date_format'] = 'auto'
else:
self.redshift_options['date_format'] = (self
.hints
.dateformat)
quiet_remove(self.unhandled_hints, 'dateformat')

hints = records_format.\
validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)

if hints.compression == 'GZIP':
redshift_options['compression'] = Compression.gzip
elif hints.compression == 'LZO':
redshift_options['compression'] = Compression.lzop
elif hints.compression == 'BZIP':
redshift_options['compression'] = Compression.bzip2
elif hints.compression is None:
redshift_options['compression'] = None
else:
_assert_never(hints.compression)
quiet_remove(unhandled_hints, 'compression')
if hints.dateformat is None:
redshift_options['date_format'] = 'auto'
else:
redshift_options['date_format'] = hints.dateformat
quiet_remove(unhandled_hints, 'dateformat')
if hints.encoding == 'UTF8':
redshift_options['encoding'] = Encoding.utf8
elif hints.encoding == 'UTF16':
redshift_options['encoding'] = Encoding.utf16
elif hints.encoding == 'UTF16LE':
redshift_options['encoding'] = Encoding.utf16le
elif hints.encoding == 'UTF16BE':
redshift_options['encoding'] = Encoding.utf16be
else:
cant_handle_hint(fail_if_cant_handle_hint, 'encoding', hints)
redshift_options['encoding'] = Encoding(hints.encoding)
quiet_remove(unhandled_hints, 'encoding')
redshift_options['quote'] = hints.quotechar
quiet_remove(unhandled_hints, 'quotechar')
if hints.quoting == 'minimal':
if hints.escape is not None:
cant_handle_hint(fail_if_cant_handle_hint, 'escape', hints)
if hints.field_delimiter != ',':
cant_handle_hint(fail_if_cant_handle_hint, 'field-delimiter', hints)
if hints.doublequote is not True:
cant_handle_hint(fail_if_cant_handle_hint, 'doublequote', hints)

redshift_options['format'] = Format.csv
else:
redshift_options['delimiter'] = hints.field_delimiter
if hints.escape == '\\':
redshift_options['escape'] = True
elif hints.escape is None:
redshift_options['escape'] = False
def process_encoding(self):
encoding_map = {
'UTF8': Encoding.utf8,
'UTF16': Encoding.utf16,
'UTF16LE': Encoding.utf16le,
'UTF16BE': Encoding.utf16be,
}
encoding = encoding_map.get(self.hints.encoding)
if not encoding:
cant_handle_hint(self.fail_if_cant_handle_hint,
'encoding', self.hints)
self.redshift_options['encoding'] = Encoding(
self.hints.encoding)
else:
_assert_never(hints.escape)
if hints.quoting == 'all':
redshift_options['remove_quotes'] = True
if hints.doublequote is not False:
cant_handle_hint(fail_if_cant_handle_hint, 'doublequote', hints)

elif hints.quoting is None:
redshift_options['remove_quotes'] = False
self.redshift_options['encoding'] = encoding
quiet_remove(self.unhandled_hints, 'encoding')

def process_quotechar(self):
self.redshift_options['quote'] = self.hints.quotechar
quiet_remove(self.unhandled_hints, 'quotechar')

def process_quoting(self):
if self.hints.quoting == 'minimal':
if self.hints.escape is not None:
cant_handle_hint(self.fail_if_cant_handle_hint,
'escape', self.hints)
if self.hints.field_delimiter != ',':
cant_handle_hint(self.fail_if_cant_handle_hint,
'field-delimiter', self.hints)
if self.hints.doublequote is not True:
cant_handle_hint(self.fail_if_cant_handle_hint,
'doublequote', self.hints)

self.redshift_options['format'] = Format.csv
else:
cant_handle_hint(fail_if_cant_handle_hint, 'quoting', hints)
quiet_remove(unhandled_hints, 'quoting')
quiet_remove(unhandled_hints, 'escape')
quiet_remove(unhandled_hints, 'field-delimiter')
quiet_remove(unhandled_hints, 'doublequote')
if hints.datetimeformat is None:
redshift_options['time_format'] = 'auto'
else:
# After testing, Redshift's date/time parsing doesn't actually
# support timezone parsing if you give it configuration - as
# documented below, it doesn't accept a time zone as part of
# the format string, and in experimentation, it silently drops
# the offset when data into a timestamptz field if you specify
# one directly.
#
# Its automatic parser seems to be smarter, though, and is
# likely to handle a variety of formats:
#
# https://docs.aws.amazon.com/redshift/latest/dg/automatic-recognition.html
if hints.datetimeformat != hints.datetimeformattz:
# The Redshift auto parser seems to take a good handling
# at our various supported formats, so let's give it a
# shot if we're not able to specify a specific format due
# to the Redshift timestamptz limitation:
#
# analytics=> create table formattest (test char(32));
# CREATE TABLE
# analytics=> insert into formattest values('2018-01-01 12:34:56');
# INSERT 0 1
# analytics=> insert into formattest values('01/02/18 15:34');
# INSERT 0 1
# analytics=> insert into formattest values('2018-01-02 15:34:12');
# INSERT 0 1
# analytics=> insert into formattest values('2018-01-02 10:34 PM');
# INSERT 0 1
# analytics=> select test, cast(test as timestamp) as timestamp,
# cast(test as date) as date from formattest;
self.redshift_options['delimiter'] = (self
.hints
.field_delimiter)
if self.hints.escape == '\\':
self.redshift_options['escape'] = True
elif self.hints.escape is None:
self.redshift_options['escape'] = False
else:
_assert_never(self.hints.escape)
if self.hints.quoting == 'all':
self.redshift_options['remove_quotes'] = True
if self.hints.doublequote is not False:
cant_handle_hint(self.fail_if_cant_handle_hint,
'doublequote', self.hints)

elif self.hints.quoting is None:
self.redshift_options['remove_quotes'] = False
else:
cant_handle_hint(self.fail_if_cant_handle_hint,
'quoting', self.hints)
quiet_remove(self.unhandled_hints, 'quoting')
quiet_remove(self.unhandled_hints, 'escape')
quiet_remove(self.unhandled_hints, 'field-delimiter')
quiet_remove(self.unhandled_hints, 'doublequote')

def process_temporal_info(self):
if self.hints.datetimeformat is None:
self.redshift_options['time_format'] = 'auto'
else:
# After testing, Redshift's date/time parsing doesn't actually
# support timezone parsing if you give it configuration - as
# documented below, it doesn't accept a time zone as part of
# the format string, and in experimentation, it silently drops
# the offset when data into a timestamptz field if you specify
# one directly.
#
# test | timestamp | date
# ----------------------------------+---------------------+------------
# 2018-01-01 12:34:56 | 2018-01-01 12:34:56 | 2018-01-01
# 01/02/18 15:34 | 2018-01-02 15:34:00 | 2018-01-02
# 2018-01-02 15:34:12 | 2018-01-02 15:34:12 | 2018-01-02
# 2018-01-02 10:34 PM | 2018-01-02 22:34:00 | 2018-01-02
# (4 rows)
# Its automatic parser seems to be smarter, though, and is
# likely to handle a variety of formats:
#
# analytics=>
redshift_options['time_format'] = 'auto'
# https://docs.aws.amazon.com/redshift/latest/dg/automatic-recognition.html
if self.hints.datetimeformat != (self
.hints
.datetimeformattz):
# The Redshift auto parser seems to take a good handling
# at our various supported formats, so let's give it a
# shot if we're not able to specify a specific format due
# to the Redshift timestamptz limitation:
#
# analytics=> create table formattest (test char(32));
# CREATE TABLE
# analytics=> insert into formattest values('2018-01-01 12:34:56');
# INSERT 0 1
# analytics=> insert into formattest values('01/02/18 15:34');
# INSERT 0 1
# analytics=> insert into formattest values('2018-01-02 15:34:12');
# INSERT 0 1
# analytics=> insert into formattest values('2018-01-02 10:34 PM');
# INSERT 0 1
# analytics=> select test, cast(test as timestamp) as timestamp,
# cast(test as date) as date from formattest;
#
# test | timestamp | date
# ----------------------------------+---------------------+------------
# 2018-01-01 12:34:56 | 2018-01-01 12:34:56 | 2018-01-01
# 01/02/18 15:34 | 2018-01-02 15:34:00 | 2018-01-02
# 2018-01-02 15:34:12 | 2018-01-02 15:34:12 | 2018-01-02
# 2018-01-02 10:34 PM | 2018-01-02 22:34:00 | 2018-01-02
# (4 rows)
#
# analytics=>
self.redshift_options['time_format'] = 'auto'
else:
self.redshift_options['time_format'] = (self
.hints
.datetimeformat)
quiet_remove(self.unhandled_hints, 'datetimeformat')
quiet_remove(self.unhandled_hints, 'datetimeformattz')
# Redshift doesn't support time-only fields, so these will
# come in as strings regardless.
quiet_remove(self.unhandled_hints, 'timeonlyformat')

def process_max_failure_rows(self):
if self.max_failure_rows is not None:
self.redshift_options['max_error'] = (self
.max_failure_rows)
elif self.fail_if_row_invalid:
self.redshift_options['max_error'] = 0
else:
redshift_options['time_format'] = hints.datetimeformat
quiet_remove(unhandled_hints, 'datetimeformat')
quiet_remove(unhandled_hints, 'datetimeformattz')
# Redshift doesn't support time-only fields, so these will
# come in as strings regardless.
quiet_remove(unhandled_hints, 'timeonlyformat')
if max_failure_rows is not None:
redshift_options['max_error'] = max_failure_rows
elif fail_if_row_invalid:
redshift_options['max_error'] = 0
else:
# max allowed value
redshift_options['max_error'] = 100000
if hints.record_terminator is not None and \
hints.record_terminator != "\n":
# max allowed value
self.redshift_options['max_error'] = 100000

def process_records_terminator(self):
if self.hints.record_terminator is not None and \
self.hints.record_terminator != "\n":
cant_handle_hint(self.fail_if_cant_handle_hint,
'record-terminator', self.hints)
quiet_remove(self.unhandled_hints, 'record-terminator')

cant_handle_hint(fail_if_cant_handle_hint, 'record-terminator', hints)
quiet_remove(unhandled_hints, 'record-terminator')
def process_header_row(self):
if self.hints.header_row:
self.redshift_options['ignore_header'] = 1
else:
self.redshift_options['ignore_header'] = 0
quiet_remove(self.unhandled_hints, 'header-row')

if hints.header_row:
redshift_options['ignore_header'] = 1

def redshift_copy_options(unhandled_hints: Set[str],
records_format: BaseRecordsFormat,
fail_if_cant_handle_hint: bool,
fail_if_row_invalid: bool,
max_failure_rows: Optional[int]) -> RedshiftCopyOptions:

if isinstance(records_format, AvroRecordsFormat):
redshift_options: RedshiftCopyOptions = dict()
redshift_options['format'] = Format.avro
return redshift_options
elif isinstance(records_format, DelimitedRecordsFormat):
redshift_delimited_records_handler = RedshiftDelimitedRecordsHandler(
records_format, fail_if_cant_handle_hint, unhandled_hints, fail_if_row_invalid,
max_failure_rows)
else:
redshift_options['ignore_header'] = 0
quiet_remove(unhandled_hints, 'header-row')
raise NotImplementedError(f"Teach me how to COPY to {records_format}")

return redshift_options
return redshift_delimited_records_handler.redshift_options
2 changes: 1 addition & 1 deletion records_mover/records/targets/fileobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def write_dfs(path_or_buf: Union[str, IO[str]]) -> int:
move_count = write_dfs(text_fileobj)
text_fileobj.detach()
else:
with NamedTemporaryFile(prefix='mover_fileobj_target') as output_file:
with NamedTemporaryFile(prefix='mover_fileobj_target') as output_file: # noqa
move_count = write_dfs(output_file.name)
ryantimjohn marked this conversation as resolved.
Show resolved Hide resolved
with open(output_file.name, "rb") as output_fileobj:
copyfileobj(output_fileobj, self.fileobj) # type: ignore
Expand Down