-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add optimizer rule for type coercion (binary operations only) #3222
Changes from 4 commits
f5fbe25
0178435
01f12d9
4b09cb8
d14d0e5
be3ec2e
30c2057
54bf82f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,6 +110,7 @@ use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys; | |
use datafusion_optimizer::pre_cast_lit_in_comparison::PreCastLitInComparisonExpressions; | ||
use datafusion_optimizer::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate; | ||
use datafusion_optimizer::scalar_subquery_to_join::ScalarSubqueryToJoin; | ||
use datafusion_optimizer::type_coercion::TypeCoercion; | ||
use datafusion_sql::{ | ||
parser::DFParser, | ||
planner::{ContextProvider, SqlToRel}, | ||
|
@@ -1401,6 +1402,8 @@ impl SessionState { | |
} | ||
rules.push(Arc::new(ReduceOuterJoin::new())); | ||
rules.push(Arc::new(FilterPushDown::new())); | ||
// we do type coercion after filter push down so that we don't push CAST filters to Parquet | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a partially written ticket (I will post later this week) related to supporting |
||
rules.push(Arc::new(TypeCoercion::new())); | ||
rules.push(Arc::new(LimitPushDown::new())); | ||
rules.push(Arc::new(SingleDistinctToGroupBy::new())); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -667,14 +667,14 @@ async fn test_fn_substr() -> Result<()> { | |
async fn test_cast() -> Result<()> { | ||
let expr = cast(col("b"), DataType::Float64); | ||
let expected = vec![ | ||
"+-------------------------+", | ||
"| CAST(test.b AS Float64) |", | ||
"+-------------------------+", | ||
"| 1 |", | ||
"| 10 |", | ||
"| 10 |", | ||
"| 100 |", | ||
"+-------------------------+", | ||
"+--------+", | ||
"| test.b |", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't the original header better? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I personally don't think seeing the cast in the column name adds much value. Also no cast in the subject is consistent with postgres: alamb=# select cast(1 as int);
int4
------
1
(1 row)
alamb=# select cast(i as int) from foo;
i
---
1
2
0
(3 rows) |
||
"+--------+", | ||
"| 1 |", | ||
"| 10 |", | ||
"| 10 |", | ||
"| 100 |", | ||
"+--------+", | ||
]; | ||
|
||
assert_fn_batches!(expr, expected); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -653,7 +653,7 @@ order by | |
let expected = "\ | ||
Sort: #revenue DESC NULLS FIRST\ | ||
\n Projection: #customer.c_custkey, #customer.c_name, #SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, #customer.c_acctbal, #nation.n_name, #customer.c_address, #customer.c_phone, #customer.c_comment\ | ||
\n Aggregate: groupBy=[[#customer.c_custkey, #customer.c_name, #customer.c_acctbal, #customer.c_phone, #nation.n_name, #customer.c_address, #customer.c_comment]], aggr=[[SUM(#lineitem.l_extendedprice * Int64(1) - #lineitem.l_discount)]]\ | ||
\n Aggregate: groupBy=[[#customer.c_custkey, #customer.c_name, #customer.c_acctbal, #customer.c_phone, #nation.n_name, #customer.c_address, #customer.c_comment]], aggr=[[SUM(#lineitem.l_extendedprice * CAST(Int64(1) AS Float64) - #lineitem.l_discount)]]\ | ||
\n Inner Join: #customer.c_nationkey = #nation.n_nationkey\ | ||
\n Inner Join: #orders.o_orderkey = #lineitem.l_orderkey\ | ||
\n Inner Join: #customer.c_custkey = #orders.o_custkey\ | ||
|
@@ -663,7 +663,7 @@ order by | |
\n Filter: #lineitem.l_returnflag = Utf8(\"R\")\ | ||
\n TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag], partial_filters=[#lineitem.l_returnflag = Utf8(\"R\")]\ | ||
\n TableScan: nation projection=[n_nationkey, n_name]"; | ||
assert_eq!(format!("{:?}", plan.unwrap()), expected); | ||
assert_eq!(expected, format!("{:?}", plan.unwrap()),); | ||
|
||
Ok(()) | ||
} | ||
|
@@ -694,7 +694,7 @@ async fn test_physical_plan_display_indent() { | |
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)", | ||
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", | ||
" CoalesceBatchesExec: target_batch_size=4096", | ||
" FilterExec: c12@1 < CAST(10 AS Float64)", | ||
" FilterExec: c12@1 < 10", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the physical plan, which no longer contains a cast here because the logical plan optimized out the cast of a literal value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🎉 -- which I think is a good example of the value of this pass |
||
" RepartitionExec: partitioning=RoundRobinBatch(9000)", | ||
" CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c12]", | ||
]; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
smart move, that would have been a hard bug to find!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused about this comment and explain why do the type coercion after the filter push down optimizer rule.
I think the type coercion rule should be done in preview stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example,
Filter expr: FLOAT32(C1) < FLOAT64(16)
. We should do type coercion first and convert the filter expr toCAST(INT32(C1) AS FLOAT64 < FLOAT64(16)
and try to push the new filter expr to the table scan operation.If you don't do the type coercion first, you will push the
expr: FLOAT32(C1) < FLOAT64(16)
to table scan, Does this can be applied to the parquet filter or pruning filter?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is ready for review now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed #3289 applying
TypeCoercion
beforeFilterPushDown
. I think the PR would get too large to review if I make those changes here.