Skip to content

Commit

Permalink
Optimize PushDownFilter to avoid recreating schema columns (apache#11211
Browse files Browse the repository at this point in the history
)
  • Loading branch information
alamb authored and comphead committed Jul 8, 2024
1 parent 66aec76 commit 6ff8aa9
Showing 1 changed file with 53 additions and 27 deletions.
80 changes: 53 additions & 27 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,50 @@ fn on_lr_is_preserved(join_type: JoinType) -> Result<(bool, bool)> {
}
}

/// Return true if a predicate only references columns in the specified schema
fn can_pushdown_join_predicate(predicate: &Expr, schema: &DFSchema) -> Result<bool> {
let schema_columns = schema
/// Evaluates the columns referenced in the given expression to see if they refer
/// only to the left or right columns
#[derive(Debug)]
struct ColumnChecker<'a> {
/// schema of left join input
left_schema: &'a DFSchema,
/// columns in left_schema, computed on demand
left_columns: Option<HashSet<Column>>,
/// schema of right join input
right_schema: &'a DFSchema,
/// columns in left_schema, computed on demand
right_columns: Option<HashSet<Column>>,
}

impl<'a> ColumnChecker<'a> {
fn new(left_schema: &'a DFSchema, right_schema: &'a DFSchema) -> Self {
Self {
left_schema,
left_columns: None,
right_schema,
right_columns: None,
}
}

/// Return true if the expression references only columns from the left side of the join
fn is_left_only(&mut self, predicate: &Expr) -> bool {
if self.left_columns.is_none() {
self.left_columns = Some(schema_columns(self.left_schema));
}
has_all_column_refs(predicate, self.left_columns.as_ref().unwrap())
}

/// Return true if the expression references only columns from the right side of the join
fn is_right_only(&mut self, predicate: &Expr) -> bool {
if self.right_columns.is_none() {
self.right_columns = Some(schema_columns(self.right_schema));
}
has_all_column_refs(predicate, self.right_columns.as_ref().unwrap())
}
}

/// Returns all columns in the schema
fn schema_columns(schema: &DFSchema) -> HashSet<Column> {
schema
.iter()
.flat_map(|(qualifier, field)| {
[
Expand All @@ -205,8 +246,7 @@ fn can_pushdown_join_predicate(predicate: &Expr, schema: &DFSchema) -> Result<bo
Column::new_unqualified(field.name()),
]
})
.collect::<HashSet<_>>();
Ok(has_all_column_refs(predicate, &schema_columns))
.collect::<HashSet<_>>()
}

/// Determine whether the predicate can evaluate as the join conditions
Expand Down Expand Up @@ -291,16 +331,7 @@ fn extract_or_clauses_for_join<'a>(
filters: &'a [Expr],
schema: &'a DFSchema,
) -> impl Iterator<Item = Expr> + 'a {
let schema_columns = schema
.iter()
.flat_map(|(qualifier, field)| {
[
Column::new(qualifier.cloned(), field.name()),
// we need to push down filter using unqualified column as well
Column::new_unqualified(field.name()),
]
})
.collect::<HashSet<_>>();
let schema_columns = schema_columns(schema);

// new formed OR clauses and their column references
filters.iter().filter_map(move |expr| {
Expand Down Expand Up @@ -403,12 +434,11 @@ fn push_down_all_join(
let mut right_push = vec![];
let mut keep_predicates = vec![];
let mut join_conditions = vec![];
let mut checker = ColumnChecker::new(left_schema, right_schema);
for predicate in predicates {
if left_preserved && can_pushdown_join_predicate(&predicate, left_schema)? {
if left_preserved && checker.is_left_only(&predicate) {
left_push.push(predicate);
} else if right_preserved
&& can_pushdown_join_predicate(&predicate, right_schema)?
{
} else if right_preserved && checker.is_right_only(&predicate) {
right_push.push(predicate);
} else if is_inner_join && can_evaluate_as_join_condition(&predicate)? {
// Here we do not differ it is eq or non-eq predicate, ExtractEquijoinPredicate will extract the eq predicate
Expand All @@ -421,11 +451,9 @@ fn push_down_all_join(

// For infer predicates, if they can not push through join, just drop them
for predicate in inferred_join_predicates {
if left_preserved && can_pushdown_join_predicate(&predicate, left_schema)? {
if left_preserved && checker.is_left_only(&predicate) {
left_push.push(predicate);
} else if right_preserved
&& can_pushdown_join_predicate(&predicate, right_schema)?
{
} else if right_preserved && checker.is_right_only(&predicate) {
right_push.push(predicate);
}
}
Expand All @@ -435,11 +463,9 @@ fn push_down_all_join(

if !on_filter.is_empty() {
for on in on_filter {
if on_left_preserved && can_pushdown_join_predicate(&on, left_schema)? {
if on_left_preserved && checker.is_left_only(&on) {
left_push.push(on)
} else if on_right_preserved
&& can_pushdown_join_predicate(&on, right_schema)?
{
} else if on_right_preserved && checker.is_right_only(&on) {
right_push.push(on)
} else {
on_filter_join_conditions.push(on)
Expand Down

0 comments on commit 6ff8aa9

Please sign in to comment.