Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CLI hint code and types, more CLI hint arguments #66

Merged
merged 30 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5c04553
Give Hint class a description field and move in existing values
vinceatbluelabs May 23, 2020
1e17cc9
Initial work towards using Hint objects for CLI generation
vinceatbluelabs May 23, 2020
de1eb43
Unratchet temporarily
vinceatbluelabs May 23, 2020
53acc55
Lazily import JsonSchemaDocument
vinceatbluelabs May 23, 2020
04ac539
Drop unneeded import
vinceatbluelabs May 24, 2020
72fbb2e
Get rid of records/job/hints.py entirely
vinceatbluelabs May 24, 2020
d2852a3
Limit bootstrapping parameters
vinceatbluelabs May 24, 2020
d6a46c4
Add TypedRecordsHints type
vinceatbluelabs May 24, 2020
4b5315b
Reflect more realistic input of raw hints
vinceatbluelabs May 24, 2020
1be53a9
Validate hints after they come in from a user as initial hints.
vinceatbluelabs May 25, 2020
1b1c617
Introduce non-total TypedDict of hints
vinceatbluelabs May 25, 2020
af2967e
Fix tests
vinceatbluelabs May 25, 2020
fbe802d
Fix mypy issues
vinceatbluelabs May 26, 2020
0974b1f
flake8 fixes
vinceatbluelabs May 26, 2020
f1c51ce
Convert more hint code
vinceatbluelabs May 26, 2020
c04d857
Convert more uses of record_mover.hints
vinceatbluelabs May 26, 2020
84d74fb
Drop BootstrappingRecordsHints
vinceatbluelabs May 27, 2020
d70fe31
Standardize on PartialRecordsHints name
vinceatbluelabs May 27, 2020
9dd54bb
Move more code to PartialRecordsHints
vinceatbluelabs May 27, 2020
2b32314
Convert over one more use of PartialRecordsHints
vinceatbluelabs May 27, 2020
bf075b5
flake8 fixes
vinceatbluelabs May 28, 2020
2a987e9
Fix flake8 issues
vinceatbluelabs May 28, 2020
1481473
Merge remote-tracking branch 'origin/master' into refactor_cli_hints
vinceatbluelabs May 28, 2020
09a8266
Increase coverage
vinceatbluelabs May 28, 2020
e47454b
Unratchet
vinceatbluelabs May 30, 2020
30ee0ae
Remove unused import
vinceatbluelabs May 30, 2020
7d16295
Handle --header-row and --no_header_row correctly
vinceatbluelabs May 30, 2020
021d008
Remove dead code
vinceatbluelabs May 30, 2020
ee1ea55
Fix flake8 issue
vinceatbluelabs May 30, 2020
2262644
Merge remote-tracking branch 'origin/master' into refactor_cli_hints
vinceatbluelabs May 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
94.0200
94.0100
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.1100
92.0400
5 changes: 4 additions & 1 deletion records_mover/db/bigquery/load_job_config_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ...records.load_plan import RecordsLoadPlan
from ...records.records_format import DelimitedRecordsFormat, ParquetRecordsFormat
from records_mover.records import RecordsHints
from records_mover.records.delimited import validate_partial_hints
from google.cloud.bigquery.job import CreateDisposition, WriteDisposition
from google.cloud import bigquery
import logging
Expand Down Expand Up @@ -106,8 +107,10 @@ def load_job_config(unhandled_hints: Set[str],
config.schema_update_options = None

if isinstance(load_plan.records_format, DelimitedRecordsFormat):
hints = validate_partial_hints(load_plan.records_format.hints,
fail_if_cant_handle_hint=load_plan.processing_instructions.fail_if_cant_handle_hint)
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
add_load_job_csv_config(unhandled_hints,
load_plan.records_format.hints,
hints,
fail_if_cant_handle_hint,
config)
return config
Expand Down
10 changes: 5 additions & 5 deletions records_mover/db/postgres/copy_options/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from records_mover.records.load_plan import RecordsLoadPlan
from records_mover.records.delimited import RecordsHints
from records_mover.records.delimited import ValidatedRecordsHints
from records_mover.records.records_format import DelimitedRecordsFormat
import logging
from typing import Set, Tuple, Optional
Expand All @@ -16,7 +16,7 @@
# https://www.postgresql.org/docs/9.2/sql-copy.html


def needs_csv_format(hints: RecordsHints) -> bool:
def needs_csv_format(hints: ValidatedRecordsHints) -> bool:
# This format option is used for importing and exporting the Comma
# Separated Value (CSV) file format used by many other programs,
# such as spreadsheets. Instead of the escaping rules used by
Expand All @@ -31,7 +31,7 @@ def needs_csv_format(hints: RecordsHints) -> bool:
# QUOTE character or the ESCAPE character is preceded by the
# escape character. You can also use FORCE_QUOTE to force quotes
# when outputting non-NULL values in specific columns.
if hints['header-row'] or (hints['quoting'] is not None):
if hints.header_row or (hints.quoting is not None):
return True

return False
Expand All @@ -44,7 +44,7 @@ def postgres_copy_to_options(unhandled_hints: Set[str],
Tuple[DateOutputStyle,
Optional[DateOrderStyle],
PostgresCopyOptions]:
hints = delimited_records_format.hints
hints = delimited_records_format.validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)

if needs_csv_format(hints):
copy_options = postgres_copy_options_csv(unhandled_hints,
Expand Down Expand Up @@ -74,7 +74,7 @@ def postgres_copy_from_options(unhandled_hints: Set[str],
if not isinstance(load_plan.records_format, DelimitedRecordsFormat):
raise NotImplementedError("Not currently able to import "
f"{load_plan.records_format.format_type}")
hints = load_plan.records_format.hints
hints = load_plan.records_format.validate(fail_if_cant_handle_hint=fail_if_cant_handle_hint)

if needs_csv_format(hints):
postgres_copy_options = postgres_copy_options_csv(unhandled_hints,
Expand Down
11 changes: 6 additions & 5 deletions records_mover/db/postgres/copy_options/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from records_mover.utils import quiet_remove
from records_mover.records.delimited import cant_handle_hint, RecordsHints
from records_mover.records.records_format import DelimitedRecordsFormat
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
from records_mover.records.delimited import cant_handle_hint, ValidatedRecordsHints
from typing import Set
from .types import PostgresCopyOptions

Expand All @@ -15,7 +16,7 @@


def postgres_copy_options_common(unhandled_hints: Set[str],
hints: RecordsHints,
hints: ValidatedRecordsHints,
fail_if_cant_handle_hint: bool,
original_postgres_options: PostgresCopyOptions) ->\
PostgresCopyOptions:
Expand All @@ -27,7 +28,7 @@ def postgres_copy_options_common(unhandled_hints: Set[str],
# this option is omitted, the current client encoding is
# used. See the Notes below for more details.

encoding_hint: str = hints['encoding'] # type: ignore
encoding_hint = hints.encoding
if encoding_hint in postgres_encoding_names:
postgres_options['encoding'] = postgres_encoding_names[encoding_hint]
quiet_remove(unhandled_hints, 'encoding')
Expand All @@ -51,7 +52,7 @@ def postgres_copy_options_common(unhandled_hints: Set[str],
# is ignored. This option is allowed only when using CSV format.
#
quiet_remove(unhandled_hints, 'header-row')
postgres_options['header'] = hints['header-row']
postgres_options['header'] = hints.header_row

# OIDS
#
Expand All @@ -67,7 +68,7 @@ def postgres_copy_options_common(unhandled_hints: Set[str],
# format, a comma in CSV format. This must be a single one-byte
# character. This option is not allowed when using binary format.
#
postgres_options['delimiter'] = hints['field-delimiter']
postgres_options['delimiter'] = hints.field_delimiter
quiet_remove(unhandled_hints, 'field-delimiter')

return postgres_options
23 changes: 12 additions & 11 deletions records_mover/db/postgres/copy_options/csv.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from records_mover.utils import quiet_remove
from records_mover.records.delimited import cant_handle_hint, RecordsHints
from records_mover.records.records_format import DelimitedRecordsFormat
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
from records_mover.records.delimited import cant_handle_hint, ValidatedRecordsHints
from typing import Set
from .mode import CopyOptionsMode
from .common import postgres_copy_options_common
from .types import PostgresCopyOptions, CopyOptionsModeType, _assert_never


def postgres_copy_options_csv(unhandled_hints: Set[str],
hints: RecordsHints,
hints: ValidatedRecordsHints,
fail_if_cant_handle_hint: bool,
mode: CopyOptionsModeType) ->\
PostgresCopyOptions:
Expand Down Expand Up @@ -41,7 +42,7 @@ def postgres_copy_options_csv(unhandled_hints: Set[str],
# format.
#

postgres_options['quote'] = hints['quotechar']
postgres_options['quote'] = hints.quotechar
quiet_remove(unhandled_hints, 'quotechar')

# ESCAPE
Expand All @@ -53,12 +54,12 @@ def postgres_copy_options_csv(unhandled_hints: Set[str],
# character. This option is allowed only when using CSV format.
#

if not hints['doublequote']:
if not hints.doublequote:
cant_handle_hint(fail_if_cant_handle_hint, 'doublequote', hints)
else:
quiet_remove(unhandled_hints, 'doublequote')

if hints['escape'] is not None:
if hints.escape is not None:
cant_handle_hint(fail_if_cant_handle_hint, 'escape', hints)
else:
quiet_remove(unhandled_hints, 'escape')
Expand All @@ -71,7 +72,7 @@ def postgres_copy_options_csv(unhandled_hints: Set[str],
# option is allowed only in COPY TO, and only when using CSV
# format.
if mode is CopyOptionsMode.LOADING:
if hints['quoting'] != 'minimal':
if hints.quoting != 'minimal':
cant_handle_hint(fail_if_cant_handle_hint, 'quoting', hints)
else:
quiet_remove(unhandled_hints, 'quoting')
Expand All @@ -86,9 +87,9 @@ def postgres_copy_options_csv(unhandled_hints: Set[str],
# FORCE_QUOTE to force quotes when outputting non-NULL values
# in specific columns.

if hints['quoting'] == 'minimal':
if hints.quoting == 'minimal':
pass # default
elif hints['quoting'] == 'all':
elif hints.quoting == 'all':
postgres_options['force_quote'] = '*'
else:
cant_handle_hint(fail_if_cant_handle_hint, 'quoting', hints)
Expand Down Expand Up @@ -126,21 +127,21 @@ def postgres_copy_options_csv(unhandled_hints: Set[str],
# dockerized-postgres public bigqueryformat # loads fine

if mode is CopyOptionsMode.LOADING:
if hints['record-terminator'] in ("\n", "\r\n", "\r", None):
if hints.record_terminator in ("\n", "\r\n", "\r", None):
quiet_remove(unhandled_hints, 'record-terminator')
else:
cant_handle_hint(fail_if_cant_handle_hint, 'records-terminator', hints)
elif mode is CopyOptionsMode.UNLOADING:
# No control for this is given - exports appear with unix
# newlines.
if hints['record-terminator'] == "\n":
if hints.record_terminator == "\n":
quiet_remove(unhandled_hints, 'record-terminator')
else:
cant_handle_hint(fail_if_cant_handle_hint, 'records-terminator', hints)
else:
_assert_never(mode)

if hints['compression'] is not None:
if hints.compression is not None:
cant_handle_hint(fail_if_cant_handle_hint, 'compression', hints)
else:
quiet_remove(unhandled_hints, 'compression')
Expand Down
12 changes: 6 additions & 6 deletions records_mover/db/postgres/copy_options/date_input_style.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from records_mover.utils import quiet_remove
from records_mover.records.delimited import cant_handle_hint, RecordsHints
from records_mover.records.delimited import cant_handle_hint, ValidatedRecordsHints
from typing import Optional, Set
from .types import DateOrderStyle


def determine_input_date_order_style(unhandled_hints: Set[str],
hints: RecordsHints,
hints: ValidatedRecordsHints,
fail_if_cant_handle_hint: bool) ->\
Optional[DateOrderStyle]:
date_order_style: Optional[DateOrderStyle] = None
Expand Down Expand Up @@ -39,7 +39,7 @@ def upgrade_date_order_style(style: DateOrderStyle, hint_name: str) -> None:
#
# postgres=#

datetimeformattz = hints['datetimeformattz']
datetimeformattz = hints.datetimeformattz

# datetimeformattz: Valid values: "YYYY-MM-DD HH:MI:SSOF",
# "YYYY-MM-DD HH:MI:SS", "YYYY-MM-DD HH24:MI:SSOF", "YYYY-MM-DD
Expand Down Expand Up @@ -94,7 +94,7 @@ def upgrade_date_order_style(style: DateOrderStyle, hint_name: str) -> None:
# "YYYY-MM-DD HH12:MI AM", "MM/DD/YY HH24:MI". See Redshift docs
# for more information.

datetimeformat = hints['datetimeformat']
datetimeformat = hints.datetimeformat

if datetimeformat in ("YYYY-MM-DD HH24:MI:SS",
"YYYY-MM-DD HH:MI:SS"):
Expand Down Expand Up @@ -135,7 +135,7 @@ def upgrade_date_order_style(style: DateOrderStyle, hint_name: str) -> None:
else:
cant_handle_hint(fail_if_cant_handle_hint, 'datetimeformat', hints)

timeonlyformat = hints['timeonlyformat']
timeonlyformat = hints.timeonlyformat

# timeonlyformat: Valid values: "HH12:MI AM" (e.g., "1:00 PM"),
# "HH24:MI:SS" (e.g., "13:00:00")
Expand Down Expand Up @@ -171,7 +171,7 @@ def upgrade_date_order_style(style: DateOrderStyle, hint_name: str) -> None:
cant_handle_hint(fail_if_cant_handle_hint, 'datetimeformat', hints)

# dateformat: Valid values: null, "YYYY-MM-DD", "MM-DD-YYYY", "DD-MM-YYYY", "MM/DD/YY".
dateformat = hints['dateformat']
dateformat = hints.dateformat

if dateformat == "YYYY-MM-DD":
# postgres=# select date '1999-01-02';
Expand Down
12 changes: 6 additions & 6 deletions records_mover/db/postgres/copy_options/date_output_style.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
from records_mover.utils import quiet_remove
from records_mover.records.delimited import cant_handle_hint, RecordsHints
from records_mover.records.delimited import cant_handle_hint, ValidatedRecordsHints
from typing import Set, Tuple, Optional
from .types import DateOrderStyle, DateOutputStyle


def determine_date_output_style(unhandled_hints: Set[str],
hints: RecordsHints,
hints: ValidatedRecordsHints,
fail_if_cant_handle_hint: bool) -> \
Tuple[DateOutputStyle, Optional[DateOrderStyle]]:

# see docs in the types module

dateformat = hints['dateformat']
timeonlyformat = hints['timeonlyformat']
datetimeformattz = hints['datetimeformattz']
datetimeformat = hints['datetimeformat']
dateformat = hints.dateformat
timeonlyformat = hints.timeonlyformat
datetimeformattz = hints.datetimeformattz
datetimeformat = hints.datetimeformat

date_order_style: Optional[DateOrderStyle] = None

Expand Down
17 changes: 9 additions & 8 deletions records_mover/db/postgres/copy_options/text.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from records_mover.utils import quiet_remove
from records_mover.records.delimited import cant_handle_hint, RecordsHints
from records_mover.records.records_format import DelimitedRecordsFormat
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
from records_mover.records.delimited import cant_handle_hint, ValidatedRecordsHints
from typing import Set
from .mode import CopyOptionsMode
from .types import PostgresCopyOptions, CopyOptionsModeType, _assert_never
from .common import postgres_copy_options_common


def postgres_copy_options_text(unhandled_hints: Set[str],
hints: RecordsHints,
hints: ValidatedRecordsHints,
fail_if_cant_handle_hint: bool,
mode: CopyOptionsModeType) ->\
PostgresCopyOptions:
Expand All @@ -26,7 +27,7 @@ def postgres_copy_options_text(unhandled_hints: Set[str],
# preceded by a backslash if they appear as part of a column
# value: backslash itself, newline, carriage return, and the
# current delimiter character.
if hints['escape'] is None:
if hints.escape is None:
cant_handle_hint(fail_if_cant_handle_hint, 'escape', hints)
else:
quiet_remove(unhandled_hints, 'escape')
Expand Down Expand Up @@ -64,7 +65,7 @@ def postgres_copy_options_text(unhandled_hints: Set[str],
# character. This option is allowed only when using CSV format.
#

if hints['doublequote']:
if hints.doublequote:
cant_handle_hint(fail_if_cant_handle_hint, 'doublequote', hints)
else:
quiet_remove(unhandled_hints, 'doublequote')
Expand All @@ -78,7 +79,7 @@ def postgres_copy_options_text(unhandled_hints: Set[str],
# format.
#

if hints['quoting'] is not None:
if hints.quoting is not None:
cant_handle_hint(fail_if_cant_handle_hint, 'quoting', hints)
else:
quiet_remove(unhandled_hints, 'quoting')
Expand All @@ -105,21 +106,21 @@ def postgres_copy_options_text(unhandled_hints: Set[str],
# are not all alike.

if mode is CopyOptionsMode.LOADING:
if hints['record-terminator'] in ["\n", "\r", "\r\n"]:
if hints.record_terminator in ["\n", "\r", "\r\n"]:
quiet_remove(unhandled_hints, 'record-terminator')
else:
cant_handle_hint(fail_if_cant_handle_hint, 'record-terminator', hints)
elif mode is CopyOptionsMode.UNLOADING:
# No control for this is given - exports appear with unix
# newlines.
if hints['record-terminator'] == "\n":
if hints.record_terminator == "\n":
quiet_remove(unhandled_hints, 'record-terminator')
else:
cant_handle_hint(fail_if_cant_handle_hint, 'record-terminator', hints)
else:
_assert_never(mode)

if hints['compression'] is not None:
if hints.compression is not None:
cant_handle_hint(fail_if_cant_handle_hint, 'compression', hints)
else:
quiet_remove(unhandled_hints, 'compression')
Expand Down
10 changes: 8 additions & 2 deletions records_mover/db/redshift/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def load(self,
to = Table(table, self.meta, schema=schema) # no autoload
unhandled_hints = set(load_plan.records_format.hints.keys())
processing_instructions = load_plan.processing_instructions
redshift_options = redshift_copy_options(unhandled_hints, load_plan.records_format.hints,
validated_hints = load_plan.records_format.\
validate(fail_if_cant_handle_hint=processing_instructions.fail_if_cant_handle_hint)
redshift_options = redshift_copy_options(unhandled_hints,
validated_hints,
processing_instructions.fail_if_cant_handle_hint,
processing_instructions.fail_if_row_invalid,
processing_instructions.max_failure_rows)
Expand Down Expand Up @@ -106,7 +109,10 @@ def can_load_this_format(self, source_records_format: BaseRecordsFormat) -> bool
return False
unhandled_hints = set(load_plan.records_format.hints.keys())
processing_instructions = load_plan.processing_instructions
redshift_copy_options(unhandled_hints, load_plan.records_format.hints,
hints = load_plan.records_format.\
validate(fail_if_cant_handle_hint=processing_instructions.fail_if_cant_handle_hint)
redshift_copy_options(unhandled_hints,
hints,
processing_instructions.fail_if_cant_handle_hint,
processing_instructions.fail_if_row_invalid,
processing_instructions.max_failure_rows)
Expand Down
Loading