Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement INTERSECT & INTERSECT DISTINCT #1135

Merged
merged 1 commit into from
Nov 6, 2021

Conversation

xudong963
Copy link
Member

@xudong963 xudong963 commented Oct 16, 2021

Which issue does this PR close?

Closes #1082

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added datafusion Changes in the datafusion crate sql SQL Planner labels Oct 16, 2021
@xudong963
Copy link
Member Author

As @Dandandan has mentioned in #1082 (comment), "Complication with using things like is distinct from will not supported by a equi-join implementation (which only supports A=B type of expressions)", so I am considering how to solve it.

@xudong963
Copy link
Member Author

Inspired by spark which has a condition expr in Join to show if null equal is safe. So I add a bool type named null_equal_safe to indicate if null equal is safe.

PTAL @alamb @Dandandan @houqp, thanks!

@xudong963 xudong963 changed the title [WIP] Implement INTERSECT & INTERSECT DISTINCT Implement INTERSECT & INTERSECT DISTINCT Oct 31, 2021
@houqp houqp requested review from Dandandan and alamb November 1, 2021 07:30
@houqp
Copy link
Member

houqp commented Nov 3, 2021

Am I correct that in the current design, the null_equal_safe argument will apply to all join key pairs? i.e. will we be able to have t1 JOIN t2 ON t1.col1 = t2.col1, t1.col2 <=> t2.col2?

@xudong963
Copy link
Member Author

Am I correct that in the current design, the null_equal_safe argument will apply to all join key pairs? i.e. will we be able to have t1 JOIN t2 ON t1.col1 = t2.col1, t1.col2 <=> t2.col2?

Yes, the null_equal_safe argument will apply to all join key pairs.

@alamb
Copy link
Contributor

alamb commented Nov 3, 2021 via email

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great work @xudong963 . I am really sorry for the delay in review

I have a few suggestions related to the external API (aka adding a null_equals_safe parameter) which I think should be taken into account, but otherwise this is looking very nice 👌

