Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify parquet statistics generation #5183

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 14 additions & 26 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::data_type::{AsBytes, ByteArray, Int32Type};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{WriterProperties, WriterVersion};
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
Expand Down Expand Up @@ -379,6 +379,7 @@ impl DictEncoder {
pub struct ByteArrayEncoder {
fallback: FallbackEncoder,
dict_encoder: Option<DictEncoder>,
statistics_enabled: EnabledStatistics,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
bloom_filter: Option<Sbbf>,
Expand All @@ -387,24 +388,6 @@ pub struct ByteArrayEncoder {
impl ColumnValueEncoder for ByteArrayEncoder {
type T = ByteArray;
type Values = dyn Array;

fn min_max(
&self,
values: &dyn Array,
value_indices: Option<&[usize]>,
) -> Option<(Self::T, Self::T)> {
match value_indices {
Some(indices) => {
let iter = indices.iter().cloned();
downcast_op!(values.data_type(), values, compute_min_max, iter)
}
None => {
let len = Array::len(values);
downcast_op!(values.data_type(), values, compute_min_max, 0..len)
}
}
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
}
Expand All @@ -424,12 +407,15 @@ impl ColumnValueEncoder for ByteArrayEncoder {
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;

let statistics_enabled = props.statistics_enabled(descr.path());

Ok(Self {
fallback,
statistics_enabled,
bloom_filter,
dict_encoder: dictionary,
min_value: None,
max_value: None,
bloom_filter,
})
}

Expand Down Expand Up @@ -498,13 +484,15 @@ where
T: ArrayAccessor + Copy,
T::Item: Copy + Ord + AsRef<[u8]>,
{
if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
encoder.min_value = Some(min);
}
if encoder.statistics_enabled != EnabledStatistics::None {
if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
encoder.min_value = Some(min);
}

if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
encoder.max_value = Some(max);
if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
encoder.max_value = Some(max);
}
}
}

Expand Down
29 changes: 8 additions & 21 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,6 @@ pub trait ColumnValueEncoder {
/// The values encoded by this encoder
type Values: ColumnValues + ?Sized;

/// Returns the min and max values in this collection, skipping any NaN values
///
/// Returns `None` if no values found
fn min_max(
&self,
values: &Self::Values,
value_indices: Option<&[usize]>,
) -> Option<(Self::T, Self::T)>;

/// Create a new [`ColumnValueEncoder`]
fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
where
Expand Down Expand Up @@ -136,8 +127,15 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
}

impl<T: DataType> ColumnValueEncoderImpl<T> {
fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> {
match value_indices {
Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
None => get_min_max(&self.descr, values.iter()),
}
}

fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
if self.statistics_enabled == EnabledStatistics::Page
if self.statistics_enabled != EnabledStatistics::None
// INTERVAL has undefined sort order, so don't write min/max stats for it
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
Expand Down Expand Up @@ -166,17 +164,6 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {

type Values = [T::T];

fn min_max(
&self,
values: &Self::Values,
value_indices: Option<&[usize]>,
) -> Option<(Self::T, Self::T)> {
match value_indices {
Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
None => get_min_max(&self.descr, values.iter()),
}
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
}
Expand Down
60 changes: 22 additions & 38 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,28 +329,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
None => values.len(),
};

// If only computing chunk-level statistics compute them here, page-level statistics
// are computed in [`Self::write_mini_batch`] and used to update chunk statistics in
// [`Self::add_data_page`]
if self.statistics_enabled == EnabledStatistics::Chunk
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major change, rather than computing the chunk statistics here, we always compute statistics per-page, unless completely disabled, and just skip writing them to the page as per the change in #5181.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defiantly simpler than #5181 IMO. Having less places that do update_min/update_max(...) on the column's stats makes it much easier to follow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any performance implications of always computing the statistics even when they aren't written to the page?

Though it seems like if we are writing column chunk level statistics anyways we would have to calculate the min/max across all values anyways, so tracking per page shouldn't be any more expensive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially for byte arrays you will have additional allocations per page, where previously you would have them per chunk. In practice this is extremely unlikely to matter.

// INTERVAL has undefined sort order, so don't write min/max stats for it
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
match (min, max) {
(Some(min), Some(max)) => {
update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
}
(None, Some(_)) | (Some(_), None) => {
panic!("min/max should be both set or both None")
}
(None, None) => {
if let Some((min, max)) = self.encoder.min_max(values, value_indices) {
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
}
}
};
if let Some(min) = min {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This min is only passed in when precomputed statistics are used (write_batch_with_statistics). Is that correct? I couldn't find another callsite where min and max were non zero

update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
}
if let Some(max) = max {
update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
}

// We can only set the distinct count if there are no other writes
Expand Down Expand Up @@ -764,22 +747,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;

let page_statistics = if let (Some(min), Some(max)) =
(values_data.min_value, values_data.max_value)
{
// Update chunk level statistics
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);

(self.statistics_enabled == EnabledStatistics::Page).then_some(ValueStatistics::new(
Some(min),
Some(max),
None,
self.page_metrics.num_page_nulls,
false,
))
} else {
None
let page_statistics = match (values_data.min_value, values_data.max_value) {
(Some(min), Some(max)) => {
// Update chunk level statistics
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);

(self.statistics_enabled == EnabledStatistics::Page).then_some(
ValueStatistics::new(
Some(min),
Some(max),
None,
self.page_metrics.num_page_nulls,
false,
),
)
}
_ => None,
};

// update column and offset index
Expand Down
Loading