Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Parquet records format to be specified on mvrec command line #129

Merged
merged 184 commits into from
Nov 15, 2020

Conversation

vinceatbluelabs
Copy link
Contributor

@vinceatbluelabs vinceatbluelabs commented Nov 10, 2020

This allows folks to specify the Parquet records format on the command line. Right now, mvrec implicitly assumes that only delimited format files (like CSVs) are being specified using its command line parameters.

The forcing function for this PR is that we use Parquet when transferring data between Redshift and BigQuery. I want to allow folks to break up that transfer to exporting first to an explicitly provided S3 bucket. That is now expressible as mvrec table2recordsdir --target.format parquet dbname schema table s3://bucket/directory/ in mvrec.

Example:

(records-mover-3.8.5) �]0;bigbookpro�broz@bigbookpro:~/src/records-mover$ mvrec file2table --help
mvrec file2table --help
usage: mvrec file2table [-h] [--source.variant SOURCE.VARIANT]
                        [--source.format {parquet,delimited}]
                        [--source.datetimeformattz {YYYY-MM-DD HH:MI:SSOF,YYYY-MM-DD HH:MI:SS,YYYY-MM-DD HH24:MI:SSOF,MM/DD/YY HH24:MI}]
                        [--source.datetimeformat {YYYY-MM-DD HH24:MI:SS,YYYY-MM-DD HH:MI:SS,YYYY-MM-DD HH12:MI AM,MM/DD/YY HH24:MI}]
                        [--source.no_compression]
                        [--source.compression {GZIP,BZIP,LZO}]
                        [--source.no_quoting]
                        [--source.quoting {all,minimal,nonnumeric}]
                        [--source.no_escape] [--source.escape {\}]
                        [--source.encoding {UTF8,UTF16,UTF16LE,UTF16BE,UTF16BOM,UTF8BOM,LATIN1,CP1252}]
                        [--source.dateformat {YYYY-MM-DD,MM-DD-YYYY,DD-MM-YYYY,MM/DD/YY}]
                        [--source.timeonlyformat {HH12:MI AM,HH24:MI:SS}]
                        [--source.no_doublequote] [--source.doublequote]
                        [--source.no_header_row] [--source.header_row]
                        [--source.quotechar SOURCE.QUOTECHAR]
                        [--source.record_terminator SOURCE.RECORD-TERMINATOR]
                        [--source.field_delimiter SOURCE.FIELD-DELIMITER]
                        [--target.existing_table {delete_and_overwrite,truncate_and_overwrite,drop_and_recreate,append}]
                        [--target.drop_and_recreate_on_load_error]
                        source.filename target.db_name target.schema_name
                        target.table_name

positional arguments:
  source.filename       File path (relative or absolute) of the data file
                        to load.
  target.db_name        SQLAlchemy database engine to write data to.
  target.schema_name    Schema name of a table to write data to.
  target.table_name     Table name of a table to write data to.

optional arguments:
  -h, --help            show this help message and exit
  --source.variant SOURCE.VARIANT
                        Records format variant - valid for 'delimited'
                        records format type
  --source.format {parquet,delimited}
                        Records format type. Note that 'delimited' includes
                        CSV/TSV/etc.
  --source.datetimeformattz {YYYY-MM-DD HH:MI:SSOF,YYYY-MM-DD HH:MI:SS,YYYY-MM-DD HH24:MI:SSOF,MM/DD/YY HH24:MI}
                        Format used to write 'datetimetz' values
  --source.datetimeformat {YYYY-MM-DD HH24:MI:SS,YYYY-MM-DD HH:MI:SS,YYYY-MM-DD HH12:MI AM,MM/DD/YY HH24:MI}
                        Format used to write 'datetime' values
  --source.no_compression
  --source.compression {GZIP,BZIP,LZO}
                        Compression type of the file.
  --source.no_quoting
  --source.quoting {all,minimal,nonnumeric}
                        How quotes are applied to individual fields. all:
                        quote all fields. minimal: quote only fields that
                        contain ambiguous characters (the delimiter, the
                        escape character, or a line terminator). default:
                        never quote fields.
  --source.no_escape
  --source.escape {\}   Character used to escape strings
  --source.encoding {UTF8,UTF16,UTF16LE,UTF16BE,UTF16BOM,UTF8BOM,LATIN1,CP1252}
                        Text encoding of file
  --source.dateformat {YYYY-MM-DD,MM-DD-YYYY,DD-MM-YYYY,MM/DD/YY}
                        Format used to write 'date' values
  --source.timeonlyformat {HH12:MI AM,HH24:MI:SS}
                        Format used to write 'time' values
  --source.no_doublequote
  --source.doublequote  Controls how instances of quotechar appearing
                        inside a field should themselves be quoted. When
                        True, the character is doubled. When False, the
                        escapechar is used as a prefix to the quotechar.
  --source.no_header_row
  --source.header_row   True if a header row is provided in the delimited
                        files.
  --source.quotechar SOURCE.QUOTECHAR
                        A one-character string used to quote fields
                        containing special characters, such as the
                        delimiter or quotechar, or which contain new-line
                        characters.
  --source.record_terminator SOURCE.RECORD-TERMINATOR
                        String used to close out individual rows of data.
  --source.field_delimiter SOURCE.FIELD-DELIMITER
                        Character used between fields.
  --target.existing_table {delete_and_overwrite,truncate_and_overwrite,drop_and_recreate,append}
                        When loading into a database table, controls how
                        any existing table found will be handled. This must
                        be a :class:`records_mover.records.ExistingTableHan
                        dling` object.
  --target.drop_and_recreate_on_load_error
                        If True, table load errors will attempt to be
                        addressed by dropping the target table and
                        reloading the incoming data.