@@ -211,6 +211,7 @@ pub trait DataFrame: Send + Sync {
join_type: JoinType,
left_cols: &[&str],
right_cols: &[&str],
null_equal_safe: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be documenting what null_equal_safe means in this public API

@@ -488,6 +488,7 @@ impl LogicalPlanBuilder {
right: &LogicalPlan,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise I think we should add a doc comment about what null_equal_safe means in this context.

Given how specialized the use case of null_equal_safe is, what do you think about adding a specialized API for it. This would hide some of this complexity from most users of DataFusion.

So something like

    /// Apply a join with on constraint and specified null equality. 
    /// If null_equal_safe is true then ...
    pub fn join_detailed(
        &self,
        right: &LogicalPlan,
        join_type: JoinType,
        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>,
        null_equal_safe: bool) {
  ...
    } 

    /// Apply a join with on constraint
    pub fn join(
        &self,
        right: &LogicalPlan,
        join_type: JoinType,
        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>) {
  self.join_detailed(right, join_type, join_keys, false)
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea! Then we can call join_detailed in INTESECT and other places keep original.

@@ -135,6 +135,8 @@ pub enum LogicalPlan {
join_constraint: JoinConstraint,
/// The output schema, containing fields from the left and right inputs
schema: DFSchemaRef,
/// If null_equal_safe is true, null == null.
null_equal_safe: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest naming this field (and all other instances of null_equal_safe) to null_equal_null to make its meaning clearer

let expected: &[&[&str]] = &[];

let mut ctx = create_join_context_qualified().unwrap();
let actual = execute(&mut ctx, sql).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason you didn't use execute_to_batches! and assert_batches_eq! in the first two tests and then did not use them in the subsequent tests? In other words, why are the tests inconsistent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first two tests contain NULL in results. I noticed the return value format of execute is clearer.

https://github.com/apache/arrow-datafusion/blob/3d4db909e83ea8354e9b9134c20b1cbef06a5beb/datafusion/tests/sql.rs#L3453

let mut ctx = create_join_context_qualified().unwrap();
let actual = execute(&mut ctx, sql).await;

// left and right shouldn't match anything
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment appears to be incorrect

async fn test_intersect_distinct() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_alltypes_parquet(&mut ctx).await;
// execute the query
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let sql = "SELECT order_id from orders EXCEPT SELECT order_id FROM orders";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"NotImplemented(\"Only UNION ALL and UNION [DISTINCT] are supported, found EXCEPT\")",
"NotImplemented(\"Only UNION ALL and UNION [DISTINCT] and INTERSECT and INTERSECT [DISTINCT] are supported, found EXCEPT\")",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Comment on lines 210 to 213
let distinct_left_plan = LogicalPlanBuilder::from(left_plan).distinct()?.build()?;
let join_keys = distinct_left_plan.schema().fields().iter().zip(right_plan.schema().fields().iter()).map(|(left_field, right_field)| ((Column::from_name(left_field.name())), (Column::from_name(right_field.name())))).unzip();
LogicalPlanBuilder::from(distinct_left_plan).join(&right_plan, JoinType::Semi, join_keys, true)?.build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a cleanliness suggestion, definitely not needed:

Suggested change
let distinct_left_plan = LogicalPlanBuilder::from(left_plan).distinct()?.build()?;
let join_keys = distinct_left_plan.schema().fields().iter().zip(right_plan.schema().fields().iter()).map(|(left_field, right_field)| ((Column::from_name(left_field.name())), (Column::from_name(right_field.name())))).unzip();
LogicalPlanBuilder::from(distinct_left_plan).join(&right_plan, JoinType::Semi, join_keys, true)?.build()
let join_keys = distinct_left_plan
.schema()
.fields()
.iter()
.zip(right_plan.schema().fields().iter())
.map(|(left_field, right_field)| {
((Column::from_name(left_field.name())),
(Column::from_name(right_field.name())))
})
.unzip();
LogicalPlanBuilder::from(left_plan)
.distinct()?
.build()?
.join(&right_plan, JoinType::Semi, join_keys, true)?
.build()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good caught!

@@ -499,6 +510,8 @@ struct HashJoinStream {
join_metrics: HashJoinMetrics,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a missing doc comment

let left_array = $l.as_any().downcast_ref::<$array_type>().unwrap();
let right_array = $r.as_any().downcast_ref::<$array_type>().unwrap();

match (left_array.is_null($left), right_array.is_null($right)) {
(false, false) => left_array.value($left) == right_array.value($right),
(true, true) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dandandan what do you think about the potential performance impact of doing this check for each input element?

I think it is fine but maybe there are some benchmarks we could run to find out

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned bout the performance impact.
It would expect it to be optimized away, as it's using a macro for this and I would expect the branch to be removed based on the boolean constant, but not sure about that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the is_null check and downcasting per item, I would expect this has a potential higher impact, even if the added code has some non-zero cost.

@xudong963
Copy link
Member Author

Thanks for your review @alamb. I'll fix the PR tomorrow.

@xudong963
Copy link
Member Author

Fixed! PTAL @alamb

let left_array = $l.as_any().downcast_ref::<$array_type>().unwrap();
let right_array = $r.as_any().downcast_ref::<$array_type>().unwrap();

match (left_array.is_null($left), right_array.is_null($right)) {
(false, false) => left_array.value($left) == right_array.value($right),
(true, true) => {
if $null_equal_null {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be !$null_equal_null instead (without if/else)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I think you mean $null_equal_null not !$null_equal_null 😁

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought true/false were swapped in if/else, but probably I had that incorrect 👍

@xudong963 xudong963 force-pushed the impl_intersect branch 3 times, most recently from f896cba to 06418f2 Compare November 6, 2021 03:31
@xudong963
Copy link
Member Author

@houqp Would you like to take another look? 😄

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@houqp houqp added the enhancement New feature or request label Nov 6, 2021
@@ -120,6 +120,8 @@ pub struct HashJoinExec {
metrics: ExecutionPlanMetricsSet,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
/// If null_equal_null is true, null == null else null != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, what about null_equals_null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we should keep preciseness! 👍🏻

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor comment, but this seems good to go to me! Awesome @xudong963 !

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really nice -- thank you @xudong963

@alamb
Copy link
Contributor

alamb commented Nov 6, 2021

Will merge when tests pass

@alamb alamb merged commit 2b24b89 into apache:master Nov 6, 2021
@xudong963 xudong963 deleted the impl_intersect branch November 6, 2021 13:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement the rest of Set Operators: INTERSECT, EXCEPT, etc
4 participants