Skip to content

Commit

Permalink
Simplify predicate pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jun 8, 2022
1 parent 1f3a62d commit 6f6433c
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 127 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ lto = true

# TODO: Temporary
[patch.crates-io]
arrow = { git = "https://github.com/tustvold/arrow-rs.git", rev = "1e123d428345561382e46f7ce1fe6f3cf88ba250" }
arrow-flight = { git = "https://github.com/tustvold/arrow-rs.git", rev = "1e123d428345561382e46f7ce1fe6f3cf88ba250" }
parquet = { git = "https://github.com/tustvold/arrow-rs.git", rev = "1e123d428345561382e46f7ce1fe6f3cf88ba250" }
arrow = { git = "https://github.com/tustvold/arrow-rs.git", rev = "dfedade6f17e0bfd0a4958a6a0bbbb72cc8cfaa4" }
arrow-flight = { git = "https://github.com/tustvold/arrow-rs.git", rev = "dfedade6f17e0bfd0a4958a6a0bbbb72cc8cfaa4" }
parquet = { git = "https://github.com/tustvold/arrow-rs.git", rev = "dfedade6f17e0bfd0a4958a6a0bbbb72cc8cfaa4" }
217 changes: 93 additions & 124 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use log::debug;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::async_reader::AsyncChunkReader;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::errors::ParquetError;
use parquet::file::{
Expand Down Expand Up @@ -90,7 +90,6 @@ struct ParquetFileMetrics {

impl ParquetExec {
/// Create a new Parquet reader execution plan provided file list and schema.
/// Even if `limit` is set, ParquetExec rounds up the number of records to the next `batch_size`.
pub fn new(base_config: FileScanConfig, predicate: Option<Expr>) -> Self {
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
base_config.file_groups, base_config.projection, predicate, base_config.limit);
Expand Down Expand Up @@ -260,6 +259,7 @@ impl ExecutionPlan for ParquetExec {
}
}

/// Implements [`FormatReader`] for a parquet file
struct ParquetOpener {
partition_index: usize,
projection: Arc<[usize]>,
Expand All @@ -276,26 +276,24 @@ impl FormatReader for ParquetOpener {
meta: ObjectMeta,
range: Option<FileRange>,
) -> ReaderFuture {
// TODO: Use ParquetRecordBatchStream (arrow-rs#1803) (arrow-rs#1804)
let file_metrics = ParquetFileMetrics::new(
let metrics = ParquetFileMetrics::new(
self.partition_index,
meta.location.as_ref(),
&self.metrics,
);

file_metrics.bytes_scanned.add(meta.size);
let reader = ParquetFileReader {
store,
meta,
metrics: metrics.clone(),
};

let schema_adapter = SchemaAdapter::new(self.table_schema.clone());
let batch_size = self.batch_size;
let projection = self.projection.clone();
let mut pruning_predicate = self
.pruning_predicate
.clone()
.map(|predicate| build_row_group_predicate(predicate, file_metrics));
let pruning_predicate = self.pruning_predicate.clone();

Box::pin(async move {
let reader = ParquetFileReader { store, meta };

let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let adapted_projections =
schema_adapter.map_projections(builder.schema(), &projection)?;
Expand All @@ -305,28 +303,8 @@ impl FormatReader for ParquetOpener {
adapted_projections.iter().cloned(),
);

let row_groups = builder
.metadata()
.row_groups()
.iter()
.enumerate()
.filter_map(move |(idx, metadata)| {
let keep_prune = pruning_predicate
.as_mut()
.map(|p| p(metadata, idx))
.unwrap_or(true);

let keep_range = range
.as_ref()
.map(|x| {
let offset = metadata.column(0).file_offset();
offset >= x.start && offset < x.end
})
.unwrap_or(true);

(keep_prune && keep_range).then(|| idx)
})
.collect();
let groups = builder.metadata().row_groups();
let row_groups = prune_row_groups(groups, range, pruning_predicate, &metrics);

let stream = builder
.with_projection(mask)
Expand All @@ -349,16 +327,20 @@ impl FormatReader for ParquetOpener {
}
}

/// Implements [`AsyncFileReader`] for a parquet file in object storage
struct ParquetFileReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
metrics: ParquetFileMetrics,
}

impl AsyncChunkReader for ParquetFileReader {
impl AsyncFileReader for ParquetFileReader {
fn get_bytes(
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.metrics.bytes_scanned.add(range.end - range.start);

self.store
.get_range(&self.meta.location, range)
.map_err(|e| {
Expand Down Expand Up @@ -483,35 +465,47 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
}
}

fn build_row_group_predicate(
pruning_predicate: PruningPredicate,
metrics: ParquetFileMetrics,
) -> Box<dyn FnMut(&RowGroupMetaData, usize) -> bool + Send> {
Box::new(
move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool {
let parquet_schema = pruning_predicate.schema().as_ref();
fn prune_row_groups(
groups: &[RowGroupMetaData],
range: Option<FileRange>,
predicate: Option<PruningPredicate>,
metrics: &ParquetFileMetrics,
) -> Vec<usize> {
// TODO: Columnar pruning
let mut filtered = Vec::with_capacity(groups.len());
for (idx, metadata) in groups.iter().enumerate() {
if let Some(range) = &range {
let offset = metadata.column(0).file_offset();
if offset < range.start || offset >= range.end {
continue;
}
}

if let Some(predicate) = &predicate {
let pruning_stats = RowGroupPruningStatistics {
row_group_metadata,
parquet_schema,
row_group_metadata: metadata,
parquet_schema: predicate.schema().as_ref(),
};
let predicate_values = pruning_predicate.prune(&pruning_stats);
match predicate_values {
match predicate.prune(&pruning_stats) {
Ok(values) => {
// NB: false means don't scan row group
let num_pruned = values.iter().filter(|&v| !*v).count();
metrics.row_groups_pruned.add(num_pruned);
values[0]
if !values[0] {
metrics.row_groups_pruned.add(1);
continue;
}
}
// stats filter array could not be built
// return a closure which will not filter out any row groups
Err(e) => {
debug!("Error evaluating row group predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
true
}
}
},
)
}

filtered.push(idx)
}
filtered
}

/// Executes a query and writes the results to a partitioned Parquet file.
Expand Down Expand Up @@ -1182,12 +1176,13 @@ mod tests {
}

#[test]
fn row_group_pruning_predicate_simple_expr() -> Result<()> {
fn row_group_pruning_predicate_simple_expr() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let expr = col("c1").gt(lit(15));
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let pruning_predicate = PruningPredicate::try_new(expr, Arc::new(schema))?;
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]);
let rgm1 = get_row_group_meta_data(
Expand All @@ -1198,26 +1193,22 @@ mod tests {
&schema_descr,
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let row_group_metadata = vec![rgm1, rgm2];
let mut row_group_predicate =
build_row_group_predicate(pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
.map(|(i, g)| row_group_predicate(g, i))
.collect::<Vec<_>>();
assert_eq!(row_group_filter, vec![false, true]);

Ok(())
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
vec![1]
);
}

#[test]
fn row_group_pruning_predicate_missing_stats() -> Result<()> {
fn row_group_pruning_predicate_missing_stats() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let expr = col("c1").gt(lit(15));
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let pruning_predicate = PruningPredicate::try_new(expr, Arc::new(schema))?;
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]);
let rgm1 = get_row_group_meta_data(
Expand All @@ -1228,23 +1219,17 @@ mod tests {
&schema_descr,
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let row_group_metadata = vec![rgm1, rgm2];
let mut row_group_predicate =
build_row_group_predicate(pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
.map(|(i, g)| row_group_predicate(g, i))
.collect::<Vec<_>>();
let metrics = parquet_file_metrics();
// missing statistics for first row group mean that the result from the predicate expression
// is null / undefined so the first row group can't be filtered out
assert_eq!(row_group_filter, vec![true, true]);

Ok(())
assert_eq!(
prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
vec![0, 1]
);
}

#[test]
fn row_group_pruning_predicate_partial_expr() -> Result<()> {
fn row_group_pruning_predicate_partial_expr() {
use datafusion_expr::{col, lit};
// test row group predicate with partially supported expression
// int > 1 and int % 2 => c1_max > 1 and true
Expand All @@ -1253,7 +1238,7 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]));
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?;
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

let schema_descr = get_test_schema_descr(vec![
("c1", PhysicalType::INT32),
Expand All @@ -1273,32 +1258,27 @@ mod tests {
ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
],
);
let row_group_metadata = vec![rgm1, rgm2];
let mut row_group_predicate =
build_row_group_predicate(pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
.map(|(i, g)| row_group_predicate(g, i))
.collect::<Vec<_>>();

let metrics = parquet_file_metrics();
let groups = &[rgm1, rgm2];
// the first row group is still filtered out because the predicate expression can be partially evaluated
// when conditions are joined using AND
assert_eq!(row_group_filter, vec![false, true]);
assert_eq!(
prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
vec![1]
);

// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
let mut row_group_predicate =
build_row_group_predicate(pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
.map(|(i, g)| row_group_predicate(g, i))
.collect::<Vec<_>>();
assert_eq!(row_group_filter, vec![true, true]);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();

Ok(())
// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
assert_eq!(
prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
vec![0, 1]
);
}

fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
Expand All @@ -1324,32 +1304,27 @@ mod tests {
}

#[test]
fn row_group_pruning_predicate_null_expr() -> Result<()> {
fn row_group_pruning_predicate_null_expr() {
use datafusion_expr::{col, lit};
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let mut row_group_predicate =
build_row_group_predicate(pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
.map(|(i, g)| row_group_predicate(g, i))
.collect::<Vec<_>>();
let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value on "c2".
assert_eq!(row_group_filter, vec![false, true]);

Ok(())
assert_eq!(
prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
vec![1]
);
}

#[test]
fn row_group_pruning_predicate_eq_null_expr() -> Result<()> {
fn row_group_pruning_predicate_eq_null_expr() {
use datafusion_expr::{col, lit};
// test row group predicate with an unknown (Null) expr
//
Expand All @@ -1361,22 +1336,16 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();

let mut row_group_predicate =
build_row_group_predicate(pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
.map(|(i, g)| row_group_predicate(g, i))
.collect::<Vec<_>>();
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
assert_eq!(row_group_filter, vec![false, true]);

Ok(())
assert_eq!(
prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
vec![1]
);
}

fn get_row_group_meta_data(
Expand Down

0 comments on commit 6f6433c

Please sign in to comment.