Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract parquet statistics to its own module, add tests #8294

Merged
merged 21 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d187e36
Extract parquet statistics to its own module, add tests
alamb Nov 21, 2023
7512c8b
Merge remote-tracking branch 'apache/main' into alamb/extract_parquet…
alamb Nov 27, 2023
d4e660a
Update datafusion/core/src/datasource/physical_plan/parquet/statistic…
alamb Nov 27, 2023
fd2aebc
rename enum
alamb Nov 27, 2023
96a42f9
Merge branch 'alamb/extract_parquet_statistics' of github.com:alamb/a…
alamb Nov 27, 2023
a128a20
Improve API
alamb Nov 27, 2023
b4009c2
Add test for reading struct array statistics
alamb Nov 27, 2023
ef79c42
Add test for column after statistics
alamb Nov 27, 2023
9b914db
improve tests
alamb Nov 27, 2023
b95dea9
simplify
alamb Nov 27, 2023
cd3c042
clippy
alamb Nov 27, 2023
ab95453
Update datafusion/core/src/datasource/physical_plan/parquet/statistic…
alamb Nov 27, 2023
0235a9e
Update datafusion/core/src/datasource/physical_plan/parquet/statistic…
alamb Nov 27, 2023
a601fbf
Add test showing incorrect statistics
alamb Nov 27, 2023
5c55302
Merge remote-tracking branch 'upstream/main' into alamb/extract_parqu…
tustvold Nov 28, 2023
06b5201
Rework statistics
tustvold Nov 28, 2023
641142b
Merge pull request #16 from tustvold/tustvold/extract_parquet_statistics
alamb Nov 28, 2023
e5cd8cf
Fix clippy
alamb Nov 28, 2023
b1666c2
Merge remote-tracking branch 'apache/main' into alamb/extract_parquet…
alamb Nov 28, 2023
7022691
Update documentation and make it clear the statistics are not publica…
alamb Nov 28, 2023
a5e235a
Add link to upstream arrow ticket
alamb Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 1 addition & 22 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ mod metrics;
pub mod page_filter;
mod row_filter;
mod row_groups;
mod statistics;

pub use metrics::ParquetFileMetrics;

Expand Down Expand Up @@ -718,28 +719,6 @@ pub async fn plan_to_parquet(
Ok(())
}

// Copy from the arrow-rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to statistics.rs

// https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55
// Convert the byte slice to fixed length byte array with the length of 16
fn sign_extend_be(b: &[u8]) -> [u8; 16] {
assert!(b.len() <= 16, "Array too large, expected less than 16");
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] };
for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) {
*d = *s;
}
result
}

// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(sign_extend_be(b))
}

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
pub(crate) fn parquet_to_arrow_decimal_type(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use parquet::{
};
use std::sync::Arc;

use crate::datasource::physical_plan::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::metrics::ParquetFileMetrics;
Expand Down
141 changes: 24 additions & 117 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,26 @@
// specific language governing permissions and limitations
// under the License.

use arrow::{
array::ArrayRef,
datatypes::{DataType, Schema},
};
use arrow::{array::ArrayRef, datatypes::Schema};
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
bloom_filter::Sbbf,
file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
file::metadata::RowGroupMetaData,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use crate::datasource::{
listing::FileRange,
physical_plan::parquet::{from_bytes_to_i128, parquet_to_arrow_decimal_type},
};
use crate::datasource::listing::FileRange;
use crate::logical_expr::Operator;
use crate::physical_expr::expressions as phys_expr;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::PhysicalExpr;

use super::statistics::RowGroupStatisticsConverter;
use super::ParquetFileMetrics;

/// Prune row groups based on statistics
Expand Down Expand Up @@ -303,112 +298,6 @@ struct RowGroupPruningStatistics<'a> {
parquet_schema: &'a Schema,
}

/// Extract the min/max statistics from a `ParquetStatistics` object
macro_rules! get_statistic {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This macro is moved, without modification, into statistics.rs

($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
if !$column_statistics.has_min_max_set() {
return None;
}
match $column_statistics {
ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))),
ParquetStatistics::Int32(s) => {
match $target_arrow_type {
// int32 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(*s.$func() as i128),
precision,
scale,
))
}
_ => Some(ScalarValue::Int32(Some(*s.$func()))),
}
}
ParquetStatistics::Int64(s) => {
match $target_arrow_type {
// int64 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(*s.$func() as i128),
precision,
scale,
))
}
_ => Some(ScalarValue::Int64(Some(*s.$func()))),
}
}
// 96 bit ints not supported
ParquetStatistics::Int96(_) => None,
ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))),
ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))),
ParquetStatistics::ByteArray(s) => {
match $target_arrow_type {
// decimal data type
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(from_bytes_to_i128(s.$bytes_func())),
precision,
scale,
))
}
_ => {
let s = std::str::from_utf8(s.$bytes_func())
.map(|s| s.to_string())
.ok();
Some(ScalarValue::Utf8(s))
}
}
}
// type not supported yet
ParquetStatistics::FixedLenByteArray(s) => {
match $target_arrow_type {
// just support the decimal data type
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(from_bytes_to_i128(s.$bytes_func())),
precision,
scale,
))
}
_ => None,
}
}
}
}};
}

// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate
macro_rules! get_min_max_values {
($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
let (_column_index, field) =
if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
(v, f)
} else {
// Named column was not present
return None;
};

let data_type = field.data_type();
// The result may be None, because DataFusion doesn't have support for ScalarValues of the column type
let null_scalar: ScalarValue = data_type.try_into().ok()?;

$self.row_group_metadata
.columns()
.iter()
.find(|c| c.column_descr().name() == &$column.name)
.and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None})
.map(|(stats, column_descr)|
{
let target_data_type = parquet_to_arrow_decimal_type(column_descr);
get_statistic!(stats, $func, $bytes_func, target_data_type)
})
.flatten()
// column either didn't have statistics at all or didn't have min/max values
.or_else(|| Some(null_scalar.clone()))
.and_then(|s| s.to_array().ok())
}}
}

// Extract the null count value on the ParquetStatistics
macro_rules! get_null_count_values {
($self:expr, $column:expr) => {{
Expand All @@ -431,11 +320,29 @@ macro_rules! get_null_count_values {

impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, min, min_bytes)
let field = self
.parquet_schema
.fields()
.find(&column.name)
.map(|(_idx, field)| field)?;

RowGroupStatisticsConverter::new(field)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight mismatch here as parquet handles schema nesting differently from arrow

I'm not sure how Column addresses nested fields, but I would expect to see something walking SchemaDescriptor to compute this mapping, or something similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.min([self.row_group_metadata])
// ignore errors during conversion, and just use no statistics
.ok()
}

fn max_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, max, max_bytes)
let field = self
.parquet_schema
.fields()
.find(&column.name)
.map(|(_idx, field)| field)?;

RowGroupStatisticsConverter::new(field)
.max([self.row_group_metadata])
// ignore errors during conversion, and just use no statistics
.ok()
}

fn num_containers(&self) -> usize {
Expand Down
Loading