Skip to content

Commit

Permalink
Move Pruning into physical-optimizer crate (apache#13485)
Browse files Browse the repository at this point in the history
* Move `Pruning` into `physical-optimizer` crate

* fix check

* fix issues

* cargo update
  • Loading branch information
irenjj authored Nov 20, 2024
1 parent c3681dc commit 30ff48e
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 54 deletions.
64 changes: 33 additions & 31 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub mod enforce_sorting;
pub mod join_selection;
pub mod optimizer;
pub mod projection_pushdown;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
pub mod sanity_checker;
#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ arrow-schema = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-expr-common = { workspace = true, default-features = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
recursive = { workspace = true }

[dev-dependencies]
datafusion-expr = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true }
tokio = { workspace = true }
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod limit_pushdown;
pub mod limited_distinct_aggregation;
mod optimizer;
pub mod output_requirements;
pub mod pruning;
pub mod topk_aggregation;
pub mod update_aggr_exprs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,30 @@
//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
//! based on statistics (e.g. Parquet Row Groups)
//!
//! [`Expr`]: crate::prelude::Expr
//! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
use std::collections::HashSet;
use std::sync::Arc;

use crate::{
common::{Column, DFSchema},
error::{DataFusionError, Result},
logical_expr::Operator,
physical_plan::{ColumnarValue, PhysicalExpr},
};

use arrow::array::AsArray;
use arrow::{
array::{new_null_array, ArrayRef, BooleanArray},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::{RecordBatch, RecordBatchOptions},
};
use arrow_array::cast::AsArray;
use log::trace;

use datafusion_common::error::{DataFusionError, Result};
use datafusion_common::tree_node::TransformedResult;
use datafusion_common::{
internal_err, plan_datafusion_err, plan_err,
tree_node::{Transformed, TreeNode},
ScalarValue,
};
use datafusion_common::{Column, DFSchema};
use datafusion_expr_common::operator::Operator;
use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};

use log::trace;
use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};

/// A source of runtime statistical information to [`PruningPredicate`]s.
///
Expand Down Expand Up @@ -567,7 +564,7 @@ impl PruningPredicate {
/// expressions like `b = false`, but it does handle the
/// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
///
/// [`ExprSimplifier`]: crate::optimizer::simplify_expressions::ExprSimplifier
/// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html
pub fn prune<S: PruningStatistics>(&self, statistics: &S) -> Result<Vec<bool>> {
let mut builder = BoolVecBuilder::new(statistics.num_containers());

Expand Down Expand Up @@ -653,7 +650,7 @@ impl PruningPredicate {

// this is only used by `parquet` feature right now
#[allow(dead_code)]
pub(crate) fn required_columns(&self) -> &RequiredColumns {
pub fn required_columns(&self) -> &RequiredColumns {
&self.required_columns
}

Expand Down Expand Up @@ -762,7 +759,7 @@ fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
/// Handles creating references to the min/max statistics
/// for columns as well as recording which statistics are needed
#[derive(Debug, Default, Clone)]
pub(crate) struct RequiredColumns {
pub struct RequiredColumns {
/// The statistics required to evaluate this predicate:
/// * The unqualified column in the input schema
/// * Statistics type (e.g. Min or Max or Null_Count)
Expand All @@ -786,7 +783,7 @@ impl RequiredColumns {
/// * `true` returns None
#[allow(dead_code)]
// this fn is only used by `parquet` feature right now, thus the `allow(dead_code)`
pub(crate) fn single_column(&self) -> Option<&phys_expr::Column> {
pub fn single_column(&self) -> Option<&phys_expr::Column> {
if self.columns.windows(2).all(|w| {
// check if all columns are the same (ignoring statistics and field)
let c1 = &w[0].0;
Expand Down Expand Up @@ -1664,15 +1661,14 @@ mod tests {
use std::ops::{Not, Rem};

use super::*;
use crate::assert_batches_eq;
use crate::logical_expr::{col, lit};
use datafusion_common::assert_batches_eq;
use datafusion_expr::{col, lit};

use arrow::array::Decimal128Array;
use arrow::{
array::{BinaryArray, Int32Array, Int64Array, StringArray},
array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
datatypes::TimeUnit,
};
use arrow_array::UInt64Array;
use datafusion_expr::expr::InList;
use datafusion_expr::{cast, is_null, try_cast, Expr};
use datafusion_functions_nested::expr_fn::{array_has, make_array};
Expand Down Expand Up @@ -3536,7 +3532,7 @@ mod tests {
// more complex case with unknown column
let input = known_expression.clone().and(input.clone());
let expected = phys_expr::BinaryExpr::new(
known_expression_transformed.clone(),
Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
Operator::And,
logical2physical(&lit(42), &schema),
);
Expand All @@ -3552,7 +3548,7 @@ mod tests {
// more complex case with unknown expression
let input = known_expression.and(input);
let expected = phys_expr::BinaryExpr::new(
known_expression_transformed.clone(),
Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
Operator::And,
logical2physical(&lit(42), &schema),
);
Expand Down Expand Up @@ -4038,7 +4034,7 @@ mod tests {
) {
println!("Pruning with expr: {}", expr);
let expr = logical2physical(&expr, schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
let result = p.prune(statistics).unwrap();
assert_eq!(result, expected);
}
Expand Down

0 comments on commit 30ff48e

Please sign in to comment.