Skip to content

Commit

Permalink
Implement foundational filter selectivity analysis (#3868)
Browse files Browse the repository at this point in the history
* Implement foundational filter selectivity analysis

* Apply suggestions & minor fixes
  • Loading branch information
isidentical authored Oct 20, 2022
1 parent 57e445a commit 6d44791
Show file tree
Hide file tree
Showing 10 changed files with 861 additions and 40 deletions.
2 changes: 2 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ pub mod from_slice;
#[cfg(feature = "pyarrow")]
mod pyarrow;
pub mod scalar;
pub mod stats;

pub use column::Column;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
pub use error::{field_not_found, DataFusionError, Result, SchemaError};
pub use scalar::{ScalarType, ScalarValue};
pub use stats::{ColumnStatistics, Statistics};

/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
/// not possible. In normal usage of DataFusion the downcast should always succeed.
Expand Down
187 changes: 187 additions & 0 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,43 @@ impl ScalarValue {
}
}

/// Absolute distance between two numeric values (of the same type). This method will return
/// None if either one of the arguments are null. It might also return None if the resulting
/// distance is greater than [`usize::MAX`]. If the type is a float, then the distance will be
/// rounded to the nearest integer.
///
///
/// Note: the datatype itself must support subtraction.
pub fn distance(&self, other: &ScalarValue) -> Option<usize> {
// Having an explicit null check here is important because the
// subtraction for scalar values will return a real value even
// if one side is null.
if self.is_null() || other.is_null() {
return None;
}

let distance = if self > other {
self.sub(other).ok()?
} else {
other.sub(self).ok()?
};

match distance {
ScalarValue::Int8(Some(v)) => usize::try_from(v).ok(),
ScalarValue::Int16(Some(v)) => usize::try_from(v).ok(),
ScalarValue::Int32(Some(v)) => usize::try_from(v).ok(),
ScalarValue::Int64(Some(v)) => usize::try_from(v).ok(),
ScalarValue::UInt8(Some(v)) => Some(v as usize),
ScalarValue::UInt16(Some(v)) => Some(v as usize),
ScalarValue::UInt32(Some(v)) => usize::try_from(v).ok(),
ScalarValue::UInt64(Some(v)) => usize::try_from(v).ok(),
// TODO: we might want to look into supporting ceil/floor here for floats.
ScalarValue::Float32(Some(v)) => Some(v.round() as usize),
ScalarValue::Float64(Some(v)) => Some(v.round() as usize),
_ => None,
}
}

/// Converts a scalar value into an 1-row array.
pub fn to_array(&self) -> ArrayRef {
self.to_array_of_size(1)
Expand Down Expand Up @@ -3805,4 +3842,154 @@ mod tests {
]
);
}

#[test]
fn test_scalar_distance() {
let cases = [
// scalar (lhs), scalar (rhs), expected distance
// ---------------------------------------------
(ScalarValue::Int8(Some(1)), ScalarValue::Int8(Some(2)), 1),
(ScalarValue::Int8(Some(2)), ScalarValue::Int8(Some(1)), 1),
(
ScalarValue::Int16(Some(-5)),
ScalarValue::Int16(Some(5)),
10,
),
(
ScalarValue::Int16(Some(5)),
ScalarValue::Int16(Some(-5)),
10,
),
(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(0)), 0),
(
ScalarValue::Int32(Some(-5)),
ScalarValue::Int32(Some(-10)),
5,
),
(
ScalarValue::Int64(Some(-10)),
ScalarValue::Int64(Some(-5)),
5,
),
(ScalarValue::UInt8(Some(1)), ScalarValue::UInt8(Some(2)), 1),
(ScalarValue::UInt8(Some(0)), ScalarValue::UInt8(Some(0)), 0),
(
ScalarValue::UInt16(Some(5)),
ScalarValue::UInt16(Some(10)),
5,
),
(
ScalarValue::UInt32(Some(10)),
ScalarValue::UInt32(Some(5)),
5,
),
(
ScalarValue::UInt64(Some(5)),
ScalarValue::UInt64(Some(10)),
5,
),
(
ScalarValue::Float32(Some(1.0)),
ScalarValue::Float32(Some(2.0)),
1,
),
(
ScalarValue::Float32(Some(2.0)),
ScalarValue::Float32(Some(1.0)),
1,
),
(
ScalarValue::Float64(Some(0.0)),
ScalarValue::Float64(Some(0.0)),
0,
),
(
ScalarValue::Float64(Some(-5.0)),
ScalarValue::Float64(Some(-10.0)),
5,
),
(
ScalarValue::Float64(Some(-10.0)),
ScalarValue::Float64(Some(-5.0)),
5,
),
// Floats are currently special cased to f64/f32 and the result is rounded
// rather than ceiled/floored. In the future we might want to take a mode
// which specified the rounding behavior.
(
ScalarValue::Float32(Some(1.2)),
ScalarValue::Float32(Some(1.3)),
0,
),
(
ScalarValue::Float32(Some(1.1)),
ScalarValue::Float32(Some(1.9)),
1,
),
(
ScalarValue::Float64(Some(-5.3)),
ScalarValue::Float64(Some(-9.2)),
4,
),
(
ScalarValue::Float64(Some(-5.3)),
ScalarValue::Float64(Some(-9.7)),
4,
),
(
ScalarValue::Float64(Some(-5.3)),
ScalarValue::Float64(Some(-9.9)),
5,
),
];
for (lhs, rhs, expected) in cases.iter() {
let distance = lhs.distance(rhs).unwrap();
assert_eq!(distance, *expected);
}
}

