From b2a04519da97c2ff81789ef41dd652870794a73a Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Mon, 19 Feb 2024 02:32:27 -0500 Subject: [PATCH] Support Copy To Partitioned Files (#9240) * support copy to partitioned files * remove print statements * fmt * fix tests and use err macro * cargo doc fix * add partition directory specific test * try to support columns with single quotes in name --- datafusion/common/src/file_options/mod.rs | 25 ++++++ datafusion/core/src/dataframe/mod.rs | 12 +++ datafusion/core/src/dataframe/parquet.rs | 1 + .../src/datasource/file_format/write/demux.rs | 28 ++++--- datafusion/core/src/physical_planner.rs | 10 ++- datafusion/expr/src/logical_plan/builder.rs | 2 + datafusion/expr/src/logical_plan/dml.rs | 2 + datafusion/expr/src/logical_plan/plan.rs | 4 +- datafusion/proto/src/logical_plan/mod.rs | 2 + .../tests/cases/roundtrip_logical_plan.rs | 4 + datafusion/sql/src/statement.rs | 2 + datafusion/sqllogictest/test_files/copy.slt | 84 +++++++++++++++++++ 12 files changed, 163 insertions(+), 13 deletions(-) diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 1d661b17eb1c..3a48f188fb97 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -97,6 +97,31 @@ impl StatementOptions { maybe_option.map(|(_, v)| v) } + /// Finds partition_by option if exists and parses into a `Vec`. + /// If option doesn't exist, returns empty `vec![]`. + /// E.g. (partition_by 'colA, colB, colC') -> `vec!['colA','colB','colC']` + pub fn take_partition_by(&mut self) -> Vec { + let partition_by = self.take_str_option("partition_by"); + match partition_by { + Some(part_cols) => { + let dequoted = part_cols + .chars() + .enumerate() + .filter(|(idx, c)| { + !((*idx == 0 || *idx == part_cols.len() - 1) + && (*c == '\'' || *c == '"')) + }) + .map(|(_idx, c)| c) + .collect::(); + dequoted + .split(',') + .map(|s| s.trim().replace("''", "'")) + .collect::>() + } + None => vec![], + } + } + /// Infers the file_type given a target and arbitrary options. /// If the options contain an explicit "format" option, that will be used. /// Otherwise, attempt to infer file_type from the extension of target. diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 237f14d2c046..81247908dfe1 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -73,6 +73,9 @@ pub struct DataFrameWriteOptions { /// Allows compression of CSV and JSON. /// Not supported for parquet. compression: CompressionTypeVariant, + /// Sets which columns should be used for hive-style partitioned writes by name. + /// Can be set to empty vec![] for non-partitioned writes. + partition_by: Vec, } impl DataFrameWriteOptions { @@ -82,6 +85,7 @@ impl DataFrameWriteOptions { overwrite: false, single_file_output: false, compression: CompressionTypeVariant::UNCOMPRESSED, + partition_by: vec![], } } /// Set the overwrite option to true or false @@ -101,6 +105,12 @@ impl DataFrameWriteOptions { self.compression = compression; self } + + /// Sets the partition_by columns for output partitioning + pub fn with_partition_by(mut self, partition_by: Vec) -> Self { + self.partition_by = partition_by; + self + } } impl Default for DataFrameWriteOptions { @@ -1176,6 +1186,7 @@ impl DataFrame { self.plan, path.into(), FileType::CSV, + options.partition_by, copy_options, )? .build()?; @@ -1219,6 +1230,7 @@ impl DataFrame { self.plan, path.into(), FileType::JSON, + options.partition_by, copy_options, )? .build()?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 00a0e780d51f..184d3c8cb25a 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -73,6 +73,7 @@ impl DataFrame { self.plan, path.into(), FileType::PARQUET, + options.partition_by, copy_options, )? .build()?; diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs b/datafusion/core/src/datasource/file_format/write/demux.rs index 94d915827e4f..1f7c243e980d 100644 --- a/datafusion/core/src/datasource/file_format/write/demux.rs +++ b/datafusion/core/src/datasource/file_format/write/demux.rs @@ -32,7 +32,7 @@ use arrow_array::cast::AsArray; use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArray}; use arrow_schema::{DataType, Schema}; use datafusion_common::cast::as_string_array; -use datafusion_common::DataFusionError; +use datafusion_common::{exec_datafusion_err, DataFusionError}; use datafusion_execution::TaskContext; @@ -319,14 +319,20 @@ fn compute_partition_keys_by_row<'a>( ) -> Result>> { let mut all_partition_values = vec![]; - for (col, dtype) in partition_by.iter() { + // For the purposes of writing partitioned data, we can rely on schema inference + // to determine the type of the partition cols in order to provide a more ergonomic + // UI which does not require specifying DataTypes manually. So, we ignore the + // DataType within the partition_by array and infer the correct type from the + // batch schema instead. + let schema = rb.schema(); + for (col, _) in partition_by.iter() { let mut partition_values = vec![]; - let col_array = - rb.column_by_name(col) - .ok_or(DataFusionError::Execution(format!( - "PartitionBy Column {} does not exist in source data!", - col - )))?; + + let dtype = schema.field_with_name(col)?.data_type(); + let col_array = rb.column_by_name(col).ok_or(exec_datafusion_err!( + "PartitionBy Column {} does not exist in source data! Got schema {schema}.", + col + ))?; match dtype { DataType::Utf8 => { @@ -339,12 +345,12 @@ fn compute_partition_keys_by_row<'a>( downcast_dictionary_array!( col_array => { let array = col_array.downcast_dict::() - .ok_or(DataFusionError::Execution(format!("it is not yet supported to write to hive partitions with datatype {}", - dtype)))?; + .ok_or(exec_datafusion_err!("it is not yet supported to write to hive partitions with datatype {}", + dtype))?; for val in array.values() { partition_values.push( - val.ok_or(DataFusionError::Execution(format!("Cannot partition by null value for column {}", col)))? + val.ok_or(exec_datafusion_err!("Cannot partition by null value for column {}", col))? ); } }, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 463d0cde8282..dabf0a91b2d3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -568,6 +568,7 @@ impl DefaultPhysicalPlanner { output_url, file_format, copy_options, + partition_by, }) => { let input_exec = self.create_initial_plan(input, session_state).await?; let parsed_url = ListingTableUrl::parse(output_url)?; @@ -585,13 +586,20 @@ impl DefaultPhysicalPlanner { CopyOptions::WriterOptions(writer_options) => *writer_options.clone() }; + // Note: the DataType passed here is ignored for the purposes of writing and inferred instead + // from the schema of the RecordBatch being written. This allows COPY statements to specify only + // the column name rather than column name + explicit data type. + let table_partition_cols = partition_by.iter() + .map(|s| (s.to_string(), arrow_schema::DataType::Null)) + .collect::>(); + // Set file sink related options let config = FileSinkConfig { object_store_url, table_paths: vec![parsed_url], file_groups: vec![], output_schema: Arc::new(schema), - table_partition_cols: vec![], + table_partition_cols, overwrite: false, file_type_writer_options }; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 39df96d61f45..0662396f611b 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -263,12 +263,14 @@ impl LogicalPlanBuilder { input: LogicalPlan, output_url: String, file_format: FileType, + partition_by: Vec, copy_options: CopyOptions, ) -> Result { Ok(Self::from(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url, file_format, + partition_by, copy_options, }))) } diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 794c64998935..a55781eda643 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -36,6 +36,8 @@ pub struct CopyTo { pub output_url: String, /// The file format to output (explicitly defined or inferred from file extension) pub file_format: FileType, + /// Detmines which, if any, columns should be used for hive-style partitioned writes + pub partition_by: Vec, /// Arbitrary options as tuples pub copy_options: CopyOptions, } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ba768cf3c6d6..aa5dff25efd8 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -615,12 +615,13 @@ impl LogicalPlan { input: _, output_url, file_format, + partition_by, copy_options, }) => Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(inputs.swap_remove(0)), output_url: output_url.clone(), file_format: file_format.clone(), - + partition_by: partition_by.clone(), copy_options: copy_options.clone(), })), LogicalPlan::Values(Values { schema, .. }) => { @@ -1551,6 +1552,7 @@ impl LogicalPlan { input: _, output_url, file_format, + partition_by: _, copy_options, }) => { let op_str = match copy_options { diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 7a6dab85de34..aaaf165e3276 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -918,6 +918,7 @@ impl AsLogicalPlan for LogicalPlanNode { input: Arc::new(input), output_url: copy.output_url.clone(), file_format: FileType::from_str(©.file_type)?, + partition_by: vec![], copy_options, }, )) @@ -1641,6 +1642,7 @@ impl AsLogicalPlan for LogicalPlanNode { output_url, file_format, copy_options, + partition_by: _, }) => { let input = protobuf::LogicalPlanNode::try_from_logical_plan( input, diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 68a318b5a6d5..81f59975476f 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -324,6 +324,7 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> { input: Arc::new(input), output_url: "test.csv".to_string(), file_format: FileType::CSV, + partition_by: vec![], copy_options: CopyOptions::SQLOptions(StatementOptions::from(&options)), }); @@ -354,6 +355,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { input: Arc::new(input), output_url: "test.parquet".to_string(), file_format: FileType::PARQUET, + partition_by: vec![], copy_options: CopyOptions::WriterOptions(Box::new( FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)), )), @@ -402,6 +404,7 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> { input: Arc::new(input), output_url: "test.arrow".to_string(), file_format: FileType::ARROW, + partition_by: vec![], copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::Arrow( ArrowWriterOptions::new(), ))), @@ -447,6 +450,7 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> { input: Arc::new(input), output_url: "test.csv".to_string(), file_format: FileType::CSV, + partition_by: vec![], copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::CSV( CsvWriterOptions::new( writer_properties, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 47eca70ef3e2..bf15146a92f7 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -718,6 +718,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut statement_options = StatementOptions::new(options); let file_format = statement_options.try_infer_file_type(&statement.target)?; + let partition_by = statement_options.take_partition_by(); let copy_options = CopyOptions::SQLOptions(statement_options); @@ -725,6 +726,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { input: Arc::new(input), output_url: statement.target, file_format, + partition_by, copy_options, })) } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index dd2ce16a632e..51b46d710bd8 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -25,6 +25,90 @@ COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compressi ---- 2 +# Copy to directory as partitioned files +query IT +COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' (format parquet, compression 'zstd(10)', partition_by 'col2'); +---- +2 + +# validate multiple partitioned parquet file output +statement ok +CREATE EXTERNAL TABLE validate_partitioned_parquet STORED AS PARQUET +LOCATION 'test_files/scratch/copy/partitioned_table1/' PARTITIONED BY (col2); + +query I? +select * from validate_partitioned_parquet order by col1, col2; +---- +1 Foo +2 Bar + +# validate partition paths were actually generated +statement ok +CREATE EXTERNAL TABLE validate_partitioned_parquet_bar STORED AS PARQUET +LOCATION 'test_files/scratch/copy/partitioned_table1/col2=Bar'; + +query I +select * from validate_partitioned_parquet_bar order by col1; +---- +2 + +# Copy to directory as partitioned files +query ITT +COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/' +(format parquet, compression 'zstd(10)', partition_by 'column2, column3'); +---- +3 + +# validate multiple partitioned parquet file output +statement ok +CREATE EXTERNAL TABLE validate_partitioned_parquet2 STORED AS PARQUET +LOCATION 'test_files/scratch/copy/partitioned_table2/' PARTITIONED BY (column2, column3); + +query I?? +select * from validate_partitioned_parquet2 order by column1,column2,column3; +---- +1 a x +2 b y +3 c z + +statement ok +CREATE EXTERNAL TABLE validate_partitioned_parquet_a_x STORED AS PARQUET +LOCATION 'test_files/scratch/copy/partitioned_table2/column2=a/column3=x'; + +query I +select * from validate_partitioned_parquet_a_x order by column1; +---- +1 + +statement ok +create table test ("'test'" varchar, "'test2'" varchar, "'test3'" varchar); + +query TTT +insert into test VALUES ('a', 'x', 'aa'), ('b','y', 'bb'), ('c', 'z', 'cc') +---- +3 + +query T +select "'test'" from test +---- +a +b +c + +# Note to place a single ' inside of a literal string escape by putting two '' +query TTT +copy test to 'test_files/scratch/copy/escape_quote' (format csv, partition_by '''test2'',''test3''') +---- +3 + +statement ok +CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV +LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", "'test3'"); + +# This triggers a panic (index out of bounds) +#query +#select * from validate_partitioned_escape_quote; + query TT EXPLAIN COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compression 'zstd(10)'); ----