Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JOIN conditions are order dependent #778

Merged
merged 3 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,28 @@ impl LogicalPlanBuilder {
));
}

let left_keys: Vec<Column> = left_keys
.into_iter()
.map(|c| c.into().normalize(&self.plan))
.collect::<Result<_>>()?;
let right_keys: Vec<Column> = right_keys
.into_iter()
.map(|c| c.into().normalize(right))
.collect::<Result<_>>()?;
let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this code -- if LogicalPlanBuilder::join is called it has two inputs, a left (self) and a right (right) so the idea that left_keys might contain references to right is confusing to me.

I wonder if there might be a better place to fix up the errors -- perhaps somewhere further up in the call stack thatn LogicalPlanBuilder::join -- for example extract_join_keys in the SQL planner (especially since this bug seems related to SQL planning)

https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/sql/planner.rs#L372-L380

Copy link
Contributor Author

@seddonm1 seddonm1 Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Welcome back @seddonm1 ! It is great to see a contribution from you. Thank you very much

Thanks @alamb. Unfortunately my employment situation makes it hard to contribute too much but I am still very interested in seeing Datafusion succeed.

The LogicalPlanBuilder::join has left and right keys but they are left and right relative to a join condition not to a left or right relation.

The left_keys and right_keys are where the join conditions are resolved to underlying relations. The Postgres behavior does not require a specific order of conditions for joining so we need to be able to handle both conditions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the argument name makes sense with the old assumption that left_keys should only refer to columns from the left plan. With this fix, it can become confusing.

Perhaps a better signature for this function would be something along the lines of: join_keys: Vec<(impl Into<Column>, impl Into<Column>)>.

If we want the dataframe API to also support order independent JOIN conditions, then it is indeed better to address the fix within this builder method. Otherwise I agree it would be better to fix it in the SQL planner instead. I am leaning towards supporting this in dataframe API as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing the signature to be join_keys: Vec<(impl Into<Column>, impl Into<Column>)>. instead of left_keys:... and right_keys: ... makes sense to me

left_keys
.into_iter()
.zip(right_keys.into_iter())
.map(|(l, r)| {
let mut swap = false;
let l = l.into();
let left_key = l.clone().normalize(&self.plan).or_else(|_| {
swap = true;
l.normalize(right)
});
if swap {
(r.into().normalize(&self.plan), left_key)
} else {
(left_key, r.into().normalize(right))
}
})
.unzip();

let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;

let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect();
let join_schema =
build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
Expand Down
90 changes: 69 additions & 21 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1717,15 +1717,40 @@ fn create_case_context() -> Result<ExecutionContext> {
#[tokio::test]
async fn equijoin() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
let sql =
"SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
let actual = execute(&mut ctx, sql).await;
let equivalent_sql = [
"SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
"SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
];
let expected = vec![
vec!["11", "a", "z"],
vec!["22", "b", "y"],
vec!["44", "d", "x"],
];
assert_eq!(expected, actual);
for sql in equivalent_sql.iter() {
let actual = execute(&mut ctx, sql).await;
assert_eq!(expected, actual);
}
Ok(())
}

#[tokio::test]
async fn equijoin_multiple_condition_ordering() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
let equivalent_sql = [
"SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t1_name <> t2_name ORDER BY t1_id",
"SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t2_name <> t1_name ORDER BY t1_id",
"SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id AND t1_name <> t2_name ORDER BY t1_id",
"SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id AND t2_name <> t1_name ORDER BY t1_id",
];
let expected = vec![
vec!["11", "a", "z"],
vec!["22", "b", "y"],
vec!["44", "d", "x"],
];
for sql in equivalent_sql.iter() {
let actual = execute(&mut ctx, sql).await;
assert_eq!(expected, actual);
}
Ok(())
}

Expand Down Expand Up @@ -1754,51 +1779,70 @@ async fn equijoin_and_unsupported_condition() -> Result<()> {
#[tokio::test]
async fn left_join() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
let actual = execute(&mut ctx, sql).await;
let equivalent_sql = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this test case and those below it add any additional coverage compared to

async fn equijoin_multiple_condition_ordering() -> Result<()> {

Like are there bugs that would cause equijoin_multiple_condition_ordering to pass but the others to fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main objective was to ensure nothing specific to equijoin (INNER) would break this implementation for the other join types (which occurred when developing it). This may be more relevant if any further optimisations occur.

"SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
"SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
];
let expected = vec![
vec!["11", "a", "z"],
vec!["22", "b", "y"],
vec!["33", "c", "NULL"],
vec!["44", "d", "x"],
];
assert_eq!(expected, actual);
for sql in equivalent_sql.iter() {
let actual = execute(&mut ctx, sql).await;
assert_eq!(expected, actual);
}
Ok(())
}

#[tokio::test]
async fn right_join() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
let sql =
"SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
let actual = execute(&mut ctx, sql).await;
let equivalent_sql = [
"SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
"SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t2_id = t1_id ORDER BY t1_id"
];
let expected = vec![
vec!["NULL", "NULL", "w"],
vec!["11", "a", "z"],
vec!["22", "b", "y"],
vec!["44", "d", "x"],
];
assert_eq!(expected, actual);
for sql in equivalent_sql.iter() {
let actual = execute(&mut ctx, sql).await;
assert_eq!(expected, actual);
}
Ok(())
}

#[tokio::test]
async fn full_join() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
let sql = "SELECT t1_id, t1_name, t2_name FROM t1 FULL JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
let actual = execute(&mut ctx, sql).await;
let equivalent_sql = [
"SELECT t1_id, t1_name, t2_name FROM t1 FULL JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
"SELECT t1_id, t1_name, t2_name FROM t1 FULL JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
];
let expected = vec![
vec!["NULL", "NULL", "w"],
vec!["11", "a", "z"],
vec!["22", "b", "y"],
vec!["33", "c", "NULL"],
vec!["44", "d", "x"],
];
assert_eq!(expected, actual);
for sql in equivalent_sql.iter() {
let actual = execute(&mut ctx, sql).await;
assert_eq!(expected, actual);
}

let sql = "SELECT t1_id, t1_name, t2_name FROM t1 FULL OUTER JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
let actual = execute(&mut ctx, sql).await;
assert_eq!(expected, actual);
let equivalent_sql = [
"SELECT t1_id, t1_name, t2_name FROM t1 FULL OUTER JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
"SELECT t1_id, t1_name, t2_name FROM t1 FULL OUTER JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
];
for sql in equivalent_sql.iter() {
let actual = execute(&mut ctx, sql).await;
assert_eq!(expected, actual);
}

Ok(())
}
Expand All @@ -1821,15 +1865,19 @@ async fn left_join_using() -> Result<()> {
#[tokio::test]
async fn equijoin_implicit_syntax() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
let sql =
"SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id";
let actual = execute(&mut ctx, sql).await;
let equivalent_sql = [
"SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id",
"SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id",
];
let expected = vec![
vec!["11", "a", "z"],
vec!["22", "b", "y"],
vec!["44", "d", "x"],
];
assert_eq!(expected, actual);
for sql in equivalent_sql.iter() {
let actual = execute(&mut ctx, sql).await;
assert_eq!(expected, actual);
}
Ok(())
}

Expand Down