Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 18, 2022
1 parent e188b8c commit 2b10d68
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 13 deletions.
10 changes: 10 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use datafusion_expr::expr::{BinaryExpr, Cast};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::expressions::Literal;
use log::trace;

/// Interface to pass statistics information to [`PruningPredicate`]
Expand Down Expand Up @@ -224,6 +225,15 @@ impl PruningPredicate {
&self.predicate_expr
}

/// Returns true if this pruning predicate is "always true" (aka will not prune anything)
pub fn allways_true(&self) -> bool {
self.predicate_expr
.as_any()
.downcast_ref::<Literal>()
.map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
.unwrap_or_default()
}

/// Returns all need column indexes to evaluate this pruning predicate
pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
let mut set = HashSet::new();
Expand Down
42 changes: 29 additions & 13 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,28 @@ impl ParquetExec {
let predicate_creation_errors =
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");

let pruning_predicate = predicate.and_then(|predicate_expr| {
match PruningPredicate::try_new(
predicate_expr,
base_config.file_schema.clone(),
) {
Ok(pruning_predicate) => Some(pruning_predicate),
Err(e) => {
debug!("Could not create pruning predicate for: {}", e);
predicate_creation_errors.add(1);
let pruning_predicate = predicate
.and_then(|predicate_expr| {
match PruningPredicate::try_new(
predicate_expr,
base_config.file_schema.clone(),
) {
Ok(pruning_predicate) => Some(pruning_predicate),
Err(e) => {
debug!("Could not create pruning predicate for: {}", e);
predicate_creation_errors.add(1);
None
}
}
})
.and_then(|pruning_predicate| {
// If the pruning predicate can't prune anything, don't try
if pruning_predicate.allways_true() {
None
} else {
Some(pruning_predicate)
}
}
});
});

let (projected_schema, projected_statistics) = base_config.project();

Expand Down Expand Up @@ -1544,6 +1553,10 @@ mod tests {

let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;

// should have a pruning predicate
let pruning_predicate = &rt.parquet_exec.pruning_predicate;
assert!(pruning_predicate.is_some());

// convert to explain plan form
let display = displayable(rt.parquet_exec.as_ref()).indent().to_string();

Expand Down Expand Up @@ -1578,8 +1591,11 @@ mod tests {

// Should not contain a pruning predicate
let pruning_predicate = &rt.parquet_exec.pruning_predicate;
assert!(pruning_predicate.is_none(),
"Still had pruning predicate: {:?}", pruning_predicate)
assert!(
pruning_predicate.is_none(),
"Still had pruning predicate: {:?}",
pruning_predicate
)
}

/// returns the sum of all the metrics with the specified name
Expand Down

0 comments on commit 2b10d68

Please sign in to comment.