Skip to content

Commit

Permalink
blaze: supports pruning IsNotNull
Browse files Browse the repository at this point in the history
blaze: supports pruning with starts_with

blaze: use case-insensitive comparing in processing parquet file schema.

blaze: fix incorrect result with case-when expr

blaze: make some parquet-scan related structs/functions public

blaze: cast column to target data type in schema adapter

blaze: supports parquet dictionary filtering

blaze: accept nullable partition value

blaze: disable isnull/isnotnull pruning for nested data type

blaze: finds column in row-group using column path, instead of column_descr.name

blaze: use deprecated min/max values only when min == max

blaze: re-add mem_used() to baseline metrics

blaze: fix compile errors

blaze: supports ScalarValue::Map

blaze: make scatter() public

blaze: allow nullable parquet scan partition key.

blaze: optmize case expr

blaze: skip parquet utf-8 validating

blaze: handle special cases when pruning float column with NaN values
  • Loading branch information
zhangli20 committed Mar 24, 2024
1 parent bf6f83b commit 1565cd3
Show file tree
Hide file tree
Showing 17 changed files with 1,000 additions and 117 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ rust-version = "1.72"
version = "36.0.0"

[workspace.dependencies]
arrow = { version = "50.0.0", features = ["prettyprint"] }
arrow-array = { version = "50.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "50.0.0", default-features = false }
arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
arrow-ipc = { version = "50.0.0", default-features = false, features = ["lz4"] }
arrow-ord = { version = "50.0.0", default-features = false }
arrow-schema = { version = "50.0.0", default-features = false }
arrow-string = { version = "50.0.0", default-features = false }
arrow = { git = "https://github.com/blaze-init/arrow-rs", rev = "4d46e9e1f0", features = ["prettyprint"] }
arrow-array = { git = "https://github.com/blaze-init/arrow-rs", rev = "4d46e9e1f0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs", rev = "4d46e9e1f0", default-features = false }
arrow-flight = { git = "https://github.com/blaze-init/arrow-rs", rev = "4d46e9e1f0", features = ["flight-sql-experimental"] }
arrow-ipc = { git = "https://github.com/blaze-init/arrow-rs", rev = "4d46e9e1f0", default-features = false, features = ["lz4"] }
arrow-ord = { git = "https://github.com/blaze-init/arrow-rs", rev = "4d46e9e1f0", default-features = false }
arrow-schema = { git = "https://github.com/blaze-init/arrow-rs", rev = "4d46e9e1f0", default-features = false }
arrow-string = { git = "https://github.com/blaze-init/arrow-rs", rev = "4d46e9e1f0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "0.4.1"
bytes = "1.4"
Expand Down Expand Up @@ -69,7 +69,7 @@ log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.9.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "50.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { git = "https://github.com/blaze-init/arrow-rs", rev = "4d46e9e1f0", default-features = false, features = ["arrow", "async", "object_store"] }
rand = "0.8"
rstest = "0.18.0"
serde_json = "1"
Expand Down
54 changes: 54 additions & 0 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::sync::Arc;
use crate::cast::{
as_decimal128_array, as_decimal256_array, as_dictionary_array,
as_fixed_size_binary_array, as_fixed_size_list_array,
as_map_array, as_struct_array,
};
use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err};
use crate::hash_utils::create_hashes;
Expand All @@ -42,6 +43,7 @@ use arrow::compute::kernels::numeric::*;
use arrow::util::display::{array_value_to_string, ArrayFormatter, FormatOptions};
use arrow::{
array::*,
buffer::{OffsetBuffer, ScalarBuffer},
compute::kernels::cast::{cast_with_options, CastOptions},
datatypes::{
i256, ArrowDictionaryKeyType, ArrowNativeType, ArrowTimestampType, DataType,
Expand Down Expand Up @@ -276,6 +278,8 @@ pub enum ScalarValue {
DurationMicrosecond(Option<i64>),
/// Duration in nanoseconds
DurationNanosecond(Option<i64>),
/// map of nested ScalarValue, represented as struct<key, value>
Map(Box<ScalarValue>, bool),
/// Dictionary type: index type and value
Dictionary(Box<DataType>, Box<ScalarValue>),
}
Expand Down Expand Up @@ -342,6 +346,8 @@ impl PartialEq for ScalarValue {
(LargeList(_), _) => false,
(Struct(v1), Struct(v2)) => v1.eq(v2),
(Struct(_), _) => false,
(Map(v1, s1), Map(v2, s2)) => v1.eq(v2) && s1.eq(s2),
(Map(_, _), _) => false,
(Date32(v1), Date32(v2)) => v1.eq(v2),
(Date32(_), _) => false,
(Date64(v1), Date64(v2)) => v1.eq(v2),
Expand Down Expand Up @@ -461,6 +467,14 @@ impl PartialOrd for ScalarValue {
partial_cmp_struct(struct_arr1, struct_arr2)
}
(Struct(_), _) => None,
(Map(v1, s1), Map(v2, s2)) => {
if s1.eq(s2) {
v1.partial_cmp(v2)
} else {
None
}
}
(Map(_, _), _) => None,
(Date32(v1), Date32(v2)) => v1.partial_cmp(v2),
(Date32(_), _) => None,
(Date64(v1), Date64(v2)) => v1.partial_cmp(v2),
Expand Down Expand Up @@ -647,6 +661,7 @@ impl std::hash::Hash for ScalarValue {
Struct(arr) => {
hash_nested_array(arr.to_owned() as ArrayRef, state);
}
Map(v, s) => (v, s).hash(state),
Date32(v) => v.hash(state),
Date64(v) => v.hash(state),
Time32Second(v) => v.hash(state),
Expand Down Expand Up @@ -1071,6 +1086,16 @@ impl ScalarValue {
ScalarValue::LargeList(arr) => arr.data_type().to_owned(),
ScalarValue::FixedSizeList(arr) => arr.data_type().to_owned(),
ScalarValue::Struct(arr) => arr.data_type().to_owned(),
ScalarValue::Map(v, s) => {
if let ScalarValue::Struct(fields) = v.as_ref() {
DataType::Map(
Arc::new(Field::new("entries", DataType::Struct(fields.fields().clone()), true)),
*s,
)
} else {
panic!("ScalarValue::Map inner value must be struct<key, value>")
}
},
ScalarValue::Date32(_) => DataType::Date32,
ScalarValue::Date64(_) => DataType::Date64,
ScalarValue::Time32Second(_) => DataType::Time32(TimeUnit::Second),
Expand Down Expand Up @@ -1276,6 +1301,7 @@ impl ScalarValue {
ScalarValue::LargeList(arr) => arr.len() == arr.null_count(),
ScalarValue::FixedSizeList(arr) => arr.len() == arr.null_count(),
ScalarValue::Struct(arr) => arr.len() == arr.null_count(),
ScalarValue::Map(v, _) => v.is_null(),
ScalarValue::Date32(v) => v.is_none(),
ScalarValue::Date64(v) => v.is_none(),
ScalarValue::Time32Second(v) => v.is_none(),
Expand Down Expand Up @@ -1993,6 +2019,20 @@ impl ScalarValue {
ScalarValue::Struct(arr) => {
Self::list_to_array_of_size(arr.as_ref() as &dyn Array, size)?
}
ScalarValue::Map(v, sorted) => {
let inner_array = v.to_array_of_size(size)?;
let inner_struct = as_struct_array(&inner_array)
.expect("ScalarValue::Map inner array data type must be struct<key, value>");
let inner_data = inner_struct.to_data();

Arc::new(MapArray::new(
Arc::new(Field::new("entries", inner_struct.data_type().clone(), true)),
OffsetBuffer::new(ScalarBuffer::<i32>::from(inner_data.buffers()[0].clone())),
inner_struct.clone(),
inner_data.nulls().cloned(),
*sorted,
))
}
ScalarValue::Date32(e) => {
build_array_from_option!(Date32, Date32Array, e, size)
}
Expand Down Expand Up @@ -2266,6 +2306,12 @@ impl ScalarValue {

ScalarValue::FixedSizeList(arr)
}
DataType::Map(_field, sorted) => {
let array = as_map_array(array)?;
let inner_struct = array.value(index);
let inner_scalar = ScalarValue::try_from_array(&inner_struct, 0)?;
ScalarValue::Map(Box::new(inner_scalar), *sorted)
}
DataType::Date32 => typed_cast!(array, index, Date32Array, Date32)?,
DataType::Date64 => typed_cast!(array, index, Date64Array, Date64)?,
DataType::Time32(TimeUnit::Second) => {
Expand Down Expand Up @@ -2547,6 +2593,7 @@ impl ScalarValue {
ScalarValue::Struct(arr) => {
Self::eq_array_list(&(arr.to_owned() as ArrayRef), array, index)
}
ScalarValue::Map(_, _) => unimplemented!(),
ScalarValue::Date32(val) => {
eq_array_primitive!(array, index, Date32Array, val)?
}
Expand Down Expand Up @@ -2674,6 +2721,7 @@ impl ScalarValue {
ScalarValue::List(arr) => arr.get_array_memory_size(),
ScalarValue::LargeList(arr) => arr.get_array_memory_size(),
ScalarValue::FixedSizeList(arr) => arr.get_array_memory_size(),
ScalarValue::Map(v, _) => v.size() + 1,
ScalarValue::Struct(arr) => arr.get_array_memory_size(),
ScalarValue::Dictionary(dt, sv) => {
// `dt` and `sv` are boxed, so they are NOT already included in `self`
Expand Down Expand Up @@ -3086,6 +3134,11 @@ impl fmt::Display for ScalarValue {
ScalarValue::List(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::LargeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::FixedSizeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::Map(v, s) => if !v.is_null() {
write!(f, "Map({v}, {s})")?
} else {
write!(f, "NULL")?
},
ScalarValue::Date32(e) => format_option!(f, e)?,
ScalarValue::Date64(e) => format_option!(f, e)?,
ScalarValue::Time32Second(e) => format_option!(f, e)?,
Expand Down Expand Up @@ -3220,6 +3273,7 @@ impl fmt::Debug for ScalarValue {
.join(",")
)
}
ScalarValue::Map(_, _) => write!(f, "Map([{self}])"),
ScalarValue::Date32(_) => write!(f, "Date32(\"{self}\")"),
ScalarValue::Date64(_) => write!(f, "Date64(\"{self}\")"),
ScalarValue::Time32Second(_) => write!(f, "Time32Second(\"{self}\")"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ impl FileScanConfig {
table_cols_stats.push(self.statistics.column_statistics[idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
table_fields.push(self.table_partition_cols[partition_idx]
.clone() // blaze: spark allows nullable partition key
.with_nullable(true));
// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::new_unknown())
}
Expand Down Expand Up @@ -160,7 +162,7 @@ impl FileScanConfig {
})
}

pub(crate) fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
self.projection.as_ref().map(|p| {
p.iter()
.filter(|col_idx| **col_idx < self.file_schema.fields().len())
Expand Down
50 changes: 26 additions & 24 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use std::{
};

use super::listing::ListingTableUrl;
use crate::error::{DataFusionError, Result};
use crate::error::Result;
use crate::physical_plan::{DisplayAs, DisplayFormatType};
use crate::{
datasource::{
Expand All @@ -63,12 +63,11 @@ use crate::{
};

use arrow::{
array::new_null_array,
compute::{can_cast_types, cast},
array::{ArrayRef, new_null_array},
datatypes::{DataType, Schema, SchemaRef},
record_batch::{RecordBatch, RecordBatchOptions},
};
use datafusion_common::{file_options::FileTypeWriterOptions, plan_err};
use datafusion_common::file_options::FileTypeWriterOptions;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -259,21 +258,21 @@ where
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
#[derive(Clone, Debug)]
pub(crate) struct SchemaAdapter {
pub struct SchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
}

impl SchemaAdapter {
pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
pub fn new(table_schema: SchemaRef) -> SchemaAdapter {
Self { table_schema }
}

/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
pub(crate) fn map_column_index(
pub fn map_column_index(
&self,
index: usize,
file_schema: &Schema,
Expand All @@ -298,23 +297,15 @@ impl SchemaAdapter {
let mut field_mappings = vec![None; self.table_schema.fields().len()];

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, table_field)) =
self.table_schema.fields().find(file_field.name())
if let Some((table_idx, _table_field)) = self.table_schema
.fields()
.iter()
.enumerate()
.find(|(_, b)| b.name().eq_ignore_ascii_case(file_field.name()))
{
match can_cast_types(file_field.data_type(), table_field.data_type()) {
true => {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
false => {
return plan_err!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
)
}
}
// blaze: no need to check with can_cast_types()
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
}

Expand Down Expand Up @@ -344,13 +335,24 @@ impl SchemaMapping {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();

// blaze:
// cast to target data type before adding columns
extern "Rust" {
fn schema_adapter_cast_column(
col: &ArrayRef,
data_type: &DataType,
) -> Result<ArrayRef>;
}

let cols = self
.table_schema
.fields()
.iter()
.zip(&self.field_mappings)
.map(|(field, file_idx)| match file_idx {
Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()),
Some(batch_idx) => unsafe {
schema_adapter_cast_column(&batch_cols[*batch_idx], field.data_type())
}
None => Ok(new_null_array(field.data_type(), batch_rows)),
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down
Loading

0 comments on commit 1565cd3

Please sign in to comment.