Skip to content

Commit

Permalink
[WIP] Implement INTERSECT & INTERSECT DISTINCT
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Nov 5, 2021
1 parent a4bd240 commit f896cba
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 54 deletions.
2 changes: 2 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ message JoinNode {
JoinConstraint join_constraint = 4;
repeated Column left_join_column = 5;
repeated Column right_join_column = 6;
bool null_equal_null = 7;
}

message CrossJoinNode {
Expand Down Expand Up @@ -649,6 +650,7 @@ message HashJoinExecNode {
repeated JoinOn on = 3;
JoinType join_type = 4;
PartitionMode partition_mode = 6;
bool null_equal_null = 7;
}

message CrossJoinExecNode {
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
on,
join_type,
join_constraint,
null_equal_null,
..
} => {
let left: protobuf::LogicalPlanNode = left.as_ref().try_into()?;
Expand All @@ -868,6 +869,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
join_constraint: join_constraint.into(),
left_join_column,
right_join_column,
null_equal_null: *null_equal_null,
},
))),
})
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
on,
&join_type.into(),
partition_mode,
&hashjoin.null_equal_null,
)?))
}
PhysicalPlanType::CrossJoin(crossjoin) => {
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ mod roundtrip_tests {
on.clone(),
join_type,
*partition_mode,
&false,
)?))?;
}
}
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
on,
join_type: join_type.into(),
partition_mode: partition_mode.into(),
null_equal_null: *exec.null_equal_null(),
},
))),
})
Expand Down
14 changes: 14 additions & 0 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,18 @@ impl LogicalPlanBuilder {
right: &LogicalPlan,
join_type: JoinType,
join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
) -> Result<Self> {
self.join_detailed(right, join_type, join_keys, false)
}

/// Apply a join with on constraint and specified null equality
/// If null_equal_null is true then null == null, else null != null
pub fn join_detailed(
&self,
right: &LogicalPlan,
join_type: JoinType,
join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
null_equal_null: bool,
) -> Result<Self> {
if join_keys.0.len() != join_keys.1.len() {
return Err(DataFusionError::Plan(
Expand Down Expand Up @@ -580,6 +592,7 @@ impl LogicalPlanBuilder {
join_type,
join_constraint: JoinConstraint::On,
schema: DFSchemaRef::new(join_schema),
null_equal_null,
}))
}

Expand Down Expand Up @@ -611,6 +624,7 @@ impl LogicalPlanBuilder {
join_type,
join_constraint: JoinConstraint::Using,
schema: DFSchemaRef::new(join_schema),
null_equal_null: false,
}))
}

Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ pub enum LogicalPlan {
join_constraint: JoinConstraint,
/// The output schema, containing fields from the left and right inputs
schema: DFSchemaRef,
/// If null_equal_null is true, null == null else null != null
null_equal_null: bool,
},
/// Apply Cross Join to two logical plans
CrossJoin {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ fn optimize_plan(
on,
join_type,
join_constraint,
null_equal_null,
..
} => {
for (l, r) in on {
Expand Down Expand Up @@ -231,6 +232,7 @@ fn optimize_plan(
join_constraint: *join_constraint,
on: on.clone(),
schema: DFSchemaRef::new(schema),
null_equal_null: *null_equal_null,
})
}
LogicalPlan::Window {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ pub fn from_plan(
join_type,
join_constraint,
on,
null_equal_null,
..
} => {
let schema =
Expand All @@ -209,6 +210,7 @@ pub fn from_plan(
join_constraint: *join_constraint,
on: on.clone(),
schema: DFSchemaRef::new(schema),
null_equal_null: *null_equal_null,
})
}
LogicalPlan::CrossJoin { .. } => {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/src/physical_optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
.collect(),
&swap_join_type(*hash_join.join_type()),
*hash_join.partition_mode(),
hash_join.null_equal_null(),
)?;
let proj = ProjectionExec::try_new(
swap_reverting_projection(&*left.schema(), &*right.schema()),
Expand Down Expand Up @@ -195,6 +196,7 @@ mod tests {
)],
&JoinType::Left,
PartitionMode::CollectLeft,
&false,
)
.unwrap();

Expand Down Expand Up @@ -238,6 +240,7 @@ mod tests {
)],
&JoinType::Left,
PartitionMode::CollectLeft,
&false,
)
.unwrap();

Expand Down
Loading

0 comments on commit f896cba

Please sign in to comment.