Skip to content

Commit

Permalink
fix: incorrect NATURAL/USING JOIN schema (#14102)
Browse files Browse the repository at this point in the history
* fix: incorrect NATURAL/USING JOIN schema

* Add test

* Simplify exclude_using_columns

* Add more tests
  • Loading branch information
jonahgao authored Jan 14, 2025
1 parent 0a2f09f commit 02c8247
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 36 deletions.
71 changes: 35 additions & 36 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use datafusion_common::tree_node::{
};
use datafusion_common::utils::get_at_indices;
use datafusion_common::{
internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef,
DataFusionError, HashMap, Result, TableReference,
internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, HashMap,
Result, TableReference,
};

use indexmap::IndexSet;
Expand Down Expand Up @@ -379,14 +379,12 @@ fn get_exprs_except_skipped(
}
}

/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
pub fn expand_wildcard(
schema: &DFSchema,
plan: &LogicalPlan,
wildcard_options: Option<&WildcardOptions>,
) -> Result<Vec<Expr>> {
/// For each column specified in the USING JOIN condition, the JOIN plan outputs it twice
/// (once for each join side), but an unqualified wildcard should include it only once.
/// This function returns the columns that should be excluded.
fn exclude_using_columns(plan: &LogicalPlan) -> Result<HashSet<Column>> {
let using_columns = plan.using_columns()?;
let mut columns_to_skip = using_columns
let excluded = using_columns
.into_iter()
// For each USING JOIN condition, only expand to one of each join column in projection
.flat_map(|cols| {
Expand All @@ -395,18 +393,26 @@ pub fn expand_wildcard(
// qualified column
cols.sort();
let mut out_column_names: HashSet<String> = HashSet::new();
cols.into_iter()
.filter_map(|c| {
if out_column_names.contains(&c.name) {
Some(c)
} else {
out_column_names.insert(c.name);
None
}
})
.collect::<Vec<_>>()
cols.into_iter().filter_map(move |c| {
if out_column_names.contains(&c.name) {
Some(c)
} else {
out_column_names.insert(c.name);
None
}
})
})
.collect::<HashSet<_>>();
Ok(excluded)
}

/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
pub fn expand_wildcard(
schema: &DFSchema,
plan: &LogicalPlan,
wildcard_options: Option<&WildcardOptions>,
) -> Result<Vec<Expr>> {
let mut columns_to_skip = exclude_using_columns(plan)?;
let excluded_columns = if let Some(WildcardOptions {
exclude: opt_exclude,
except: opt_except,
Expand Down Expand Up @@ -705,27 +711,20 @@ pub fn exprlist_to_fields<'a>(
.map(|e| match e {
Expr::Wildcard { qualifier, options } => match qualifier {
None => {
let excluded: Vec<String> = get_excluded_columns(
let mut excluded = exclude_using_columns(plan)?;
excluded.extend(get_excluded_columns(
options.exclude.as_ref(),
options.except.as_ref(),
wildcard_schema,
None,
)?
.into_iter()
.map(|c| c.flat_name())
.collect();
Ok::<_, DataFusionError>(
wildcard_schema
.field_names()
.iter()
.enumerate()
.filter(|(_, s)| !excluded.contains(s))
.map(|(i, _)| wildcard_schema.qualified_field(i))
.map(|(qualifier, f)| {
(qualifier.cloned(), Arc::new(f.to_owned()))
})
.collect::<Vec<_>>(),
)
)?);
Ok(wildcard_schema
.iter()
.filter(|(q, f)| {
!excluded.contains(&Column::new(q.cloned(), f.name()))
})
.map(|(q, f)| (q.cloned(), Arc::clone(f)))
.collect::<Vec<_>>())
}
Some(qualifier) => {
let excluded: Vec<String> = get_excluded_columns(
Expand Down
74 changes: 74 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4549,3 +4549,77 @@ fn test_error_message_invalid_window_aggregate_function_signature() {
"Error during planning: sum does not support zero arguments",
);
}

// Test issue: https://github.com/apache/datafusion/issues/14058
// Select with wildcard over a USING/NATURAL JOIN should deduplicate condition columns.
#[test]
fn test_using_join_wildcard_schema() {
let sql = "SELECT * FROM orders o1 JOIN orders o2 USING (order_id)";
let plan = logical_plan(sql).unwrap();
let count = plan
.schema()
.iter()
.filter(|(_, f)| f.name() == "order_id")
.count();
// Only one order_id column
assert_eq!(count, 1);

let sql = "SELECT * FROM orders o1 NATURAL JOIN orders o2";
let plan = logical_plan(sql).unwrap();
// Only columns from one join side should be present
let expected_fields = vec![
"o1.order_id".to_string(),
"o1.customer_id".to_string(),
"o1.o_item_id".to_string(),
"o1.qty".to_string(),
"o1.price".to_string(),
"o1.delivered".to_string(),
];
assert_eq!(plan.schema().field_names(), expected_fields);

// Reproducible example of issue #14058
let sql = "WITH t1 AS (SELECT 1 AS id, 'a' AS value1),
t2 AS (SELECT 1 AS id, 'x' AS value2)
SELECT * FROM t1 NATURAL JOIN t2";
let plan = logical_plan(sql).unwrap();
assert_eq!(
plan.schema().field_names(),
[
"t1.id".to_string(),
"t1.value1".to_string(),
"t2.value2".to_string()
]
);

// Multiple joins
let sql = "WITH t1 AS (SELECT 1 AS a, 1 AS b),
t2 AS (SELECT 1 AS a, 2 AS c),
t3 AS (SELECT 1 AS c, 2 AS d)
SELECT * FROM t1 NATURAL JOIN t2 RIGHT JOIN t3 USING (c)";
let plan = logical_plan(sql).unwrap();
assert_eq!(
plan.schema().field_names(),
[
"t1.a".to_string(),
"t1.b".to_string(),
"t2.c".to_string(),
"t3.d".to_string()
]
);

// Subquery
let sql = "WITH t1 AS (SELECT 1 AS a, 1 AS b),
t2 AS (SELECT 1 AS a, 2 AS c),
t3 AS (SELECT 1 AS c, 2 AS d)
SELECT * FROM (SELECT * FROM t1 LEFT JOIN t2 USING(a)) NATURAL JOIN t3";
let plan = logical_plan(sql).unwrap();
assert_eq!(
plan.schema().field_names(),
[
"t1.a".to_string(),
"t1.b".to_string(),
"t2.c".to_string(),
"t3.d".to_string()
]
);
}

0 comments on commit 02c8247

Please sign in to comment.