#[test]
fn test_scalar_distance_invalid() {
let cases = [
// scalar (lhs), scalar (rhs)
// --------------------------
// Same type but with nulls
(ScalarValue::Int8(None), ScalarValue::Int8(None)),
(ScalarValue::Int8(None), ScalarValue::Int8(Some(1))),
(ScalarValue::Int8(Some(1)), ScalarValue::Int8(None)),
// Different type
(ScalarValue::Int8(Some(1)), ScalarValue::Int16(Some(1))),
(ScalarValue::Int8(Some(1)), ScalarValue::Float32(Some(1.0))),
(
ScalarValue::Float64(Some(1.1)),
ScalarValue::Float32(Some(2.2)),
),
(
ScalarValue::UInt64(Some(777)),
ScalarValue::Int32(Some(111)),
),
// Different types with nulls
(ScalarValue::Int8(None), ScalarValue::Int16(Some(1))),
(ScalarValue::Int8(Some(1)), ScalarValue::Int16(None)),
// Unsupported types
(
ScalarValue::Utf8(Some("foo".to_string())),
ScalarValue::Utf8(Some("bar".to_string())),
),
(
ScalarValue::Boolean(Some(true)),
ScalarValue::Boolean(Some(false)),
),
(ScalarValue::Date32(Some(0)), ScalarValue::Date32(Some(1))),
(ScalarValue::Date64(Some(0)), ScalarValue::Date64(Some(1))),
(
ScalarValue::Decimal128(Some(123), 5, 5),
ScalarValue::Decimal128(Some(120), 5, 5),
),
];
for (lhs, rhs) in cases {
let distance = lhs.distance(&rhs);
assert!(distance.is_none());
}
}
}
51 changes: 51 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This module provides an interface for plan level statistics.
use crate::ScalarValue;

/// Statistics for a physical plan node
/// Fields are optional and can be inexact because the sources
/// sometimes provide approximate estimates for performance reasons
/// and the transformations output are not always predictable.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Statistics {
/// The number of table rows
pub num_rows: Option<usize>,
/// total bytes of the table rows
pub total_byte_size: Option<usize>,
/// Statistics on a column level
pub column_statistics: Option<Vec<ColumnStatistics>>,
/// If true, any field that is `Some(..)` is the actual value in the data provided by the operator (it is not
/// an estimate). Any or all other fields might still be None, in which case no information is known.
/// if false, any field that is `Some(..)` may contain an inexact estimate and may not be the actual value.
pub is_exact: bool,
}

/// This table statistics are estimates about column
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ColumnStatistics {
/// Number of null values on column
pub null_count: Option<usize>,
/// Maximum value of column
pub max_value: Option<ScalarValue>,
/// Minimum value of column
pub min_value: Option<ScalarValue>,
/// Number of distinct values
pub distinct_count: Option<usize>,
}
33 changes: 2 additions & 31 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
pub use crate::common::{ColumnStatistics, Statistics};
use crate::error::Result;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::{error::Result, scalar::ScalarValue};

use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -88,36 +89,6 @@ impl Stream for EmptyRecordBatchStream {
/// Physical planner interface
pub use self::planner::PhysicalPlanner;

/// Statistics for a physical plan node
/// Fields are optional and can be inexact because the sources
/// sometimes provide approximate estimates for performance reasons
/// and the transformations output are not always predictable.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Statistics {
/// The number of table rows
pub num_rows: Option<usize>,
/// total bytes of the table rows
pub total_byte_size: Option<usize>,
/// Statistics on a column level
pub column_statistics: Option<Vec<ColumnStatistics>>,
/// If true, any field that is `Some(..)` is the actual value in the data provided by the operator (it is not
/// an estimate). Any or all other fields might still be None, in which case no information is known.
/// if false, any field that is `Some(..)` may contain an inexact estimate and may not be the actual value.
pub is_exact: bool,
}
/// This table statistics are estimates about column
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ColumnStatistics {
/// Number of null values on column
pub null_count: Option<usize>,
/// Maximum value of column
pub max_value: Option<ScalarValue>,
/// Minimum value of column
pub min_value: Option<ScalarValue>,
/// Number of distinct values
pub distinct_count: Option<usize>,
}

/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
///
/// Each `ExecutionPlan` is partition-aware and is responsible for
Expand Down
35 changes: 35 additions & 0 deletions datafusion/expr/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,41 @@ impl Operator {
| Operator::StringConcat => None,
}
}

/// Return the operator where swapping lhs and rhs wouldn't change the result.
///
/// For example `Binary(50, >=, a)` could also be represented as `Binary(a, <=, 50)`.
pub fn swap(&self) -> Option<Operator> {
match self {
Operator::Eq => Some(Operator::Eq),
Operator::NotEq => Some(Operator::NotEq),
Operator::Lt => Some(Operator::Gt),
Operator::LtEq => Some(Operator::GtEq),
Operator::Gt => Some(Operator::Lt),
Operator::GtEq => Some(Operator::LtEq),
Operator::Like
| Operator::NotLike
| Operator::IsDistinctFrom
| Operator::IsNotDistinctFrom
| Operator::Plus
| Operator::Minus
| Operator::Multiply
| Operator::Divide
| Operator::Modulo
| Operator::And
| Operator::Or
| Operator::RegexMatch
| Operator::RegexIMatch
| Operator::RegexNotMatch
| Operator::RegexNotIMatch
| Operator::BitwiseAnd
| Operator::BitwiseOr
| Operator::BitwiseXor
| Operator::BitwiseShiftRight
| Operator::BitwiseShiftLeft
| Operator::StringConcat => None,
}
}
}

impl fmt::Display for Operator {
Expand Down
Loading

0 comments on commit 6d44791

Please sign in to comment.