Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RM-45-records_mover-db-redshift-records_copy.py-13-1-C901-redshift_copy_options-is-too-complex-27 #237

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics/flake8_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3
2
138 changes: 104 additions & 34 deletions records_mover/db/redshift/records_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,84 @@
from records_mover.records.records_format import (
BaseRecordsFormat, DelimitedRecordsFormat, AvroRecordsFormat
)
from records_mover.records.delimited import ValidatedRecordsHints
from records_mover.mover_types import _assert_never
from sqlalchemy_redshift.commands import Format, Encoding, Compression
from typing import Dict, Optional, Set

RedshiftCopyOptions = Dict[str, Optional[object]]


def redshift_copy_options(unhandled_hints: Set[str],
records_format: BaseRecordsFormat,
fail_if_cant_handle_hint: bool,
fail_if_row_invalid: bool,
max_failure_rows: Optional[int]) -> RedshiftCopyOptions:
redshift_options: RedshiftCopyOptions = {}

if isinstance(records_format, AvroRecordsFormat):
redshift_options['format'] = Format.avro
return redshift_options

if not isinstance(records_format, DelimitedRecordsFormat):
raise NotImplementedError(f"Teach me how to COPY to {records_format}")
class RedshiftCopyParamaters:
ryantimjohn marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self,
records_format: DelimitedRecordsFormat,
fail_if_cant_handle_hint: bool,
unhandled_hints: Set[str]):
self.redshift_options: RedshiftCopyOptions = {}
self.hints = records_format.\
validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)
self.unhandled_hints = unhandled_hints
self.fail_if_cant_handle_hint = fail_if_cant_handle_hint

hints = records_format.\
validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)

if hints.compression == 'GZIP':
redshift_options['compression'] = Compression.gzip
elif hints.compression == 'LZO':
redshift_options['compression'] = Compression.lzop
elif hints.compression == 'BZIP':
redshift_options['compression'] = Compression.bzip2
elif hints.compression is None:
redshift_options['compression'] = None
def process_compression(redshift_options: RedshiftCopyOptions,
hints: ValidatedRecordsHints,
unhandled_hints: Set[str]):
compression_map = {
'GZIP': Compression.gzip,
'LZO': Compression.lzop,
'BZIP': Compression.bzip2,
None: None
ryantimjohn marked this conversation as resolved.
Show resolved Hide resolved
}
hints_compression: Optional[str] = hints.compression
compression = compression_map.get(hints_compression)
if compression is None and hints_compression is not None:
_assert_never(hints_compression)
else:
_assert_never(hints.compression)
redshift_options['compression'] = compression
quiet_remove(unhandled_hints, 'compression')


def process_dateformat(redshift_options: RedshiftCopyOptions,
hints: ValidatedRecordsHints,
unhandled_hints: Set[str]):
if hints.dateformat is None:
redshift_options['date_format'] = 'auto'
else:
redshift_options['date_format'] = hints.dateformat
quiet_remove(unhandled_hints, 'dateformat')
if hints.encoding == 'UTF8':
redshift_options['encoding'] = Encoding.utf8
elif hints.encoding == 'UTF16':
redshift_options['encoding'] = Encoding.utf16
elif hints.encoding == 'UTF16LE':
redshift_options['encoding'] = Encoding.utf16le
elif hints.encoding == 'UTF16BE':
redshift_options['encoding'] = Encoding.utf16be
else:


def process_encoding(redshift_options: RedshiftCopyOptions,
hints: ValidatedRecordsHints,
unhandled_hints: Set[str],
fail_if_cant_handle_hint: bool):
encoding_map = {
'UTF8': Encoding.utf8,
'UTF16': Encoding.utf16,
'UTF16LE': Encoding.utf16le,
'UTF16BE': Encoding.utf16be,
}
encoding = encoding_map.get(hints.encoding)
if not encoding:
cant_handle_hint(fail_if_cant_handle_hint, 'encoding', hints)
redshift_options['encoding'] = Encoding(hints.encoding)
else:
redshift_options['encoding'] = encoding
quiet_remove(unhandled_hints, 'encoding')


def process_quotechar(redshift_options: RedshiftCopyOptions,
hints: ValidatedRecordsHints,
unhandled_hints: Set[str]):
redshift_options['quote'] = hints.quotechar
quiet_remove(unhandled_hints, 'quotechar')


