Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in projection: "column types must match schema types, expected XXX but found YYY" #1448

Merged
merged 3 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions datafusion/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! projection expressions. `SELECT` without `FROM` will only evaluate expressions.

use std::any::Any;
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -63,13 +64,15 @@ impl ProjectionExec {

let fields: Result<Vec<Field>> = expr
.iter()
.map(|(e, name)| match input_schema.field_with_name(name) {
Ok(f) => Ok(f.clone()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally missed this in my review -- basically the projection's output is not the same for its input even if the field name matches (due to aliases). The actual output type needs to be calculated from the expr for all cases.

Err(_) => {
let dt = e.data_type(&input_schema)?;
let nullable = e.nullable(&input_schema)?;
Ok(Field::new(name, dt, nullable))
}
.map(|(e, name)| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced it is a great idea to copy field metadata from the input to the output based on the field name alone... As that would mean metadata on a field named a from the input could end up on the output field a even if the output was not related at all to a.

Thus I will make made this code only copy field level metadata for a direct input column reference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the question I had asked previously about when projections created new columns, basically, it doesn't seem to make sense to just blindly carry over metadata because it's about a column that may no longer exist.

let mut field = Field::new(
name,
e.data_type(&input_schema)?,
e.nullable(&input_schema)?,
);
field.set_metadata(get_field_metadata(e, &input_schema));

Ok(field)
})
.collect();

Expand Down Expand Up @@ -179,6 +182,24 @@ impl ExecutionPlan for ProjectionExec {
}
}

/// If e is a direct column reference, returns the field level
/// metadata for that field, if any. Otherwise returns None
fn get_field_metadata(
e: &Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Option<BTreeMap<String, String>> {
let name = if let Some(column) = e.as_any().downcast_ref::<Column>() {
column.name()
} else {
return None;
};

input_schema
.field_with_name(name)
.ok()
.and_then(|f| f.metadata().as_ref().cloned())
}

fn stats_projection(
stats: Statistics,
exprs: impl Iterator<Item = Arc<dyn PhysicalExpr>>,
Expand Down
23 changes: 23 additions & 0 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,29 @@ async fn projection_same_fields() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn projection_type_alias() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_aggregate_simple_csv(&mut ctx).await?;

// Query that aliases one column to the name of a different column
// that also has a different type (c1 == float32, c3 == boolean)
let sql = "SELECT c1 as c3 FROM aggregate_simple ORDER BY c3 LIMIT 2";
let actual = execute_to_batches(&mut ctx, sql).await;

let expected = vec![
"+---------+",
"| c3 |",
"+---------+",
"| 0.00001 |",
"| 0.00002 |",
"+---------+",
];
assert_batches_eq!(expected, &actual);

Ok(())
}

#[tokio::test]
async fn csv_query_group_by_float64() -> Result<()> {
let mut ctx = ExecutionContext::new();
Expand Down