diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 864172d96a01..0b6c67f88430 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -23,11 +23,13 @@ pub mod from_slice; #[cfg(feature = "pyarrow")] mod pyarrow; pub mod scalar; +pub mod stats; pub use column::Column; pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema}; pub use error::{field_not_found, DataFusionError, Result, SchemaError}; pub use scalar::{ScalarType, ScalarValue}; +pub use stats::{ColumnStatistics, Statistics}; /// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is /// not possible. In normal usage of DataFusion the downcast should always succeed. diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 680bd3f66fe0..2b4b9896441b 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -1004,6 +1004,43 @@ impl ScalarValue { } } + /// Absolute distance between two numeric values (of the same type). This method will return + /// None if either one of the arguments are null. It might also return None if the resulting + /// distance is greater than [`usize::MAX`]. If the type is a float, then the distance will be + /// rounded to the nearest integer. + /// + /// + /// Note: the datatype itself must support subtraction. + pub fn distance(&self, other: &ScalarValue) -> Option { + // Having an explicit null check here is important because the + // subtraction for scalar values will return a real value even + // if one side is null. + if self.is_null() || other.is_null() { + return None; + } + + let distance = if self > other { + self.sub(other).ok()? + } else { + other.sub(self).ok()? + }; + + match distance { + ScalarValue::Int8(Some(v)) => usize::try_from(v).ok(), + ScalarValue::Int16(Some(v)) => usize::try_from(v).ok(), + ScalarValue::Int32(Some(v)) => usize::try_from(v).ok(), + ScalarValue::Int64(Some(v)) => usize::try_from(v).ok(), + ScalarValue::UInt8(Some(v)) => Some(v as usize), + ScalarValue::UInt16(Some(v)) => Some(v as usize), + ScalarValue::UInt32(Some(v)) => usize::try_from(v).ok(), + ScalarValue::UInt64(Some(v)) => usize::try_from(v).ok(), + // TODO: we might want to look into supporting ceil/floor here for floats. + ScalarValue::Float32(Some(v)) => Some(v.round() as usize), + ScalarValue::Float64(Some(v)) => Some(v.round() as usize), + _ => None, + } + } + /// Converts a scalar value into an 1-row array. pub fn to_array(&self) -> ArrayRef { self.to_array_of_size(1) @@ -3805,4 +3842,154 @@ mod tests { ] ); } + + #[test] + fn test_scalar_distance() { + let cases = [ + // scalar (lhs), scalar (rhs), expected distance + // --------------------------------------------- + (ScalarValue::Int8(Some(1)), ScalarValue::Int8(Some(2)), 1), + (ScalarValue::Int8(Some(2)), ScalarValue::Int8(Some(1)), 1), + ( + ScalarValue::Int16(Some(-5)), + ScalarValue::Int16(Some(5)), + 10, + ), + ( + ScalarValue::Int16(Some(5)), + ScalarValue::Int16(Some(-5)), + 10, + ), + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(0)), 0), + ( + ScalarValue::Int32(Some(-5)), + ScalarValue::Int32(Some(-10)), + 5, + ), + ( + ScalarValue::Int64(Some(-10)), + ScalarValue::Int64(Some(-5)), + 5, + ), + (ScalarValue::UInt8(Some(1)), ScalarValue::UInt8(Some(2)), 1), + (ScalarValue::UInt8(Some(0)), ScalarValue::UInt8(Some(0)), 0), + ( + ScalarValue::UInt16(Some(5)), + ScalarValue::UInt16(Some(10)), + 5, + ), + ( + ScalarValue::UInt32(Some(10)), + ScalarValue::UInt32(Some(5)), + 5, + ), + ( + ScalarValue::UInt64(Some(5)), + ScalarValue::UInt64(Some(10)), + 5, + ), + ( + ScalarValue::Float32(Some(1.0)), + ScalarValue::Float32(Some(2.0)), + 1, + ), + ( + ScalarValue::Float32(Some(2.0)), + ScalarValue::Float32(Some(1.0)), + 1, + ), + ( + ScalarValue::Float64(Some(0.0)), + ScalarValue::Float64(Some(0.0)), + 0, + ), + ( + ScalarValue::Float64(Some(-5.0)), + ScalarValue::Float64(Some(-10.0)), + 5, + ), + ( + ScalarValue::Float64(Some(-10.0)), + ScalarValue::Float64(Some(-5.0)), + 5, + ), + // Floats are currently special cased to f64/f32 and the result is rounded + // rather than ceiled/floored. In the future we might want to take a mode + // which specified the rounding behavior. + ( + ScalarValue::Float32(Some(1.2)), + ScalarValue::Float32(Some(1.3)), + 0, + ), + ( + ScalarValue::Float32(Some(1.1)), + ScalarValue::Float32(Some(1.9)), + 1, + ), + ( + ScalarValue::Float64(Some(-5.3)), + ScalarValue::Float64(Some(-9.2)), + 4, + ), + ( + ScalarValue::Float64(Some(-5.3)), + ScalarValue::Float64(Some(-9.7)), + 4, + ), + ( + ScalarValue::Float64(Some(-5.3)), + ScalarValue::Float64(Some(-9.9)), + 5, + ), + ]; + for (lhs, rhs, expected) in cases.iter() { + let distance = lhs.distance(rhs).unwrap(); + assert_eq!(distance, *expected); + } + } + + #[test] + fn test_scalar_distance_invalid() { + let cases = [ + // scalar (lhs), scalar (rhs) + // -------------------------- + // Same type but with nulls + (ScalarValue::Int8(None), ScalarValue::Int8(None)), + (ScalarValue::Int8(None), ScalarValue::Int8(Some(1))), + (ScalarValue::Int8(Some(1)), ScalarValue::Int8(None)), + // Different type + (ScalarValue::Int8(Some(1)), ScalarValue::Int16(Some(1))), + (ScalarValue::Int8(Some(1)), ScalarValue::Float32(Some(1.0))), + ( + ScalarValue::Float64(Some(1.1)), + ScalarValue::Float32(Some(2.2)), + ), + ( + ScalarValue::UInt64(Some(777)), + ScalarValue::Int32(Some(111)), + ), + // Different types with nulls + (ScalarValue::Int8(None), ScalarValue::Int16(Some(1))), + (ScalarValue::Int8(Some(1)), ScalarValue::Int16(None)), + // Unsupported types + ( + ScalarValue::Utf8(Some("foo".to_string())), + ScalarValue::Utf8(Some("bar".to_string())), + ), + ( + ScalarValue::Boolean(Some(true)), + ScalarValue::Boolean(Some(false)), + ), + (ScalarValue::Date32(Some(0)), ScalarValue::Date32(Some(1))), + (ScalarValue::Date64(Some(0)), ScalarValue::Date64(Some(1))), + ( + ScalarValue::Decimal128(Some(123), 5, 5), + ScalarValue::Decimal128(Some(120), 5, 5), + ), + ]; + for (lhs, rhs) in cases { + let distance = lhs.distance(&rhs); + assert!(distance.is_none()); + } + } } diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs new file mode 100644 index 000000000000..c27f77532066 --- /dev/null +++ b/datafusion/common/src/stats.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides an interface for plan level statistics. + +use crate::ScalarValue; + +/// Statistics for a physical plan node +/// Fields are optional and can be inexact because the sources +/// sometimes provide approximate estimates for performance reasons +/// and the transformations output are not always predictable. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct Statistics { + /// The number of table rows + pub num_rows: Option, + /// total bytes of the table rows + pub total_byte_size: Option, + /// Statistics on a column level + pub column_statistics: Option>, + /// If true, any field that is `Some(..)` is the actual value in the data provided by the operator (it is not + /// an estimate). Any or all other fields might still be None, in which case no information is known. + /// if false, any field that is `Some(..)` may contain an inexact estimate and may not be the actual value. + pub is_exact: bool, +} + +/// This table statistics are estimates about column +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct ColumnStatistics { + /// Number of null values on column + pub null_count: Option, + /// Maximum value of column + pub max_value: Option, + /// Minimum value of column + pub min_value: Option, + /// Number of distinct values + pub distinct_count: Option, +} diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 259a11992583..b2f6c40f556d 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -22,8 +22,9 @@ use self::metrics::MetricsSet; use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; +pub use crate::common::{ColumnStatistics, Statistics}; +use crate::error::Result; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::{error::Result, scalar::ScalarValue}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; @@ -88,36 +89,6 @@ impl Stream for EmptyRecordBatchStream { /// Physical planner interface pub use self::planner::PhysicalPlanner; -/// Statistics for a physical plan node -/// Fields are optional and can be inexact because the sources -/// sometimes provide approximate estimates for performance reasons -/// and the transformations output are not always predictable. -#[derive(Debug, Clone, Default, PartialEq, Eq)] -pub struct Statistics { - /// The number of table rows - pub num_rows: Option, - /// total bytes of the table rows - pub total_byte_size: Option, - /// Statistics on a column level - pub column_statistics: Option>, - /// If true, any field that is `Some(..)` is the actual value in the data provided by the operator (it is not - /// an estimate). Any or all other fields might still be None, in which case no information is known. - /// if false, any field that is `Some(..)` may contain an inexact estimate and may not be the actual value. - pub is_exact: bool, -} -/// This table statistics are estimates about column -#[derive(Clone, Debug, Default, PartialEq, Eq)] -pub struct ColumnStatistics { - /// Number of null values on column - pub null_count: Option, - /// Maximum value of column - pub max_value: Option, - /// Minimum value of column - pub min_value: Option, - /// Number of distinct values - pub distinct_count: Option, -} - /// `ExecutionPlan` represent nodes in the DataFusion Physical Plan. /// /// Each `ExecutionPlan` is partition-aware and is responsible for diff --git a/datafusion/expr/src/operator.rs b/datafusion/expr/src/operator.rs index 7c0c4d76bbe0..602930c9a210 100644 --- a/datafusion/expr/src/operator.rs +++ b/datafusion/expr/src/operator.rs @@ -115,6 +115,41 @@ impl Operator { | Operator::StringConcat => None, } } + + /// Return the operator where swapping lhs and rhs wouldn't change the result. + /// + /// For example `Binary(50, >=, a)` could also be represented as `Binary(a, <=, 50)`. + pub fn swap(&self) -> Option { + match self { + Operator::Eq => Some(Operator::Eq), + Operator::NotEq => Some(Operator::NotEq), + Operator::Lt => Some(Operator::Gt), + Operator::LtEq => Some(Operator::GtEq), + Operator::Gt => Some(Operator::Lt), + Operator::GtEq => Some(Operator::LtEq), + Operator::Like + | Operator::NotLike + | Operator::IsDistinctFrom + | Operator::IsNotDistinctFrom + | Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::Modulo + | Operator::And + | Operator::Or + | Operator::RegexMatch + | Operator::RegexIMatch + | Operator::RegexNotMatch + | Operator::RegexNotIMatch + | Operator::BitwiseAnd + | Operator::BitwiseOr + | Operator::BitwiseXor + | Operator::BitwiseShiftRight + | Operator::BitwiseShiftLeft + | Operator::StringConcat => None, + } + } } impl fmt::Display for Operator { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 6db5b5a781d1..d6c1240a0d87 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -19,6 +19,7 @@ mod adapter; mod kernels; mod kernels_arrow; +use std::cmp::Ordering; use std::convert::TryInto; use std::{any::Any, sync::Arc}; @@ -73,8 +74,8 @@ use kernels_arrow::{ use arrow::datatypes::{DataType, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; -use crate::PhysicalExpr; -use datafusion_common::ScalarValue; +use crate::{ExprBoundaries, PhysicalExpr, PhysicalExprStats}; +use datafusion_common::{ColumnStatistics, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::type_coercion::binary::binary_operator_data_type; use datafusion_expr::{ColumnarValue, Operator}; @@ -640,6 +641,162 @@ impl PhysicalExpr for BinaryExpr { self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type) .map(|a| ColumnarValue::Array(a)) } + + fn expr_stats(&self) -> Arc { + Arc::new(BinaryExprStats { + op: self.op, + left: Arc::clone(self.left()), + right: Arc::clone(self.right()), + }) + } +} + +struct BinaryExprStats { + op: Operator, + left: Arc, + right: Arc, +} + +impl PhysicalExprStats for BinaryExprStats { + fn boundaries(&self, columns: &[ColumnStatistics]) -> Option { + match &self.op { + Operator::Eq + | Operator::Gt + | Operator::Lt + | Operator::LtEq + | Operator::GtEq => { + let l_bounds = self.left.expr_stats().boundaries(columns)?; + let r_bounds = self.right.expr_stats().boundaries(columns)?; + match (l_bounds.reduce(), r_bounds.reduce()) { + (_, Some(r)) => compare_left_boundaries(&self.op, &l_bounds, r), + (Some(scalar_value), _) => { + compare_left_boundaries(&self.op.swap()?, &r_bounds, scalar_value) + } + _ => None, + } + } + _ => None, + } + } +} + +// Compute the bounds of a comparison predicate (>, >=, <, <=) between +// two expressions (one of which must have a single value). Returns new +// expression boundary that represents the result of this comparison. +fn compare_left_boundaries( + op: &Operator, + lhs_expr_bounds: &ExprBoundaries, + rhs_scalar_value: ScalarValue, +) -> Option { + let variadic_min = lhs_expr_bounds.min_value.clone(); + let variadic_max = lhs_expr_bounds.max_value.clone(); + + // Direct selectivity is applicable when we can determine that this comparison will + // always be true or false (e.g. `x > 10` where the `x`'s min value is 11 or `a < 5` + // where the `a`'s max value is 4) (with the assuption that min/max are correct). + assert!(!matches!( + variadic_min.partial_cmp(&variadic_max), + Some(Ordering::Greater) + )); + let (always_selects, never_selects) = match op { + Operator::Lt => ( + rhs_scalar_value > variadic_max, + rhs_scalar_value <= variadic_min, + ), + Operator::LtEq => ( + rhs_scalar_value >= variadic_max, + rhs_scalar_value < variadic_min, + ), + Operator::Gt => ( + rhs_scalar_value < variadic_min, + rhs_scalar_value >= variadic_max, + ), + Operator::GtEq => ( + rhs_scalar_value <= variadic_min, + rhs_scalar_value > variadic_max, + ), + Operator::Eq => ( + // Since min/max can be artificial (e.g. the min or max value of a column + // might be just a guess), we can't assume variadic_min == literal_value + // would always select. + false, + rhs_scalar_value < variadic_min || rhs_scalar_value > variadic_max, + ), + _ => unreachable!(), + }; + + // Both can not be true at the same time. + assert!(!(always_selects && never_selects)); + + let selectivity = match (always_selects, never_selects) { + (true, _) => Some(1.0), + (_, true) => Some(0.0), + (false, false) => { + // If there is a partial overlap, then we can estimate the selectivity + // by computing the ratio of the existing overlap to the total range. Since we + // currently don't have access to a value distribution histogram, the part below + // assumes a uniform distribution by default. + + // Our [min, max] is inclusive, so we need to add 1 to the difference. + let total_range = variadic_max.distance(&variadic_min)? + 1; + let overlap_between_boundaries = match op { + Operator::Lt => rhs_scalar_value.distance(&variadic_min)?, + Operator::Gt => variadic_max.distance(&rhs_scalar_value)?, + Operator::LtEq => rhs_scalar_value.distance(&variadic_min)? + 1, + Operator::GtEq => variadic_max.distance(&rhs_scalar_value)? + 1, + Operator::Eq => 1, + _ => unreachable!(), + }; + + Some(overlap_between_boundaries as f64 / total_range as f64) + } + }?; + + // The selectivity can't be be greater than 1.0. + assert!(selectivity <= 1.0); + let distinct_count = lhs_expr_bounds + .distinct_count + .map(|distinct_count| (distinct_count as f64 * selectivity).round() as usize); + + // Now, we know what is the upper/lower bound is for this column after the + // predicate is applied. + let (new_min, new_max) = match op { + // TODO: for lt/gt, we technically should shrink the possibility space + // by one since a < 5 means that 5 is not a possible value for `a`. However, + // it is currently tricky to do so (e.g. for floats, we can get away with 4.999 + // so we need a smarter logic to find out what is the closest value that is + // different from the scalar_value). + Operator::Lt | Operator::LtEq => { + // We only want to update the upper bound when we know it will help us (e.g. + // it is actually smaller than what we have right now) and it is a valid + // value (e.g. [0, 100] < -100 would update the boundaries to [0, -100] if + // there weren't the selectivity check). + if rhs_scalar_value < variadic_max && selectivity > 0.0 { + (variadic_min, rhs_scalar_value) + } else { + (variadic_min, variadic_max) + } + } + Operator::Gt | Operator::GtEq => { + // Same as above, but this time we want to limit the lower bound. + if rhs_scalar_value > variadic_min && selectivity > 0.0 { + (rhs_scalar_value, variadic_max) + } else { + (variadic_min, variadic_max) + } + } + // For equality, we don't have the range problem so even if the selectivity + // is 0.0, we can still update the boundaries. + Operator::Eq => (rhs_scalar_value.clone(), rhs_scalar_value), + _ => unreachable!(), + }; + + Some(ExprBoundaries { + min_value: new_min, + max_value: new_max, + distinct_count, + selectivity: Some(selectivity), + }) } /// unwrap underlying (non dictionary) value, if any, to pass to a scalar kernel @@ -2602,4 +2759,232 @@ mod tests { Ok(()) } + + #[test] + fn test_comparison_result_estimate_generic() -> Result<()> { + let col_min = 1; + let col_max = 100; + let col_distinct = None; + let cases = [ + // (operator, rhs), (expected selectivity, expected min, expected max) + // ------------------------------------------------------------------- + // + // Table: + // - a (min = 1, max = 100, distinct_count = null) + // + // Equality (a = $): + // + ((Operator::Eq, 1), (1.0 / 100.0, 1, 1)), + ((Operator::Eq, 5), (1.0 / 100.0, 5, 5)), + ((Operator::Eq, 99), (1.0 / 100.0, 99, 99)), + ((Operator::Eq, 100), (1.0 / 100.0, 100, 100)), + // For never matches like the following, we still produce the correct + // min/max values since if this condition holds by an off chance, then + // the result of expression will effectively become the = $limit. + ((Operator::Eq, 0), (0.0, 0, 0)), + ((Operator::Eq, -101), (0.0, -101, -101)), + ((Operator::Eq, 101), (0.0, 101, 101)), + // + // Less than (a < $): + // + // Note: upper bounds for less than is currently overstated (by the closest value). + // see the comment in `compare_left_boundaries` for the reason + ((Operator::Lt, 5), (4.0 / 100.0, 1, 5)), + ((Operator::Lt, 99), (98.0 / 100.0, 1, 99)), + ((Operator::Lt, 101), (100.0 / 100.0, 1, 100)), + // Unlike equality, we now have an obligation to provide a range of values here + // so if "col < -100" expr is executed, we don't want to say col can take [0, -100]. + ((Operator::Lt, 0), (0.0, 1, 100)), + ((Operator::Lt, 1), (0.0, 1, 100)), + ((Operator::Lt, -100), (0.0, 1, 100)), + ((Operator::Lt, -200), (0.0, 1, 100)), + // We also don't want to expand the range unnecessarily even if the predicate is + // successful. + ((Operator::Lt, 200), (1.0, 1, 100)), + // + // Less than or equal (a <= $): + // + ((Operator::LtEq, -100), (0.0, 1, 100)), + ((Operator::LtEq, 0), (0.0, 1, 100)), + ((Operator::LtEq, 1), (1.0 / 100.0, 1, 1)), + ((Operator::LtEq, 5), (5.0 / 100.0, 1, 5)), + ((Operator::LtEq, 99), (99.0 / 100.0, 1, 99)), + ((Operator::LtEq, 100), (100.0 / 100.0, 1, 100)), + ((Operator::LtEq, 101), (1.0, 1, 100)), + ((Operator::LtEq, 200), (1.0, 1, 100)), + // + // Greater than (a > $): + // + ((Operator::Gt, -100), (1.0, 1, 100)), + ((Operator::Gt, 0), (1.0, 1, 100)), + ((Operator::Gt, 1), (99.0 / 100.0, 1, 100)), + ((Operator::Gt, 5), (95.0 / 100.0, 5, 100)), + ((Operator::Gt, 99), (1.0 / 100.0, 99, 100)), + ((Operator::Gt, 100), (0.0, 1, 100)), + ((Operator::Gt, 101), (0.0, 1, 100)), + ((Operator::Gt, 200), (0.0, 1, 100)), + // + // Greater than or equal (a >= $): + // + ((Operator::GtEq, -100), (1.0, 1, 100)), + ((Operator::GtEq, 0), (1.0, 1, 100)), + ((Operator::GtEq, 1), (1.0, 1, 100)), + ((Operator::GtEq, 5), (96.0 / 100.0, 5, 100)), + ((Operator::GtEq, 99), (2.0 / 100.0, 99, 100)), + ((Operator::GtEq, 100), (1.0 / 100.0, 100, 100)), + ((Operator::GtEq, 101), (0.0, 1, 100)), + ((Operator::GtEq, 200), (0.0, 1, 100)), + ]; + + for ((operator, rhs), (exp_selectivity, exp_min, exp_max)) in cases { + let lhs = ExprBoundaries::new( + ScalarValue::Int64(Some(col_max)), + ScalarValue::Int64(Some(col_min)), + col_distinct, + ); + let rhs_as_scalar = ScalarValue::Int64(Some(rhs)); + let boundaries = + compare_left_boundaries(&operator, &lhs, rhs_as_scalar.clone()) + .expect("this case should not return None"); + assert_eq!( + boundaries + .selectivity + .expect("compare_left_boundaries must produce a selectivity value"), + exp_selectivity + ); + assert_eq!(boundaries.min_value, ScalarValue::Int64(Some(exp_min))); + assert_eq!(boundaries.max_value, ScalarValue::Int64(Some(exp_max))); + } + Ok(()) + } + + #[test] + fn test_comparison_result_estimate_different_type() -> Result<()> { + let col_min = 1.3; + let col_max = 50.7; + let distance = 50.0; // rounded distance is (max - min) + 1 + let col_distinct = Some(25); + + // Since the generic version already covers all the paths, we can just + // test a small subset of the cases. + let cases = [ + // (operator, rhs), (expected selectivity, expected min, expected max, expected distinct) + // -------------------------------------------------------------------------------------- + // + // Table: + // - a (min = 1.3, max = 50.7, distinct_count = 25) + // + // Never selects (out of range) + ((Operator::Eq, 1.1), (0.0, 1.1, 1.1, 0)), + ((Operator::Eq, 50.75), (0.0, 50.75, 50.75, 0)), + ((Operator::Lt, 1.3), (0.0, 1.3, 50.7, 0)), + ((Operator::LtEq, 1.29), (0.0, 1.3, 50.7, 0)), + ((Operator::Gt, 50.7), (0.0, 1.3, 50.7, 0)), + ((Operator::GtEq, 50.75), (0.0, 1.3, 50.7, 0)), + // Always selects + ((Operator::Lt, 50.75), (1.0, 1.3, 50.7, 25)), + ((Operator::LtEq, 50.75), (1.0, 1.3, 50.7, 25)), + ((Operator::Gt, 1.29), (1.0, 1.3, 50.7, 25)), + ((Operator::GtEq, 1.3), (1.0, 1.3, 50.7, 25)), + // Partial selection (the x in 'x/distance' is basically the rounded version of + // the bound distance, as per the implementation). + ((Operator::Eq, 27.8), (1.0 / distance, 27.8, 27.8, 1)), + ((Operator::Lt, 5.2), (4.0 / distance, 1.3, 5.2, 2)), // On a uniform distribution, this is {2.6, 3.9} + ((Operator::LtEq, 1.3), (1.0 / distance, 1.3, 1.3, 1)), + ((Operator::Gt, 45.5), (5.0 / distance, 45.5, 50.7, 3)), // On a uniform distribution, this is {46.8, 48.1, 49.4} + ((Operator::GtEq, 50.7), (1.0 / distance, 50.7, 50.7, 1)), + ]; + + for ((operator, rhs), (exp_selectivity, exp_min, exp_max, exp_dist)) in cases { + let lhs = ExprBoundaries::new( + ScalarValue::Float64(Some(col_max)), + ScalarValue::Float64(Some(col_min)), + col_distinct, + ); + let rhs_as_scalar = ScalarValue::Float64(Some(rhs)); + let boundaries = compare_left_boundaries(&operator, &lhs, rhs_as_scalar) + .expect("this case should not return None"); + assert_eq!( + boundaries + .selectivity + .expect("compare_left_boundaries must produce a selectivity value"), + exp_selectivity + ); + assert_eq!(boundaries.min_value, ScalarValue::Float64(Some(exp_min))); + assert_eq!(boundaries.max_value, ScalarValue::Float64(Some(exp_max))); + assert_eq!( + boundaries + .distinct_count + .expect("this test expects distinct_count != NULL"), + exp_dist + ); + } + Ok(()) + } + + #[test] + fn test_binary_expression_boundaries() -> Result<()> { + // A table where the column 'a' has a min of 1, a max of 100. + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let columns = [ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(1))), + max_value: Some(ScalarValue::Int32(Some(100))), + null_count: Some(0), + distinct_count: Some(100), + }]; + + // expression: "a >= 25" + let lt = binary_simple( + col("a", &schema)?, + Operator::GtEq, + lit(ScalarValue::Int32(Some(25))), + &schema, + ); + + let stats = lt.expr_stats(); + let boundaries = stats + .boundaries(&columns) + .expect("boundaries should not be None"); + assert_eq!(boundaries.min_value, ScalarValue::Int32(Some(25))); + assert_eq!(boundaries.max_value, ScalarValue::Int32(Some(100))); + assert_eq!(boundaries.distinct_count, Some(76)); + assert_eq!(boundaries.selectivity, Some(0.76)); + + Ok(()) + } + + #[test] + fn test_binary_expression_boundaries_rhs() -> Result<()> { + // This test is about the column rewriting feature in the boundary provider + // (e.g. if the lhs is a literal and rhs is the column, then we swap them when + // doing the computation). + + // A table where the column 'a' has a min of 1, a max of 100. + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let columns = [ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(1))), + max_value: Some(ScalarValue::Int32(Some(100))), + null_count: Some(0), + distinct_count: Some(100), + }]; + + // expression: "50 >= a" + let lt = binary_simple( + lit(ScalarValue::Int32(Some(50))), + Operator::GtEq, + col("a", &schema)?, + &schema, + ); + + let stats = lt.expr_stats(); + let boundaries = stats + .boundaries(&columns) + .expect("boundaries should not be None"); + assert_eq!(boundaries.min_value, ScalarValue::Int32(Some(1))); + assert_eq!(boundaries.max_value, ScalarValue::Int32(Some(50))); + assert_eq!(boundaries.distinct_count, Some(50)); + assert_eq!(boundaries.selectivity, Some(0.50)); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 63f8c405308e..dfb6eca40d3f 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -24,8 +24,8 @@ use arrow::{ record_batch::RecordBatch, }; -use crate::PhysicalExpr; -use datafusion_common::{DataFusionError, Result}; +use crate::{ExprBoundaries, PhysicalExpr, PhysicalExprStats}; +use datafusion_common::{ColumnStatistics, DataFusionError, Result}; use datafusion_expr::ColumnarValue; /// Represents the column at a given index in a RecordBatch @@ -89,6 +89,28 @@ impl PhysicalExpr for Column { self.bounds_check(batch.schema().as_ref())?; Ok(ColumnarValue::Array(batch.column(self.index).clone())) } + + /// Return the statistics for this expression + fn expr_stats(&self) -> Arc { + Arc::new(ColumnExprStats { index: self.index }) + } +} + +#[derive(Debug, Clone)] +struct ColumnExprStats { + index: usize, +} + +impl PhysicalExprStats for ColumnExprStats { + /// Retrieve the boundaries of this column from the given column-level statistics. + fn boundaries(&self, columns: &[ColumnStatistics]) -> Option { + let column = &columns[self.index]; + Some(ExprBoundaries::new( + column.max_value.as_ref()?.clone(), + column.min_value.as_ref()?.clone(), + column.distinct_count, + )) + } } impl Column { @@ -116,7 +138,7 @@ mod test { use arrow::array::StringArray; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; - use datafusion_common::Result; + use datafusion_common::{ColumnStatistics, Result, ScalarValue}; use std::sync::Arc; #[test] @@ -154,4 +176,44 @@ mod test { &format!("{}", error)); Ok(()) } + + #[test] + fn stats() -> Result<()> { + let columns = [ + ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(1))), + max_value: Some(ScalarValue::Int32(Some(100))), + distinct_count: Some(15), + ..Default::default() + }, + ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(1))), + max_value: Some(ScalarValue::Int32(Some(100))), + distinct_count: Some(75), + ..Default::default() + }, + ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(1))), + max_value: Some(ScalarValue::Int32(Some(100))), + distinct_count: None, + ..Default::default() + }, + ]; + + let cases = [ + // (name, index, expected distinct count) + ("col0", 0, Some(15)), + ("col1", 1, Some(75)), + ("col2", 2, None), + ]; + + for (name, index, expected) in cases { + let col = Column::new(name, index); + let stats = col.expr_stats(); + let boundaries = stats.boundaries(&columns).unwrap(); + assert_eq!(boundaries.distinct_count, expected); + } + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index e711f57c0e3b..b3e4a7a320ac 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -25,9 +25,9 @@ use arrow::{ record_batch::RecordBatch, }; -use crate::PhysicalExpr; -use datafusion_common::Result; +use crate::{ExprBoundaries, PhysicalExpr, PhysicalExprStats}; use datafusion_common::ScalarValue; +use datafusion_common::{ColumnStatistics, Result}; use datafusion_expr::{ColumnarValue, Expr}; /// Represents a literal value @@ -71,6 +71,29 @@ impl PhysicalExpr for Literal { fn evaluate(&self, _batch: &RecordBatch) -> Result { Ok(ColumnarValue::Scalar(self.value.clone())) } + + fn expr_stats(&self) -> Arc { + Arc::new(LiteralExprStats { + value: self.value.clone(), + }) + } +} + +struct LiteralExprStats { + value: ScalarValue, +} + +impl PhysicalExprStats for LiteralExprStats { + #[allow(unused_variables)] + /// A literal's boundaries are the same as its value's boundaries (since it is a + /// scalar, both min/max are the same). + fn boundaries(&self, columns: &[ColumnStatistics]) -> Option { + Some(ExprBoundaries::new( + self.value.clone(), + self.value.clone(), + Some(1), + )) + } } /// Create a literal expression @@ -110,4 +133,17 @@ mod tests { Ok(()) } + + #[test] + fn literal_stats() -> Result<()> { + let literal_expr = lit(42i32); + let stats = literal_expr.expr_stats(); + let boundaries = stats.boundaries(&[]).unwrap(); + assert_eq!(boundaries.min_value, ScalarValue::Int32(Some(42))); + assert_eq!(boundaries.max_value, ScalarValue::Int32(Some(42))); + assert_eq!(boundaries.distinct_count, Some(1)); + assert_eq!(boundaries.selectivity, None); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 7511fb2624cb..965ca330056d 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -43,7 +43,7 @@ pub mod window; // reexport this to maintain compatibility with anything that used from_slice previously pub use aggregate::AggregateExpr; pub use datafusion_common::from_slice; -pub use physical_expr::PhysicalExpr; +pub use physical_expr::{ExprBoundaries, PhysicalExpr, PhysicalExprStats}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; pub use sort_expr::PhysicalSortExpr; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 62f5541273ef..df82cc07f1ee 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -19,10 +19,11 @@ use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::Result; +use datafusion_common::{ColumnStatistics, Result, ScalarValue}; use datafusion_expr::ColumnarValue; use std::fmt::{Debug, Display}; +use std::sync::Arc; use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, filter_record_batch, is_not_null, SlicesIterator}; @@ -61,6 +62,70 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug { Ok(tmp_result) } } + /// Return the expression statistics for this expression. This API is currently experimental. + fn expr_stats(&self) -> Arc { + Arc::new(BasicExpressionStats {}) + } +} + +/// Statistics about the result of a single expression. +#[derive(Clone, Debug, PartialEq)] +pub struct ExprBoundaries { + /// Maximum value this expression's result can have. + pub max_value: ScalarValue, + /// Minimum value this expression's result can have. + pub min_value: ScalarValue, + /// Maximum number of distinct values this expression can produce, if known. + pub distinct_count: Option, + /// Selectivity of this expression if it were used as a predicate, as a + /// value between 0 and 1. + pub selectivity: Option, +} + +impl ExprBoundaries { + /// Create a new `ExprBoundaries`. + pub fn new( + max_value: ScalarValue, + min_value: ScalarValue, + distinct_count: Option, + ) -> Self { + Self { + max_value, + min_value, + distinct_count, + selectivity: None, + } + } + + /// Try to reduce the expression boundaries to a single value if possible. + pub fn reduce(&self) -> Option { + if self.min_value == self.max_value { + Some(self.min_value.clone()) + } else { + None + } + } +} + +/// A toolkit to work with physical expressions statistics. This API is currently experimental +/// and might be subject to change. +pub trait PhysicalExprStats: Send + Sync { + /// Return an estimate about the boundaries of this expression's result would have (in + /// terms of minimum and maximum values it can take as well the number of unique values + /// it can produce). The inputs are the column-level statistics from the current physical + /// plan. + fn boundaries(&self, columns: &[ColumnStatistics]) -> Option; +} + +#[derive(Debug, Clone)] +pub struct BasicExpressionStats {} + +/// A dummy implementation of [`ExpressionStats`] that does not provide any statistics. +impl PhysicalExprStats for BasicExpressionStats { + #[allow(unused_variables)] + fn boundaries(&self, columns: &[ColumnStatistics]) -> Option { + None + } } /// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy` @@ -179,4 +244,31 @@ mod tests { assert_eq!(&expected, result); Ok(()) } + + #[test] + fn reduce_boundaries() -> Result<()> { + let different_boundaries = ExprBoundaries::new( + ScalarValue::Int32(Some(1)), + ScalarValue::Int32(Some(10)), + None, + ); + assert_eq!(different_boundaries.reduce(), None); + + let scalar_boundaries = ExprBoundaries::new( + ScalarValue::Int32(Some(1)), + ScalarValue::Int32(Some(1)), + None, + ); + assert_eq!( + scalar_boundaries.reduce(), + Some(ScalarValue::Int32(Some(1))) + ); + + // Can still reduce. + let no_boundaries = + ExprBoundaries::new(ScalarValue::Int32(None), ScalarValue::Int32(None), None); + assert_eq!(no_boundaries.reduce(), Some(ScalarValue::Int32(None))); + + Ok(()) + } }