Skip to content

Commit

Permalink
semi/anti join output statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Mar 25, 2024
1 parent bd9b33c commit f54746f
Showing 1 changed file with 278 additions and 50 deletions.
328 changes: 278 additions & 50 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,27 +825,27 @@ fn estimate_join_cardinality(
right_stats: Statistics,
on: &JoinOn,
) -> Option<PartialJoinStatistics> {
let (left_col_stats, right_col_stats) = on
.iter()
.map(|(left, right)| {
match (
left.as_any().downcast_ref::<Column>(),
right.as_any().downcast_ref::<Column>(),
) {
(Some(left), Some(right)) => (
left_stats.column_statistics[left.index()].clone(),
right_stats.column_statistics[right.index()].clone(),
),
_ => (
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
),
}
})
.unzip::<_, _, Vec<_>, Vec<_>>();

match join_type {
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
let (left_col_stats, right_col_stats) = on
.iter()
.map(|(left, right)| {
match (
left.as_any().downcast_ref::<Column>(),
right.as_any().downcast_ref::<Column>(),
) {
(Some(left), Some(right)) => (
left_stats.column_statistics[left.index()].clone(),
right_stats.column_statistics[right.index()].clone(),
),
_ => (
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
),
}
})
.unzip::<_, _, Vec<_>, Vec<_>>();

let ij_cardinality = estimate_inner_join_cardinality(
Statistics {
num_rows: left_stats.num_rows.clone(),
Expand Down Expand Up @@ -888,10 +888,45 @@ fn estimate_join_cardinality(
})
}

JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti => None,
JoinType::LeftSemi | JoinType::LeftAnti => {
let cardinality = estimate_semi_join_cardinality(
Statistics {
num_rows: left_stats.num_rows.clone(),
total_byte_size: Precision::Absent,
column_statistics: left_col_stats,
},
Statistics {
num_rows: right_stats.num_rows.clone(),
total_byte_size: Precision::Absent,
column_statistics: right_col_stats,
},
)?;

Some(PartialJoinStatistics {
num_rows: *cardinality.get_value()?,
column_statistics: left_stats.column_statistics,
})
}

JoinType::RightSemi | JoinType::RightAnti => {
let cardinality = estimate_semi_join_cardinality(
Statistics {
num_rows: right_stats.num_rows.clone(),
total_byte_size: Precision::Absent,
column_statistics: right_col_stats,
},
Statistics {
num_rows: left_stats.num_rows.clone(),
total_byte_size: Precision::Absent,
column_statistics: left_col_stats,
},
)?;

Some(PartialJoinStatistics {
num_rows: *cardinality.get_value()?,
column_statistics: right_stats.column_statistics,
})
}
}
}

