From 48a6133be1142d55a73a3f646b7c42f3303ff1fb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 7 Dec 2023 14:31:23 +0000 Subject: [PATCH] Simplify parquet statistics generation --- parquet/src/arrow/arrow_writer/byte_array.rs | 40 +++++-------- parquet/src/column/writer/encoder.rs | 29 +++------- parquet/src/column/writer/mod.rs | 60 +++++++------------- 3 files changed, 44 insertions(+), 85 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 28c7c3b00540..61933b24178e 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -22,7 +22,7 @@ use crate::data_type::{AsBytes, ByteArray, Int32Type}; use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder}; use crate::encodings::rle::RleEncoder; use crate::errors::{ParquetError, Result}; -use crate::file::properties::{WriterProperties, WriterVersion}; +use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; @@ -379,6 +379,7 @@ impl DictEncoder { pub struct ByteArrayEncoder { fallback: FallbackEncoder, dict_encoder: Option, + statistics_enabled: EnabledStatistics, min_value: Option, max_value: Option, bloom_filter: Option, @@ -387,24 +388,6 @@ pub struct ByteArrayEncoder { impl ColumnValueEncoder for ByteArrayEncoder { type T = ByteArray; type Values = dyn Array; - - fn min_max( - &self, - values: &dyn Array, - value_indices: Option<&[usize]>, - ) -> Option<(Self::T, Self::T)> { - match value_indices { - Some(indices) => { - let iter = indices.iter().cloned(); - downcast_op!(values.data_type(), values, compute_min_max, iter) - } - None => { - let len = Array::len(values); - downcast_op!(values.data_type(), values, compute_min_max, 0..len) - } - } - } - fn flush_bloom_filter(&mut self) -> Option { self.bloom_filter.take() } @@ -424,12 +407,15 @@ impl ColumnValueEncoder for ByteArrayEncoder { .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp)) .transpose()?; + let statistics_enabled = props.statistics_enabled(descr.path()); + Ok(Self { fallback, + statistics_enabled, + bloom_filter, dict_encoder: dictionary, min_value: None, max_value: None, - bloom_filter, }) } @@ -498,13 +484,15 @@ where T: ArrayAccessor + Copy, T::Item: Copy + Ord + AsRef<[u8]>, { - if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) { - if encoder.min_value.as_ref().map_or(true, |m| m > &min) { - encoder.min_value = Some(min); - } + if encoder.statistics_enabled != EnabledStatistics::None { + if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) { + if encoder.min_value.as_ref().map_or(true, |m| m > &min) { + encoder.min_value = Some(min); + } - if encoder.max_value.as_ref().map_or(true, |m| m < &max) { - encoder.max_value = Some(max); + if encoder.max_value.as_ref().map_or(true, |m| m < &max) { + encoder.max_value = Some(max); + } } } diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 0d5144f61c26..8624f859f4b0 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -76,15 +76,6 @@ pub trait ColumnValueEncoder { /// The values encoded by this encoder type Values: ColumnValues + ?Sized; - /// Returns the min and max values in this collection, skipping any NaN values - /// - /// Returns `None` if no values found - fn min_max( - &self, - values: &Self::Values, - value_indices: Option<&[usize]>, - ) -> Option<(Self::T, Self::T)>; - /// Create a new [`ColumnValueEncoder`] fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result where @@ -136,8 +127,15 @@ pub struct ColumnValueEncoderImpl { } impl ColumnValueEncoderImpl { + fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> { + match value_indices { + Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])), + None => get_min_max(&self.descr, values.iter()), + } + } + fn write_slice(&mut self, slice: &[T::T]) -> Result<()> { - if self.statistics_enabled == EnabledStatistics::Page + if self.statistics_enabled != EnabledStatistics::None // INTERVAL has undefined sort order, so don't write min/max stats for it && self.descr.converted_type() != ConvertedType::INTERVAL { @@ -166,17 +164,6 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { type Values = [T::T]; - fn min_max( - &self, - values: &Self::Values, - value_indices: Option<&[usize]>, - ) -> Option<(Self::T, Self::T)> { - match value_indices { - Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])), - None => get_min_max(&self.descr, values.iter()), - } - } - fn flush_bloom_filter(&mut self) -> Option { self.bloom_filter.take() } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 531af4bd461e..9f476595fb7e 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -329,28 +329,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { None => values.len(), }; - // If only computing chunk-level statistics compute them here, page-level statistics - // are computed in [`Self::write_mini_batch`] and used to update chunk statistics in - // [`Self::add_data_page`] - if self.statistics_enabled == EnabledStatistics::Chunk - // INTERVAL has undefined sort order, so don't write min/max stats for it - && self.descr.converted_type() != ConvertedType::INTERVAL - { - match (min, max) { - (Some(min), Some(max)) => { - update_min(&self.descr, min, &mut self.column_metrics.min_column_value); - update_max(&self.descr, max, &mut self.column_metrics.max_column_value); - } - (None, Some(_)) | (Some(_), None) => { - panic!("min/max should be both set or both None") - } - (None, None) => { - if let Some((min, max)) = self.encoder.min_max(values, value_indices) { - update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); - update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); - } - } - }; + if let Some(min) = min { + update_min(&self.descr, min, &mut self.column_metrics.min_column_value); + } + if let Some(max) = max { + update_max(&self.descr, max, &mut self.column_metrics.max_column_value); } // We can only set the distinct count if there are no other writes @@ -764,22 +747,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls; - let page_statistics = if let (Some(min), Some(max)) = - (values_data.min_value, values_data.max_value) - { - // Update chunk level statistics - update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); - update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); - - (self.statistics_enabled == EnabledStatistics::Page).then_some(ValueStatistics::new( - Some(min), - Some(max), - None, - self.page_metrics.num_page_nulls, - false, - )) - } else { - None + let page_statistics = match (values_data.min_value, values_data.max_value) { + (Some(min), Some(max)) => { + // Update chunk level statistics + update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); + update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); + + (self.statistics_enabled == EnabledStatistics::Page).then_some( + ValueStatistics::new( + Some(min), + Some(max), + None, + self.page_metrics.num_page_nulls, + false, + ), + ) + } + _ => None, }; // update column and offset index