From a2c9d1a8ba4445dec2a37df5a3fdd018158b91a6 Mon Sep 17 00:00:00 2001 From: Marvin Lanhenke <62298609+marvinlanhenke@users.noreply.github.com> Date: Tue, 18 Jun 2024 16:36:00 +0200 Subject: [PATCH] Minor: Return option from row_group_row_count (#10973) * refactor: return Option from row_group_row_count * fix: doctest --- datafusion-examples/examples/parquet_index.rs | 6 ++++- datafusion/core/benches/parquet_statistic.rs | 3 +-- .../physical_plan/parquet/row_groups.rs | 6 +++-- .../physical_plan/parquet/statistics.rs | 20 ++++++++++++--- .../core/tests/parquet/arrow_statistics.rs | 25 +++---------------- 5 files changed, 30 insertions(+), 30 deletions(-) diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index e3387117c91f..668eda047444 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -526,7 +526,11 @@ impl ParquetMetadataIndexBuilder { reader.schema(), reader.parquet_schema(), )?; - let row_counts = StatisticsConverter::row_group_row_counts(row_groups.iter())?; + let row_counts = converter + .row_group_row_counts(row_groups.iter())? + .ok_or_else(|| { + internal_datafusion_err!("Row group row counts are missing") + })?; let value_column_mins = converter.row_group_mins(row_groups.iter())?; let value_column_maxes = converter.row_group_maxes(row_groups.iter())?; diff --git a/datafusion/core/benches/parquet_statistic.rs b/datafusion/core/benches/parquet_statistic.rs index 5fd6b0066eb2..b58ecc13aee0 100644 --- a/datafusion/core/benches/parquet_statistic.rs +++ b/datafusion/core/benches/parquet_statistic.rs @@ -175,8 +175,7 @@ fn criterion_benchmark(c: &mut Criterion) { let _ = converter.row_group_mins(row_groups.iter()).unwrap(); let _ = converter.row_group_maxes(row_groups.iter()).unwrap(); let _ = converter.row_group_null_counts(row_groups.iter()).unwrap(); - let _ = StatisticsConverter::row_group_row_counts(row_groups.iter()) - .unwrap(); + let _ = converter.row_group_row_counts(row_groups.iter()).unwrap(); }) }, ); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index f8e4889f0b7f..e590f372253c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -384,10 +384,12 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { .map(|counts| Arc::new(counts) as ArrayRef) } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self, column: &Column) -> Option { // row counts are the same for all columns in a row group - StatisticsConverter::row_group_row_counts(self.metadata_iter()) + self.statistics_converter(column) + .and_then(|c| c.row_group_row_counts(self.metadata_iter())) .ok() + .flatten() .map(|counts| Arc::new(counts) as ArrayRef) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 14d7bc2af42d..6ad78a82b9bf 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -718,21 +718,33 @@ impl<'a> StatisticsConverter<'a> { /// /// # Example /// ```no_run + /// # use arrow::datatypes::Schema; + /// # use arrow_array::ArrayRef; /// # use parquet::file::metadata::ParquetMetaData; /// # use datafusion::datasource::physical_plan::parquet::StatisticsConverter; /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() } - /// // Given the metadata for a parquet file + /// # fn get_arrow_schema() -> Schema { unimplemented!() } + /// // Given the metadata for a parquet file and the arrow schema /// let metadata: ParquetMetaData = get_parquet_metadata(); + /// let arrow_schema: Schema = get_arrow_schema(); + /// let parquet_schema = metadata.file_metadata().schema_descr(); + /// // create a converter + /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema) + /// .unwrap(); /// // get the row counts for each row group - /// let row_counts = StatisticsConverter::row_group_row_counts(metadata + /// let row_counts = converter.row_group_row_counts(metadata /// .row_groups() /// .iter() /// ); /// ``` - pub fn row_group_row_counts(metadatas: I) -> Result + pub fn row_group_row_counts(&self, metadatas: I) -> Result> where I: IntoIterator, { + let Some(_) = self.parquet_index else { + return Ok(None); + }; + let mut builder = UInt64Array::builder(10); for metadata in metadatas.into_iter() { let row_count = metadata.num_rows(); @@ -743,7 +755,7 @@ impl<'a> StatisticsConverter<'a> { })?; builder.append_value(row_count); } - Ok(builder.finish()) + Ok(Some(builder.finish())) } /// Create a new `StatisticsConverter` to extract statistics for a column diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index cd0efc8d3525..4c68a57333e5 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -325,11 +325,9 @@ impl<'a> Test<'a> { Actual: {null_counts:?}. Expected: {expected_null_counts:?}" ); - let row_counts = StatisticsConverter::row_group_row_counts( - reader.metadata().row_groups().iter(), - ) - .unwrap(); - let row_counts = Some(row_counts); + let row_counts = converter + .row_group_row_counts(reader.metadata().row_groups().iter()) + .unwrap(); assert_eq!( row_counts, expected_row_counts, "{column_name}: Mismatch with expected row counts. \ @@ -2001,21 +1999,6 @@ async fn test_column_non_existent() { .build() .await; - Test { - reader: &reader, - // mins are [-5, -4, 0, 5] - expected_min: Arc::new(Int64Array::from(vec![None, None, None, None])), - // maxes are [-1, 0, 4, 9] - expected_max: Arc::new(Int64Array::from(vec![None, None, None, None])), - // nulls are [0, 0, 0, 0] - expected_null_counts: UInt64Array::from(vec![None, None, None, None]), - // row counts are [5, 5, 5, 5] - expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), - column_name: "i_do_not_exist", - check: Check::RowGroup, - } - .run_with_schema(&schema); - Test { reader: &reader, // mins are [-5, -4, 0, 5] @@ -2027,7 +2010,7 @@ async fn test_column_non_existent() { // row counts are [5, 5, 5, 5] expected_row_counts: None, column_name: "i_do_not_exist", - check: Check::DataPage, + check: Check::Both, } .run_with_schema(&schema); }