Skip to content

Commit

Permalink
Support duplicate column aliases in queries
Browse files Browse the repository at this point in the history
In SQL, selecting single column multiple times is legal and most modern
databases support this. This commit adds such support to DataFusion too.
  • Loading branch information
findepi committed Nov 19, 2024
1 parent ce87bc5 commit 8ff7e9a
Show file tree
Hide file tree
Showing 14 changed files with 260 additions and 90 deletions.
50 changes: 6 additions & 44 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! DFSchema is an extended schema struct that DataFusion uses to provide support for
//! fields with optional relation names.
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;
Expand Down Expand Up @@ -154,7 +154,6 @@ impl DFSchema {
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}

Expand Down Expand Up @@ -183,7 +182,6 @@ impl DFSchema {
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}

Expand All @@ -201,7 +199,6 @@ impl DFSchema {
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
schema.check_names()?;
Ok(schema)
}

Expand All @@ -215,40 +212,9 @@ impl DFSchema {
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}

/// Check if the schema have some fields with the same name
pub fn check_names(&self) -> Result<()> {
let mut qualified_names = BTreeSet::new();
let mut unqualified_names = BTreeSet::new();

for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
if let Some(qualifier) = qualifier {
if !qualified_names.insert((qualifier, field.name())) {
return _schema_err!(SchemaError::DuplicateQualifiedField {
qualifier: Box::new(qualifier.clone()),
name: field.name().to_string(),
});
}
} else if !unqualified_names.insert(field.name()) {
return _schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string()
});
}
}

for (qualifier, name) in qualified_names {
if unqualified_names.contains(name) {
return _schema_err!(SchemaError::AmbiguousReference {
field: Column::new(Some(qualifier.clone()), name)
});
}
}
Ok(())
}

