Skip to content

Commit

Permalink
Merge commit 'd091b55be6a4ce552023ef162b5d081136d3ff6d' into chunchun…
Browse files Browse the repository at this point in the history
…/12-08-2023
  • Loading branch information
appletreeisyellow committed Jan 8, 2024
2 parents 5e58967 + d091b55 commit 3061242
Show file tree
Hide file tree
Showing 63 changed files with 3,086 additions and 1,263 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ cargo run --example csv_sql
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Use the `Expr` construction and simplification API
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and anaylze `Expr`s
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
Expand Down
144 changes: 134 additions & 10 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,95 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{BooleanArray, Int32Array};
use arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::error::Result;
use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_expr::{
analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr,
};
use datafusion::prelude::*;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::Operator;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
use std::sync::Arc;

/// This example demonstrates the DataFusion [`Expr`] API.
///
/// DataFusion comes with a powerful and extensive system for
/// representing and manipulating expressions such as `A + 5` and `X
/// IN ('foo', 'bar', 'baz')` and many other constructs.
/// IN ('foo', 'bar', 'baz')`.
///
/// In addition to building and manipulating [`Expr`]s, DataFusion
/// also comes with APIs for evaluation, simplification, and analysis.
///
/// The code in this example shows how to:
/// 1. Create [`Exprs`] using different APIs: [`main`]`
/// 2. Evaluate [`Exprs`] against data: [`evaluate_demo`]
/// 3. Simplify expressions: [`simplify_demo`]
/// 4. Analyze predicates for boundary ranges: [`range_analysis_demo`]
#[tokio::main]
async fn main() -> Result<()> {
// The easiest way to do create expressions is to use the
// "fluent"-style API, like this:
// "fluent"-style API:
let expr = col("a") + lit(5);

// this creates the same expression as the following though with
// much less code,
// The same same expression can be created directly, with much more code:
let expr2 = Expr::BinaryExpr(BinaryExpr::new(
Box::new(col("a")),
Operator::Plus,
Box::new(Expr::Literal(ScalarValue::Int32(Some(5)))),
));
assert_eq!(expr, expr2);

// See how to evaluate expressions
evaluate_demo()?;

// See how to simplify expressions
simplify_demo()?;

// See how to analyze ranges in expressions
range_analysis_demo()?;

Ok(())
}

/// DataFusion can also evaluate arbitrary expressions on Arrow arrays.
fn evaluate_demo() -> Result<()> {
// For example, let's say you have some integers in an array
let batch = RecordBatch::try_from_iter([(
"a",
Arc::new(Int32Array::from(vec![4, 5, 6, 7, 8, 7, 4])) as _,
)])?;

// If you want to find all rows where the expression `a < 5 OR a = 8` is true
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));

// First, you make a "physical expression" from the logical `Expr`
let physical_expr = physical_expr(&batch.schema(), expr)?;

// Now, you can evaluate the expression against the RecordBatch
let result = physical_expr.evaluate(&batch)?;

// The result contain an array that is true only for where `a < 5 OR a = 8`
let expected_result = Arc::new(BooleanArray::from(vec![
true, false, false, false, true, false, true,
])) as _;
assert!(
matches!(&result, ColumnarValue::Array(r) if r == &expected_result),
"result: {:?}",
result
);

Ok(())
}

/// In addition to easy construction, DataFusion exposes APIs for
/// working with and simplifying such expressions that call into the
/// same powerful and extensive implementation used for the query
/// engine.
/// In addition to easy construction, DataFusion exposes APIs for simplifying
/// such expression so they are more efficient to evaluate. This code is also
/// used by the query engine to optimize queries.
fn simplify_demo() -> Result<()> {
// For example, lets say you have has created an expression such
// ts = to_timestamp("2020-09-08T12:00:00+00:00")
Expand Down Expand Up @@ -94,7 +145,7 @@ fn simplify_demo() -> Result<()> {
make_field("b", DataType::Boolean),
])
.to_dfschema_ref()?;
let context = SimplifyContext::new(&props).with_schema(schema);
let context = SimplifyContext::new(&props).with_schema(schema.clone());
let simplifier = ExprSimplifier::new(context);

// basic arithmetic simplification
Expand All @@ -120,6 +171,64 @@ fn simplify_demo() -> Result<()> {
col("i").lt(lit(10))
);

