Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to DataFusion 14.0.0 #903

Merged
merged 21 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9a61598
upgrade to latest datafusion rev
andygrove Nov 3, 2022
c2f2d1c
panic on unexpected value
andygrove Nov 3, 2022
4e9339d
remove panic
andygrove Nov 3, 2022
1acfa69
fix regression with window functions
andygrove Nov 3, 2022
193b25d
fix regression
andygrove Nov 3, 2022
b3667ed
use official release of DataFusion
andygrove Nov 8, 2022
cf69b86
Merge branch 'main' into datafusion-14
andygrove Nov 8, 2022
60551a9
update optimizer rules list
andygrove Nov 8, 2022
ce2db78
Merge branch 'datafusion-14' of github.com:andygrove/dask-sql into da…
andygrove Nov 8, 2022
0a4733e
add filter_push_down rule from DataFusion 13
andygrove Nov 8, 2022
355a385
Merge remote-tracking branch 'upstream/main' into datafusion-14
andygrove Nov 8, 2022
0b06335
fix
andygrove Nov 8, 2022
2d1f8a4
add expr simplifier rule but without optimization for rewriting small…
andygrove Nov 10, 2022
07e171e
remove unused imports
andygrove Nov 10, 2022
97a1ea6
Disable EliminateFilter optimization to unblock regressions
charlesbluca Nov 14, 2022
3ad4d7a
Use upstream SimplifyExpressions, catch associated KeyError
charlesbluca Nov 14, 2022
459edf1
Forbid auto-index setting in attempt_predicate_pushdown
charlesbluca Nov 14, 2022
3a2e68c
Ignore index in test_predicate_pushdown
charlesbluca Nov 14, 2022
63b4e5d
Add dask version check to predicate pushdown tests
charlesbluca Nov 15, 2022
65c5669
Merge remote-tracking branch 'origin/main' into datafusion-14
charlesbluca Nov 15, 2022
b9dfc08
Add TODO for index specification
charlesbluca Nov 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 225 additions & 88 deletions dask_planner/Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions dask_planner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ edition = "2021"
rust-version = "1.62"

