Skip to content

Commit

Permalink
blaze: supports pruning IsNotNull
Browse files Browse the repository at this point in the history
blaze: supports pruning with starts_with

blaze: use case-insensitive comparing in processing parquet file schema.

blaze: fix incorrect result with case-when expr

blaze: make some parquet-scan related structs/functions public

blaze: cast column to target data type in schema adapter

blaze: supports parquet dictionary filtering

blaze: accept nullable partition value

blaze: disable isnull/isnotnull pruning for nested data type

blaze: finds column in row-group using column path, instead of column_descr.name

blaze: use deprecated min/max values only when min == max

blaze: re-add mem_used() to baseline metrics

blaze: fix compile errors

blaze: supports ScalarValue::Map

blaze: make scatter() public

blaze: allow nullable parquet scan partition key.

blaze: optmize case expr

blaze: skip parquet utf-8 validating

blaze: handle special cases when pruning float column with NaN values
  • Loading branch information
zhangli20 committed Nov 28, 2023
1 parent c703526 commit 72f9a7f
Show file tree
Hide file tree
Showing 18 changed files with 1,049 additions and 115 deletions.
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ rust-version = "1.70"
version = "30.0.0"

[workspace.dependencies]
arrow = { version = "45.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
arrow-array = { version = "45.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "45.0.0", default-features = false }
arrow-flight = { version = "45.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "45.0.0", default-features = false }
parquet = { version = "45.0.0", features = ["arrow", "async", "object_store"] }
sqlparser = { version = "0.36.1", features = ["visitor"] }
arrow = { git = "https://github.com/blaze-init/arrow-rs", rev = "5a6d98d183", features = ["prettyprint"] }
arrow-flight = { git = "https://github.com/blaze-init/arrow-rs", rev = "5a6d98d183", features = ["flight-sql-experimental"] }
arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs", rev = "5a6d98d183", default-features = false }
arrow-schema = { git = "https://github.com/blaze-init/arrow-rs", rev = "5a6d98d183", default-features = false }
arrow-array = { git = "https://github.com/blaze-init/arrow-rs", rev = "5a6d98d183", default-features = false, features = ["chrono-tz"] }
parquet = { git = "https://github.com/blaze-init/arrow-rs", rev = "5a6d98d183", features = ["arrow", "async", "object_store"] }
sqlparser = { version = "0.35", features = ["visitor"] }

[profile.release]
codegen-units = 1
Expand Down
57 changes: 55 additions & 2 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};

use crate::cast::{
as_decimal128_array, as_decimal256_array, as_dictionary_array,
as_fixed_size_binary_array, as_fixed_size_list_array, as_list_array, as_struct_array,
as_fixed_size_binary_array, as_fixed_size_list_array, as_list_array,
as_map_array, as_struct_array,
};
use crate::delta::shift_months;
use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err};
use arrow::buffer::NullBuffer;
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::compute::nullif;
use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder};
use arrow::{
Expand Down Expand Up @@ -147,6 +148,8 @@ pub enum ScalarValue {
DurationNanosecond(Option<i64>),
/// struct of nested ScalarValue
Struct(Option<Vec<ScalarValue>>, Fields),
/// map of nested ScalarValue, represented as struct<key, value>
Map(Box<ScalarValue>, bool),
/// Dictionary type: index type and value
Dictionary(Box<DataType>, Box<ScalarValue>),
}
Expand Down Expand Up @@ -211,6 +214,8 @@ impl PartialEq for ScalarValue {
(Fixedsizelist(_, _, _), _) => false,
(List(v1, t1), List(v2, t2)) => v1.eq(v2) && t1.eq(t2),
(List(_, _), _) => false,
(Map(v1, s1), Map(v2, s2)) => v1.eq(v2) && s1.eq(s2),
(Map(_, _), _) => false,
(Date32(v1), Date32(v2)) => v1.eq(v2),
(Date32(_), _) => false,
(Date64(v1), Date64(v2)) => v1.eq(v2),
Expand Down Expand Up @@ -353,6 +358,14 @@ impl PartialOrd for ScalarValue {
}
}
(List(_, _), _) => None,
(Map(v1, s1), Map(v2, s2)) => {
if s1.eq(s2) {
v1.partial_cmp(v2)
} else {
None
}
}
(Map(_, _), _) => None,
(Date32(v1), Date32(v2)) => v1.partial_cmp(v2),
(Date32(_), _) => None,
(Date64(v1), Date64(v2)) => v1.partial_cmp(v2),
Expand Down Expand Up @@ -1489,6 +1502,7 @@ impl std::hash::Hash for ScalarValue {
v.hash(state);
t.hash(state);
}
Map(v, s) => (v, s).hash(state),
Date32(v) => v.hash(state),
Date64(v) => v.hash(state),
Time32Second(v) => v.hash(state),
Expand Down Expand Up @@ -2002,6 +2016,16 @@ impl ScalarValue {
field.data_type().clone(),
true,
))),
ScalarValue::Map(v, s) => {
if let ScalarValue::Struct(_, fields) = v.as_ref() {
DataType::Map(
Arc::new(Field::new("entries", DataType::Struct(fields.clone()), true)),
*s,
)
} else {
panic!("ScalarValue::Map inner value must be struct<key, value>")
}
},
ScalarValue::Date32(_) => DataType::Date32,
ScalarValue::Date64(_) => DataType::Date64,
ScalarValue::Time32Second(_) => DataType::Time32(TimeUnit::Second),
Expand Down Expand Up @@ -2155,6 +2179,7 @@ impl ScalarValue {
ScalarValue::LargeBinary(v) => v.is_none(),
ScalarValue::Fixedsizelist(v, ..) => v.is_none(),
ScalarValue::List(v, _) => v.is_none(),
ScalarValue::Map(v, _) => v.is_null(),
ScalarValue::Date32(v) => v.is_none(),
ScalarValue::Date64(v) => v.is_none(),
ScalarValue::Time32Second(v) => v.is_none(),
Expand Down Expand Up @@ -2928,6 +2953,20 @@ impl ScalarValue {
)
.unwrap(),
}),
ScalarValue::Map(v, sorted) => {
let inner_array = v.to_array_of_size(size);
let inner_struct = as_struct_array(&inner_array)
.expect("ScalarValue::Map inner array data type must be struct<key, value>");
let inner_data = inner_struct.to_data();

Arc::new(MapArray::new(
Arc::new(Field::new("entries", inner_struct.data_type().clone(), true)),
OffsetBuffer::new(ScalarBuffer::<i32>::from(inner_data.buffers()[0].clone())),
inner_struct.clone(),
inner_data.nulls().cloned(),
*sorted,
))
}
ScalarValue::Date32(e) => {
build_array_from_option!(Date32, Date32Array, e, size)
}
Expand Down Expand Up @@ -3133,6 +3172,12 @@ impl ScalarValue {
};
ScalarValue::new_list(value, nested_type.data_type().clone())
}
DataType::Map(_field, sorted) => {
let array = as_map_array(array)?;
let inner_struct = array.value(index);
let inner_scalar = ScalarValue::try_from_array(&inner_struct, 0)?;
ScalarValue::Map(Box::new(inner_scalar), *sorted)
}
DataType::Date32 => {
typed_cast!(array, index, Date32Array, Date32)
}
Expand Down Expand Up @@ -3410,6 +3455,7 @@ impl ScalarValue {
}
ScalarValue::Fixedsizelist(..) => unimplemented!(),
ScalarValue::List(_, _) => unimplemented!(),
ScalarValue::Map(_, _) => unimplemented!(),
ScalarValue::Date32(val) => {
eq_array_primitive!(array, index, Date32Array, val)
}
Expand Down Expand Up @@ -3538,6 +3584,7 @@ impl ScalarValue {
// `field` is boxed, so it is NOT already included in `self`
+ field.size()
}
ScalarValue::Map(v, _) => v.size() + 1,
ScalarValue::Struct(vals, fields) => {
vals.as_ref()
.map(|vals| {
Expand Down Expand Up @@ -3907,6 +3954,11 @@ impl fmt::Display for ScalarValue {
)?,
None => write!(f, "NULL")?,
},
ScalarValue::Map(v, s) => if !v.is_null() {
write!(f, "Map({v}, {s})")?
} else {
write!(f, "NULL")?
},
ScalarValue::Date32(e) => format_option!(f, e)?,
ScalarValue::Date64(e) => format_option!(f, e)?,
ScalarValue::Time32Second(e) => format_option!(f, e)?,
Expand Down Expand Up @@ -3983,6 +4035,7 @@ impl fmt::Debug for ScalarValue {
ScalarValue::LargeBinary(Some(_)) => write!(f, "LargeBinary(\"{self}\")"),
ScalarValue::Fixedsizelist(..) => write!(f, "FixedSizeList([{self}])"),
ScalarValue::List(_, _) => write!(f, "List([{self}])"),
ScalarValue::Map(_, _) => write!(f, "Map([{self}])"),
ScalarValue::Date32(_) => write!(f, "Date32(\"{self}\")"),
ScalarValue::Date64(_) => write!(f, "Date64(\"{self}\")"),
ScalarValue::Time32Second(_) => write!(f, "Time32Second(\"{self}\")"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl FileScanConfig {
table_fields.push(Field::new(
&self.table_partition_cols[partition_idx].0,
self.table_partition_cols[partition_idx].1.to_owned(),
false,
true, // blaze: spark allows nullable partition key
));
// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::default())
Expand Down Expand Up @@ -192,7 +192,7 @@ impl FileScanConfig {
})
}

pub(crate) fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
self.projection.as_ref().map(|p| {
p.iter()
.filter(|col_idx| **col_idx < self.file_schema.fields().len())
Expand Down
50 changes: 25 additions & 25 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ pub use self::csv::{CsvConfig, CsvExec, CsvOpener};
pub(crate) use self::parquet::plan_to_parquet;
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
use arrow::{
array::new_null_array,
compute::can_cast_types,
array::{ArrayRef, new_null_array},
datatypes::{DataType, Schema, SchemaRef},
record_batch::{RecordBatch, RecordBatchOptions},
};
Expand All @@ -49,7 +48,7 @@ pub use file_scan_config::{
FileScanConfig,
};

use crate::error::{DataFusionError, Result};
use crate::error::Result;
use crate::{
datasource::file_format::write::FileWriterMode,
physical_plan::{DisplayAs, DisplayFormatType},
Expand All @@ -62,10 +61,8 @@ use crate::{
physical_plan::display::{OutputOrderingDisplay, ProjectSchemaDisplay},
};

use datafusion_common::plan_err;
use datafusion_physical_expr::expressions::Column;

use arrow::compute::cast;
use log::debug;
use object_store::path::Path;
use object_store::ObjectMeta;
Expand Down Expand Up @@ -272,21 +269,21 @@ where
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
#[derive(Clone, Debug)]
pub(crate) struct SchemaAdapter {
pub struct SchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
}

impl SchemaAdapter {
pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
pub fn new(table_schema: SchemaRef) -> SchemaAdapter {
Self { table_schema }
}

/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
pub(crate) fn map_column_index(
pub fn map_column_index(
&self,
index: usize,
file_schema: &Schema,
Expand All @@ -311,23 +308,15 @@ impl SchemaAdapter {
let mut field_mappings = vec![None; self.table_schema.fields().len()];

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, table_field)) =
self.table_schema.fields().find(file_field.name())
if let Some((table_idx, _table_field)) = self.table_schema
.fields()
.iter()
.enumerate()
.find(|(_, b)| b.name().eq_ignore_ascii_case(file_field.name()))
{
match can_cast_types(file_field.data_type(), table_field.data_type()) {
true => {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
false => {
return plan_err!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
)
}
}
// blaze: no need to check with can_cast_types()
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
}

Expand Down Expand Up @@ -357,13 +346,24 @@ impl SchemaMapping {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();

// blaze:
// cast to target data type before adding columns
extern "Rust" {
fn schema_adapter_cast_column(
col: &ArrayRef,
data_type: &DataType,
) -> Result<ArrayRef>;
}

let cols = self
.table_schema
.fields()
.iter()
.zip(&self.field_mappings)
.map(|(field, file_idx)| match file_idx {
Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()),
Some(batch_idx) => unsafe {
schema_adapter_cast_column(&batch_cols[*batch_idx], field.data_type())
}
None => Ok(new_null_array(field.data_type(), batch_rows)),
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down
59 changes: 37 additions & 22 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion_physical_expr::{
use fmt::Debug;
use object_store::path::Path;
use std::any::Any;
use std::collections::HashSet;
use std::fmt;
use std::ops::Range;
use std::sync::Arc;
Expand All @@ -65,10 +66,10 @@ use parquet::basic::{ConvertedType, LogicalType};
use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties};
use parquet::schema::types::ColumnDescriptor;

mod metrics;
pub mod metrics;
pub mod page_filter;
mod row_filter;
mod row_groups;
pub mod row_filter;
pub mod row_groups;

pub use metrics::ParquetFileMetrics;

Expand Down Expand Up @@ -397,21 +398,21 @@ impl ExecutionPlan for ParquetExec {
}

/// Implements [`FileOpener`] for a parquet file
struct ParquetOpener {
partition_index: usize,
projection: Arc<[usize]>,
batch_size: usize,
limit: Option<usize>,
predicate: Option<Arc<dyn PhysicalExpr>>,
pruning_predicate: Option<Arc<PruningPredicate>>,
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
table_schema: SchemaRef,
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
pushdown_filters: bool,
reorder_filters: bool,
enable_page_index: bool,
pub struct ParquetOpener {
pub partition_index: usize,
pub projection: Arc<[usize]>,
pub batch_size: usize,
pub limit: Option<usize>,
pub predicate: Option<Arc<dyn PhysicalExpr>>,
pub pruning_predicate: Option<Arc<PruningPredicate>>,
pub page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
pub table_schema: SchemaRef,
pub metadata_size_hint: Option<usize>,
pub metrics: ExecutionPlanMetricsSet,
pub parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
pub pushdown_filters: bool,
pub reorder_filters: bool,
pub enable_page_index: bool,
}

impl FileOpener for ParquetOpener {
Expand Down Expand Up @@ -449,7 +450,7 @@ impl FileOpener for ParquetOpener {
let limit = self.limit;

Ok(Box::pin(async move {
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);
let options = ArrowReaderOptions::new();
let mut builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;
Expand Down Expand Up @@ -490,11 +491,25 @@ impl FileOpener for ParquetOpener {

// Row group pruning: attempt to skip entire row_groups
// using metadata on the row groups
let file_metadata = builder.metadata();
// first run without dictionary filtering, to reduce io to dictionary pages
let file_metadata = builder.metadata().clone();
let row_groups = row_groups::prune_row_groups(
&mut builder,
&file_metadata,
&HashSet::from_iter(0..file_metadata.row_groups().len()),
file_range.clone(),
pruning_predicate.as_ref().map(|p| p.as_ref()),
false,
&file_metrics,
);
// second run with dictionary filtering
let row_groups = row_groups::prune_row_groups(
file_metadata.row_groups(),
file_range,
&mut builder,
&file_metadata,
&HashSet::from_iter(row_groups),
file_range.clone(),
pruning_predicate.as_ref().map(|p| p.as_ref()),
true,
&file_metrics,
);

Expand Down
Loading

0 comments on commit 72f9a7f

Please sign in to comment.