Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: more economic data skipping with datafusion #2772

Merged
merged 6 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.3.0" }
# delta_kernel = { path = "../delta-kernel-rs/kernel" }
# delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" }

# arrow
arrow = { version = "52" }
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/kernel/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Delta Kernel module
//!
//! The Kernel module contains all the logic for reading and processing the Delta Lake transaction log.
use delta_kernel::engine::arrow_expression::ArrowExpressionHandler;

pub mod arrow;
pub mod error;
Expand All @@ -19,3 +20,7 @@ pub trait DataCheck {
/// The SQL expression to use for the check
fn get_expression(&self) -> &str;
}

lazy_static::lazy_static! {
static ref ARROW_HANDLER: ArrowExpressionHandler = ArrowExpressionHandler {};
}
8 changes: 8 additions & 0 deletions crates/core/src/kernel/models/fields.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Schema definitions for action types
use std::sync::Arc;

use delta_kernel::schema::{ArrayType, DataType, MapType, StructField, StructType};
use lazy_static::lazy_static;
Expand Down Expand Up @@ -271,3 +272,10 @@ fn deletion_vector_field() -> StructField {
pub(crate) fn log_schema() -> &'static StructType {
&LOG_SCHEMA
}

pub(crate) fn log_schema_ref() -> &'static Arc<StructType> {
lazy_static! {
static ref LOG_SCHEMA_REF: Arc<StructType> = Arc::new(LOG_SCHEMA.clone());
}
&LOG_SCHEMA_REF
}
145 changes: 138 additions & 7 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
use arrow_array::{
Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray, UInt64Array,
};
use chrono::{DateTime, Utc};
use delta_kernel::expressions::Scalar;
use indexmap::IndexMap;
Expand Down Expand Up @@ -198,12 +200,16 @@ impl LogicalFile<'_> {
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or(DeltaTableError::Generic("()".into()))?;
.ok_or(DeltaTableError::generic(
"expected partition values key field to be of type string",
))?;
let values = map_value
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.ok_or(DeltaTableError::Generic("()".into()))?;
.ok_or(DeltaTableError::generic(
"expected partition values value field to be of type string",
))?;

let values = keys
.iter()
Expand All @@ -212,8 +218,8 @@ impl LogicalFile<'_> {
let (key, field) = self.partition_fields.get_key_value(k.unwrap()).unwrap();
let field_type = match field.data_type() {
DataType::Primitive(p) => Ok(p),
_ => Err(DeltaTableError::Generic(
"nested partitioning values are not supported".to_string(),
_ => Err(DeltaTableError::generic(
"nested partitioning values are not supported",
)),
}?;
Ok((
Expand All @@ -225,7 +231,7 @@ impl LogicalFile<'_> {
})
.collect::<DeltaResult<HashMap<_, _>>>()?;

// NOTE: we recreate the map as a BTreeMap to ensure the order of the keys is consistently
// NOTE: we recreate the map as a IndexMap to ensure the order of the keys is consistently
// the same as the order of partition fields.
self.partition_fields
.iter()
Expand Down Expand Up @@ -470,18 +476,27 @@ impl<'a> IntoIterator for LogDataHandler<'a> {

#[cfg(feature = "datafusion")]
mod datafusion {
use std::collections::HashSet;
use std::sync::Arc;

use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use ::datafusion::physical_optimizer::pruning::PruningStatistics;
use ::datafusion::physical_plan::Accumulator;
use arrow::compute::concat_batches;
use arrow_arith::aggregate::sum;
use arrow_array::Int64Array;
use arrow_array::{ArrayRef, BooleanArray, Int64Array};
use arrow_schema::DataType as ArrowDataType;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
use datafusion_common::Column;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::expressions::Expression;
use delta_kernel::schema::{DataType, PrimitiveType};
use delta_kernel::{ExpressionEvaluator, ExpressionHandler};

use super::*;
use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column};
use crate::kernel::ARROW_HANDLER;

#[derive(Debug, Default, Clone)]
enum AccumulatorType {
Expand Down Expand Up @@ -693,6 +708,122 @@ mod datafusion {
column_statistics,
})
}

fn pick_stats(&self, column: &Column, stats_field: &'static str) -> Option<ArrayRef> {
let field = self.schema.field(&column.name)?;
// See issue #1214. Binary type does not support natural order which is required for Datafusion to prune
if field.data_type() == &DataType::Primitive(PrimitiveType::Binary) {
return None;
}
let expression = if self.metadata.partition_columns.contains(&column.name) {
Expression::Column(format!("add.partitionValues_parsed.{}", column.name))
} else {
Expression::Column(format!("add.stats_parsed.{}.{}", stats_field, column.name))
};
let evaluator = ARROW_HANDLER.get_evaluator(
crate::kernel::models::fields::log_schema_ref().clone(),
expression,
field.data_type().clone(),
);
let mut results = Vec::with_capacity(self.data.len());
for batch in self.data.iter() {
let engine = ArrowEngineData::new(batch.clone());
let result = evaluator.evaluate(&engine).ok()?;
let result = result
.as_any()
.downcast_ref::<ArrowEngineData>()
.ok_or(DeltaTableError::generic(
"failed to downcast evaluator result to ArrowEngineData.",
))
.ok()?;
results.push(result.record_batch().clone());
}
let batch = concat_batches(results[0].schema_ref(), &results).ok()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it cheaper to concat first and then run it through the Evaluator?

Copy link
Collaborator Author

@roeap roeap Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, at the very least that would give more opportunity for parallelism. in fact we should already concatenate the batches on EagerSnapshot. However the schemas of these batches are not yet normalized, so we cannot concatenate them yet.

For this we need to do either some internal casting/filtering in the log replay, or even better do column selection when reading the checkpoints ...

batch.column_by_name("output").map(|c| c.clone())
}
}

impl<'a> PruningStatistics for LogDataHandler<'a> {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
self.pick_stats(column, "minValues")
}

/// return the maximum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows.
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
self.pick_stats(column, "maxValues")
}