[dependencies]
arrow = { version = "25.0.0", features = ["prettyprint"] }
arrow = { version = "26.0.0", features = ["prettyprint"] }
async-trait = "0.1.58"
datafusion-common = { git = "https://github.com/apache/arrow-datafusion/", rev = "54d2870a56d8d8f914a617a7e2d52e387ef5dba2" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion/", rev = "54d2870a56d8d8f914a617a7e2d52e387ef5dba2" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion/", rev = "54d2870a56d8d8f914a617a7e2d52e387ef5dba2" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion/", rev = "54d2870a56d8d8f914a617a7e2d52e387ef5dba2" }
datafusion-common = "14.0.0"
datafusion-expr = "14.0.0"
datafusion-optimizer = "14.0.0"
datafusion-sql = "14.0.0"
env_logger = "0.9"
log = "^0.4"
mimalloc = { version = "*", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl DaskSQLContext {
Ok(valid) => {
if valid {
optimizer::DaskSqlOptimizer::new(true)
.run_optimizations(existing_plan.original_plan)
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
Expand Down
1 change: 1 addition & 0 deletions dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ impl PyLogicalPlan {
LogicalPlan::CreateCatalogSchema(_create) => "CreateCatalogSchema",
LogicalPlan::CreateCatalog(_create_catalog) => "CreateCatalog",
LogicalPlan::CreateView(_create_view) => "CreateView",
LogicalPlan::SetVariable(_) => "SetVariable",
// Further examine and return the name that is a possible Dask-SQL Extension type
LogicalPlan::Extension(extension) => {
let node = extension.node.as_any();
Expand Down
6 changes: 4 additions & 2 deletions dask_planner/src/sql/logical/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ impl PyJoin {
JoinType::Left => Ok("LEFT".to_string()),
JoinType::Right => Ok("RIGHT".to_string()),
JoinType::Full => Ok("FULL".to_string()),
JoinType::Semi => Ok("SEMI".to_string()),
JoinType::Anti => Ok("ANTI".to_string()),
JoinType::LeftSemi => Ok("LEFTSEMI".to_string()),
JoinType::LeftAnti => Ok("LEFTANTI".to_string()),
JoinType::RightSemi => Ok("RIGHTSEMI".to_string()),
JoinType::RightAnti => Ok("RIGHTANTI".to_string()),
}
}
}
Expand Down
72 changes: 40 additions & 32 deletions dask_planner/src/sql/logical/window.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use datafusion_common::ScalarValue;
use datafusion_expr::{logical_plan::Window, Expr, LogicalPlan, WindowFrame, WindowFrameBound};
use pyo3::prelude::*;

use crate::{
error::DaskPlannerError,
expression::{py_expr_list, PyExpr},
sql::exceptions::py_type_err,
};
Expand Down Expand Up @@ -58,57 +60,45 @@ impl PyWindow {
/// Returns order by columns in a window function expression
#[pyo3(name = "getSortExprs")]
pub fn get_sort_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
match expr.expr {
match expr.expr.unalias() {
Expr::WindowFunction { order_by, .. } => py_expr_list(&self.window.input, &order_by),
_ => Err(py_type_err(format!(
"Provided Expr {:?} is not a WindowFunction type",
expr
))),
other => Err(not_window_function_err(other)),
}
}

/// Return partition by columns in a window function expression
#[pyo3(name = "getPartitionExprs")]
pub fn get_partition_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
match expr.expr {
match expr.expr.unalias() {
Expr::WindowFunction { partition_by, .. } => {
py_expr_list(&self.window.input, &partition_by)
}
_ => Err(py_type_err(format!(
"Provided Expr {:?} is not a WindowFunction type",
expr
))),
other => Err(not_window_function_err(other)),
}
}

/// Return input args for window function
#[pyo3(name = "getArgs")]
pub fn get_args(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
match expr.expr {
match expr.expr.unalias() {
Expr::WindowFunction { args, .. } => py_expr_list(&self.window.input, &args),
_ => Err(py_type_err(format!(
"Provided Expr {:?} is not a WindowFunction type",
expr
))),
other => Err(not_window_function_err(other)),
}
}

/// Return window function name
#[pyo3(name = "getWindowFuncName")]
pub fn window_func_name(&self, expr: PyExpr) -> PyResult<String> {
match expr.expr {
match expr.expr.unalias() {
Expr::WindowFunction { fun, .. } => Ok(fun.to_string()),
_ => Err(py_type_err(format!(
"Provided Expr {:?} is not a WindowFunction type",
expr
))),
other => Err(not_window_function_err(other)),
}
}

/// Returns a Pywindow frame for a given window function expression
#[pyo3(name = "getWindowFrame")]
pub fn get_window_frame(&self, expr: PyExpr) -> Option<PyWindowFrame> {
match expr.expr {
match expr.expr.unalias() {
Expr::WindowFunction { window_frame, .. } => {
window_frame.map(|window_frame| window_frame.into())
}
Expand All @@ -117,6 +107,14 @@ impl PyWindow {
}
}

fn not_window_function_err(expr: Expr) -> PyErr {
py_type_err(format!(
"Provided {} Expr {:?} is not a WindowFunction type",
expr.variant_name(),
expr
))
}

#[pymethods]
impl PyWindowFrame {
/// Returns the window frame units for the bounds
Expand All @@ -127,12 +125,12 @@ impl PyWindowFrame {
/// Returns starting bound
#[pyo3(name = "getLowerBound")]
pub fn get_lower_bound(&self) -> PyResult<PyWindowFrameBound> {
Ok(self.window_frame.start_bound.into())
Ok(self.window_frame.start_bound.clone().into())
}
/// Returns end bound
#[pyo3(name = "getUpperBound")]
pub fn get_upper_bound(&self) -> PyResult<PyWindowFrameBound> {
Ok(self.window_frame.end_bound.into())
Ok(self.window_frame.end_bound.clone().into())
}
}

Expand All @@ -147,28 +145,38 @@ impl PyWindowFrameBound {
/// Returns if the frame bound is preceding
#[pyo3(name = "isPreceding")]
pub fn is_preceding(&self) -> bool {
matches!(self.frame_bound, WindowFrameBound::Preceding(..))
matches!(self.frame_bound, WindowFrameBound::Preceding(_))
}

/// Returns if the frame bound is following
#[pyo3(name = "isFollowing")]
pub fn is_following(&self) -> bool {
matches!(self.frame_bound, WindowFrameBound::Following(..))
matches!(self.frame_bound, WindowFrameBound::Following(_))
}
/// Returns the offset of the window frame
#[pyo3(name = "getOffset")]
pub fn get_offset(&self) -> Option<u64> {
pub fn get_offset(&self) -> PyResult<Option<u64>> {
match self.frame_bound {
WindowFrameBound::Preceding(val) | WindowFrameBound::Following(val) => val,
WindowFrameBound::CurrentRow => None,
WindowFrameBound::Preceding(ScalarValue::UInt64(val))
| WindowFrameBound::Following(ScalarValue::UInt64(val)) => Ok(val),
WindowFrameBound::Preceding(ref x) | WindowFrameBound::Following(ref x) => Err(
DaskPlannerError::Internal(format!("Unexpected window frame bound: {:?}", x))
.into(),
),
WindowFrameBound::CurrentRow => Ok(None),
}
}
/// Returns if the frame bound is unbounded
#[pyo3(name = "isUnbounded")]
pub fn is_unbounded(&self) -> bool {
match self.frame_bound {
WindowFrameBound::Preceding(val) | WindowFrameBound::Following(val) => val.is_none(),
WindowFrameBound::CurrentRow => false,
pub fn is_unbounded(&self) -> PyResult<bool> {
match &self.frame_bound {
WindowFrameBound::Preceding(ScalarValue::UInt64(v))
| WindowFrameBound::Following(ScalarValue::UInt64(v)) => Ok(v.is_none()),
WindowFrameBound::Preceding(ref x) | WindowFrameBound::Following(ref x) => Err(
DaskPlannerError::Internal(format!("Unexpected window frame bound: {:?}", x))
.into(),
),
WindowFrameBound::CurrentRow => Ok(false),
}
}
}
109 changes: 50 additions & 59 deletions dask_planner/src/sql/optimizer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::sync::Arc;

use datafusion_common::DataFusionError;
use datafusion_expr::LogicalPlan;
use datafusion_optimizer::{
common_subexpr_eliminate::CommonSubexprEliminate,
decorrelate_where_exists::DecorrelateWhereExists,
decorrelate_where_in::DecorrelateWhereIn,
eliminate_filter::EliminateFilter,
// TODO: need to handle EmptyRelation for GPU cases
// eliminate_filter::EliminateFilter,
eliminate_limit::EliminateLimit,
filter_null_join_keys::FilterNullJoinKeys,
filter_push_down::FilterPushDown,
inline_table_scan::InlineTableScan,
limit_push_down::LimitPushDown,
optimizer::OptimizerRule,
optimizer::{Optimizer, OptimizerRule},
projection_push_down::ProjectionPushDown,
reduce_cross_join::ReduceCrossJoin,
reduce_outer_join::ReduceOuterJoin,
Expand All @@ -26,85 +29,73 @@ use log::trace;
mod eliminate_agg_distinct;
use eliminate_agg_distinct::EliminateAggDistinct;

mod filter_push_down;
use filter_push_down::FilterPushDown;

/// Houses the optimization logic for Dask-SQL. This optimization controls the optimizations
/// and their ordering in regards to their impact on the underlying `LogicalPlan` instance
pub struct DaskSqlOptimizer {
skip_failing_rules: bool,
optimizations: Vec<Box<dyn OptimizerRule + Send + Sync>>,
optimizer: Optimizer,
}

impl DaskSqlOptimizer {
/// Creates a new instance of the DaskSqlOptimizer with all the DataFusion desired
/// optimizers as well as any custom `OptimizerRule` trait impls that might be desired.
pub fn new(skip_failing_rules: bool) -> Self {
let rules: Vec<Box<dyn OptimizerRule + Send + Sync>> = vec![
Box::new(TypeCoercion::new()),
Box::new(SimplifyExpressions::new()),
Box::new(UnwrapCastInComparison::new()),
Box::new(DecorrelateWhereExists::new()),
Box::new(DecorrelateWhereIn::new()),
Box::new(ScalarSubqueryToJoin::new()),
Box::new(SubqueryFilterToJoin::new()),
let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeCoercion::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(DecorrelateWhereExists::new()),
Arc::new(DecorrelateWhereIn::new()),
Arc::new(ScalarSubqueryToJoin::new()),
Arc::new(SubqueryFilterToJoin::new()),
// simplify expressions does not simplify expressions in subqueries, so we
// run it again after running the optimizations that potentially converted
// subqueries to joins
Box::new(SimplifyExpressions::new()),
Box::new(EliminateFilter::new()),
Box::new(ReduceCrossJoin::new()),
Box::new(CommonSubexprEliminate::new()),
Box::new(EliminateLimit::new()),
Box::new(RewriteDisjunctivePredicate::new()),
Box::new(FilterNullJoinKeys::default()),
Box::new(ReduceOuterJoin::new()),
Box::new(FilterPushDown::new()),
Box::new(LimitPushDown::new()),
// Box::new(SingleDistinctToGroupBy::new()),
Arc::new(SimplifyExpressions::new()),
// TODO: need to handle EmptyRelation for GPU cases
// Arc::new(EliminateFilter::new()),
Arc::new(ReduceCrossJoin::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Arc::new(RewriteDisjunctivePredicate::new()),
Arc::new(FilterNullJoinKeys::default()),
Arc::new(ReduceOuterJoin::new()),
Arc::new(FilterPushDown::new()),
Arc::new(LimitPushDown::new()),
// Dask-SQL specific optimizations
Box::new(EliminateAggDistinct::new()),
Arc::new(EliminateAggDistinct::new()),
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
Box::new(SimplifyExpressions::new()),
Box::new(UnwrapCastInComparison::new()),
Box::new(CommonSubexprEliminate::new()),
Box::new(ProjectionPushDown::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(ProjectionPushDown::new()),
];

Self {
skip_failing_rules,
optimizations: rules,
optimizer: Optimizer::with_rules(rules),
}
}

/// Iteratoes through the configured `OptimizerRule`(s) to transform the input `LogicalPlan`
/// to its final optimized form
pub(crate) fn run_optimizations(
&self,
plan: LogicalPlan,
) -> Result<LogicalPlan, DataFusionError> {
let mut resulting_plan: LogicalPlan = plan;
for optimization in &self.optimizations {
match optimization.optimize(&resulting_plan, &mut OptimizerConfig::new()) {
Ok(optimized_plan) => {
trace!(
"== AFTER APPLYING RULE {} ==\n{}",
optimization.name(),
optimized_plan.display_indent()
);
resulting_plan = optimized_plan
}
Err(e) => {
if self.skip_failing_rules {
println!(
"Skipping optimizer rule {} due to unexpected error: {}",
optimization.name(),
e
);
} else {
return Err(e);
}
}
}
}
Ok(resulting_plan)
pub(crate) fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan, DataFusionError> {
let mut config =
OptimizerConfig::default().with_skip_failing_rules(self.skip_failing_rules);
self.optimizer.optimize(&plan, &mut config, Self::observe)
}

fn observe(optimized_plan: &LogicalPlan, optimization: &dyn OptimizerRule) {
trace!(
"== AFTER APPLYING RULE {} ==\n{}\n",
optimization.name(),
optimized_plan.display_indent()
);
}
}

Expand Down Expand Up @@ -158,7 +149,7 @@ mod tests {

// optimize the logical plan
let optimizer = DaskSqlOptimizer::new(false);
optimizer.run_optimizations(plan)
optimizer.optimize(plan)
}

struct MySchemaProvider {}
Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/sql/optimizer/eliminate_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ mod tests {
fn assert_fully_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let optimizer = DaskSqlOptimizer::new(false);
let optimized_plan = optimizer
.run_optimizations(plan.clone())
.optimize(plan.clone())
.expect("failed to optimize plan");
let formatted_plan = format!("{}", optimized_plan.display_indent());
assert_eq!(expected, formatted_plan);
Expand Down
Loading