Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Configuring Arrow RecordBatch Writers via SQL Statement Options #7390

Merged
merged 10 commits into from
Aug 29, 2023

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Aug 23, 2023

Which issue does this PR close?

Closes #7322
Closes #7298

Rationale for this change

Currently, the only way to configure how files are written out as a result of INSERT or COPY statements is via session level configuration. Additionally, string parsing of some special arbitrary SQL statement options are handled in various places in DataFusion on an adhoc basis (such as the current per_thread_output setting in COPY ).

This PR aims to consolidate logic for parsing arbitrary SQL statement options and support reuse of code, including in downstream systems which may choose to support their own special arbitrary options.

What changes are included in this PR?

  • Move existing parquet setting string parsing logic to datafusion-common
  • Create abstractions to make working with tuples of arbitrary string options easier
  • Create a new DataFusionError type for unsupported options
  • Implement string parsing logic for CSV and JSON settings
  • Support writing compressed CSV and JSON files (this one is a DataFusion specific write option)
  • Rename per_thread_output option to single_file_output as I believe this is less ambiguous in DataFusion (DuckDB uses the per_thread_output option, but the reference to threads doesn't really make sense in DataFusion). This also makes more sense in a table context where a listing table can be backed by a single file or not.

An example of a new query supported by this PR:

COPY source_table TO 
'./table' 
(format parquet, 
single_file_output false, 
compression 'zstd(10)');

Notable work that is important for follow ons

  • Support these options in the DataFrame::write_* functions without requiring passing of strings (current thinking is to allow passing a fully formed writer builder object directly).
  • Improve test coverage of various combinations of write options.

Are these changes tested?

Yes, via existing tests and some new ones. However, there is now a very large number of possible option combinations. We may want to implement some dynamic test case generation framework to ensure no specific combination of options will fail.

Are there any user-facing changes?

Yes, writes now support most options that the various arrow writers support via SQL statements. The primary exception right now is column specific parquet options (i.e. setting an encoding for only 1 column). Additional work will be needed to support that.

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Aug 23, 2023
@@ -73,6 +73,9 @@ pub enum DataFusionError {
/// This error happens whenever a plan is not valid. Examples include
/// impossible casts.
Plan(String),
/// This error happens when an invalid or unsupported option is passed
/// in a SQL statement
InvalidOption(String),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this causes any issues with the ongoing work around DataFusionError, I'm happy to roll it back. I created this because I didn't feel any existing error type seemed to make sense for an invalid option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree none of the existing errors really fits. What about calling this Configuration as I think it could be used more generally when some configuration failed validation (not just setting options via SQL)?

/// around a "writer builder" type (e.g. arrow::csv::WriterBuilder)
/// plus any DataFusion specific writing options (e.g. CSV compression)
#[derive(Clone, Debug)]
pub enum FileTypeWriterOptions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This enum was proposed by @alamb in #6569 and is the key new abstraction in this PR.

@@ -1828,7 +1862,7 @@ mod tests {
)
.await
.expect_err("Example should fail!");
assert_eq!("Error during planning: zstd compression requires specifying a level such as zstd(4)", format!("{e}"));
assert_eq!("Invalid Option: zstd compression requires specifying a level such as zstd(4)", format!("{e}"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example I think shows why I wanted to create a new error type. "Error during planning" I think would be more confusing to an end user than just a straightforward "Invalid Option" error.

# Copy from table with options
query IT
query error DataFusion error: Invalid Option: Found unsupported option row_group_size with value 55 for JSON format!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in #6569, this query now throws an error since row_group_size does not make sense in the context of writing JSON files.

@alamb
Copy link
Contributor

alamb commented Aug 24, 2023

Yes, via existing tests and some new ones. However, there is now a very large number of possible option combinations. We may want to implement some dynamic test case generation framework to ensure no specific combination of options will fail.

FWIW I don't think it is necessary to test all the combinations -- that is largely tested upstream in the arrow writer so testing in DataFusion that all the plumbing is hooked up correctly is likely the most important thing

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great @devinjdangelo -- thank you! I also tried it out locally and it worked great. It is not that often that I review 1500+ line PRs and have so few comments 🥇

The only thing I think I think this PR needs prior to merging is test coverage for config parsing (as you mentioned in your description)

I also think we should document the available options (but that can be done as a follow on PR)

Local test

~/Software/target-df2/release/datafusion-cli -c "copy 'traces_nd.parquet' to 'out' (format parquet, single_file_output false, compression 'zstd(10)')"
DataFusion CLI v30.0.0
+---------+
| count   |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 23.679 seconds.

@@ -73,6 +73,9 @@ pub enum DataFusionError {
/// This error happens whenever a plan is not valid. Examples include
/// impossible casts.
Plan(String),
/// This error happens when an invalid or unsupported option is passed
/// in a SQL statement
InvalidOption(String),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree none of the existing errors really fits. What about calling this Configuration as I think it could be used more generally when some configuration failed validation (not just setting options via SQL)?

datafusion/common/src/file_options/arrow_writer.rs Outdated Show resolved Hide resolved
datafusion/common/src/file_options/csv_writer.rs Outdated Show resolved Hide resolved
impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions {
type Error = DataFusionError;

fn try_from(value: (&ConfigOptions, &StatementOptions)) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is cool

datafusion/common/src/file_options/mod.rs Outdated Show resolved Hide resolved
/// Constructs a default Parquet WriterPropertiesBuilder using
/// Session level ConfigOptions to initialize settings
fn default_builder(options: &ConfigOptions) -> Result<WriterPropertiesBuilder> {
let parquet_context = &options.execution.parquet;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: calling this parquet_options might be clearer

}

/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding
pub(crate) fn parse_encoding_string(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to move this stuff upstream into a FromStr implementation in the parquet crate. I can perhaps file a ticket to do so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you already did! apache/arrow-rs#4693

@@ -1191,6 +1191,8 @@ mod tests {
Field::new("c2", DataType::UInt64, false),
]));

println!("{out_dir}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be left over? Since it is part of a test I think it is fine to leave in, just thought I would point it out

/// regardless of input partitioning. Otherwise, each table path is assumed to be a directory
/// to which each output partition is written to its own output file.
pub per_thread_output: bool,
pub single_file_output: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the new name is much nicer

@alamb
Copy link
Contributor

alamb commented Aug 24, 2023

I also took the liberty of merging this branch to main to resolve a conflict (I am so excited to see this feature almost ready!)

@devinjdangelo
Copy link
Contributor Author

Thank you for the review @alamb! All of your suggestions make sense. I plan to get to updating this pr and adding some additional test cases within the next few days.

I also noticed your local test took 23s to execute. I am hoping to (finally) get to improving that performance soon!

@alamb
Copy link
Contributor

alamb commented Aug 25, 2023

I also noticed your local test took 23s to execute. I am hoping to (finally) get to improving that performance soon!

thank you for all your help!

@alamb
Copy link
Contributor

alamb commented Aug 28, 2023

Let me know if you need help finishing up this PR @devinjdangelo -- I am on vacation this week so I can focus my energy and reviews on things I think are the most interesting -- and this is one of them!

@devinjdangelo
Copy link
Contributor Author

Thanks @alamb! I just pushed up some updates addressing review comments and adding a few more tests. The main thing remaining is unit tests which set options via a StatementOptions struct and inspect the resulting FileTypeWriterOptions to ensure the settings actually were set as expected. The current sqllogictests are only ensuring that passing the option does not result in an error, not that the setting actually had an effect on the underlying RecordBatchWriter.

I expect to get back to work on unit tests later today (after 5pm EST). Feel free to take a crack at it before I get to it (or just enjoy your vacation)!

Also noticed many clippy warnings locally unrelated to the changes in this PR. I believe that is related to the clippy updates upstream, so may just need to merge main again.

@devinjdangelo
Copy link
Contributor Author

I pushed up some unit tests. The CSV option parsing unit test cannot validate options were set properly on csv::WriterBuilder since actually none of its properties are externally readable. We should update this unit test when apache/arrow-rs#4735 is closed out.

Also, solving my local clippy errors was as simple as running rustup update.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo -- this looks epic! Really nice work

@alamb alamb merged commit 18a03bd into apache:main Aug 29, 2023
@andygrove andygrove added the enhancement New feature or request label Sep 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate enhancement New feature or request logical-expr Logical plan and expressions sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
3 participants