def process_quoting(redshift_options: RedshiftCopyOptions,
hints: ValidatedRecordsHints,
unhandled_hints: Set[str],
fail_if_cant_handle_hint: bool):
if hints.quoting == 'minimal':
if hints.escape is not None:
cant_handle_hint(fail_if_cant_handle_hint, 'escape', hints)
Expand Down Expand Up @@ -87,6 +111,11 @@ def redshift_copy_options(unhandled_hints: Set[str],
quiet_remove(unhandled_hints, 'escape')
quiet_remove(unhandled_hints, 'field-delimiter')
quiet_remove(unhandled_hints, 'doublequote')


def process_temporal_info(redshift_options: RedshiftCopyOptions,
hints: ValidatedRecordsHints,
unhandled_hints: Set[str]):
if hints.datetimeformat is None:
redshift_options['time_format'] = 'auto'
else:
Expand Down Expand Up @@ -137,23 +166,64 @@ def redshift_copy_options(unhandled_hints: Set[str],
# Redshift doesn't support time-only fields, so these will
# come in as strings regardless.
quiet_remove(unhandled_hints, 'timeonlyformat')


def process_max_failure_rows(redshift_options: RedshiftCopyOptions,
max_failure_rows: Optional[int],
fail_if_row_invalid: bool):
if max_failure_rows is not None:
redshift_options['max_error'] = max_failure_rows
elif fail_if_row_invalid:
redshift_options['max_error'] = 0
else:
# max allowed value
redshift_options['max_error'] = 100000


def process_records_terminator(hints: ValidatedRecordsHints,
unhandled_hints: Set[str],
fail_if_cant_handle_hint: bool):
if hints.record_terminator is not None and \
hints.record_terminator != "\n":

cant_handle_hint(fail_if_cant_handle_hint, 'record-terminator', hints)
quiet_remove(unhandled_hints, 'record-terminator')


def process_header_row(hints: ValidatedRecordsHints,
redshift_options: RedshiftCopyOptions,
unhandled_hints: Set[str]):
if hints.header_row:
redshift_options['ignore_header'] = 1
else:
redshift_options['ignore_header'] = 0
quiet_remove(unhandled_hints, 'header-row')


def redshift_copy_options(unhandled_hints: Set[str],
records_format: BaseRecordsFormat,
fail_if_cant_handle_hint: bool,
fail_if_row_invalid: bool,
max_failure_rows: Optional[int]) -> RedshiftCopyOptions:
redshift_options: RedshiftCopyOptions = dict()

if isinstance(records_format, AvroRecordsFormat):
redshift_options['format'] = Format.avro
return redshift_options

if not isinstance(records_format, DelimitedRecordsFormat):
raise NotImplementedError(f"Teach me how to COPY to {records_format}")

hints: ValidatedRecordsHints = records_format.\
validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)

process_compression(redshift_options, hints, unhandled_hints)
process_dateformat(redshift_options, hints, unhandled_hints)
process_encoding(redshift_options, hints, unhandled_hints, fail_if_cant_handle_hint)
process_quotechar(redshift_options, hints, unhandled_hints)
process_quoting(redshift_options, hints, unhandled_hints, fail_if_cant_handle_hint)
process_temporal_info(redshift_options, hints, unhandled_hints)
process_max_failure_rows(redshift_options, max_failure_rows, fail_if_row_invalid)
process_records_terminator(hints, unhandled_hints, fail_if_cant_handle_hint)
process_header_row(hints, redshift_options, unhandled_hints)

return redshift_options
3 changes: 1 addition & 2 deletions records_mover/records/targets/fileobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ def write_dfs(path_or_buf: Union[str, IO[str]]) -> int:
move_count = write_dfs(text_fileobj)
text_fileobj.detach()
else:
with NamedTemporaryFile(prefix='mover_fileobj_target') as output_file:
move_count = write_dfs(output_file.name)
ryantimjohn marked this conversation as resolved.
Show resolved Hide resolved
with NamedTemporaryFile(prefix='mover_fileobj_target') as output_file: # noqa
with open(output_file.name, "rb") as output_fileobj:
copyfileobj(output_fileobj, self.fileobj) # type: ignore

Expand Down