/// return the number of containers (e.g. row groups) being
/// pruned with these statistics
fn num_containers(&self) -> usize {
self.data.iter().map(|f| f.num_rows()).sum()
}

/// return the number of null values for the named column as an
/// `Option<UInt64Array>`.
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
if !self.metadata.partition_columns.contains(&column.name) {
let counts = self.pick_stats(column, "nullCount")?;
return arrow_cast::cast(counts.as_ref(), &ArrowDataType::UInt64).ok();
}
let partition_values = self.pick_stats(column, "__dummy__")?;
let row_counts = self.row_counts(column)?;
let row_counts = row_counts.as_any().downcast_ref::<UInt64Array>()?;
let mut null_counts = Vec::with_capacity(partition_values.len());
for i in 0..partition_values.len() {
let null_count = if partition_values.is_null(i) {
row_counts.value(i)
} else {
0
};
null_counts.push(null_count);
}
Some(Arc::new(UInt64Array::from(null_counts)))
}

/// return the number of rows for the named column in each container
/// as an `Option<UInt64Array>`.
///
/// Note: the returned array must contain `num_containers()` rows
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
lazy_static::lazy_static! {
static ref ROW_COUNTS_EVAL: Arc<dyn ExpressionEvaluator> = ARROW_HANDLER.get_evaluator(
crate::kernel::models::fields::log_schema_ref().clone(),
Expression::column("add.stats_parsed.numRecords"),
DataType::Primitive(PrimitiveType::Long),
);
}
let mut results = Vec::with_capacity(self.data.len());
for batch in self.data.iter() {
let engine = ArrowEngineData::new(batch.clone());
let result = ROW_COUNTS_EVAL.evaluate(&engine).ok()?;
let result = result
.as_any()
.downcast_ref::<ArrowEngineData>()
.ok_or(DeltaTableError::generic(
"failed to downcast evaluator result to ArrowEngineData.",
))
.ok()?;
results.push(result.record_batch().clone());
}
let batch = concat_batches(results[0].schema_ref(), &results).ok()?;
arrow_cast::cast(batch.column_by_name("output")?, &ArrowDataType::UInt64).ok()
}

// This function is required since DataFusion 35.0, but is implemented as a no-op
// https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550
fn contained(
&self,
_column: &Column,
_value: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
None
}
}
}