// String --> Date simplification
// `cast('2020-09-01' as date)` --> 18500
assert_eq!(
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?,
lit(ScalarValue::Date32(Some(18506)))
);

Ok(())
}

/// DataFusion also has APIs for analyzing predicates (boolean expressions) to
/// determine any ranges restrictions on the inputs required for the predicate
/// evaluate to true.
fn range_analysis_demo() -> Result<()> {
// For example, let's say you are interested in finding data for all days
// in the month of September, 2020
let september_1 = ScalarValue::Date32(Some(18506)); // 2020-09-01
let october_1 = ScalarValue::Date32(Some(18536)); // 2020-10-01

// The predicate to find all such days could be
// `date > '2020-09-01' AND date < '2020-10-01'`
let expr = col("date")
.gt(lit(september_1.clone()))
.and(col("date").lt(lit(october_1.clone())));

// Using the analysis API, DataFusion can determine that the value of `date`
// must be in the range `['2020-09-01', '2020-10-01']`. If your data is
// organized in files according to day, this information permits skipping
// entire files without reading them.
//
// While this simple example could be handled with a special case, the
// DataFusion API handles arbitrary expressions (so for example, you don't
// have to handle the case where the predicate clauses are reversed such as
// `date < '2020-10-01' AND date > '2020-09-01'`

// As always, we need to tell DataFusion the type of column "date"
let schema = Schema::new(vec![make_field("date", DataType::Date32)]);

// You can provide DataFusion any known boundaries on the values of `date`
// (for example, maybe you know you only have data up to `2020-09-15`), but
// in this case, let's say we don't know any boundaries beforehand so we use
// `try_new_unknown`
let boundaries = ExprBoundaries::try_new_unbounded(&schema)?;

// Now, we invoke the analysis code to perform the range analysis
let physical_expr = physical_expr(&schema, expr)?;
let analysis_result =
analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?;

// The results of the analysis is an range, encoded as an `Interval`, for
// each column in the schema, that must be true in order for the predicate
// to be true.
//
// In this case, we can see that, as expected, `analyze` has figured out
// that in this case, `date` must be in the range `['2020-09-01', '2020-10-01']`
let expected_range = Interval::try_new(september_1, october_1)?;
assert_eq!(analysis_result.boundaries[0].interval, expected_range);

Ok(())
}

Expand All @@ -132,3 +241,18 @@ fn make_ts_field(name: &str) -> Field {
let tz = None;
make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
}

/// Build a physical expression from a logical one, after applying simplification and type coercion
pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr>> {
let df_schema = schema.clone().to_dfschema_ref()?;

// Simplify
let props = ExecutionProps::new();
let simplifier =
ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));

// apply type coercion here to ensure types match
let expr = simplifier.coerce(expr, df_schema.clone())?;

create_physical_expr(&expr, df_schema.as_ref(), schema, &props)
}
13 changes: 10 additions & 3 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,16 @@ impl DFSchema {
pub fn with_functional_dependencies(
mut self,
functional_dependencies: FunctionalDependencies,
) -> Self {
self.functional_dependencies = functional_dependencies;
self
) -> Result<Self> {
if functional_dependencies.is_valid(self.fields.len()) {
self.functional_dependencies = functional_dependencies;
Ok(self)
} else {
_plan_err!(
"Invalid functional dependency: {:?}",
functional_dependencies
)
}
}

/// Create a new schema that contains the fields from this schema followed by the fields
Expand Down
117 changes: 103 additions & 14 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::ops::Deref;
use std::vec::IntoIter;

use crate::error::_plan_err;
use crate::utils::{merge_and_order_indices, set_difference};
use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};

use sqlparser::ast::TableConstraint;
Expand Down Expand Up @@ -271,6 +272,29 @@ impl FunctionalDependencies {
self.deps.extend(other.deps);
}

/// Sanity checks if functional dependencies are valid. For example, if
/// there are 10 fields, we cannot receive any index further than 9.
pub fn is_valid(&self, n_field: usize) -> bool {
self.deps.iter().all(
|FunctionalDependence {
source_indices,
target_indices,
..
}| {
source_indices
.iter()
.max()
.map(|&max_index| max_index < n_field)
.unwrap_or(true)
&& target_indices
.iter()
.max()
.map(|&max_index| max_index < n_field)
.unwrap_or(true)
},
)
}

