Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into refactor_code
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 5, 2023
2 parents 6565f5b + 2e5ad7a commit 15f4d8b
Show file tree
Hide file tree
Showing 16 changed files with 496 additions and 111 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ in-memory format. [Python Bindings](https://github.com/apache/arrow-datafusion-p
Here are links to some important information

- [Project Site](https://arrow.apache.org/datafusion)
- [Installation](https://arrow.apache.org/datafusion/user-guide/cli.html#installation)
- [Rust Getting Started](https://arrow.apache.org/datafusion/user-guide/example-usage.html)
- [Rust DataFrame API](https://arrow.apache.org/datafusion/user-guide/dataframe.html)
- [Rust API docs](https://docs.rs/datafusion/latest/datafusion)
Expand Down
2 changes: 2 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod dfschema;
mod error;
mod functional_dependencies;
mod join_type;
mod param_value;
#[cfg(feature = "pyarrow")]
mod pyarrow;
mod schema_reference;
Expand Down Expand Up @@ -59,6 +60,7 @@ pub use functional_dependencies::{
Constraints, Dependency, FunctionalDependence, FunctionalDependencies,
};
pub use join_type::{JoinConstraint, JoinSide, JoinType};
pub use param_value::ParamValues;
pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use stats::{ColumnStatistics, Statistics};
Expand Down
149 changes: 149 additions & 0 deletions datafusion/common/src/param_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::{_internal_err, _plan_err};
use crate::{DataFusionError, Result, ScalarValue};
use arrow_schema::DataType;
use std::collections::HashMap;

/// The parameter value corresponding to the placeholder
#[derive(Debug, Clone)]
pub enum ParamValues {
/// for positional query parameters, like select * from test where a > $1 and b = $2
LIST(Vec<ScalarValue>),
/// for named query parameters, like select * from test where a > $foo and b = $goo
MAP(HashMap<String, ScalarValue>),
}

impl ParamValues {
/// Verify parameter list length and type
pub fn verify(&self, expect: &Vec<DataType>) -> Result<()> {
match self {
ParamValues::LIST(list) => {
// Verify if the number of params matches the number of values
if expect.len() != list.len() {
return _plan_err!(
"Expected {} parameters, got {}",
expect.len(),
list.len()
);
}

// Verify if the types of the params matches the types of the values
let iter = expect.iter().zip(list.iter());
for (i, (param_type, value)) in iter.enumerate() {
if *param_type != value.data_type() {
return _plan_err!(
"Expected parameter of type {:?}, got {:?} at index {}",
param_type,
value.data_type(),
i
);
}
}
Ok(())
}
ParamValues::MAP(_) => {
// If it is a named query, variables can be reused,
// but the lengths are not necessarily equal
Ok(())
}
}
}

pub fn get_placeholders_with_values(
&self,
id: &String,
data_type: &Option<DataType>,
) -> Result<ScalarValue> {
match self {
ParamValues::LIST(list) => {
if id.is_empty() || id == "$0" {
return _plan_err!("Empty placeholder id");
}
// convert id (in format $1, $2, ..) to idx (0, 1, ..)
let idx = id[1..].parse::<usize>().map_err(|e| {
DataFusionError::Internal(format!(
"Failed to parse placeholder id: {e}"
))
})? - 1;
// value at the idx-th position in param_values should be the value for the placeholder
let value = list.get(idx).ok_or_else(|| {
DataFusionError::Internal(format!(
"No value found for placeholder with id {id}"
))
})?;
// check if the data type of the value matches the data type of the placeholder
if Some(value.data_type()) != *data_type {
return _internal_err!(
"Placeholder value type mismatch: expected {:?}, got {:?}",
data_type,
value.data_type()
);
}
Ok(value.clone())
}
ParamValues::MAP(map) => {
// convert name (in format $a, $b, ..) to mapped values (a, b, ..)
let name = &id[1..];
// value at the name position in param_values should be the value for the placeholder
let value = map.get(name).ok_or_else(|| {
DataFusionError::Internal(format!(
"No value found for placeholder with name {id}"
))
})?;
// check if the data type of the value matches the data type of the placeholder
if Some(value.data_type()) != *data_type {
return _internal_err!(
"Placeholder value type mismatch: expected {:?}, got {:?}",
data_type,
value.data_type()
);
}
Ok(value.clone())
}
}
}
}

impl From<Vec<ScalarValue>> for ParamValues {
fn from(value: Vec<ScalarValue>) -> Self {
Self::LIST(value)
}
}

impl<K> From<Vec<(K, ScalarValue)>> for ParamValues
where
K: Into<String>,
{
fn from(value: Vec<(K, ScalarValue)>) -> Self {
let value: HashMap<String, ScalarValue> =
value.into_iter().map(|(k, v)| (k.into(), v)).collect();
Self::MAP(value)
}
}

impl<K> From<HashMap<K, ScalarValue>> for ParamValues
where
K: Into<String>,
{
fn from(value: HashMap<K, ScalarValue>) -> Self {
let value: HashMap<String, ScalarValue> =
value.into_iter().map(|(k, v)| (k.into(), v)).collect();
Self::MAP(value)
}
}
16 changes: 16 additions & 0 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3065,6 +3065,12 @@ impl FromStr for ScalarValue {
}
}

impl From<String> for ScalarValue {
fn from(value: String) -> Self {
ScalarValue::Utf8(Some(value))
}
}

impl From<Vec<(&str, ScalarValue)>> for ScalarValue {
fn from(value: Vec<(&str, ScalarValue)>) -> Self {
let (fields, scalars): (SchemaBuilder, Vec<_>) = value
Expand Down Expand Up @@ -4688,6 +4694,16 @@ mod tests {
);
}

#[test]
fn test_scalar_value_from_string() {
let scalar = ScalarValue::from("foo");
assert_eq!(scalar, ScalarValue::Utf8(Some("foo".to_string())));
let scalar = ScalarValue::from("foo".to_string());
assert_eq!(scalar, ScalarValue::Utf8(Some("foo".to_string())));
let scalar = ScalarValue::from_str("foo").unwrap();
assert_eq!(scalar, ScalarValue::Utf8(Some("foo".to_string())));
}

#[test]
fn test_scalar_struct() {
let field_a = Arc::new(Field::new("A", DataType::Int32, false));
Expand Down
30 changes: 26 additions & 4 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
DataFusionError, FileType, FileTypeWriterOptions, SchemaError, UnnestOptions,
DataFusionError, FileType, FileTypeWriterOptions, ParamValues, SchemaError,
UnnestOptions,
};
use datafusion_expr::dml::CopyOptions;

use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_common::{Column, DFSchema};
use datafusion_expr::{
avg, count, is_null, max, median, min, stddev, utils::COUNT_STAR_EXPANSION,
TableProviderFilterPushDown, UNNAMED_TABLE,
Expand Down Expand Up @@ -1227,11 +1228,32 @@ impl DataFrame {
/// ],
/// &results
/// );
/// // Note you can also provide named parameters
/// let results = ctx
/// .sql("SELECT a FROM example WHERE b = $my_param")
/// .await?
/// // replace $my_param with value 2
/// // Note you can also use a HashMap as well
/// .with_param_values(vec![
/// ("my_param", ScalarValue::from(2i64))
/// ])?
/// .collect()
/// .await?;
/// assert_batches_eq!(
/// &[
/// "+---+",
/// "| a |",
/// "+---+",
/// "| 1 |",
/// "+---+",
/// ],
/// &results
/// );
/// # Ok(())
/// # }
/// ```
pub fn with_param_values(self, param_values: Vec<ScalarValue>) -> Result<Self> {
let plan = self.plan.with_param_values(param_values)?;
pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
let plan = self.plan.with_param_values(query_values)?;
Ok(Self::new(self.session_state, plan))
}

Expand Down
57 changes: 38 additions & 19 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,43 +66,57 @@ use log::trace;
/// min_values("X") -> None
/// ```
pub trait PruningStatistics {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
/// Return the minimum values for the named column, if known.
///
/// If the minimum value for a particular container is not known, the
/// returned array should have `null` in that row. If the minimum value is
/// not known for any row, return `None`.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn min_values(&self, column: &Column) -> Option<ArrayRef>;

/// return the maximum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows.
/// Return the maximum values for the named column, if known.
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn max_values(&self, column: &Column) -> Option<ArrayRef>;

/// return the number of containers (e.g. row groups) being
/// pruned with these statistics
/// Return the number of containers (e.g. row groups) being
/// pruned with these statistics (the number of rows in each returned array)
fn num_containers(&self) -> usize;

/// return the number of null values for the named column as an
/// Return the number of null values for the named column as an
/// `Option<UInt64Array>`.
///
/// Note: the returned array must contain `num_containers()` rows.
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
}

/// Evaluates filter expressions on statistics, rather than the actual data. If
/// no rows could possibly pass the filter entire containers can be "pruned"
/// (skipped), without reading any actual data, leading to significant
/// Evaluates filter expressions on statistics such as min/max values and null
/// counts, attempting to prove a "container" (e.g. Parquet Row Group) can be
/// skipped without reading the actual data, potentially leading to significant
/// performance improvements.
///
/// [`PruningPredicate`]s are used to prune (avoid scanning) Parquet Row Groups
/// For example, [`PruningPredicate`]s are used to prune Parquet Row Groups
/// based on the min/max values found in the Parquet metadata. If the
/// `PruningPredicate` can guarantee that no rows in the Row Group match the
/// filter, the entire Row Group is skipped during query execution.
///
/// Note that this API is designed to be general, as it works:
/// The `PruningPredicate` API is general, allowing it to be used for pruning
/// other types of containers (e.g. files) based on statistics that may be
/// known from external catalogs (e.g. Delta Lake) or other sources. Thus it
/// supports:
///
/// 1. Arbitrary expressions expressions (including user defined functions)
///
/// 2. Anything that implements the [`PruningStatistics`] trait, not just
/// Parquet metadata, allowing it to be used by other systems to prune entities
/// (e.g. entire files) if the statistics are known via some other source, such
/// as a catalog.
/// 2. Vectorized evaluation (provide more than one set of statistics at a time)
/// so it is suitable for pruning 1000s of containers.
///
/// 3. Anything that implements the [`PruningStatistics`] trait, not just
/// Parquet metadata.
///
/// # Example
///
Expand All @@ -122,6 +136,7 @@ pub trait PruningStatistics {
/// B: true (rows might match x = 5)
/// C: true (rows might match x = 5)
/// ```
///
/// See [`PruningPredicate::try_new`] and [`PruningPredicate::prune`] for more information.
#[derive(Debug, Clone)]
pub struct PruningPredicate {
Expand Down Expand Up @@ -251,8 +266,12 @@ fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
.unwrap_or_default()
}

/// Records for which columns statistics are necessary to evaluate a
/// pruning predicate.
/// Describes which columns statistics are necessary to evaluate a
/// [`PruningPredicate`].
///
/// This structure permits reading and creating the minimum number statistics,
/// which is important since statistics may be non trivial to read (e.g. large
/// strings or when there are 1000s of columns).
///
/// Handles creating references to the min/max statistics
/// for columns as well as recording which statistics are needed
Expand Down
Loading

0 comments on commit 15f4d8b

Please sign in to comment.