(records-mover-3.8.5) �]0;bigbookpro�broz@bigbookpro:~/src/records-mover$ 

Note: This is set to merge into #128

It's not needed anymore, presumably due to changes in the move()
algorithm.
…t_load_not_available' into gcs_bigquery_load
(records-mover-3.8.5) �]0;bigbookpro�broz@bigbookpro:~/src/records-mover/tests/integration$ ./itest table2table
Integration test targets: table2table
Executing table2table test
Running docker_compose start verticadb postgresdb mysqldb...
Starting verticadb  ...
Starting postgresdb ...
Starting mysqldb    ...
Creating integration_records_mover_run ...
Checking connection to verticadb on port 5433
Could not connect to database
Connecting to database docker on verticadb:5433 as dbadmin
vsql: could not connect to server: Connection refused
	Is the server running on host "verticadb" and accepting
	TCP/IP connections on port 5433?
Waiting 5 seconds for Vertica to be available
Checking connection to verticadb on port 5433
Could not connect to database
Connecting to database docker on verticadb:5433 as dbadmin
Waiting 5 seconds for Vertica to be available
vsql: could not connect to server: Connection refused
	Is the server running on host "verticadb" and accepting
	TCP/IP connections on port 5433?
Checking connection to verticadb on port 5433
Could not connect to database
Connecting to database docker on verticadb:5433 as dbadmin
vsql: could not connect to server: Connection refused
	Is the server running on host "verticadb" and accepting
	TCP/IP connections on port 5433?
Waiting 5 seconds for Vertica to be available
Checking connection to verticadb on port 5433
Could not connect to database
Connecting to database docker on verticadb:5433 as dbadmin
vsql: could not connect to server: Connection refused
	Is the server running on host "verticadb" and accepting
	TCP/IP connections on port 5433?
Waiting 5 seconds for Vertica to be available
Checking connection to verticadb on port 5433
Connecting to database docker on verticadb:5433 as dbadmin
Verified Vertica is up and listening
Creating integration_records_mover_run ...
Connecting to database postgres on postgresdb:5432 as postgres
Verified Postgres is up and listening
Creating integration_records_mover_run ...
SHOW DATABASES: List all schemas (MySQL calls them 'databases')
SHOW TABLES: List all tables in schema
DESCRIBE foo: Show table structure
Verified MySQL is up and listening
All tables name in public: []
Tables to purge matching public.itest_target_: []
/Users/broz/.pyenv/versions/3.8.5/envs/records-mover-3.8.5/lib/python3.8/site-packages/yaml/scanner.py:286: ResourceWarning: unclosed <socket.socket fd=5, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=0, laddr=('::1', 65157, 0, 0), raddr=('::1', 32774, 0, 0)>
  for level in list(self.possible_simple_keys):
