From a403a9553d3dd8b4dce59965014c547e60ef066c Mon Sep 17 00:00:00 2001 From: Thomas K Cameron Date: Sun, 10 Dec 2023 12:12:42 +0900 Subject: [PATCH 01/11] update --- datafusion/core/src/datasource/file_format/parquet.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 875c58ae447e..e485683ec470 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -162,6 +162,13 @@ impl FileFormat for ParquetFormat { .try_collect() .await?; + // sorted + let schemas: Vec<_> = { + let mut stack: Vec<_> = schemas.into_iter().zip(objects).collect(); + stack.sort_by(|(_, a), (_, b)| a.location.cmp(&b.location)); + stack.into_iter().map(|(a, _)| a).collect() + }; + let schema = if self.skip_metadata(state.config_options()) { Schema::try_merge(clear_metadata(schemas)) } else { From ba6ab9ed0273eca6f2788a2901eb25f8a2935063 Mon Sep 17 00:00:00 2001 From: Thomas K Cameron Date: Sun, 10 Dec 2023 13:19:56 +0900 Subject: [PATCH 02/11] FIXed --- .../src/datasource/file_format/parquet.rs | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index e485683ec470..a85d85b95f7c 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -26,8 +26,9 @@ use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use datafusion_common::DataFusionError; use datafusion_physical_expr::PhysicalExpr; -use futures::{StreamExt, TryStreamExt}; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; use hashbrown::HashMap; +use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::footer::{decode_footer, decode_metadata}; @@ -143,6 +144,16 @@ fn clear_metadata( }) } +async fn fetch_schema_with_location( + store: &dyn ObjectStore, + file: &ObjectMeta, + metadata_size_hint: Option, +) -> Result<(Path, Schema)> { + let loc_path = file.location.clone(); + let schema = fetch_schema(store, file, metadata_size_hint).await?; + Ok((loc_path, schema)) +} + #[async_trait] impl FileFormat for ParquetFormat { fn as_any(&self) -> &dyn Any { @@ -155,19 +166,22 @@ impl FileFormat for ParquetFormat { store: &Arc, objects: &[ObjectMeta], ) -> Result { - let schemas: Vec<_> = futures::stream::iter(objects) - .map(|object| fetch_schema(store.as_ref(), object, self.metadata_size_hint)) + let mut schemas: Vec<_> = futures::stream::iter(objects) + .map(|object| { + fetch_schema_with_location( + store.as_ref(), + object, + self.metadata_size_hint, + ) + }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 .buffered(SCHEMA_INFERENCE_CONCURRENCY) .try_collect() .await?; - // sorted - let schemas: Vec<_> = { - let mut stack: Vec<_> = schemas.into_iter().zip(objects).collect(); - stack.sort_by(|(_, a), (_, b)| a.location.cmp(&b.location)); - stack.into_iter().map(|(a, _)| a).collect() - }; + schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2)); + + let schemas = schemas.into_iter().map(|(_, schema)| schema).collect::>(); let schema = if self.skip_metadata(state.config_options()) { Schema::try_merge(clear_metadata(schemas)) From ac00cfa1012da8af74b667a2ad133a3174bde577 Mon Sep 17 00:00:00 2001 From: Thomas K Cameron Date: Sun, 10 Dec 2023 13:22:37 +0900 Subject: [PATCH 03/11] add parquet --- datafusion/core/src/datasource/file_format/parquet.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index a85d85b95f7c..0e9792625ddd 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -180,8 +180,11 @@ impl FileFormat for ParquetFormat { .await?; schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2)); - - let schemas = schemas.into_iter().map(|(_, schema)| schema).collect::>(); + + let schemas = schemas + .into_iter() + .map(|(_, schema)| schema) + .collect::>(); let schema = if self.skip_metadata(state.config_options()) { Schema::try_merge(clear_metadata(schemas)) From 03690ab19ff7ac84fe9fb3302c34a4718bc81c29 Mon Sep 17 00:00:00 2001 From: Thomas K Cameron Date: Sun, 10 Dec 2023 13:53:00 +0900 Subject: [PATCH 04/11] update --- datafusion/core/src/datasource/file_format/parquet.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0e9792625ddd..afdf9ab7a781 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -696,15 +696,15 @@ mod tests { assert_eq!(stats.num_rows, Some(3)); let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0]; let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1]; - assert_eq!(c1_stats.null_count, Some(1)); - assert_eq!(c2_stats.null_count, Some(3)); + assert_eq!(c1_stats.null_count, Some(3)); + assert_eq!(c2_stats.null_count, Some(1)); let stats = fetch_statistics(store.as_ref(), schema, &meta[1], None).await?; assert_eq!(stats.num_rows, Some(3)); let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0]; let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1]; - assert_eq!(c1_stats.null_count, Some(3)); - assert_eq!(c2_stats.null_count, Some(1)); + assert_eq!(c1_stats.null_count, Some(1)); + assert_eq!(c2_stats.null_count, Some(3)); assert_eq!(c2_stats.max_value, Some(ScalarValue::Int64(Some(2)))); assert_eq!(c2_stats.min_value, Some(ScalarValue::Int64(Some(1)))); From f24acfda845a2a5c85e3e859b4de6836a7b8505f Mon Sep 17 00:00:00 2001 From: Thomas K Cameron Date: Sun, 10 Dec 2023 14:12:04 +0900 Subject: [PATCH 05/11] update --- datafusion/core/src/datasource/file_format/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index f8ff371c3574..7b25763487ce 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -43,7 +43,7 @@ use bytes::{BufMut, BytesMut}; use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; -use futures::{StreamExt, TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; From 970d5f9ba543502fdad73279d358c1dc2850f4a4 Mon Sep 17 00:00:00 2001 From: Thomas K Cameron Date: Sun, 10 Dec 2023 14:51:53 +0900 Subject: [PATCH 06/11] update --- datafusion/core/src/datasource/file_format/parquet.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7b25763487ce..d06f44f7c496 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1148,7 +1148,7 @@ pub(crate) mod test_util { multi_page: bool, ) -> Result<(Vec, Vec)> { // Each batch writes to their own file - let files: Vec<_> = batches + let mut files: Vec<_> = batches .into_iter() .map(|batch| { let mut output = NamedTempFile::new().expect("creating temp file"); @@ -1177,7 +1177,9 @@ pub(crate) mod test_util { }) .collect(); + files.sort_by(|a, b| a.path().cmp(b.path())); let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); + Ok((meta, files)) } From 16113ee7f51ff7da41cea9da73f74b8f3d85d2eb Mon Sep 17 00:00:00 2001 From: Thomas K Cameron Date: Sun, 10 Dec 2023 15:16:23 +0900 Subject: [PATCH 07/11] try2 --- .../core/src/datasource/file_format/parquet.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index d06f44f7c496..541f03fbf7c4 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1133,7 +1133,7 @@ pub(crate) mod test_util { use arrow::record_batch::RecordBatch; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; - use tempfile::NamedTempFile; + use tempfile::{NamedTempFile, TempDir}; /// How many rows per page should be written const ROWS_PER_PAGE: usize = 2; @@ -1147,11 +1147,19 @@ pub(crate) mod test_util { batches: Vec, multi_page: bool, ) -> Result<(Vec, Vec)> { + let tmp_files = { + let mut tmp_files: Vec<_> = (0..batches.len()) + .map(|_| NamedTempFile::new().expect("creating temp file")) + .collect(); + tmp_files.sort_by(|a, b| a.path().cmp(b.path())); + tmp_files + }; + // Each batch writes to their own file - let mut files: Vec<_> = batches + let files: Vec<_> = batches .into_iter() - .map(|batch| { - let mut output = NamedTempFile::new().expect("creating temp file"); + .zip(tmp_files.into_iter()) + .map(|(batch, mut output)| { let builder = WriterProperties::builder(); let props = if multi_page { @@ -1177,7 +1185,6 @@ pub(crate) mod test_util { }) .collect(); - files.sort_by(|a, b| a.path().cmp(b.path())); let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); Ok((meta, files)) From e557343c021625c88670b5951ed2f82957823c6d Mon Sep 17 00:00:00 2001 From: Thomas K Cameron Date: Sun, 10 Dec 2023 15:18:03 +0900 Subject: [PATCH 08/11] update --- datafusion/core/src/datasource/file_format/parquet.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 541f03fbf7c4..c3554e409b1e 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1133,7 +1133,7 @@ pub(crate) mod test_util { use arrow::record_batch::RecordBatch; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; - use tempfile::{NamedTempFile, TempDir}; + use tempfile::NamedTempFile; /// How many rows per page should be written const ROWS_PER_PAGE: usize = 2; @@ -1147,6 +1147,8 @@ pub(crate) mod test_util { batches: Vec, multi_page: bool, ) -> Result<(Vec, Vec)> { + // we need the tmp files to be sorted as some tests rely on the how the returning files are ordered + // https://github.com/apache/arrow-datafusion/pull/6629 let tmp_files = { let mut tmp_files: Vec<_> = (0..batches.len()) .map(|_| NamedTempFile::new().expect("creating temp file")) @@ -1154,13 +1156,12 @@ pub(crate) mod test_util { tmp_files.sort_by(|a, b| a.path().cmp(b.path())); tmp_files }; - + // Each batch writes to their own file let files: Vec<_> = batches .into_iter() .zip(tmp_files.into_iter()) .map(|(batch, mut output)| { - let builder = WriterProperties::builder(); let props = if multi_page { builder.set_data_page_row_count_limit(ROWS_PER_PAGE) From 1edcbf28b7327839b167a284974ec6d32a016ddf Mon Sep 17 00:00:00 2001 From: Thomas K Cameron Date: Sun, 10 Dec 2023 16:02:45 +0900 Subject: [PATCH 09/11] update --- .../src/datasource/file_format/parquet.rs | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index c3554e409b1e..e54a2e53fe15 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1287,6 +1287,42 @@ mod tests { Ok(()) } + #[tokio::test] + async fn is_schema_stable() -> Result<()> { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let batch1 = + RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", c1.clone())]) + .unwrap(); + let batch2 = + RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", c2.clone())]) + .unwrap(); + + let store = Arc::new(LocalFileSystem::new()) as _; + let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; + + let session = SessionContext::new(); + let ctx = session.state(); + let format = ParquetFormat::default(); + let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); + + let order: Vec<_> = ["a", "b", "c", "d"] + .into_iter() + .map(|i| i.to_string()) + .collect(); + let coll: Vec<_> = schema + .all_fields() + .into_iter() + .map(|i| i.name().to_string()) + .collect(); + assert_eq!(coll, order); + + Ok(()) + } + #[derive(Debug)] struct RequestCountingObjectStore { inner: Arc, From 1907772319d388c973486ea4d465ef8e2d59b6f5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 11 Dec 2023 07:47:30 -0500 Subject: [PATCH 10/11] Add comments --- datafusion/core/src/datasource/file_format/parquet.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index e54a2e53fe15..595324dc930b 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -199,6 +199,12 @@ impl FileFormat for ParquetFormat { .try_collect() .await?; + // Schema inference adds fields based the order they are seen + // which depends on the order the files are processed. For some + // object stores (like local file systems) the order returned from list + // is not deterministic. Thus, to ensure deterministic schema inference + // sort the files first. + // https://github.com/apache/arrow-datafusion/pull/6629 schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2)); let schemas = schemas From 495f160212b7d296e6660734290d1b1a368e92b4 Mon Sep 17 00:00:00 2001 From: Thomas K Cameron Date: Mon, 11 Dec 2023 22:40:35 +0900 Subject: [PATCH 11/11] cargo fmt --- datafusion/core/src/datasource/file_format/parquet.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 595324dc930b..9db320fb9da4 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -200,10 +200,10 @@ impl FileFormat for ParquetFormat { .await?; // Schema inference adds fields based the order they are seen - // which depends on the order the files are processed. For some + // which depends on the order the files are processed. For some // object stores (like local file systems) the order returned from list - // is not deterministic. Thus, to ensure deterministic schema inference - // sort the files first. + // is not deterministic. Thus, to ensure deterministic schema inference + // sort the files first. // https://github.com/apache/arrow-datafusion/pull/6629 schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));