Skip to content

Commit

Permalink
Implement EXCEPT & EXCEPT DISTINCT
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Nov 7, 2021
1 parent ac07269 commit 7cedf95
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 29 deletions.
31 changes: 31 additions & 0 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,37 @@ impl LogicalPlanBuilder {
}
}

/// Process intersect or except
pub(crate) fn intersect_or_except(
left_plan: LogicalPlan,
right_plan: LogicalPlan,
join_type: JoinType,
is_all: bool,
) -> Result<LogicalPlan> {
let join_keys = left_plan
.schema()
.fields()
.iter()
.zip(right_plan.schema().fields().iter())
.map(|(left_field, right_field)| {
(
(Column::from_name(left_field.name())),
(Column::from_name(right_field.name())),
)
})
.unzip();
if is_all {
LogicalPlanBuilder::from(left_plan)
.join_detailed(&right_plan, join_type, join_keys, true)?
.build()
} else {
LogicalPlanBuilder::from(left_plan)
.distinct()?
.join_detailed(&right_plan, join_type, join_keys, true)?
.build()
}
}

/// Build the plan
pub fn build(&self) -> Result<LogicalPlan> {
Ok(self.plan.clone())
Expand Down
68 changes: 39 additions & 29 deletions datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,25 +226,45 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let left_plan = self.set_expr_to_plan(left.as_ref(), None, ctes)?;
let right_plan = self.set_expr_to_plan(right.as_ref(), None, ctes)?;
match (op, all) {
(SetOperator::Union, true) => {
union_with_alias(left_plan, right_plan, alias)
}
(SetOperator::Union, false) => {
let union_plan = union_with_alias(left_plan, right_plan, alias)?;
LogicalPlanBuilder::from(union_plan).distinct()?.build()
}
(SetOperator::Intersect, true) => {
let join_keys = left_plan.schema().fields().iter().zip(right_plan.schema().fields().iter()).map(|(left_field, right_field)| ((Column::from_name(left_field.name())), (Column::from_name(right_field.name())))).unzip();
LogicalPlanBuilder::from(left_plan).join_detailed(&right_plan, JoinType::Semi, join_keys, true)?.build()
}
(SetOperator::Intersect, false) => {
let join_keys = left_plan.schema().fields().iter().zip(right_plan.schema().fields().iter()).map(|(left_field, right_field)| ((Column::from_name(left_field.name())), (Column::from_name(right_field.name())))).unzip();
LogicalPlanBuilder::from(left_plan).distinct()?.join_detailed(&right_plan, JoinType::Semi, join_keys, true)?.build()
}
_ => Err(DataFusionError::NotImplemented(format!(
"Only UNION ALL and UNION [DISTINCT] and INTERSECT and INTERSECT [DISTINCT] are supported, found {}",
op
))),
(SetOperator::Union, true) => {
union_with_alias(left_plan, right_plan, alias)
}
(SetOperator::Union, false) => {
let union_plan = union_with_alias(left_plan, right_plan, alias)?;
LogicalPlanBuilder::from(union_plan).distinct()?.build()
}
(SetOperator::Intersect, true) => {
LogicalPlanBuilder::intersect_or_except(
left_plan,
right_plan,
JoinType::Semi,
true,
)
}
(SetOperator::Intersect, false) => {
LogicalPlanBuilder::intersect_or_except(
left_plan,
right_plan,
JoinType::Semi,
false,
)
}
(SetOperator::Except, true) => {
LogicalPlanBuilder::intersect_or_except(
left_plan,
right_plan,
JoinType::Anti,
true,
)
}
(SetOperator::Except, false) => {
LogicalPlanBuilder::intersect_or_except(
left_plan,
right_plan,
JoinType::Anti,
false,
)
}
}
}
_ => Err(DataFusionError::NotImplemented(format!(
Expand Down Expand Up @@ -3580,16 +3600,6 @@ mod tests {
quick_test(sql, expected);
}

#[test]
fn except_not_supported() {
let sql = "SELECT order_id from orders EXCEPT SELECT order_id FROM orders";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"NotImplemented(\"Only UNION ALL and UNION [DISTINCT] and INTERSECT and INTERSECT [DISTINCT] are supported, found EXCEPT\")",
format!("{:?}", err)
);
}

#[test]
fn select_typedstring() {
let sql = "SELECT date '2020-12-10' AS date FROM person";
Expand Down
64 changes: 64 additions & 0 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5669,3 +5669,67 @@ async fn test_intersect_distinct() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn except_with_null_not_equal() {
let sql = "SELECT * FROM (SELECT null AS id1, 1 AS id2) t1
EXCEPT SELECT * FROM (SELECT null AS id1, 2 AS id2) t2";

let expected: Vec<Vec<String>> = vec![vec!["NULL".to_string(), "1".to_string()]];

let mut ctx = create_join_context_qualified().unwrap();
let actual = execute(&mut ctx, sql).await;

assert_eq!(expected, actual);
}

#[tokio::test]
async fn except_with_null_equal() {
let sql = "SELECT * FROM (SELECT null AS id1, 1 AS id2) t1
EXCEPT SELECT * FROM (SELECT null AS id1, 1 AS id2) t2";

let expected: &[&[&str]] = &[];
let mut ctx = create_join_context_qualified().unwrap();
let actual = execute(&mut ctx, sql).await;

assert_eq!(expected, actual);
}

#[tokio::test]
async fn test_expect_all() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_alltypes_parquet(&mut ctx).await;
// execute the query
let sql = "SELECT int_col, double_col FROM alltypes_plain where int_col > 0 EXCEPT ALL SELECT int_col, double_col FROM alltypes_plain where int_col < 1";
let actual = execute_to_batches(&mut ctx, sql).await;
let expected = vec![
"+---------+------------+",
"| int_col | double_col |",
"+---------+------------+",
"| 1 | 10.1 |",
"| 1 | 10.1 |",
"| 1 | 10.1 |",
"| 1 | 10.1 |",
"+---------+------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn test_expect_distinct() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_alltypes_parquet(&mut ctx).await;
// execute the query
let sql = "SELECT int_col, double_col FROM alltypes_plain where int_col > 0 EXCEPT SELECT int_col, double_col FROM alltypes_plain where int_col < 1";
let actual = execute_to_batches(&mut ctx, sql).await;
let expected = vec![
"+---------+------------+",
"| int_col | double_col |",
"+---------+------------+",
"| 1 | 10.1 |",
"+---------+------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

0 comments on commit 7cedf95

Please sign in to comment.