Expand Down
128 changes: 70 additions & 58 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,50 +311,7 @@ impl Snapshot {
/// Get the statistics schema of the snapshot
pub fn stats_schema(&self, table_schema: Option<&StructType>) -> DeltaResult<StructType> {
let schema = table_schema.unwrap_or_else(|| self.schema());

let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() {
stats_cols
.iter()
.map(|col| match get_stats_field(schema, col) {
Some(field) => match field.data_type() {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => {
Err(DeltaTableError::Generic(format!(
"Stats column {} has unsupported type {}",
col,
field.data_type()
)))
}
_ => Ok(StructField::new(
field.name(),
field.data_type().clone(),
true,
)),
},
_ => Err(DeltaTableError::Generic(format!(
"Stats column {} not found in schema",
col
))),
})
.collect::<Result<Vec<_>, _>>()?
} else {
let num_indexed_cols = self.table_config().num_indexed_cols();
schema
.fields
.values()
.enumerate()
.filter_map(|(idx, f)| stats_field(idx, num_indexed_cols, f))
.collect()
};
Ok(StructType::new(vec![
StructField::new("numRecords", DataType::LONG, true),
StructField::new("minValues", StructType::new(stats_fields.clone()), true),
StructField::new("maxValues", StructType::new(stats_fields.clone()), true),
StructField::new(
"nullCount",
StructType::new(stats_fields.iter().filter_map(to_count_field).collect()),
true,
),
]))
stats_schema(schema, self.table_config())
}

/// Get the partition values schema of the snapshot
Expand All @@ -366,20 +323,7 @@ impl Snapshot {
return Ok(None);
}
let schema = table_schema.unwrap_or_else(|| self.schema());
Ok(Some(StructType::new(
self.metadata
.partition_columns
.iter()
.map(|col| {
schema.field(col).cloned().ok_or_else(|| {
DeltaTableError::Generic(format!(
"Partition column {} not found in schema",
col
))
})
})
.collect::<Result<Vec<_>, _>>()?,
)))
partitions_schema(schema, &self.metadata().partition_columns)
}
}

Expand Down Expand Up @@ -713,6 +657,74 @@ impl EagerSnapshot {
}
}

fn stats_schema<'a>(schema: &StructType, config: TableConfig<'a>) -> DeltaResult<StructType> {
let stats_fields = if let Some(stats_cols) = config.stats_columns() {
stats_cols
.iter()
.map(|col| match get_stats_field(schema, col) {
Some(field) => match field.data_type() {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => {
Err(DeltaTableError::Generic(format!(
"Stats column {} has unsupported type {}",
col,
field.data_type()
)))
}
_ => Ok(StructField::new(
field.name(),
field.data_type().clone(),
true,
)),
},
_ => Err(DeltaTableError::Generic(format!(
"Stats column {} not found in schema",
col
))),
})
.collect::<Result<Vec<_>, _>>()?
} else {
let num_indexed_cols = config.num_indexed_cols();
schema
.fields
.values()
.enumerate()
.filter_map(|(idx, f)| stats_field(idx, num_indexed_cols, f))
.collect()
};
Ok(StructType::new(vec![
StructField::new("numRecords", DataType::LONG, true),
StructField::new("minValues", StructType::new(stats_fields.clone()), true),
StructField::new("maxValues", StructType::new(stats_fields.clone()), true),
StructField::new(
"nullCount",
StructType::new(stats_fields.iter().filter_map(to_count_field).collect()),
true,
),
]))
}

pub(crate) fn partitions_schema(
schema: &StructType,
partition_columns: &Vec<String>,
) -> DeltaResult<Option<StructType>> {
if partition_columns.is_empty() {
return Ok(None);
}
Ok(Some(StructType::new(
partition_columns
.iter()
.map(|col| {
schema.field(col).map(|field| field.clone()).ok_or_else(|| {
DeltaTableError::Generic(format!(
"Partition column {} not found in schema",
col
))
})
})
.collect::<Result<Vec<_>, _>>()?,
)))
}

fn stats_field(idx: usize, num_indexed_cols: i32, field: &StructField) -> Option<StructField> {
if !(num_indexed_cols < 0 || (idx as i32) < num_indexed_cols) {
return None;
Expand Down
Loading
Loading