ResourceWarning: Enable tracemalloc to get the object allocation traceback
All tables name in itest: ['idt_t2t_source', 'test_t2t_source_table', 'test_table', 'itest_source_1834_1583161124', 'itest_source_2235_1584153798', 'itest_source_2678_1584668099', 'itest_source_local_1586010504', 'itest_source_local_1586010741', 'itest_source_4149_1586728786', 'itest_source_4761_1587244565', 'itest_source_5963_1588439942', 'itest_source_7852_1590183241', 'itest_source_local_1603902474']
Tables to purge matching itest.itest_target_: []
12:35:23 - Logging into BigQuery as [email protected]
/Users/broz/.pyenv/versions/3.8.5/envs/records-mover-3.8.5/lib/python3.8/site-packages/google/cloud/bigquery/client.py:439: UserWarning: Cannot create BigQuery Storage client, the dependency google-cloud-bigquery-storage is not installed.
  warnings.warn(
All tables name in bq_itest: ['csv_import', 'foo', 'foo12382', 'foo2020', 'freezable_131457', 'freezable_165405', 'freezable_203854', 'idt_t2t_source', 'itest_10201_1603844291', 'itest_1021_1582497244', 'itest_2881_1585856593', 'itest_2898_1585856730', 'itest_3278_1586035905', 'itest_3635_1586453088', 'itest_3968_1586723143', 'itest_4157_1586728820', 'itest_4594_1587234694', 'itest_4704_1587242932', 'itest_4719_1587243070', 'itest_4790_1587244779', 'itest_5367_1587864539', 'itest_5570_1587925302', 'itest_6167_1588532453', 'itest_6218_1588535271', 'itest_6505_1588772554', 'itest_6942_1589048716', 'itest_7095_1589220574', 'itest_7326_1589305148', 'itest_7349_1589305292', 'itest_7535_1589399333', 'itest_8256_1590673525', 'itest_8507_1590867974', 'itest_9272_1594302302', 'itest_9473_1597177110', 'itest_local_1585523397', 'itest_source_1834_1583161124', 'itest_source_2275_1584155282', 'itest_source_2410_1584213216', 'itest_source_3076_1585959268', 'itest_source_3647_1586453049', 'itest_source_3692_1586455901', 'itest_source_5390_1587864751', 'itest_source_6164_1588532431', 'itest_source_6934_1589048714', 'itest_source_7534_1589399334', 'itest_source_7544_1589399508', 'itest_source_8508_1590867978', 'itest_source_9476_1597177089', 'itest_source_local_1585858508', 'itest_source_local_1585858587', 'itest_source_local_1585858916', 'itest_source_local_1585859169', 'itest_source_local_1585859410', 'itest_source_local_1585859601', 'itest_source_local_1585860929', 'itest_source_local_1585861037', 'itest_source_local_1585866394', 'itest_source_local_1585866464', 'itest_source_local_1585866523', 'itest_source_local_1585930778', 'itest_source_local_1585931222', 'itest_source_local_1585931613', 'itest_source_local_1585955002', 'itest_source_local_1585955207', 'itest_source_local_1585955611', 'itest_source_local_1585955855', 'itest_source_local_1585955917', 'itest_source_local_1585956043', 'itest_source_local_1585956138', 'itest_source_local_1585956276', 'itest_source_local_1585956504', 'itest_source_local_1585959440', 'itest_source_local_1586010408', 'itest_source_local_1586033841', 'itest_source_local_1586895409', 'itest_target', 'itest_target_local_1603902474', 'test_t2t_source_table', 'test_t2t_target_1575337817', 'test_t2t_target_table_1575338460', 'test_t2t_target_table_1575338472', 'test_table', 'vmbtest123']
Tables to purge matching bq_itest.itest_target_: []
All tables name in public: []
Tables to purge matching public.itest_target_: []
All tables name in mysqlitest: ['itest_source_local_1603900488']
Tables to purge matching mysqlitest.itest_target_: []
12:35:27 - Logging into BigQuery as [email protected]
<frozen importlib._bootstrap_external>:580: ResourceWarning: unclosed <ssl.SSLSocket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('192.168.1.45', 65170), raddr=('172.217.15.74', 443)>
ResourceWarning: Enable tracemalloc to get the object allocation traceback
<frozen importlib._bootstrap_external>:580: ResourceWarning: unclosed <ssl.SSLSocket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('192.168.1.45', 65169), raddr=('172.217.164.138', 443)>
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/Users/broz/.pyenv/versions/3.8.5/envs/records-mover-3.8.5/lib/python3.8/site-packages/boto/plugin.py:40: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
  import imp
12:35:28 - Mover: copying from TableRecordsSource(redshift) to TableRecordsTarget(bigquery) by converting to dataframe...
12:35:28 - Pulling table metadata...
/Users/broz/.pyenv/versions/3.8.5/envs/records-mover-3.8.5/lib/python3.8/site-packages/sqlalchemy_redshift/dialect.py:747: SADeprecationWarning: The Engine.contextual_connect() and Connection.contextual_connect() methods are deprecated. This method is an artifact of the threadlocal engine strategy which is also to be deprecated.   For explicit connections from an Engine, use the Engine.connect() method.
  with connection.contextual_connect() as cc:
12:35:29 - Table metadata: itest.itest_source_local_1603902920
/Users/broz/.pyenv/versions/3.8.5/envs/records-mover-3.8.5/lib/python3.8/site-packages/sqlalchemy_redshift/dialect.py:747: SADeprecationWarning: The Engine.contextual_connect() method is deprecated. This method is an artifact of the threadlocal engine strategy which is also to be deprecated.   For explicit connections from an Engine, use the Engine.connect() method.
  with connection.contextual_connect() as cc:
12:35:29 - Exporting in chunks of up to 181818 rows by 11 columns
12:35:30 - Exporting to CSV with these Pandas options: {'encoding': 'UTF8', 'compression': 'gzip', 'quoting': 0, 'doublequote': True, 'quotechar': '"', 'header': True, 'date_format': '%Y-%m-%d %H:%M:%S.%f', 'sep': ',', 'line_terminator': '\n'}
12:35:30 - Writing CSV file to /var/folders/6m/lwg6ctb51gv7v0vxrkzjps7h0000gn/T/mover_seralized_dataframe6vyecim2
12:35:30 - Found date as unexpected type <class 'datetime.date'>
12:35:30 - CSV file written
/Users/broz/.pyenv/versions/3.8.5/envs/records-mover-3.8.5/lib/python3.8/site-packages/google/cloud/bigquery/client.py:439: UserWarning: Cannot create BigQuery Storage client, the dependency google-cloud-bigquery-storage is not installed.
  warnings.warn(
12:35:30 - Connecting to database...
12:35:30 - Looking for existing table..
/Users/broz/.pyenv/versions/3.8.5/envs/records-mover-3.8.5/lib/python3.8/site-packages/pybigquery/sqlalchemy_bigquery.py:380: PendingDeprecationWarning: Client.dataset is deprecated and will be removed in a future version. Use a string like 'my_project.my_dataset' or a cloud.google.bigquery.DatasetReference object, instead.
  table = connection.connection._client.dataset(dataset, project=project).table(table_name_prepared)
12:35:30 - Creating table...
12:35:31 - Just ran
CREATE TABLE `bq_itest`.`itest_target_local_1603902920` (
	`num` INT64,
	`numstr` STRING,
	`str` STRING,
	`comma` STRING,
	`doublequote` STRING,
	`quotecommaquote` STRING,
	`newlinestr` STRING,
	`date` DATE,
	`time` STRING,
	`timestamp` DATETIME,
	`timestamptz` TIMESTAMP
)

12:35:31 - Table prepped
12:35:31 - Loading from fileobj into BigQuery
12:35:31 - Using BigQuery load options: {'load': {'autodetect': False, 'createDisposition': 'CREATE_NEVER', 'destinationTableProperties': {}, 'ignoreUnknownValues': True, 'maxBadRecords': 0, 'allowJaggedRows': False, 'writeDisposition': 'WRITE_APPEND', 'schemaUpdateOptions': None, 'sourceFormat': 'CSV', 'encoding': 'UTF-8', 'fieldDelimiter': ',', 'allowQuotedNewlines': True, 'quote': '"', 'skipLeadingRows': '1'}}
12:35:34 - Loaded 1 rows into bq_itest:itest_target_local_1603902920
.
----------------------------------------------------------------------
Ran 1 test in 16.849s

OK
Stopping integration_verticadb_1  ...
Stopping integration_postgresdb_1 ...
Stopping integration_mysqldb_1    ...
(records-mover-3.8.5) �]0;bigbookpro�broz@bigbookpro:~/src/records-mover/tests/integration$
@@ -220,7 +220,7 @@ file referenced above:

```ini
[session]
session_type = "lpass"
session_type = lpass
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't related to the theme of this PR, but I found it along the way - our config parser doesn't use double quotes for parsing in its .ini format.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh. Good find.

@@ -47,7 +47,8 @@ def unload_to_s3_directory(self,
table: str,
unload_plan: RecordsUnloadPlan,
directory: RecordsDirectory) -> int:
logger.info(f"Starting Redshift unload to {directory.loc}...")
logger.info(f"Starting Redshift unload to {directory.loc} as "
f"{unload_plan.records_format}...")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While using this switch, I wanted to make sure it works, so logging what format I'm writing out was important!

@@ -2,7 +2,7 @@
import inspect
from typing import Any, Dict, List, Callable
Copy link
Contributor Author

@vinceatbluelabs vinceatbluelabs Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The below logic is all kinds of weird. I get that.

Here's what's going on:

Records Mover is both a Python library and a command line tool.

In terms of the library, the richest surface area of the API are source and target factory methods - where you specify what source and target you want, and configure them exactly as you want:

The naive way to write a command line parser would be to take each of these functions and write an argparse parser. You'd certainly want to do something at least mildly clever to combine those, so you don't have to rewrite the "file2" part of "file2url" and "file2table", etc, or you're dealing with exponential growth as sources and targets are added.

On top of that, it turns out that a lot of those parameters to methods are common - things like the records_format, the db_engine, the initial_hints, etc. Copying and pasting those over and over is kind of silly.

It's also true that a lot of those are pretty simple - strings, booleans, etc, possibly with default values. And they already have documentation written.

So I do something more clever, and I use Python's reflection abilities to pull parameter lists, types and documentation and generate argparse parsers. There's even an intermediate form as JSON schema (we used to have a layer which would accept records mover instructions as a JSON-style request). You can draw your own conclusion as to whether 'clever' is a positive description or not.

Aaaanyway....

image

@@ -29,8 +29,13 @@ def non_default_arg_names(self, fn: Callable[..., Any]) -> List[str]:

def add_hint_parameter(self, kwargs: Dict[str, Any], param_name: str) -> None:
Copy link
Contributor Author

@vinceatbluelabs vinceatbluelabs Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First complication - not every argument in a source/target factory method maps 1:1 to a command line parameter. The example we're hitting here is a RecordsFormat object - which has two subtypes (parquet and delimited). The DelimitedRecordsFormat() constructor takes both a variant argument and a hints dict, which can contain various keys.

We have a technique (which will show up further down in this PR) for exploding things like RecordsFormat out into multiple command line arguments.

This function that we're looking at now is on the call path when we're accepting the parsed command line argument results back in (as the kwargs dictionary) and are formulating the call into the factory method.

Its job is to be called for each hint parameter we receive and update the final set of arguments we'll pass to the factory method accordingly.

if not isinstance(existing_records_format, DelimitedRecordsFormat):
raise NotImplementedError('Hints are not compatible '
'with records format '
f'{existing_records_format.format_type}')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we now might get contradictory combination of arguments (like --source.format parquet --source.delimiter '|', we need to check for that situation.

Note that one day ParquetRecordsFormat will support some very small number of hints (compression is a key one that it's missing today), but as of today, there are no hints it supports.

del kwargs['initial_hints']
kwargs['records_format'] = DelimitedRecordsFormat(variant=kwargs['variant'],
hints=hints)
del kwargs['variant']
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now have both a format and variant command line parameter, instead of just variant. These functions handle ingesting two two, respectively.

The 'initial_hints' handling is not new or changed with this PR. It deals with the situation where we want RecordsMover to generally sniff the records format, but specify certain hints to begin with as constraints.

@@ -72,24 +77,43 @@ def fill_in_existing_table_handling(self, kwargs: Dict[str, Any]) -> None:
def fill_in_db_engine(self, kwargs: Dict[str, Any]) -> None:
kwargs['db_engine'] = self.session.get_db_engine(kwargs['db_name'])

def fill_in_records_format(self, kwargs: Dict[str, Any]) -> None:
if kwargs['variant'] is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line above was not necessary as far as I can tell. When the variant command line argument isn't supplied, it doesn't appear in the map, and if it doesn't appear in the map, this function wasn't being called.

@@ -17,7 +17,19 @@ def method_to_json_schema(method: Callable[..., Any]) -> JsonSchema:
special_handling: Dict[str, List[JsonParameter]] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we do the original translation of parameters in the factory methods to the intermediate JSON Schema format.

description="Records format type. "
"Note that 'delimited' includes "
"CSV/TSV/etc."),
optional=True,)] +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added descriptions to these key two so we have specific descriptions (instead of reusing the docs from the factory method).

@vinceatbluelabs vinceatbluelabs marked this pull request as ready for review November 11, 2020 00:52
Base automatically changed from handle_operation_timed_out to master November 11, 2020 14:47
@vinceatbluelabs vinceatbluelabs merged commit b081d5a into master Nov 15, 2020
@vinceatbluelabs vinceatbluelabs deleted the mvrec_parquet branch November 15, 2020 03:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants