Skip to content

Commit

Permalink
Proposal: typed statistics iterators
Browse files Browse the repository at this point in the history
alamb committed May 29, 2024
1 parent 2796e01 commit a7cf60c
Showing 1 changed file with 75 additions and 5 deletions.
80 changes: 75 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
@@ -20,13 +20,13 @@
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::{new_empty_array, new_null_array, UInt64Array};
use arrow_array::{new_empty_array, new_null_array, BooleanArray, UInt64Array};
use arrow_schema::{Field, FieldRef, Schema};
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, Result, ScalarValue,
};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::file::statistics::{Statistics as ParquetStatistics, ValueStatistics};
use parquet::schema::types::SchemaDescriptor;
use std::sync::Arc;

@@ -52,6 +52,65 @@ fn sign_extend_be(b: &[u8]) -> [u8; 16] {
result
}

/// Define an adapter iterator for extracting statistics from an iterator of
/// `ParquetStatistics`
///
/// Handles checking if the statistics are present and valid with the correct type
///
/// Parameters:
/// * `$iterator_type` is the name of the iterator type (e.g. `BoolStatsIterator`)
/// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`)
/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`)
macro_rules! make_stats_iterator {
($iterator_type:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
/// Maps an iterator of `ParquetStatistics` into an iterator of
/// `ValueStatistics<$stat_value_type>`
///
/// Yielded elements:
/// * Some(stats) if valid
/// * None if the statistics are not present, not valid, or not $stat_value_type
struct $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
inner: I,
}

impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
/// create a new iterator to extract $stat_value_type statistics
fn new(inner: I) -> Self {
Self { inner }
}
}

impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
type Item = Option<&'a ValueStatistics<$stat_value_type>>;

/// return the next statistics value
fn next(&mut self) -> Option<Self::Item> {
let next_stat = self.inner.next()?;
let next_stat = next_stat.and_then(|stats| match stats {
$parquet_statistics_type(s) if stats.has_min_max_set() => Some(s),
_ => None,
});
Some(next_stat)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
};
}

make_stats_iterator!(BoolStatsIterator, ParquetStatistics::Boolean, bool);

/// Extract a single min/max statistics from a [`ParquetStatistics`] object
///
/// * `$column_statistics` is the `ParquetStatistics` object
@@ -211,9 +270,20 @@ pub(crate) fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
let scalars = iterator
.map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type))));
collect_scalars(data_type, scalars)
match data_type {
DataType::Boolean => {
// build a boolean array from the min statistics directly
let mins = BoolStatsIterator::new(iterator).map(|v| v.map(|v| *v.min()));
Ok(Arc::new(BooleanArray::from_iter(mins)))
}
_ => {
// fallback to scalars
let scalars = iterator.map(|x| {
x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type)))
});
collect_scalars(data_type, scalars)
}
}
}

/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]

0 comments on commit a7cf60c

Please sign in to comment.