/// Adds the `offset` value to `source_indices` and `target_indices` for
/// each functional dependency.
pub fn add_offset(&mut self, offset: usize) {
Expand Down Expand Up @@ -442,44 +466,56 @@ pub fn aggregate_functional_dependencies(
} in &func_dependencies.deps
{
// Keep source indices in a `HashSet` to prevent duplicate entries:
let mut new_source_indices = HashSet::new();
let mut new_source_indices = vec![];
let mut new_source_field_names = vec![];
let source_field_names = source_indices
.iter()
.map(|&idx| aggr_input_fields[idx].qualified_name())
.collect::<Vec<_>>();

for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() {
// When one of the input determinant expressions matches with
// the GROUP BY expression, add the index of the GROUP BY
// expression as a new determinant key:
if source_field_names.contains(group_by_expr_name) {
new_source_indices.insert(idx);
new_source_indices.push(idx);
new_source_field_names.push(group_by_expr_name.clone());
}
}
let existing_target_indices =
get_target_functional_dependencies(aggr_input_schema, group_by_expr_names);
let new_target_indices = get_target_functional_dependencies(
aggr_input_schema,
&new_source_field_names,
);
let mode = if existing_target_indices == new_target_indices
&& new_target_indices.is_some()
{
// If dependency covers all GROUP BY expressions, mode will be `Single`:
Dependency::Single
} else {
// Otherwise, existing mode is preserved:
*mode
};
// All of the composite indices occur in the GROUP BY expression:
if new_source_indices.len() == source_indices.len() {
aggregate_func_dependencies.push(
FunctionalDependence::new(
new_source_indices.into_iter().collect(),
new_source_indices,
target_indices.clone(),
*nullable,
)
// input uniqueness stays the same when GROUP BY matches with input functional dependence determinants
.with_mode(*mode),
.with_mode(mode),
);
}
}

// If we have a single GROUP BY key, we can guarantee uniqueness after
// aggregation:
if group_by_expr_names.len() == 1 {
// If `source_indices` contain 0, delete this functional dependency
// as it will be added anyway with mode `Dependency::Single`:
if let Some(idx) = aggregate_func_dependencies
.iter()
.position(|item| item.source_indices.contains(&0))
{
// Delete the functional dependency that contains zeroth idx:
aggregate_func_dependencies.remove(idx);
}
aggregate_func_dependencies.retain(|item| !item.source_indices.contains(&0));
// Add a new functional dependency associated with the whole table:
aggregate_func_dependencies.push(
// Use nullable property of the group by expression
Expand Down Expand Up @@ -527,8 +563,61 @@ pub fn get_target_functional_dependencies(
combined_target_indices.extend(target_indices.iter());
}
}
(!combined_target_indices.is_empty())
.then_some(combined_target_indices.iter().cloned().collect::<Vec<_>>())
(!combined_target_indices.is_empty()).then_some({
let mut result = combined_target_indices.into_iter().collect::<Vec<_>>();
result.sort();
result
})
}

/// Returns indices for the minimal subset of GROUP BY expressions that are
/// functionally equivalent to the original set of GROUP BY expressions.
pub fn get_required_group_by_exprs_indices(
schema: &DFSchema,
group_by_expr_names: &[String],
) -> Option<Vec<usize>> {
let dependencies = schema.functional_dependencies();
let field_names = schema
.fields()
.iter()
.map(|item| item.qualified_name())
.collect::<Vec<_>>();
let mut groupby_expr_indices = group_by_expr_names
.iter()
.map(|group_by_expr_name| {
field_names
.iter()
.position(|field_name| field_name == group_by_expr_name)
})
.collect::<Option<Vec<_>>>()?;

groupby_expr_indices.sort();
for FunctionalDependence {
source_indices,
target_indices,
..
} in &dependencies.deps
{
if source_indices
.iter()
.all(|source_idx| groupby_expr_indices.contains(source_idx))
{
// If all source indices are among GROUP BY expression indices, we
// can remove target indices from GROUP BY expression indices and
// use source indices instead.
groupby_expr_indices = set_difference(&groupby_expr_indices, target_indices);
groupby_expr_indices =
merge_and_order_indices(groupby_expr_indices, source_indices);
}
}
groupby_expr_indices
.iter()
.map(|idx| {
group_by_expr_names
.iter()
.position(|name| &field_names[*idx] == name)
})
.collect()
}

/// Updates entries inside the `entries` vector with their corresponding
Expand Down
Loading

0 comments on commit 3061242

Please sign in to comment.