Skip to content

Commit

Permalink
Upgrade to DataFusion 14.0.0 (#903)
Browse files Browse the repository at this point in the history
* upgrade to latest datafusion rev

* panic on unexpected value

* remove panic

* fix regression with window functions

* fix regression

* use official release of DataFusion

* update optimizer rules list

* add filter_push_down rule from DataFusion 13

* fix

* add expr simplifier rule but without optimization for rewriting small 'in' expressions

* remove unused imports

* Disable EliminateFilter optimization to unblock regressions

* Use upstream SimplifyExpressions, catch associated KeyError

* Forbid auto-index setting in attempt_predicate_pushdown

* Ignore index in test_predicate_pushdown

* Add dask version check to predicate pushdown tests

* Add TODO for index specification

Co-authored-by: Charles Blackmon-Luca <[email protected]>
  • Loading branch information
andygrove and charlesbluca authored Nov 15, 2022
1 parent c7017a7 commit ab246b0
Show file tree
Hide file tree
Showing 13 changed files with 980 additions and 193 deletions.
313 changes: 225 additions & 88 deletions dask_planner/Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions dask_planner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ edition = "2021"
rust-version = "1.62"

[dependencies]
arrow = { version = "25.0.0", features = ["prettyprint"] }
arrow = { version = "26.0.0", features = ["prettyprint"] }
async-trait = "0.1.58"
datafusion-common = { git = "https://github.com/apache/arrow-datafusion/", rev = "54d2870a56d8d8f914a617a7e2d52e387ef5dba2" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion/", rev = "54d2870a56d8d8f914a617a7e2d52e387ef5dba2" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion/", rev = "54d2870a56d8d8f914a617a7e2d52e387ef5dba2" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion/", rev = "54d2870a56d8d8f914a617a7e2d52e387ef5dba2" }
datafusion-common = "14.0.0"
datafusion-expr = "14.0.0"
datafusion-optimizer = "14.0.0"
datafusion-sql = "14.0.0"
env_logger = "0.9"
log = "^0.4"
mimalloc = { version = "*", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl DaskSQLContext {
Ok(valid) => {
if valid {
optimizer::DaskSqlOptimizer::new(true)
.run_optimizations(existing_plan.original_plan)
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
Expand Down
1 change: 1 addition & 0 deletions dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ impl PyLogicalPlan {
LogicalPlan::CreateCatalogSchema(_create) => "CreateCatalogSchema",
LogicalPlan::CreateCatalog(_create_catalog) => "CreateCatalog",
LogicalPlan::CreateView(_create_view) => "CreateView",
LogicalPlan::SetVariable(_) => "SetVariable",
// Further examine and return the name that is a possible Dask-SQL Extension type
LogicalPlan::Extension(extension) => {
let node = extension.node.as_any();
Expand Down
6 changes: 4 additions & 2 deletions dask_planner/src/sql/logical/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ impl PyJoin {
JoinType::Left => Ok("LEFT".to_string()),
JoinType::Right => Ok("RIGHT".to_string()),
JoinType::Full => Ok("FULL".to_string()),
JoinType::Semi => Ok("SEMI".to_string()),
JoinType::Anti => Ok("ANTI".to_string()),
JoinType::LeftSemi => Ok("LEFTSEMI".to_string()),
JoinType::LeftAnti => Ok("LEFTANTI".to_string()),
JoinType::RightSemi => Ok("RIGHTSEMI".to_string()),
JoinType::RightAnti => Ok("RIGHTANTI".to_string()),
}
}
}
Expand Down
72 changes: 40 additions & 32 deletions dask_planner/src/sql/logical/window.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use datafusion_common::ScalarValue;
use datafusion_expr::{logical_plan::Window, Expr, LogicalPlan, WindowFrame, WindowFrameBound};
use pyo3::prelude::*;

use crate::{
error::DaskPlannerError,
expression::{py_expr_list, PyExpr},
sql::exceptions::py_type_err,
};
Expand Down Expand Up @@ -58,57 +60,45 @@ impl PyWindow {
/// Returns order by columns in a window function expression
#[pyo3(name = "getSortExprs")]
pub fn get_sort_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
match expr.expr {
match expr.expr.unalias() {
Expr::WindowFunction { order_by, .. } => py_expr_list(&self.window.input, &order_by),
_ => Err(py_type_err(format!(
"Provided Expr {:?} is not a WindowFunction type",
expr
))),
other => Err(not_window_function_err(other)),
}
}

/// Return partition by columns in a window function expression
#[pyo3(name = "getPartitionExprs")]
pub fn get_partition_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
match expr.expr {
match expr.expr.unalias() {
Expr::WindowFunction { partition_by, .. } => {
py_expr_list(&self.window.input, &partition_by)
}
_ => Err(py_type_err(format!(
"Provided Expr {:?} is not a WindowFunction type",
expr
))),
other => Err(not_window_function_err(other)),
}
}

/// Return input args for window function
#[pyo3(name = "getArgs")]
pub fn get_args(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
match expr.expr {
match expr.expr.unalias() {
Expr::WindowFunction { args, .. } => py_expr_list(&self.window.input, &args),
_ => Err(py_type_err(format!(
"Provided Expr {:?} is not a WindowFunction type",
expr
))),
other => Err(not_window_function_err(other)),
}
}

/// Return window function name
#[pyo3(name = "getWindowFuncName")]
pub fn window_func_name(&self, expr: PyExpr) -> PyResult<String> {
match expr.expr {
match expr.expr.unalias() {
Expr::WindowFunction { fun, .. } => Ok(fun.to_string()),
_ => Err(py_type_err(format!(
"Provided Expr {:?} is not a WindowFunction type",
expr
))),
other => Err(not_window_function_err(other)),
}
}

/// Returns a Pywindow frame for a given window function expression
#[pyo3(name = "getWindowFrame")]
pub fn get_window_frame(&self, expr: PyExpr) -> Option<PyWindowFrame> {
match expr.expr {
match expr.expr.unalias() {
Expr::WindowFunction { window_frame, .. } => {
window_frame.map(|window_frame| window_frame.into())
}
Expand All @@ -117,6 +107,14 @@ impl PyWindow {
}
}

fn not_window_function_err(expr: Expr) -> PyErr {
py_type_err(format!(
"Provided {} Expr {:?} is not a WindowFunction type",
expr.variant_name(),
expr
))
}

#[pymethods]
impl PyWindowFrame {
/// Returns the window frame units for the bounds
Expand All @@ -127,12 +125,12 @@ impl PyWindowFrame {
/// Returns starting bound
#[pyo3(name = "getLowerBound")]
pub fn get_lower_bound(&self) -> PyResult<PyWindowFrameBound> {
Ok(self.window_frame.start_bound.into())
Ok(self.window_frame.start_bound.clone().into())
}
/// Returns end bound
#[pyo3(name = "getUpperBound")]
pub fn get_upper_bound(&self) -> PyResult<PyWindowFrameBound> {
Ok(self.window_frame.end_bound.into())
Ok(self.window_frame.end_bound.clone().into())
}
}

Expand All @@ -147,28 +145,38 @@ impl PyWindowFrameBound {
/// Returns if the frame bound is preceding
#[pyo3(name = "isPreceding")]
pub fn is_preceding(&self) -> bool {
matches!(self.frame_bound, WindowFrameBound::Preceding(..))
matches!(self.frame_bound, WindowFrameBound::Preceding(_))
}

/// Returns if the frame bound is following
#[pyo3(name = "isFollowing")]
pub fn is_following(&self) -> bool {
matches!(self.frame_bound, WindowFrameBound::Following(..))
matches!(self.frame_bound, WindowFrameBound::Following(_))
}
/// Returns the offset of the window frame
#[pyo3(name = "getOffset")]
pub fn get_offset(&self) -> Option<u64> {
pub fn get_offset(&self) -> PyResult<Option<u64>> {
match self.frame_bound {
WindowFrameBound::Preceding(val) | WindowFrameBound::Following(val) => val,
WindowFrameBound::CurrentRow => None,
WindowFrameBound::Preceding(ScalarValue::UInt64(val))
| WindowFrameBound::Following(ScalarValue::UInt64(val)) => Ok(val),
WindowFrameBound::Preceding(ref x) | WindowFrameBound::Following(ref x) => Err(
DaskPlannerError::Internal(format!("Unexpected window frame bound: {:?}", x))
.into(),
),
WindowFrameBound::CurrentRow => Ok(None),
}
}
/// Returns if the frame bound is unbounded
#[pyo3(name = "isUnbounded")]
pub fn is_unbounded(&self) -> bool {
match self.frame_bound {
WindowFrameBound::Preceding(val) | WindowFrameBound::Following(val) => val.is_none(),
WindowFrameBound::CurrentRow => false,
pub fn is_unbounded(&self) -> PyResult<bool> {
match &self.frame_bound {
WindowFrameBound::Preceding(ScalarValue::UInt64(v))
| WindowFrameBound::Following(ScalarValue::UInt64(v)) => Ok(v.is_none()),
WindowFrameBound::Preceding(ref x) | WindowFrameBound::Following(ref x) => Err(
DaskPlannerError::Internal(format!("Unexpected window frame bound: {:?}", x))
.into(),
),
WindowFrameBound::CurrentRow => Ok(false),
}
}
}
109 changes: 50 additions & 59 deletions dask_planner/src/sql/optimizer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::sync::Arc;

use datafusion_common::DataFusionError;
use datafusion_expr::LogicalPlan;
use datafusion_optimizer::{
common_subexpr_eliminate::CommonSubexprEliminate,
decorrelate_where_exists::DecorrelateWhereExists,
decorrelate_where_in::DecorrelateWhereIn,
eliminate_filter::EliminateFilter,
// TODO: need to handle EmptyRelation for GPU cases
// eliminate_filter::EliminateFilter,
eliminate_limit::EliminateLimit,
filter_null_join_keys::FilterNullJoinKeys,
filter_push_down::FilterPushDown,
inline_table_scan::InlineTableScan,
limit_push_down::LimitPushDown,
optimizer::OptimizerRule,
optimizer::{Optimizer, OptimizerRule},
projection_push_down::ProjectionPushDown,
reduce_cross_join::ReduceCrossJoin,
reduce_outer_join::ReduceOuterJoin,
Expand All @@ -26,85 +29,73 @@ use log::trace;
mod eliminate_agg_distinct;
use eliminate_agg_distinct::EliminateAggDistinct;

mod filter_push_down;
use filter_push_down::FilterPushDown;

/// Houses the optimization logic for Dask-SQL. This optimization controls the optimizations
/// and their ordering in regards to their impact on the underlying `LogicalPlan` instance
pub struct DaskSqlOptimizer {
skip_failing_rules: bool,
optimizations: Vec<Box<dyn OptimizerRule + Send + Sync>>,
optimizer: Optimizer,
}

impl DaskSqlOptimizer {
/// Creates a new instance of the DaskSqlOptimizer with all the DataFusion desired
/// optimizers as well as any custom `OptimizerRule` trait impls that might be desired.
pub fn new(skip_failing_rules: bool) -> Self {
let rules: Vec<Box<dyn OptimizerRule + Send + Sync>> = vec![
Box::new(TypeCoercion::new()),
Box::new(SimplifyExpressions::new()),
Box::new(UnwrapCastInComparison::new()),
Box::new(DecorrelateWhereExists::new()),
Box::new(DecorrelateWhereIn::new()),
Box::new(ScalarSubqueryToJoin::new()),
Box::new(SubqueryFilterToJoin::new()),
let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeCoercion::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(DecorrelateWhereExists::new()),
Arc::new(DecorrelateWhereIn::new()),
Arc::new(ScalarSubqueryToJoin::new()),
Arc::new(SubqueryFilterToJoin::new()),
// simplify expressions does not simplify expressions in subqueries, so we
// run it again after running the optimizations that potentially converted
// subqueries to joins
Box::new(SimplifyExpressions::new()),
Box::new(EliminateFilter::new()),
Box::new(ReduceCrossJoin::new()),
Box::new(CommonSubexprEliminate::new()),
Box::new(EliminateLimit::new()),
Box::new(RewriteDisjunctivePredicate::new()),
Box::new(FilterNullJoinKeys::default()),
Box::new(ReduceOuterJoin::new()),
Box::new(FilterPushDown::new()),
Box::new(LimitPushDown::new()),
// Box::new(SingleDistinctToGroupBy::new()),
Arc::new(SimplifyExpressions::new()),
// TODO: need to handle EmptyRelation for GPU cases
// Arc::new(EliminateFilter::new()),
Arc::new(ReduceCrossJoin::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Arc::new(RewriteDisjunctivePredicate::new()),
Arc::new(FilterNullJoinKeys::default()),
Arc::new(ReduceOuterJoin::new()),
Arc::new(FilterPushDown::new()),
Arc::new(LimitPushDown::new()),
// Dask-SQL specific optimizations
Box::new(EliminateAggDistinct::new()),
Arc::new(EliminateAggDistinct::new()),
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
Box::new(SimplifyExpressions::new()),
Box::new(UnwrapCastInComparison::new()),
Box::new(CommonSubexprEliminate::new()),
Box::new(ProjectionPushDown::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(ProjectionPushDown::new()),
];

Self {
skip_failing_rules,
optimizations: rules,
optimizer: Optimizer::with_rules(rules),
}
}

/// Iteratoes through the configured `OptimizerRule`(s) to transform the input `LogicalPlan`
/// to its final optimized form
pub(crate) fn run_optimizations(
&self,
plan: LogicalPlan,
) -> Result<LogicalPlan, DataFusionError> {
let mut resulting_plan: LogicalPlan = plan;
for optimization in &self.optimizations {
match optimization.optimize(&resulting_plan, &mut OptimizerConfig::new()) {
Ok(optimized_plan) => {
trace!(
"== AFTER APPLYING RULE {} ==\n{}",
optimization.name(),
optimized_plan.display_indent()
);
resulting_plan = optimized_plan
}
Err(e) => {
if self.skip_failing_rules {
println!(
"Skipping optimizer rule {} due to unexpected error: {}",
optimization.name(),
e
);
} else {
return Err(e);
}
}
}
}
Ok(resulting_plan)
pub(crate) fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan, DataFusionError> {
let mut config =
OptimizerConfig::default().with_skip_failing_rules(self.skip_failing_rules);
self.optimizer.optimize(&plan, &mut config, Self::observe)
}

fn observe(optimized_plan: &LogicalPlan, optimization: &dyn OptimizerRule) {
trace!(
"== AFTER APPLYING RULE {} ==\n{}\n",
optimization.name(),
optimized_plan.display_indent()
);
}
}

Expand Down Expand Up @@ -158,7 +149,7 @@ mod tests {

// optimize the logical plan
let optimizer = DaskSqlOptimizer::new(false);
optimizer.run_optimizations(plan)
optimizer.optimize(plan)
}

struct MySchemaProvider {}
Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/sql/optimizer/eliminate_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ mod tests {
fn assert_fully_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let optimizer = DaskSqlOptimizer::new(false);
let optimized_plan = optimizer
.run_optimizations(plan.clone())
.optimize(plan.clone())
.expect("failed to optimize plan");
let formatted_plan = format!("{}", optimized_plan.display_indent());
assert_eq!(expected, formatted_plan);
Expand Down
Loading

0 comments on commit ab246b0

Please sign in to comment.