From fab7e238e8559bdb2e4e0449354f7efeccbc07a7 Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Thu, 25 Jul 2024 21:16:51 +0100 Subject: [PATCH] Add `CsvExecBuilder` for creating `CsvExec` (#11633) * feat: add `CsvExecBuilder`, deprecate `CsvExec::new` This adds the `CsvExecBuilder` struct for building a `CsvExec` instance, and deprecates the `CsvExec::new` method which has grown too large. There are some `TODO`s related to the duplication of formatting options and their defaults coming from multiple places. Uses of the deprecated `new` method have not been updated yet. * chore: replace usage of deprecated `CsvExec::new` with `CsvExec::builder` * Add test that CSVExec options are the same * fmt --------- Co-authored-by: Andrew Lamb --- .../core/src/datasource/file_format/csv.rs | 35 +- .../core/src/datasource/physical_plan/csv.rs | 324 ++++++++++++++---- .../enforce_distribution.rs | 89 ++--- .../physical_optimizer/projection_pushdown.rs | 83 +++-- .../replace_with_order_preserving_variants.rs | 33 +- datafusion/core/src/test/mod.rs | 73 ++-- datafusion/proto/src/physical_plan/mod.rs | 55 +-- 7 files changed, 454 insertions(+), 238 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index e1b6daac092d..c55f678aef0f 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -344,22 +344,25 @@ impl FileFormat for CsvFormat { conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { - let exec = CsvExec::new( - conf, - // If format options does not specify whether there is a header, - // we consult configuration options. - self.options - .has_header - .unwrap_or(state.config_options().catalog.has_header), - self.options.delimiter, - self.options.quote, - self.options.escape, - self.options.comment, - self.options - .newlines_in_values - .unwrap_or(state.config_options().catalog.newlines_in_values), - self.options.compression.into(), - ); + // Consult configuration options for default values + let has_header = self + .options + .has_header + .unwrap_or(state.config_options().catalog.has_header); + let newlines_in_values = self + .options + .newlines_in_values + .unwrap_or(state.config_options().catalog.newlines_in_values); + + let exec = CsvExec::builder(conf) + .with_has_header(has_header) + .with_delimeter(self.options.delimiter) + .with_quote(self.options.quote) + .with_escape(self.options.escape) + .with_comment(self.options.comment) + .with_newlines_in_values(newlines_in_values) + .with_file_compression_type(self.options.compression.into()) + .build(); Ok(Arc::new(exec)) } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index fb0e23c6c164..be437cfb9444 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -49,7 +49,27 @@ use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; -/// Execution plan for scanning a CSV file +/// Execution plan for scanning a CSV file. +/// +/// # Example: create a `CsvExec` +/// ``` +/// # use std::sync::Arc; +/// # use arrow::datatypes::Schema; +/// # use datafusion::datasource::{ +/// # physical_plan::{CsvExec, FileScanConfig}, +/// # listing::PartitionedFile, +/// # }; +/// # use datafusion_execution::object_store::ObjectStoreUrl; +/// # let object_store_url = ObjectStoreUrl::local_filesystem(); +/// # let file_schema = Arc::new(Schema::empty()); +/// // Create a CsvExec for reading the first 100MB of `file1.csv` +/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema) +/// .with_file(PartitionedFile::new("file1.csv", 100*1024*1024)); +/// let exec = CsvExec::builder(file_scan_config) +/// .with_has_header(true) // The file has a header row +/// .with_newlines_in_values(true) // The file contains newlines in values +/// .build(); +/// ``` #[derive(Debug, Clone)] pub struct CsvExec { base_config: FileScanConfig, @@ -67,27 +87,124 @@ pub struct CsvExec { cache: PlanProperties, } -impl CsvExec { - /// Create a new CSV reader execution plan provided base and specific configurations - #[allow(clippy::too_many_arguments)] - pub fn new( - base_config: FileScanConfig, - has_header: bool, - delimiter: u8, - quote: u8, - escape: Option, - comment: Option, - newlines_in_values: bool, +/// Builder for [`CsvExec`]. +/// +/// See example on [`CsvExec`]. +#[derive(Debug, Clone)] +pub struct CsvExecBuilder { + file_scan_config: FileScanConfig, + file_compression_type: FileCompressionType, + // TODO: it seems like these format options could be reused across all the various CSV config + has_header: bool, + delimiter: u8, + quote: u8, + escape: Option, + comment: Option, + newlines_in_values: bool, +} + +impl CsvExecBuilder { + /// Create a new builder to read the provided file scan configuration. + pub fn new(file_scan_config: FileScanConfig) -> Self { + Self { + file_scan_config, + // TODO: these defaults are duplicated from `CsvOptions` - should they be computed? + has_header: false, + delimiter: b',', + quote: b'"', + escape: None, + comment: None, + newlines_in_values: false, + file_compression_type: FileCompressionType::UNCOMPRESSED, + } + } + + /// Set whether the first row defines the column names. + /// + /// The default value is `false`. + pub fn with_has_header(mut self, has_header: bool) -> Self { + self.has_header = has_header; + self + } + + /// Set the column delimeter. + /// + /// The default is `,`. + pub fn with_delimeter(mut self, delimiter: u8) -> Self { + self.delimiter = delimiter; + self + } + + /// Set the quote character. + /// + /// The default is `"`. + pub fn with_quote(mut self, quote: u8) -> Self { + self.quote = quote; + self + } + + /// Set the escape character. + /// + /// The default is `None` (i.e. quotes cannot be escaped). + pub fn with_escape(mut self, escape: Option) -> Self { + self.escape = escape; + self + } + + /// Set the comment character. + /// + /// The default is `None` (i.e. comments are not supported). + pub fn with_comment(mut self, comment: Option) -> Self { + self.comment = comment; + self + } + + /// Set whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default value is `false`. + pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self { + self.newlines_in_values = newlines_in_values; + self + } + + /// Set the file compression type. + /// + /// The default is [`FileCompressionType::UNCOMPRESSED`]. + pub fn with_file_compression_type( + mut self, file_compression_type: FileCompressionType, ) -> Self { + self.file_compression_type = file_compression_type; + self + } + + /// Build a [`CsvExec`]. + #[must_use] + pub fn build(self) -> CsvExec { + let Self { + file_scan_config: base_config, + file_compression_type, + has_header, + delimiter, + quote, + escape, + comment, + newlines_in_values, + } = self; + let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); - let cache = Self::compute_properties( + let cache = CsvExec::compute_properties( projected_schema, &projected_output_ordering, &base_config, ); - Self { + + CsvExec { base_config, projected_statistics, has_header, @@ -101,6 +218,39 @@ impl CsvExec { comment, } } +} + +impl CsvExec { + /// Create a new CSV reader execution plan provided base and specific configurations + #[deprecated(since = "41.0.0", note = "use `CsvExec::builder` or `CsvExecBuilder`")] + #[allow(clippy::too_many_arguments)] + pub fn new( + base_config: FileScanConfig, + has_header: bool, + delimiter: u8, + quote: u8, + escape: Option, + comment: Option, + newlines_in_values: bool, + file_compression_type: FileCompressionType, + ) -> Self { + CsvExecBuilder::new(base_config) + .with_has_header(has_header) + .with_delimeter(delimiter) + .with_quote(quote) + .with_escape(escape) + .with_comment(comment) + .with_newlines_in_values(newlines_in_values) + .with_file_compression_type(file_compression_type) + .build() + } + + /// Return a [`CsvExecBuilder`]. + /// + /// See example on [`CsvExec`] and [`CsvExecBuilder`] for specifying CSV table options. + pub fn builder(file_scan_config: FileScanConfig) -> CsvExecBuilder { + CsvExecBuilder::new(file_scan_config) + } /// Ref to the base configs pub fn base_config(&self) -> &FileScanConfig { @@ -557,6 +707,8 @@ mod tests { use arrow::datatypes::*; use datafusion_common::test_util::arrow_test_data; + use datafusion_common::config::CsvOptions; + use datafusion_execution::object_store::ObjectStoreUrl; use object_store::chunked::ChunkedStore; use object_store::local::LocalFileSystem; use rstest::*; @@ -597,16 +749,15 @@ mod tests { let mut config = partitioned_csv_config(file_schema, file_groups); config.projection = Some(vec![0, 2, 4]); - let csv = CsvExec::new( - config, - true, - b',', - b'"', - None, - None, - false, - file_compression_type.to_owned(), - ); + let csv = CsvExec::builder(config) + .with_has_header(true) + .with_delimeter(b',') + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type) + .build(); assert_eq!(13, csv.base_config.file_schema.fields().len()); assert_eq!(3, csv.schema().fields().len()); @@ -667,16 +818,15 @@ mod tests { let mut config = partitioned_csv_config(file_schema, file_groups); config.projection = Some(vec![4, 0, 2]); - let csv = CsvExec::new( - config, - true, - b',', - b'"', - None, - None, - false, - file_compression_type.to_owned(), - ); + let csv = CsvExec::builder(config) + .with_has_header(true) + .with_delimeter(b',') + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type.to_owned()) + .build(); assert_eq!(13, csv.base_config.file_schema.fields().len()); assert_eq!(3, csv.schema().fields().len()); @@ -737,16 +887,15 @@ mod tests { let mut config = partitioned_csv_config(file_schema, file_groups); config.limit = Some(5); - let csv = CsvExec::new( - config, - true, - b',', - b'"', - None, - None, - false, - file_compression_type.to_owned(), - ); + let csv = CsvExec::builder(config) + .with_has_header(true) + .with_delimeter(b',') + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type.to_owned()) + .build(); assert_eq!(13, csv.base_config.file_schema.fields().len()); assert_eq!(13, csv.schema().fields().len()); @@ -804,16 +953,15 @@ mod tests { let mut config = partitioned_csv_config(file_schema, file_groups); config.limit = Some(5); - let csv = CsvExec::new( - config, - true, - b',', - b'"', - None, - None, - false, - file_compression_type.to_owned(), - ); + let csv = CsvExec::builder(config) + .with_has_header(true) + .with_delimeter(b',') + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type.to_owned()) + .build(); assert_eq!(14, csv.base_config.file_schema.fields().len()); assert_eq!(14, csv.schema().fields().len()); @@ -870,16 +1018,15 @@ mod tests { // we don't have `/date=xx/` in the path but that is ok because // partitions are resolved during scan anyway - let csv = CsvExec::new( - config, - true, - b',', - b'"', - None, - None, - false, - file_compression_type.to_owned(), - ); + let csv = CsvExec::builder(config) + .with_has_header(true) + .with_delimeter(b',') + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type.to_owned()) + .build(); assert_eq!(13, csv.base_config.file_schema.fields().len()); assert_eq!(2, csv.schema().fields().len()); @@ -966,16 +1113,15 @@ mod tests { .unwrap(); let config = partitioned_csv_config(file_schema, file_groups); - let csv = CsvExec::new( - config, - true, - b',', - b'"', - None, - None, - false, - file_compression_type.to_owned(), - ); + let csv = CsvExec::builder(config) + .with_has_header(true) + .with_delimeter(b',') + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type.to_owned()) + .build(); let it = csv.execute(0, task_ctx).unwrap(); let batches: Vec<_> = it.try_collect().await.unwrap(); @@ -1183,4 +1329,34 @@ mod tests { Arc::new(schema) } + + /// Ensure that the default options are set correctly + #[test] + fn test_default_options() { + let file_scan_config = + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), aggr_test_schema()) + .with_file(PartitionedFile::new("foo", 34)); + + let CsvExecBuilder { + file_scan_config: _, + file_compression_type: _, + has_header, + delimiter, + quote, + escape, + comment, + newlines_in_values, + } = CsvExecBuilder::new(file_scan_config); + + let default_options = CsvOptions::default(); + assert_eq!(has_header, default_options.has_header.unwrap_or(false)); + assert_eq!(delimiter, default_options.delimiter); + assert_eq!(quote, default_options.quote); + assert_eq!(escape, default_options.escape); + assert_eq!(comment, default_options.comment); + assert_eq!( + newlines_in_values, + default_options.newlines_in_values.unwrap_or(false) + ); + } } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index acca2ed8d997..1f076e448e60 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1464,18 +1464,21 @@ pub(crate) mod tests { } fn csv_exec_with_sort(output_ordering: Vec>) -> Arc { - Arc::new(CsvExec::new( - FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) - .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_output_ordering(output_ordering), - false, - b',', - b'"', - None, - None, - false, - FileCompressionType::UNCOMPRESSED, - )) + Arc::new( + CsvExec::builder( + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(output_ordering), + ) + .with_has_header(false) + .with_delimeter(b',') + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(), + ) } fn csv_exec_multiple() -> Arc { @@ -1486,21 +1489,24 @@ pub(crate) mod tests { fn csv_exec_multiple_sorted( output_ordering: Vec>, ) -> Arc { - Arc::new(CsvExec::new( - FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) - .with_file_groups(vec![ - vec![PartitionedFile::new("x".to_string(), 100)], - vec![PartitionedFile::new("y".to_string(), 100)], - ]) - .with_output_ordering(output_ordering), - false, - b',', - b'"', - None, - None, - false, - FileCompressionType::UNCOMPRESSED, - )) + Arc::new( + CsvExec::builder( + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file_groups(vec![ + vec![PartitionedFile::new("x".to_string(), 100)], + vec![PartitionedFile::new("y".to_string(), 100)], + ]) + .with_output_ordering(output_ordering), + ) + .with_has_header(false) + .with_delimeter(b',') + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(), + ) } fn projection_exec_with_alias( @@ -3762,20 +3768,23 @@ pub(crate) mod tests { }; let plan = aggregate_exec_with_alias( - Arc::new(CsvExec::new( - FileScanConfig::new( - ObjectStoreUrl::parse("test:///").unwrap(), - schema(), + Arc::new( + CsvExec::builder( + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)), ) - .with_file(PartitionedFile::new("x".to_string(), 100)), - false, - b',', - b'"', - None, - None, - false, - compression_type, - )), + .with_has_header(false) + .with_delimeter(b',') + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(compression_type) + .build(), + ), vec![("a".to_string(), "a".to_string())], ); assert_optimized!(expected, plan, true, false, 2, true, 10, false); diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index d0d0c985b8b6..9c545c17da3c 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -179,16 +179,17 @@ fn try_swapping_with_csv( ); file_scan.projection = Some(new_projections); - Arc::new(CsvExec::new( - file_scan, - csv.has_header(), - csv.delimiter(), - csv.quote(), - csv.escape(), - csv.comment(), - csv.newlines_in_values(), - csv.file_compression_type, - )) as _ + Arc::new( + CsvExec::builder(file_scan) + .with_has_header(csv.has_header()) + .with_delimeter(csv.delimiter()) + .with_quote(csv.quote()) + .with_escape(csv.escape()) + .with_comment(csv.comment()) + .with_newlines_in_values(csv.newlines_in_values()) + .with_file_compression_type(csv.file_compression_type) + .build(), + ) as _ }) } @@ -1689,21 +1690,24 @@ mod tests { Field::new("d", DataType::Int32, true), Field::new("e", DataType::Int32, true), ])); - Arc::new(CsvExec::new( - FileScanConfig::new( - ObjectStoreUrl::parse("test:///").unwrap(), - schema.clone(), + Arc::new( + CsvExec::builder( + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_projection(Some(vec![0, 1, 2, 3, 4])), ) - .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_projection(Some(vec![0, 1, 2, 3, 4])), - false, - 0, - 0, - None, - None, - false, - FileCompressionType::UNCOMPRESSED, - )) + .with_has_header(false) + .with_delimeter(0) + .with_quote(0) + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(), + ) } fn create_projecting_csv_exec() -> Arc { @@ -1713,21 +1717,24 @@ mod tests { Field::new("c", DataType::Int32, true), Field::new("d", DataType::Int32, true), ])); - Arc::new(CsvExec::new( - FileScanConfig::new( - ObjectStoreUrl::parse("test:///").unwrap(), - schema.clone(), + Arc::new( + CsvExec::builder( + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_projection(Some(vec![3, 2, 1])), ) - .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_projection(Some(vec![3, 2, 1])), - false, - 0, - 0, - None, - None, - false, - FileCompressionType::UNCOMPRESSED, - )) + .with_has_header(false) + .with_delimeter(0) + .with_quote(0) + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(), + ) } fn create_projecting_memory_exec() -> Arc { diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 6565e3e7d0d2..a989be987d3d 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -1490,21 +1490,24 @@ mod tests { let sort_exprs = sort_exprs.into_iter().collect(); let projection: Vec = vec![0, 2, 3]; - Arc::new(CsvExec::new( - FileScanConfig::new( - ObjectStoreUrl::parse("test:///").unwrap(), - schema.clone(), + Arc::new( + CsvExec::builder( + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("file_path".to_string(), 100)) + .with_projection(Some(projection)) + .with_output_ordering(vec![sort_exprs]), ) - .with_file(PartitionedFile::new("file_path".to_string(), 100)) - .with_projection(Some(projection)) - .with_output_ordering(vec![sort_exprs]), - true, - 0, - b'"', - None, - None, - false, - FileCompressionType::UNCOMPRESSED, - )) + .with_has_header(true) + .with_delimeter(0) + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(), + ) } } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 5cb1b6ea7017..39a126a06bb6 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -92,16 +92,17 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result>`] for scanning `partitions` of `filename` @@ -275,18 +276,24 @@ pub fn csv_exec_sorted( ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(CsvExec::new( - FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + Arc::new( + CsvExec::builder( + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(vec![sort_exprs]), - false, - 0, - 0, - None, - None, - false, - FileCompressionType::UNCOMPRESSED, - )) + ) + .with_has_header(false) + .with_delimeter(0) + .with_quote(0) + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(), + ) } // construct a stream partition for test purposes @@ -332,18 +339,24 @@ pub fn csv_exec_ordered( ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(CsvExec::new( - FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + Arc::new( + CsvExec::builder( + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) .with_file(PartitionedFile::new("file_path".to_string(), 100)) .with_output_ordering(vec![sort_exprs]), - true, - 0, - b'"', - None, - None, - false, - FileCompressionType::UNCOMPRESSED, - )) + ) + .with_has_header(true) + .with_delimeter(0) + .with_quote(b'"') + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(), + ) } /// A mock execution plan that simply returns the provided statistics diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 5c4d41f0eca6..1f433ff01d12 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -187,34 +187,39 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { )), } } - PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new( - parse_protobuf_file_scan_config( + PhysicalPlanType::CsvScan(scan) => Ok(Arc::new( + CsvExec::builder(parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), registry, extension_codec, - )?, - scan.has_header, - str_to_byte(&scan.delimiter, "delimiter")?, - str_to_byte(&scan.quote, "quote")?, - if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape( - escape, - )) = &scan.optional_escape - { - Some(str_to_byte(escape, "escape")?) - } else { - None - }, - if let Some(protobuf::csv_scan_exec_node::OptionalComment::Comment( - comment, - )) = &scan.optional_comment - { - Some(str_to_byte(comment, "comment")?) - } else { - None - }, - scan.newlines_in_values, - FileCompressionType::UNCOMPRESSED, - ))), + )?) + .with_has_header(scan.has_header) + .with_delimeter(str_to_byte(&scan.delimiter, "delimiter")?) + .with_quote(str_to_byte(&scan.quote, "quote")?) + .with_escape( + if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape( + escape, + )) = &scan.optional_escape + { + Some(str_to_byte(escape, "escape")?) + } else { + None + }, + ) + .with_comment( + if let Some(protobuf::csv_scan_exec_node::OptionalComment::Comment( + comment, + )) = &scan.optional_comment + { + Some(str_to_byte(comment, "comment")?) + } else { + None + }, + ) + .with_newlines_in_values(scan.newlines_in_values) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(), + )), #[cfg(feature = "parquet")] PhysicalPlanType::ParquetScan(scan) => { let base_config = parse_protobuf_file_scan_config(