Skip to content

Commit

Permalink
fix: use ScalarValue(None) value instead of ScalarValue::Null
Browse files Browse the repository at this point in the history
Switch the `get_prune_stats` functions to use `None` to represent `null`
instead of `ScalarValue::Null` as `ArrayRef` must be of all the same
type.

https://github.com/apache/arrow-datafusion/blob/dd5e1dbbfd20539b40ae65acb8883f7e392cba92/datafusion/core/src/physical_optimizer/pruning.rs#L54-L72
  • Loading branch information
cmackenzie1 committed May 31, 2023
1 parent 24ddc5b commit 1e4c57f
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
19 changes: 14 additions & 5 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl DeltaTableState {
}
}

// TODO: Collapse with operations/transaction/state.rs method of same name
fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option<ArrayRef> {
let field = table
.get_schema()
Expand All @@ -262,7 +263,9 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option
Some(v) => serde_json::Value::String(v.to_string()),
None => serde_json::Value::Null,
};
to_correct_scalar_value(&value, &data_type).unwrap_or(ScalarValue::Null)
to_correct_scalar_value(&value, &data_type).unwrap_or(
get_null_of_arrow_type(&data_type).expect("Could not determine null type"),
)
} else if let Ok(Some(statistics)) = add.get_stats() {
let values = if get_max {
statistics.max_values
Expand All @@ -273,9 +276,12 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option
values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type))
.unwrap_or(ScalarValue::Null)
.unwrap_or(
get_null_of_arrow_type(&data_type).expect("Could not determine null type"),
)
} else {
ScalarValue::Null
// No statistics available
get_null_of_arrow_type(&data_type).expect("Could not determine null type")
}
});
ScalarValue::iter_to_array(values).ok()
Expand Down Expand Up @@ -547,7 +553,7 @@ impl ExecutionPlan for DeltaScan {
}
}

fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
match t {
ArrowDataType::Null => Ok(ScalarValue::Null),
ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
Expand Down Expand Up @@ -584,11 +590,14 @@ fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
})
}
ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
k.clone(),
Box::new(get_null_of_arrow_type(v).unwrap()),
)),
//Unsupported types...
ArrowDataType::Float16
| ArrowDataType::Decimal256(_, _)
| ArrowDataType::Union(_, _)
| ArrowDataType::Dictionary(_, _)
| ArrowDataType::LargeList(_)
| ArrowDataType::Struct(_)
| ArrowDataType::List(_)
Expand Down
18 changes: 13 additions & 5 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use sqlparser::parser::Parser;
use sqlparser::tokenizer::Tokenizer;

use crate::action::Add;
use crate::delta_datafusion::{logical_expr_to_physical_expr, to_correct_scalar_value};
use crate::delta_datafusion::{
get_null_of_arrow_type, logical_expr_to_physical_expr, to_correct_scalar_value,
};
use crate::table_state::DeltaTableState;
use crate::DeltaResult;
use crate::DeltaTableError;
Expand Down Expand Up @@ -190,14 +192,18 @@ impl<'a> AddContainer<'a> {
return None;
}

let data_type = field.data_type();

let values = self.inner.iter().map(|add| {
if self.partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
let value = match value {
Some(v) => serde_json::Value::String(v.to_string()),
None => serde_json::Value::Null,
};
to_correct_scalar_value(&value, field.data_type()).unwrap_or(ScalarValue::Null)
to_correct_scalar_value(&value, data_type).unwrap_or(
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
)
} else if let Ok(Some(statistics)) = add.get_stats() {
let values = if get_max {
statistics.max_values
Expand All @@ -207,10 +213,12 @@ impl<'a> AddContainer<'a> {

values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, field.data_type()))
.unwrap_or(ScalarValue::Null)
.and_then(|f| to_correct_scalar_value(f.as_value()?, data_type))
.unwrap_or(
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
)
} else {
ScalarValue::Null
get_null_of_arrow_type(data_type).expect("Could not determine null type")
}
});
ScalarValue::iter_to_array(values).ok()
Expand Down
16 changes: 9 additions & 7 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,13 +895,15 @@ async fn test_issue_1374() -> Result<()> {
.await?;

let expected = vec![
"+---------------------+-------------+------------+",
"| timestamp | temperature | date |",
"+---------------------+-------------+------------+",
"| 2023-05-17T17:00:00 | 20 | 2023-05-17 |",
"| 2023-05-18T18:00:00 | 20 | 2023-05-18 |",
"| 2023-05-19T19:00:00 | 20 | 2023-05-19 |",
"+---------------------+-------------+------------+",
"+----------------------------+-------------+------------+",
"| timestamp | temperature | date |",
"+----------------------------+-------------+------------+",
"| 2023-05-24T00:01:25.010301 | 8 | 2023-05-24 |",
"| 2023-05-24T00:01:25.013902 | 21 | 2023-05-24 |",
"| 2023-05-24T00:01:25.013972 | 58 | 2023-05-24 |",
"| 2023-05-24T00:01:25.014025 | 24 | 2023-05-24 |",
"| 2023-05-24T00:01:25.014072 | 90 | 2023-05-24 |",
"+----------------------------+-------------+------------+",
];

assert_batches_sorted_eq!(&expected, &batches);
Expand Down

0 comments on commit 1e4c57f

Please sign in to comment.