Skip to content

Commit

Permalink
support cross_join in limit_push_down (#4257)
Browse files Browse the repository at this point in the history
* support cross_join in `limit_push_down`

* remove comment
  • Loading branch information
jackwener authored Nov 18, 2022
1 parent 822022d commit ff2f113
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions datafusion/optimizer/src/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion_expr::{
Join, JoinType, Limit, LogicalPlan, Projection, Sort, TableScan, Union,
},
utils::from_plan,
CrossJoin,
};
use std::sync::Arc;

Expand Down Expand Up @@ -204,6 +205,38 @@ fn limit_push_down(
schema: schema.clone(),
}))
}
(
LogicalPlan::CrossJoin(cross_join),
Ancestor::FromLimit {
skip: ancestor_skip,
fetch: Some(ancestor_fetch),
..
},
) => {
let left = &*cross_join.left;
let right = &*cross_join.right;
Ok(LogicalPlan::CrossJoin(CrossJoin {
left: Arc::new(limit_push_down(
_optimizer,
Ancestor::FromLimit {
skip: 0,
fetch: Some(ancestor_fetch + ancestor_skip),
},
left,
_optimizer_config,
)?),
right: Arc::new(limit_push_down(
_optimizer,
Ancestor::FromLimit {
skip: 0,
fetch: Some(ancestor_fetch + ancestor_skip),
},
right,
_optimizer_config,
)?),
schema: plan.schema().clone(),
}))
}
(
LogicalPlan::Join(Join { join_type, .. }),
Ancestor::FromLimit {
Expand Down Expand Up @@ -394,6 +427,7 @@ mod test {

Ok(())
}

#[test]
fn limit_push_down_take_smaller_limit() -> Result<()> {
let table_scan = test_table_scan()?;
Expand Down Expand Up @@ -872,4 +906,44 @@ mod test {

Ok(())
}

#[test]
fn limit_push_down_cross_join() -> Result<()> {
let table_scan_1 = test_table_scan()?;
let table_scan_2 = test_table_scan_with_name("test2")?;

let plan = LogicalPlanBuilder::from(table_scan_1)
.cross_join(&LogicalPlanBuilder::from(table_scan_2).build()?)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n CrossJoin:\
\n TableScan: test, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn skip_limit_push_down_cross_join() -> Result<()> {
let table_scan_1 = test_table_scan()?;
let table_scan_2 = test_table_scan_with_name("test2")?;

let plan = LogicalPlanBuilder::from(table_scan_1)
.cross_join(&LogicalPlanBuilder::from(table_scan_2).build()?)?
.limit(1000, Some(1000))?
.build()?;

let expected = "Limit: skip=1000, fetch=1000\
\n CrossJoin:\
\n TableScan: test, fetch=2000\
\n TableScan: test2, fetch=2000";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}
}

0 comments on commit ff2f113

Please sign in to comment.