Skip to content

Commit

Permalink
Refactor CLI hint code and types, more CLI hint arguments (#66)
Browse files Browse the repository at this point in the history
* Move all load/unload code to use well-typed, pre-validated hints rather than raw input when we have a fully specified RecordsFormat.
* Introduce a TypedDict partially specified version of hints to use as input to and output from our hint sniffing code.
* Use the latter to flush out the remaining command line options in mvrec related to hints, and enhance our JSON schema to argparse translator to handle another case.
  • Loading branch information
vinceatbluelabs authored Jun 2, 2020
1 parent 1e73147 commit 278a06f
Show file tree
Hide file tree
Showing 56 changed files with 908 additions and 643 deletions.
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
94.0400
93.6800
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.2700
92.1800
32 changes: 27 additions & 5 deletions records_mover/cli/job_config_schema_as_args_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,34 @@ def configure_from_single_property(self,
return # no immediate argument to add
elif 'type' in value and value['type'] == 'boolean':
# https://stackoverflow.com/questions/9183936/boolean-argument-for-script
if 'default' in value and value['default'] is True:
# if --no_foo is specified, set foo variable to False
arg_name = "--no_" + formatted_key_name
kwargs['action'] = 'store_false'
if 'default' in value:
if value['default'] is True:
# if --no_foo is specified, set foo variable to False
arg_name = "--no_" + formatted_key_name
kwargs['action'] = 'store_false'
else:
kwargs['action'] = 'store_true'
else:
kwargs['action'] = 'store_true'
if key in required_keys:
raise NotImplementedError("Teach me how to handle a required boolean "
"with no default")
else:
# the regular --foo will set this to True:
kwargs['action'] = 'store_const'
# store_true sets a default, so we use store_const
kwargs['const'] = True

# and we'll add an arg that maps to False for this key:
false_arg_kwargs: ArgParseArgument = {}
# if --no_foo is specified, set foo variable to False
split_key_name = formatted_key_name.split('.')
false_arg_arg_name = '--' + '.'.join([*split_key_name[:-1],
'no_' + split_key_name[-1]])
false_arg_kwargs['action'] = 'store_const'
false_arg_kwargs['const'] = False
false_arg_kwargs['dest'] = key
false_arg_kwargs['required'] = False
self.arg_parser.add_argument(false_arg_arg_name, **false_arg_kwargs)
else:
raise Exception("Did not know how to handle key " + key +
" and value " + str(value))
Expand Down
46 changes: 24 additions & 22 deletions records_mover/db/bigquery/load_job_config_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from typing import Set
from ...records.load_plan import RecordsLoadPlan
from ...records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat
from records_mover.records import RecordsHints
from records_mover.records.delimited import ValidatedRecordsHints
from records_mover.mover_types import _assert_never
from google.cloud.bigquery.job import CreateDisposition, WriteDisposition
from google.cloud import bigquery
import logging
Expand Down Expand Up @@ -104,10 +105,11 @@ def load_job_config(unhandled_hints: Set[str],
# schema. ALLOW_FIELD_RELAXATION: allow relaxing a required
# field in the original schema to nullable.
config.schema_update_options = None

fail_if_cant_handle_hint = load_plan.processing_instructions.fail_if_cant_handle_hint
if isinstance(load_plan.records_format, DelimitedRecordsFormat):
hints = load_plan.records_format.validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)
add_load_job_csv_config(unhandled_hints,
load_plan.records_format.hints,
hints,
fail_if_cant_handle_hint,
config)
return config
Expand All @@ -121,7 +123,7 @@ def load_job_config(unhandled_hints: Set[str],


def add_load_job_csv_config(unhandled_hints: Set[str],
hints: RecordsHints,
hints: ValidatedRecordsHints,
fail_if_cant_handle_hint: bool,
config: bigquery.LoadJobConfig) -> None:
# source_format: File format of the data.
Expand All @@ -131,16 +133,16 @@ def add_load_job_csv_config(unhandled_hints: Set[str],
# The supported values are UTF-8 or ISO-8859-1.
# "UTF-8 or ISO-8859-1"
#
if hints['encoding'] == 'UTF8':
if hints.encoding == 'UTF8':
config.encoding = 'UTF-8'
else:
# Currently records hints don't support ISO-8859-1
cant_handle_hint(fail_if_cant_handle_hint, 'encoding', hints)
quiet_remove(unhandled_hints, 'encoding')

# field_delimiter: The separator for fields in a CSV file.
assert isinstance(hints['field-delimiter'], str)
config.field_delimiter = hints['field-delimiter']
assert isinstance(hints.field_delimiter, str)
config.field_delimiter = hints.field_delimiter
quiet_remove(unhandled_hints, 'field-delimiter')

# allow_jagged_rows: Allow missing trailing optional columns (CSV only).
Expand Down Expand Up @@ -174,37 +176,37 @@ def add_load_job_csv_config(unhandled_hints: Set[str],
# * nonnumeric quoting works fine
# * full quoting works fine

if hints['quoting'] is None:
if hints.quoting is None:
config.quote_character = ''
elif hints['quoting'] in ['all', 'minimal', 'nonnumeric']:
elif hints.quoting == 'all' or hints.quoting == 'minimal' or hints.quoting == 'nonnumeric':
# allow_quoted_newlines: Allow quoted data containing newline
# characters (CSV only).

config.allow_quoted_newlines = True

assert isinstance(hints['quotechar'], str)
config.quote_character = hints['quotechar']
if hints['doublequote']:
assert isinstance(hints.quotechar, str)
config.quote_character = hints.quotechar
if hints.doublequote:
pass
else:
cant_handle_hint(fail_if_cant_handle_hint, 'doublequote', hints)

else:
cant_handle_hint(fail_if_cant_handle_hint, 'quoting', hints)
_assert_never(hints.quoting)
quiet_remove(unhandled_hints, 'quoting')
quiet_remove(unhandled_hints, 'quotechar')
quiet_remove(unhandled_hints, 'doublequote')

# No mention of escaping in BigQuery documentation, and in
# practice backslashes come through without being interpreted.
if hints['escape'] is None:
if hints.escape is None:
pass
else:
cant_handle_hint(fail_if_cant_handle_hint, 'escape', hints)
quiet_remove(unhandled_hints, 'escape')

# skip_leading_rows: Number of rows to skip when reading data (CSV only).
if hints['header-row']:
if hints.header_row:
config.skip_leading_rows = 1
else:
config.skip_leading_rows = 0
Expand All @@ -213,7 +215,7 @@ def add_load_job_csv_config(unhandled_hints: Set[str],
# "When you load CSV or JSON data, values in DATE columns must
# use the dash (-) separator and the date must be in the
# following format: YYYY-MM-DD (year-month-day)."
if hints['dateformat'] == 'YYYY-MM-DD':
if hints.dateformat == 'YYYY-MM-DD':
pass
else:
cant_handle_hint(fail_if_cant_handle_hint, 'dateformat', hints)
Expand Down Expand Up @@ -307,28 +309,28 @@ def add_load_job_csv_config(unhandled_hints: Set[str],
# https://stackoverflow.com/questions/44836581/does-python-time-strftime-process-timezone-options-correctly-for-rfc-3339
# https://stackoverflow.com/questions/28729212/pandas-save-date-in-iso-format
#
if hints['datetimeformat'] in ['YYYY-MM-DD HH24:MI:SS', 'YYYY-MM-DD HH:MI:SS']:
if hints.datetimeformat in ['YYYY-MM-DD HH24:MI:SS', 'YYYY-MM-DD HH:MI:SS']:
pass
else:
cant_handle_hint(fail_if_cant_handle_hint, 'datetimeformat', hints)
quiet_remove(unhandled_hints, 'datetimeformat')

if hints['datetimeformattz'] in ['YYYY-MM-DD HH:MI:SSOF',
'YYYY-MM-DD HH:MI:SS']:
if hints.datetimeformattz in ['YYYY-MM-DD HH:MI:SSOF',
'YYYY-MM-DD HH:MI:SS']:
pass
else:
cant_handle_hint(fail_if_cant_handle_hint, 'datetimeformattz', hints)
quiet_remove(unhandled_hints, 'datetimeformattz')

if hints['timeonlyformat'] == 'HH24:MI:SS':
if hints.timeonlyformat == 'HH24:MI:SS':
pass
else:
cant_handle_hint(fail_if_cant_handle_hint, 'timeonlyformat', hints)
quiet_remove(unhandled_hints, 'timeonlyformat')

# No options to change this. Tested with unix newlines, dos
# newlines and mac newlines and all were understood.:
if hints['record-terminator'] in ['\n', '\r\n', '\r', None]:
if hints.record_terminator in ['\n', '\r\n', '\r', None]:
pass
else:
cant_handle_hint(fail_if_cant_handle_hint, 'record-terminator', hints)
Expand All @@ -338,7 +340,7 @@ def add_load_job_csv_config(unhandled_hints: Set[str],
# gzip and works great. .bz2 gives "400 Unsupported
# compression type". Not sure about .lzo, but pandas can't
# handle it regardless, so doubt it's handled.
if hints['compression'] is None or hints['compression'] == 'GZIP':
if hints.compression is None or hints.compression == 'GZIP':
pass
else:
cant_handle_hint(fail_if_cant_handle_hint, 'compression', hints)
Expand Down
10 changes: 5 additions & 5 deletions records_mover/db/postgres/copy_options/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from records_mover.records.load_plan import RecordsLoadPlan
from records_mover.records.delimited import RecordsHints
from records_mover.records.delimited import ValidatedRecordsHints
from records_mover.records.records_format import DelimitedRecordsFormat
import logging
from typing import Set, Tuple, Optional
Expand All @@ -16,7 +16,7 @@
# https://www.postgresql.org/docs/9.2/sql-copy.html


def needs_csv_format(hints: RecordsHints) -> bool:
def needs_csv_format(hints: ValidatedRecordsHints) -> bool:
# This format option is used for importing and exporting the Comma
# Separated Value (CSV) file format used by many other programs,
# such as spreadsheets. Instead of the escaping rules used by
Expand All @@ -31,7 +31,7 @@ def needs_csv_format(hints: RecordsHints) -> bool:
# QUOTE character or the ESCAPE character is preceded by the
# escape character. You can also use FORCE_QUOTE to force quotes
# when outputting non-NULL values in specific columns.
if hints['header-row'] or (hints['quoting'] is not None):
if hints.header_row or (hints.quoting is not None):
return True

return False
Expand All @@ -44,7 +44,7 @@ def postgres_copy_to_options(unhandled_hints: Set[str],
Tuple[DateOutputStyle,
Optional[DateOrderStyle],
PostgresCopyOptions]:
hints = delimited_records_format.hints
hints = delimited_records_format.validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)

if needs_csv_format(hints):
copy_options = postgres_copy_options_csv(unhandled_hints,
Expand Down Expand Up @@ -74,7 +74,7 @@ def postgres_copy_from_options(unhandled_hints: Set[str],
if not isinstance(load_plan.records_format, DelimitedRecordsFormat):
raise NotImplementedError("Not currently able to import "
f"{load_plan.records_format.format_type}")
hints = load_plan.records_format.hints
hints = load_plan.records_format.validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)

if needs_csv_format(hints):
postgres_copy_options = postgres_copy_options_csv(unhandled_hints,
Expand Down
10 changes: 5 additions & 5 deletions records_mover/db/postgres/copy_options/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from records_mover.utils import quiet_remove
from records_mover.records.delimited import cant_handle_hint, RecordsHints
from records_mover.records.delimited import cant_handle_hint, ValidatedRecordsHints
from typing import Set
from .types import PostgresCopyOptions

Expand All @@ -15,7 +15,7 @@


def postgres_copy_options_common(unhandled_hints: Set[str],
hints: RecordsHints,
hints: ValidatedRecordsHints,
fail_if_cant_handle_hint: bool,
original_postgres_options: PostgresCopyOptions) ->\
PostgresCopyOptions:
Expand All @@ -27,7 +27,7 @@ def postgres_copy_options_common(unhandled_hints: Set[str],
# this option is omitted, the current client encoding is
# used. See the Notes below for more details.

encoding_hint: str = hints['encoding'] # type: ignore
encoding_hint = hints.encoding
if encoding_hint in postgres_encoding_names:
postgres_options['encoding'] = postgres_encoding_names[encoding_hint]
quiet_remove(unhandled_hints, 'encoding')
Expand All @@ -51,7 +51,7 @@ def postgres_copy_options_common(unhandled_hints: Set[str],
# is ignored. This option is allowed only when using CSV format.
#
quiet_remove(unhandled_hints, 'header-row')
postgres_options['header'] = hints['header-row']
postgres_options['header'] = hints.header_row

# OIDS
#
Expand All @@ -67,7 +67,7 @@ def postgres_copy_options_common(unhandled_hints: Set[str],
# format, a comma in CSV format. This must be a single one-byte
# character. This option is not allowed when using binary format.
#
postgres_options['delimiter'] = hints['field-delimiter']
postgres_options['delimiter'] = hints.field_delimiter
quiet_remove(unhandled_hints, 'field-delimiter')

return postgres_options
22 changes: 11 additions & 11 deletions records_mover/db/postgres/copy_options/csv.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from records_mover.utils import quiet_remove
from records_mover.records.delimited import cant_handle_hint, RecordsHints
from records_mover.records.delimited import cant_handle_hint, ValidatedRecordsHints
from typing import Set
from .mode import CopyOptionsMode
from .common import postgres_copy_options_common
from .types import PostgresCopyOptions, CopyOptionsModeType, _assert_never


def postgres_copy_options_csv(unhandled_hints: Set[str],
hints: RecordsHints,
hints: ValidatedRecordsHints,
fail_if_cant_handle_hint: bool,
mode: CopyOptionsModeType) ->\
PostgresCopyOptions:
Expand Down Expand Up @@ -41,7 +41,7 @@ def postgres_copy_options_csv(unhandled_hints: Set[str],
# format.
#

postgres_options['quote'] = hints['quotechar']
postgres_options['quote'] = hints.quotechar
quiet_remove(unhandled_hints, 'quotechar')

# ESCAPE
Expand All @@ -53,12 +53,12 @@ def postgres_copy_options_csv(unhandled_hints: Set[str],
# character. This option is allowed only when using CSV format.
#

if not hints['doublequote']:
if not hints.doublequote:
cant_handle_hint(fail_if_cant_handle_hint, 'doublequote', hints)
else:
quiet_remove(unhandled_hints, 'doublequote')

if hints['escape'] is not None:
if hints.escape is not None:
cant_handle_hint(fail_if_cant_handle_hint, 'escape', hints)
else:
quiet_remove(unhandled_hints, 'escape')
Expand All @@ -71,7 +71,7 @@ def postgres_copy_options_csv(unhandled_hints: Set[str],
# option is allowed only in COPY TO, and only when using CSV
# format.
if mode is CopyOptionsMode.LOADING:
if hints['quoting'] != 'minimal':
if hints.quoting != 'minimal':
cant_handle_hint(fail_if_cant_handle_hint, 'quoting', hints)
else:
quiet_remove(unhandled_hints, 'quoting')
Expand All @@ -86,9 +86,9 @@ def postgres_copy_options_csv(unhandled_hints: Set[str],
# FORCE_QUOTE to force quotes when outputting non-NULL values
# in specific columns.

if hints['quoting'] == 'minimal':
if hints.quoting == 'minimal':
pass # default
elif hints['quoting'] == 'all':
elif hints.quoting == 'all':
postgres_options['force_quote'] = '*'
else:
cant_handle_hint(fail_if_cant_handle_hint, 'quoting', hints)
Expand Down Expand Up @@ -126,21 +126,21 @@ def postgres_copy_options_csv(unhandled_hints: Set[str],
# dockerized-postgres public bigqueryformat # loads fine

if mode is CopyOptionsMode.LOADING:
if hints['record-terminator'] in ("\n", "\r\n", "\r", None):
if hints.record_terminator in ("\n", "\r\n", "\r", None):
quiet_remove(unhandled_hints, 'record-terminator')
else:
cant_handle_hint(fail_if_cant_handle_hint, 'records-terminator', hints)
elif mode is CopyOptionsMode.UNLOADING:
# No control for this is given - exports appear with unix
# newlines.
if hints['record-terminator'] == "\n":
if hints.record_terminator == "\n":
quiet_remove(unhandled_hints, 'record-terminator')
else:
cant_handle_hint(fail_if_cant_handle_hint, 'records-terminator', hints)
else:
_assert_never(mode)

if hints['compression'] is not None:
if hints.compression is not None:
cant_handle_hint(fail_if_cant_handle_hint, 'compression', hints)
else:
quiet_remove(unhandled_hints, 'compression')
Expand Down
Loading

0 comments on commit 278a06f

Please sign in to comment.