Expand All @@ -903,6 +938,11 @@ fn estimate_inner_join_cardinality(
left_stats: Statistics,
right_stats: Statistics,
) -> Option<Precision<usize>> {
// Immediatedly return if inputs considered as non-overlapping
if let Some(estimation) = estimate_disjoint_inputs(&left_stats, &right_stats) {
return Some(estimation);
};

// The algorithm here is partly based on the non-histogram selectivity estimation
// from Spark's Catalyst optimizer.
let mut join_selectivity = Precision::Absent;
Expand All @@ -911,30 +951,13 @@ fn estimate_inner_join_cardinality(
.iter()
.zip(right_stats.column_statistics.iter())
{
// If there is no overlap in any of the join columns, this means the join
// itself is disjoint and the cardinality is 0. Though we can only assume
// this when the statistics are exact (since it is a very strong assumption).
if left_stat.min_value.get_value()? > right_stat.max_value.get_value()? {
return Some(
if left_stat.min_value.is_exact().unwrap_or(false)
&& right_stat.max_value.is_exact().unwrap_or(false)
{
Precision::Exact(0)
} else {
Precision::Inexact(0)
},
);
}
if left_stat.max_value.get_value()? < right_stat.min_value.get_value()? {
return Some(
if left_stat.max_value.is_exact().unwrap_or(false)
&& right_stat.min_value.is_exact().unwrap_or(false)
{
Precision::Exact(0)
} else {
Precision::Inexact(0)
},
);
// Break if any of statistics bounds are undefined
if left_stat.min_value.get_value().is_none()
|| left_stat.max_value.get_value().is_none()
|| right_stat.min_value.get_value().is_none()
|| right_stat.max_value.get_value().is_none()
{
return None;
}

let left_max_distinct = max_distinct_count(&left_stats.num_rows, left_stat);
Expand Down Expand Up @@ -968,6 +991,74 @@ fn estimate_inner_join_cardinality(
}
}

fn estimate_semi_join_cardinality(
outer_stats: Statistics,
inner_stats: Statistics,
) -> Option<Precision<usize>> {
// Immediatedly return if inputs considered as non-overlapping
if let Some(estimation) = estimate_disjoint_inputs(&outer_stats, &inner_stats) {
return Some(estimation);
};

// Otherwise return estimate SemiJoin output as whole outer side
outer_stats
.num_rows
.get_value()
.map(|val| Precision::Inexact(*val))
}

/// Estimates if inputs are non-overlapping, using input statistics.
/// If inputs are disjoint, returns zero estimation, otherwise returns None
fn estimate_disjoint_inputs(
left_stats: &Statistics,
right_stats: &Statistics,
) -> Option<Precision<usize>> {
for (left_stat, right_stat) in left_stats
.column_statistics
.iter()
.zip(right_stats.column_statistics.iter())
{
// If there is no overlap in any of the join columns, this means the join
// itself is disjoint and the cardinality is 0. Though we can only assume
// this when the statistics are exact (since it is a very strong assumption).
let left_min_val = left_stat.min_value.get_value();
let right_max_val = right_stat.max_value.get_value();
if left_min_val.is_some()
&& right_max_val.is_some()
&& left_min_val > right_max_val
{
return Some(
if left_stat.min_value.is_exact().unwrap_or(false)
&& right_stat.max_value.is_exact().unwrap_or(false)
{
Precision::Exact(0)
} else {
Precision::Inexact(0)
},
);
}

let left_max_val = left_stat.max_value.get_value();
let right_min_val = right_stat.min_value.get_value();
if left_max_val.is_some()
&& right_min_val.is_some()
&& left_max_val < right_min_val
{
return Some(
if left_stat.max_value.is_exact().unwrap_or(false)
&& right_stat.min_value.is_exact().unwrap_or(false)
{
Precision::Exact(0)
} else {
Precision::Inexact(0)
},
);
}
}

None
}

/// Estimate the number of maximum distinct values that can be present in the
/// given column from its statistics. If distinct_count is available, uses it
/// directly. Otherwise, if the column is numeric and has min/max values, it
Expand Down Expand Up @@ -1716,9 +1807,11 @@ mod tests {
#[test]
fn test_inner_join_cardinality_single_column() -> Result<()> {
let cases: Vec<(PartialStats, PartialStats, Option<Precision<usize>>)> = vec![
// -----------------------------------------------------------------------------
// | left(rows, min, max, distinct), right(rows, min, max, distinct), expected |
// -----------------------------------------------------------------------------
// ------------------------------------------------
// | left(rows, min, max, distinct, null_count), |
// | right(rows, min, max, distinct, null_count), |
// | expected, |
// ------------------------------------------------

// Cardinality computation
// =======================
Expand Down Expand Up @@ -1824,6 +1917,11 @@ mod tests {
None,
),
// Non overlapping min/max (when exact=False).
(
(10, Absent, Inexact(4), Absent, Absent),
(10, Inexact(5), Absent, Absent, Absent),
Some(Inexact(0)),
),
(
(10, Inexact(0), Inexact(10), Absent, Absent),
(10, Inexact(11), Inexact(20), Absent, Absent),
Expand Down Expand Up @@ -2106,6 +2204,136 @@ mod tests {
Ok(())
}

#[test]
fn estimate_semi_join_cardinality_absent_rows() -> Result<()> {
let cases: Vec<(PartialStats, PartialStats, Option<Precision<usize>>)> = vec![
// ------------------------------------------------
// | outer(rows, min, max, distinct, null_count), |
// | inner(rows, min, max, distinct, null_count), |
// | expected, |
// ------------------------------------------------

// Cardinality computation
// =======================
//
// distinct(left) == NaN, distinct(right) == NaN
(
(50, Inexact(10), Inexact(20), Absent, Absent),
(10, Inexact(15), Inexact(25), Absent, Absent),
Some(Inexact(50)),
),
(
(10, Absent, Absent, Absent, Absent),
(50, Absent, Absent, Absent, Absent),
Some(Inexact(10)),
),
(
(50, Inexact(10), Inexact(20), Absent, Absent),
(10, Inexact(30), Inexact(40), Absent, Absent),
Some(Inexact(0)),
),
(
(50, Inexact(10), Absent, Absent, Absent),
(10, Absent, Inexact(5), Absent, Absent),
Some(Inexact(0)),
),
(
(50, Absent, Inexact(20), Absent, Absent),
(10, Inexact(30), Absent, Absent, Absent),
Some(Inexact(0)),
),
];

for (outer_info, inner_info, expected_cardinality) in cases {
let outer_num_rows = outer_info.0;
let outer_col_stats = vec![create_column_stats(
outer_info.1,
outer_info.2,
outer_info.3,
outer_info.4,
)];

let inner_num_rows = inner_info.0;
let inner_col_stats = vec![create_column_stats(
inner_info.1,
inner_info.2,
inner_info.3,
inner_info.4,
)];

assert_eq!(
estimate_semi_join_cardinality(
Statistics {
num_rows: Inexact(outer_num_rows),
total_byte_size: Absent,
column_statistics: outer_col_stats,
},
Statistics {
num_rows: Inexact(inner_num_rows),
total_byte_size: Absent,
column_statistics: inner_col_stats,
},
),
expected_cardinality
);
}

Ok(())
}

#[test]
fn test_semi_join_cardinality() -> Result<()> {
let dummy_column_stats =
vec![create_column_stats(Absent, Absent, Absent, Absent)];

let absent_outer_estimation = estimate_semi_join_cardinality(
Statistics {
num_rows: Absent,
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
Statistics {
num_rows: Exact(10),
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
);
assert_eq!(
absent_outer_estimation, None,
"Expected \"None\" esimated SemiJoin cardinality for absent outer num_rows"
);

let absent_inner_estimation = estimate_semi_join_cardinality(
Statistics {
num_rows: Inexact(500),
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
Statistics {
num_rows: Absent,
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
);
assert_eq!(absent_inner_estimation, Some(Inexact(500)), "Expected outer.num_rows esimated SemiJoin cardinality for absent inner num_rows");

let absent_inner_estimation = estimate_semi_join_cardinality(
Statistics {
num_rows: Absent,
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
Statistics {
num_rows: Absent,
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
);
assert_eq!(absent_inner_estimation, None, "Expected \"None\" esimated SemiJoin cardinality for absent outer and inner num_rows");

Ok(())
}

#[test]
fn test_calculate_join_output_ordering() -> Result<()> {
let options = SortOptions::default();
Expand Down

0 comments on commit f54746f

Please sign in to comment.