Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/no-cse-copy
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jun 17, 2024
2 parents 3f4cc9e + d4228fe commit 67f6be2
Show file tree
Hide file tree
Showing 56 changed files with 1,504 additions and 866 deletions.
6 changes: 3 additions & 3 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -412,23 +412,23 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench benchmark with the partitioned parquet files
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench "extended" benchmark with a single large parquet file
run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
}

compare_benchmarks() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ struct Args {

#[clap(
long,
help = "The max number of rows to display for 'Table' format\n[default: 40] [possible values: numbers(0/10/...), inf(no limit)]",
help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]",
default_value = "40"
)]
maxrows: MaxRows,
Expand Down
315 changes: 312 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use arrow_array::{
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
use half::f16;
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
use parquet::file::page_index::index::Index;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
use paste::paste;
Expand Down Expand Up @@ -517,6 +518,74 @@ macro_rules! get_statistics {
}}}
}

macro_rules! make_data_page_stats_iterator {
($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
iter: I,
}

impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
fn new(iter: I) -> Self {
Self { iter }
}
}

impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
{
type Item = Vec<Option<$stat_value_type>>;

fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
match next {
Some((len, index)) => match index {
$index_type(native_index) => Some(
native_index
.indexes
.iter()
.map(|x| x.$func)
.collect::<Vec<_>>(),
),
// No matching `Index` found;
// thus no statistics that can be extracted.
// We return vec![None; len] to effectively
// create an arrow null-array with the length
// corresponding to the number of entries in
// `ParquetOffsetIndex` per row group per column.
_ => Some(vec![None; len]),
},
_ => None,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
};
}

make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64);
make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64);

macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
match $data_type {
Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
_ => unimplemented!()
}
}
}
}

/// Lookups up the parquet column by name
///
/// Returns the parquet column index and the corresponding arrow field
Expand Down Expand Up @@ -563,6 +632,51 @@ fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
get_statistics!(Max, data_type, iterator)
}

/// Extracts the min statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
pub(crate) fn min_page_statistics<'a, I>(
data_type: Option<&DataType>,
iterator: I,
) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
get_data_page_statistics!(Min, data_type, iterator)
}

/// Extracts the max statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
pub(crate) fn max_page_statistics<'a, I>(
data_type: Option<&DataType>,
iterator: I,
) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
get_data_page_statistics!(Max, data_type, iterator)
}

/// Extracts the null count statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
///
/// The returned Array is an [`UInt64Array`]
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
let iter = iterator.flat_map(|(len, index)| match index {
Index::NONE => vec![None; len],
Index::INT64(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
_ => unimplemented!(),
});

Ok(Arc::new(UInt64Array::from_iter(iter)))
}

/// Extracts Parquet statistics as Arrow arrays
///
/// This is used to convert Parquet statistics to Arrow arrays, with proper type
Expand Down Expand Up @@ -771,10 +885,205 @@ impl<'a> StatisticsConverter<'a> {
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
}

/// Extract the minimum values from Data Page statistics.
///
/// In Parquet files, in addition to the Column Chunk level statistics
/// (stored for each column for each row group) there are also
/// optional statistics stored for each data page, as part of
/// the [`ParquetColumnIndex`].
///
/// Since a single Column Chunk is stored as one or more pages,
/// page level statistics can prune at a finer granularity.
///
/// However since they are stored in a separate metadata
/// structure ([`Index`]) there is different code to extract them as
/// compared to arrow statistics.
///
/// # Parameters:
///
/// * `column_page_index`: The parquet column page indices, read from
/// `ParquetMetaData` column_index
///
/// * `column_offset_index`: The parquet column offset indices, read from
/// `ParquetMetaData` offset_index
///
/// * `row_group_indices`: The indices of the row groups, that are used to
/// extract the column page index and offset index on a per row group
/// per column basis.
///
/// # Return Value
///
/// The returned array contains 1 value for each `NativeIndex`
/// in the underlying `Index`es, in the same order as they appear
/// in `metadatas`.
///
/// For example, if there are two `Index`es in `metadatas`:
/// 1. the first having `3` `PageIndex` entries
/// 2. the second having `2` `PageIndex` entries
///
/// The returned array would have 5 rows.
///
/// Each value is either:
/// * the minimum value for the page
/// * a null value, if the statistics can not be extracted
///
/// Note that a null value does NOT mean the min value was actually
/// `null` it means it the requested statistic is unknown
///
/// # Errors
///
/// Reasons for not being able to extract the statistics include:
/// * the column is not present in the parquet file
/// * statistics for the pages are not present in the row group
/// * the stored statistic value can not be converted to the requested type
pub fn data_page_mins<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});

min_page_statistics(Some(data_type), iter)
}

/// Extract the maximum values from Data Page statistics.
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_maxes<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});

max_page_statistics(Some(data_type), iter)
}

/// Extract the null counts from Data Page statistics.
///
/// The returned Array is an [`UInt64Array`]
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_null_counts<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
null_counts_page_statistics(iter)
}

/// Returns an [`ArrayRef`] with row counts for each row group.
///
/// This function iterates over the given row group indexes and computes
/// the row count for each page in the specified column.
///
/// # Parameters:
///
/// * `column_offset_index`: The parquet column offset indices, read from
/// `ParquetMetaData` offset_index
///
/// * `row_group_metadatas`: The metadata slice of the row groups, read
/// from `ParquetMetaData` row_groups
///
/// * `row_group_indices`: The indices of the row groups, that are used to
/// extract the column offset index on a per row group per column basis.
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_row_counts<I>(
&self,
column_offset_index: &ParquetOffsetIndex,
row_group_metadatas: &[RowGroupMetaData],
row_group_indices: I,
) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

// `offset_index[row_group_number][column_number][page_number]` holds
// the [`PageLocation`] corresponding to page `page_number` of column
// `column_number`of row group `row_group_number`.
let mut row_count_total = Vec::new();
for rg_idx in row_group_indices {
let page_locations = &column_offset_index[*rg_idx][parquet_index];

let row_count_per_page = page_locations.windows(2).map(|loc| {
Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64)
});

let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();

// append the last page row count
let row_count_per_page = row_count_per_page
.chain(std::iter::once(Some(
*num_rows_in_row_group as u64
- page_locations.last().unwrap().first_row_index as u64,
)))
.collect::<Vec<_>>();

row_count_total.extend(row_count_per_page);
}

Ok(Arc::new(UInt64Array::from_iter(row_count_total)))
}

/// Returns a null array of data_type with one element per row group
fn make_null_array<I>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
I: IntoIterator<Item = A>,
{
// column was in the arrow schema but not in the parquet schema, so return a null array
let num_row_groups = metadatas.into_iter().count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ pub(crate) mod tests {
&[self.column()],
&[],
&[],
&[],
schema,
self.column_name(),
false,
Expand Down
Loading

0 comments on commit 67f6be2

Please sign in to comment.