/// Assigns functional dependencies.
pub fn with_functional_dependencies(
mut self,
Expand Down Expand Up @@ -285,7 +251,6 @@ impl DFSchema {
field_qualifiers: new_qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
new_self.check_names()?;
Ok(new_self)
}

Expand Down Expand Up @@ -1141,10 +1106,10 @@ mod tests {
fn join_qualified_duplicate() -> Result<()> {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let join = left.join(&right);
let join = left.join(&right)?;
assert_eq!(
join.unwrap_err().strip_backtrace(),
"Schema error: Schema contains duplicate qualified field name t1.c0",
"fields:[t1.c0, t1.c1, t1.c0, t1.c1], metadata:{}",
join.to_string()
);
Ok(())
}
Expand All @@ -1153,11 +1118,8 @@ mod tests {
fn join_unqualified_duplicate() -> Result<()> {
let left = DFSchema::try_from(test_schema_1())?;
let right = DFSchema::try_from(test_schema_1())?;
let join = left.join(&right);
assert_eq!(
join.unwrap_err().strip_backtrace(),
"Schema error: Schema contains duplicate unqualified field name c0"
);
let join = left.join(&right)?;
assert_eq!("fields:[c0, c1, c0, c1], metadata:{}", join.to_string());
Ok(())
}

Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ pub enum SchemaError {
qualifier: Box<TableReference>,
name: String,
},
/// Schema duplicate qualified fields with duplicate unqualified names
QualifiedFieldWithDuplicateName {
qualifier: Box<TableReference>,
name: String,
},
/// Schema contains duplicate unqualified field name
DuplicateUnqualifiedField { name: String },
/// No field with this name
Expand Down Expand Up @@ -188,6 +193,14 @@ impl Display for SchemaError {
quote_identifier(name)
)
}
Self::QualifiedFieldWithDuplicateName { qualifier, name } => {
write!(
f,
"Schema contains qualified fields with duplicate unqualified names {}.{}",
qualifier.to_quoted_string(),
quote_identifier(name)
)
}
Self::DuplicateUnqualifiedField { name } => {
write!(
f,
Expand Down
2 changes: 0 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1557,8 +1557,6 @@ pub fn project(
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?),
}
}
validate_unique_names("Projections", projected_expr.iter())?;

Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
}

Expand Down
144 changes: 136 additions & 8 deletions datafusion/expr/src/logical_plan/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
// specific language governing permissions and limitations
// under the License.

use crate::expr::Sort;
use crate::{Expr, LogicalPlan, SortExpr, Volatility};
use arrow::datatypes::DataType;
use datafusion_common::{
schema_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result,
SchemaError, SchemaReference, TableReference,
};
use sqlparser::ast::Ident;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::{
fmt::{self, Display},
hash::{Hash, Hasher},
};

use crate::expr::Sort;
use arrow::datatypes::DataType;
use datafusion_common::{
Constraints, DFSchemaRef, Result, SchemaReference, TableReference,
};
use sqlparser::ast::Ident;

/// Various types of DDL (CREATE / DROP) catalog manipulation
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum DdlStatement {
Expand Down Expand Up @@ -305,6 +305,7 @@ impl CreateExternalTable {
constraints,
column_defaults,
} = fields;
check_fields_unique(&schema)?;
Ok(Self {
name,
schema,
Expand Down Expand Up @@ -543,6 +544,7 @@ impl CreateMemoryTable {
column_defaults,
temporary,
} = fields;
check_fields_unique(input.schema())?;
Ok(Self {
name,
constraints,
Expand Down Expand Up @@ -697,6 +699,7 @@ impl CreateView {
definition,
temporary,
} = fields;
check_fields_unique(input.schema())?;
Ok(Self {
name,
input,
Expand Down Expand Up @@ -799,6 +802,48 @@ impl CreateViewBuilder {
})
}
}
fn check_fields_unique(schema: &DFSchema) -> Result<()> {
// Use tree set for deterministic error messages
let mut qualified_names = BTreeSet::new();
let mut unqualified_names = HashSet::new();
let mut name_occurrences: HashMap<&String, usize> = HashMap::new();

for (qualifier, field) in schema.iter() {
if let Some(qualifier) = qualifier {
// Check for duplicate qualified field names
if !qualified_names.insert((qualifier, field.name())) {
return schema_err!(SchemaError::DuplicateQualifiedField {
qualifier: Box::new(qualifier.clone()),
name: field.name().to_string(),
});
}
// Check for duplicate unqualified field names
} else if !unqualified_names.insert(field.name()) {
return schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string()
});
}
*name_occurrences.entry(field.name()).or_default() += 1;
}

for (qualifier, name) in qualified_names {
// Check for duplicate between qualified and unqualified field names
if unqualified_names.contains(name) {
return schema_err!(SchemaError::AmbiguousReference {
field: Column::new(Some(qualifier.clone()), name)
});
}
// Check for duplicates between qualified names as the qualification will be stripped off
if name_occurrences[name] > 1 {
return schema_err!(SchemaError::QualifiedFieldWithDuplicateName {
qualifier: Box::new(qualifier.clone()),
name: name.to_owned(),
});
}
}

Ok(())
}

/// Creates a catalog (aka "Database").
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -1039,7 +1084,9 @@ impl PartialOrd for CreateIndex {

#[cfg(test)]
mod test {
use super::*;
use crate::{CreateCatalog, DdlStatement, DropView};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::{DFSchema, DFSchemaRef, TableReference};
use std::cmp::Ordering;

Expand All @@ -1066,4 +1113,85 @@ mod test {

assert_eq!(drop_view.partial_cmp(&catalog), Some(Ordering::Greater));
}

#[test]
fn test_check_fields_unique() -> Result<()> {
// no duplicate fields, unqualified schema
check_fields_unique(&DFSchema::try_from(Schema::new(vec![
Field::new("c100", DataType::Boolean, true),
Field::new("c101", DataType::Boolean, true),
]))?)?;

// no duplicate fields, qualified schema
check_fields_unique(&DFSchema::try_from_qualified_schema(
"t1",
&Schema::new(vec![
Field::new("c100", DataType::Boolean, true),
Field::new("c101", DataType::Boolean, true),
]),
)?)?;

// duplicate unqualified field with same qualifier
assert_eq!(
check_fields_unique(&DFSchema::try_from(Schema::new(vec![
Field::new("c0", DataType::Boolean, true),
Field::new("c1", DataType::Boolean, true),
Field::new("c1", DataType::Boolean, true),
Field::new("c2", DataType::Boolean, true),
]))?)
.unwrap_err()
.to_string(),
"Schema error: Schema contains duplicate unqualified field name c1"
);

// duplicate qualified field with same qualifier
assert_eq!(
check_fields_unique(&DFSchema::try_from_qualified_schema(
"t1",
&Schema::new(vec![
Field::new("c1", DataType::Boolean, true),
Field::new("c1", DataType::Boolean, true),
]),
)?)
.unwrap_err()
.to_string(),
"Schema error: Schema contains duplicate qualified field name t1.c1"
);

// duplicate qualified and unqualified field
assert_eq!(
check_fields_unique(&DFSchema::from_field_specific_qualified_schema(
vec![
None,
Some(TableReference::from("t1")),
],
&Arc::new(Schema::new(vec![
Field::new("c1", DataType::Boolean, true),
Field::new("c1", DataType::Boolean, true),
]))
)?)
.unwrap_err()
.to_string(),
"Schema error: Schema contains qualified field name t1.c1 and unqualified field name c1 which would be ambiguous"
);

// qualified fields with duplicate unqualified names
assert_eq!(
check_fields_unique(&DFSchema::from_field_specific_qualified_schema(
vec![
Some(TableReference::from("t1")),
Some(TableReference::from("t2")),
],
&Arc::new(Schema::new(vec![
Field::new("c1", DataType::Boolean, true),
Field::new("c1", DataType::Boolean, true),
]))
)?)
.unwrap_err()
.to_string(),
"Schema error: Schema contains qualified fields with duplicate unqualified names t1.c1"
);

Ok(())
}
}
1 change: 0 additions & 1 deletion datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ fn expand_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Projection(Projection { expr, input, .. }) => {
let projected_expr = expand_exprlist(&input, expr)?;
validate_unique_names("Projections", projected_expr.iter())?;
Ok(Transformed::yes(
Projection::try_new(projected_expr, Arc::clone(&input))
.map(LogicalPlan::Projection)?,
Expand Down
10 changes: 5 additions & 5 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,11 @@ fn test_propagate_empty_relation_inner_join_and_unions() {
#[test]
fn select_wildcard_with_repeated_column() {
let sql = "SELECT *, col_int32 FROM test";
let err = test_sql(sql).expect_err("query should have failed");
assert_eq!(
"Schema error: Schema contains duplicate qualified field name test.col_int32",
err.strip_backtrace()
);
let plan = test_sql(sql).unwrap();
let expected = "\
Projection: test.col_int32, test.col_uint32, test.col_utf8, test.col_date32, test.col_date64, test.col_ts_nano_none, test.col_ts_nano_utc, test.col_int32\
\n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]";
assert_eq!(expected, format!("{plan}"));
}

#[test]
Expand Down
1 change: 0 additions & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

let schema = self.build_schema(columns)?;
let df_schema = schema.to_dfschema_ref()?;
df_schema.check_names()?;

let ordered_exprs =
self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
Expand Down
Loading

0 comments on commit 8ff7e9a

Please sign in to comment.