diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index e1fd93a9b6f3..33d52dfa316a 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -33,6 +33,7 @@ use super::schema::{ decimal_length_from_precision, }; +use crate::arrow::levels::calculate_array_levels; use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; @@ -173,16 +174,15 @@ impl ArrowWriter { } } - let mut levels: Vec<_> = arrays + let mut levels = arrays .iter() .map(|array| { - let batch_level = LevelInfo::new(0, array.len()); - let mut levels = batch_level.calculate_array_levels(array, field); + let mut levels = calculate_array_levels(array, field)?; // Reverse levels as we pop() them when writing arrays levels.reverse(); - levels + Ok(levels) }) - .collect(); + .collect::>>()?; write_leaves(&mut row_group_writer, &arrays, &mut levels)?; } @@ -341,26 +341,24 @@ fn write_leaf( column: &ArrayRef, levels: LevelInfo, ) -> Result { - let indices = levels.filter_array_indices(); - // Slice array according to computed offset and length - let column = column.slice(levels.offset, levels.length); + // TODO: Avoid filtering if no need + let indices = levels.non_null_indices(); let written = match writer { ColumnWriter::Int32ColumnWriter(ref mut typed) => { let values = match column.data_type() { ArrowDataType::Date64 => { // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32 let array = if let ArrowDataType::Date64 = column.data_type() { - let array = - arrow::compute::cast(&column, &ArrowDataType::Date32)?; + let array = arrow::compute::cast(column, &ArrowDataType::Date32)?; arrow::compute::cast(&array, &ArrowDataType::Int32)? } else { - arrow::compute::cast(&column, &ArrowDataType::Int32)? + arrow::compute::cast(column, &ArrowDataType::Int32)? }; let array = array .as_any() .downcast_ref::() .expect("Unable to get int32 array"); - get_numeric_array_slice::(array, &indices) + get_numeric_array_slice::(array, indices) } ArrowDataType::UInt32 => { // follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map @@ -373,21 +371,21 @@ fn write_leaf( array, |x| x as i32, ); - get_numeric_array_slice::(&array, &indices) + get_numeric_array_slice::(&array, indices) } _ => { - let array = arrow::compute::cast(&column, &ArrowDataType::Int32)?; + let array = arrow::compute::cast(column, &ArrowDataType::Int32)?; let array = array .as_any() .downcast_ref::() .expect("Unable to get i32 array"); - get_numeric_array_slice::(array, &indices) + get_numeric_array_slice::(array, indices) } }; typed.write_batch( values.as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), + levels.def_levels(), + levels.rep_levels(), )? } ColumnWriter::BoolColumnWriter(ref mut typed) => { @@ -396,9 +394,9 @@ fn write_leaf( .downcast_ref::() .expect("Unable to get boolean array"); typed.write_batch( - get_bool_array_slice(array, &indices).as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), + get_bool_array_slice(array, indices).as_slice(), + levels.def_levels(), + levels.rep_levels(), )? } ColumnWriter::Int64ColumnWriter(ref mut typed) => { @@ -408,7 +406,7 @@ fn write_leaf( .as_any() .downcast_ref::() .expect("Unable to get i64 array"); - get_numeric_array_slice::(array, &indices) + get_numeric_array_slice::(array, indices) } ArrowDataType::UInt64 => { // follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map @@ -421,21 +419,21 @@ fn write_leaf( array, |x| x as i64, ); - get_numeric_array_slice::(&array, &indices) + get_numeric_array_slice::(&array, indices) } _ => { - let array = arrow::compute::cast(&column, &ArrowDataType::Int64)?; + let array = arrow::compute::cast(column, &ArrowDataType::Int64)?; let array = array .as_any() .downcast_ref::() .expect("Unable to get i64 array"); - get_numeric_array_slice::(array, &indices) + get_numeric_array_slice::(array, indices) } }; typed.write_batch( values.as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), + levels.def_levels(), + levels.rep_levels(), )? } ColumnWriter::Int96ColumnWriter(ref mut _typed) => { @@ -447,9 +445,9 @@ fn write_leaf( .downcast_ref::() .expect("Unable to get Float32 array"); typed.write_batch( - get_numeric_array_slice::(array, &indices).as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), + get_numeric_array_slice::(array, indices).as_slice(), + levels.def_levels(), + levels.rep_levels(), )? } ColumnWriter::DoubleColumnWriter(ref mut typed) => { @@ -458,9 +456,9 @@ fn write_leaf( .downcast_ref::() .expect("Unable to get Float64 array"); typed.write_batch( - get_numeric_array_slice::(array, &indices).as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), + get_numeric_array_slice::(array, indices).as_slice(), + levels.def_levels(), + levels.rep_levels(), )? } ColumnWriter::ByteArrayColumnWriter(ref mut typed) => match column.data_type() { @@ -471,8 +469,8 @@ fn write_leaf( .expect("Unable to get BinaryArray array"); typed.write_batch( get_binary_array(array).as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), + levels.def_levels(), + levels.rep_levels(), )? } ArrowDataType::Utf8 => { @@ -482,8 +480,8 @@ fn write_leaf( .expect("Unable to get LargeBinaryArray array"); typed.write_batch( get_string_array(array).as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), + levels.def_levels(), + levels.rep_levels(), )? } ArrowDataType::LargeBinary => { @@ -493,8 +491,8 @@ fn write_leaf( .expect("Unable to get LargeBinaryArray array"); typed.write_batch( get_large_binary_array(array).as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), + levels.def_levels(), + levels.rep_levels(), )? } ArrowDataType::LargeUtf8 => { @@ -504,8 +502,8 @@ fn write_leaf( .expect("Unable to get LargeUtf8 array"); typed.write_batch( get_large_string_array(array).as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), + levels.def_levels(), + levels.rep_levels(), )? } _ => unreachable!("Currently unreachable because data type not supported"), @@ -518,14 +516,14 @@ fn write_leaf( .as_any() .downcast_ref::() .unwrap(); - get_interval_ym_array_slice(array, &indices) + get_interval_ym_array_slice(array, indices) } IntervalUnit::DayTime => { let array = column .as_any() .downcast_ref::() .unwrap(); - get_interval_dt_array_slice(array, &indices) + get_interval_dt_array_slice(array, indices) } _ => { return Err(ParquetError::NYI( @@ -541,14 +539,14 @@ fn write_leaf( .as_any() .downcast_ref::() .unwrap(); - get_fsb_array_slice(array, &indices) + get_fsb_array_slice(array, indices) } ArrowDataType::Decimal(_, _) => { let array = column .as_any() .downcast_ref::() .unwrap(); - get_decimal_array_slice(array, &indices) + get_decimal_array_slice(array, indices) } _ => { return Err(ParquetError::NYI( @@ -559,8 +557,8 @@ fn write_leaf( }; typed.write_batch( bytes.as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), + levels.def_levels(), + levels.rep_levels(), )? } }; @@ -593,6 +591,7 @@ macro_rules! def_get_binary_array_fn { }; } +// TODO: These methods don't handle non null indices correctly def_get_binary_array_fn!(get_binary_array, arrow_array::BinaryArray); def_get_binary_array_fn!(get_string_array, arrow_array::StringArray); def_get_binary_array_fn!(get_large_binary_array, arrow_array::LargeBinaryArray); diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs index be9a5e99323b..5fab460a1782 100644 --- a/parquet/src/arrow/levels.rs +++ b/parquet/src/arrow/levels.rs @@ -40,114 +40,32 @@ //! //! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding) -use arrow::array::{make_array, Array, ArrayRef, MapArray, StructArray}; +use crate::errors::{ParquetError, Result}; +use arrow::array::{ + make_array, Array, ArrayData, ArrayRef, GenericListArray, MapArray, OffsetSizeTrait, + StructArray, +}; use arrow::datatypes::{DataType, Field}; - -/// Keeps track of the level information per array that is needed to write an Arrow array to Parquet. -/// -/// When a nested schema is traversed, intermediate [LevelInfo] structs are created to track -/// the state of parent arrays. When a primitive Arrow array is encountered, a final [LevelInfo] -/// is created, and this is what is used to index into the array when writing data to Parquet. -#[derive(Debug, Eq, PartialEq, Clone)] -pub(crate) struct LevelInfo { - /// Array's definition levels - pub definition: Vec, - /// Array's optional repetition levels - pub repetition: Option>, - /// Array's offsets, 64-bit is used to accommodate large offset arrays - pub array_offsets: Vec, - // TODO: Convert to an Arrow Buffer after ARROW-10766 is merged. - /// Array's logical validity mask, whcih gets unpacked for list children. - /// If the parent of an array is null, all children are logically treated as - /// null. This mask keeps track of that. - /// - pub array_mask: Vec, - /// The maximum definition at this level, 0 at the record batch - pub max_definition: i16, - /// The type of array represented by this level info - pub level_type: LevelType, - /// The offset of the current level's array - pub offset: usize, - /// The length of the current level's array - pub length: usize, -} - -/// LevelType defines the type of level, and whether it is nullable or not -#[derive(Debug, Eq, PartialEq, Clone, Copy)] -pub(crate) enum LevelType { - Root, - List(bool), - Struct(bool), - Primitive(bool), +use std::ops::Range; + +/// Performs a depth-first scan of the children of `array`, constructing [`LevelInfo`] +/// for each leaf column encountered +pub(crate) fn calculate_array_levels( + array: &ArrayRef, + field: &Field, +) -> Result> { + let mut builder = LevelInfoBuilder::try_new(field, Default::default())?; + builder.write(array, 0..array.len()); + Ok(builder.finish()) } -impl LevelType { - #[inline] - const fn level_increment(&self) -> i16 { - match self { - LevelType::Root => 0, - // List repetition adds a constant 1 - LevelType::List(is_nullable) => 1 + *is_nullable as i16, - LevelType::Struct(is_nullable) | LevelType::Primitive(is_nullable) => { - *is_nullable as i16 - } - } - } -} - -impl LevelInfo { - /// Create a new [LevelInfo] by filling `length` slots, and setting an initial offset. - /// - /// This is a convenience function to populate the starting point of the traversal. - pub(crate) fn new(offset: usize, length: usize) -> Self { - Self { - // a batch has no definition level yet - definition: vec![0; length], - // a batch has no repetition as it is not a list - repetition: None, - // a batch has sequential offsets, should be num_rows + 1 - array_offsets: (0..=(length as i64)).collect(), - // all values at a batch-level are non-null - array_mask: vec![true; length], - max_definition: 0, - level_type: LevelType::Root, - offset, - length, - } - } - - /// Compute nested levels of the Arrow array, recursing into lists and structs. - /// - /// Returns a list of `LevelInfo`, where each level is for nested primitive arrays. - /// - /// The parent struct's nullness is tracked, as it determines whether the child - /// max_definition should be incremented. - /// The 'is_parent_struct' variable asks "is this field's parent a struct?". - /// * If we are starting at a [RecordBatch](arrow::record_batch::RecordBatch), this is `false`. - /// * If we are calculating a list's child, this is `false`. - /// * If we are calculating a struct (i.e. `field.data_type90 == Struct`), - /// this depends on whether the struct is a child of a struct. - /// * If we are calculating a field inside a [StructArray], this is 'true'. - pub(crate) fn calculate_array_levels( - &self, - array: &ArrayRef, - field: &Field, - ) -> Vec { - let (array_offsets, array_mask) = - Self::get_array_offsets_and_masks(array, self.offset, self.length); - match array.data_type() { - DataType::Null => vec![Self { - definition: self.definition.clone(), - repetition: self.repetition.clone(), - array_offsets, - array_mask, - max_definition: self.max_definition.max(1), - // Null type is always nullable - level_type: LevelType::Primitive(true), - offset: self.offset, - length: self.length, - }], - DataType::Boolean +/// Returns true if the DataType can be represented as a primitive parquet column, +/// i.e. a leaf array with no children +fn is_leaf(data_type: &DataType) -> bool { + matches!( + data_type, + DataType::Null + | DataType::Boolean | DataType::Int8 | DataType::Int16 | DataType::Int32 @@ -171,1149 +89,411 @@ impl LevelInfo { | DataType::Binary | DataType::LargeBinary | DataType::Decimal(_, _) - | DataType::FixedSizeBinary(_) => { - // we return a vector of 1 value to represent the primitive - vec![self.calculate_child_levels( - array_offsets, - array_mask, - LevelType::Primitive(field.is_nullable()), - )] + | DataType::FixedSizeBinary(_) + ) +} + +/// The definition and repetition level of an array within a potentially nested hierarchy +#[derive(Debug, Default, Clone, Copy)] +struct LevelContext { + /// The current repetition level + rep_level: i16, + /// The current definition level + def_level: i16, +} + +/// A helper to construct [`LevelInfo`] from a potentially nested [`Field`] +enum LevelInfoBuilder { + Primitive(LevelInfo), + List(Box, LevelContext), + Struct(Vec, LevelContext), +} + +impl LevelInfoBuilder { + /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`] + fn try_new(field: &Field, parent_ctx: LevelContext) -> Result { + match field.data_type() { + d if is_leaf(d) => Ok(Self::Primitive(LevelInfo::new( + parent_ctx, + field.is_nullable(), + ))), + DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => Ok(Self::Primitive( + LevelInfo::new(parent_ctx, field.is_nullable()), + )), + DataType::Struct(children) => { + let def_level = match field.is_nullable() { + true => parent_ctx.def_level + 1, + false => parent_ctx.def_level, + }; + + let ctx = LevelContext { + rep_level: parent_ctx.rep_level, + def_level, + }; + + let children = children + .iter() + .map(|f| Self::try_new(f, ctx)) + .collect::>()?; + + Ok(Self::Struct(children, ctx)) } - DataType::List(list_field) | DataType::LargeList(list_field) => { - let child_offset = array_offsets[0] as usize; - let child_len = *array_offsets.last().unwrap() as usize; - // Calculate the list level - let list_level = self.calculate_child_levels( - array_offsets, - array_mask, - LevelType::List(field.is_nullable()), - ); - - // Construct the child array of the list, and get its offset + mask - let array_data = array.data(); - let child_data = array_data.child_data().get(0).unwrap(); - let child_array = make_array(child_data.clone()); - let (child_offsets, child_mask) = Self::get_array_offsets_and_masks( - &child_array, - child_offset, - child_len - child_offset, - ); - - match child_array.data_type() { - DataType::Null - | DataType::Boolean - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Float16 - | DataType::Float32 - | DataType::Float64 - | DataType::Timestamp(_, _) - | DataType::Date32 - | DataType::Date64 - | DataType::Time32(_) - | DataType::Time64(_) - | DataType::Duration(_) - | DataType::Interval(_) - | DataType::Binary - | DataType::LargeBinary - | DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Dictionary(_, _) - | DataType::Decimal(_, _) - | DataType::FixedSizeBinary(_) => { - vec![list_level.calculate_child_levels( - child_offsets, - child_mask, - LevelType::Primitive(list_field.is_nullable()), - )] - } - DataType::List(_) - | DataType::LargeList(_) - | DataType::Struct(_) - | DataType::Map(_, _) => { - list_level.calculate_array_levels(&child_array, list_field) - } - DataType::FixedSizeList(_, _) => unimplemented!(), - DataType::Union(_, _, _) => unimplemented!(), - } + DataType::List(child) + | DataType::LargeList(child) + | DataType::Map(child, _) => { + let def_level = match field.is_nullable() { + true => parent_ctx.def_level + 2, + false => parent_ctx.def_level + 1, + }; + + let ctx = LevelContext { + rep_level: parent_ctx.rep_level + 1, + def_level, + }; + + let child = Self::try_new(child.as_ref(), ctx)?; + Ok(Self::List(Box::new(child), ctx)) } - DataType::Map(map_field, _) => { - // Calculate the map level - let map_level = self.calculate_child_levels( - array_offsets, - array_mask, - // A map is treated like a list as it has repetition - LevelType::List(field.is_nullable()), - ); - - let map_array = array.as_any().downcast_ref::().unwrap(); - - let key_array = map_array.keys(); - let value_array = map_array.values(); - - if let DataType::Struct(fields) = map_field.data_type() { - let key_field = &fields[0]; - let value_field = &fields[1]; - - let mut map_levels = vec![]; - - // Get key levels - let mut key_levels = - map_level.calculate_array_levels(&key_array, key_field); - map_levels.append(&mut key_levels); - - let mut value_levels = - map_level.calculate_array_levels(&value_array, value_field); - map_levels.append(&mut value_levels); - - map_levels - } else { - panic!( - "Map field should be a struct, found {:?}", - map_field.data_type() - ); - } + d => Err(nyi_err!("Datatype {} is not yet supported", d)), + } + } + + /// Finish this [`LevelInfoBuilder`] returning the [`LevelInfo`] for the leaf columns + /// as enumerated by a depth-first search + fn finish(self) -> Vec { + match self { + LevelInfoBuilder::Primitive(v) => vec![v], + LevelInfoBuilder::List(v, _) => v.finish(), + LevelInfoBuilder::Struct(v, _) => { + v.into_iter().flat_map(|l| l.finish()).collect() + } + } + } + + /// Given an `array`, write the level data for the elements in `range` + fn write(&mut self, array: &ArrayRef, range: Range) { + match array.data_type() { + d if is_leaf(d) => self.write_leaf(array, range), + DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => { + self.write_leaf(array, range) } - DataType::FixedSizeList(_, _) => unimplemented!(), - DataType::Struct(struct_fields) => { - let struct_array: &StructArray = array + DataType::Struct(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + self.write_struct(array, range) + } + DataType::List(_) => { + let array = array .as_any() - .downcast_ref::() - .expect("Unable to get struct array"); - let mut struct_level = self.calculate_child_levels( - array_offsets, - array_mask, - LevelType::Struct(field.is_nullable()), - ); - - // If the parent field is a list, calculate the children of the struct as if it - // were a list as well. - if matches!(self.level_type, LevelType::List(_)) { - struct_level.level_type = LevelType::List(false); - } + .downcast_ref::>() + .unwrap(); + self.write_list(array.value_offsets(), array.data(), range) + } + DataType::LargeList(_) => { + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); - let mut struct_levels = vec![]; - struct_array - .columns() - .into_iter() - .zip(struct_fields) - .for_each(|(child_array, child_field)| { - let mut levels = - struct_level.calculate_array_levels(child_array, child_field); - struct_levels.append(&mut levels); - }); - struct_levels + self.write_list(array.value_offsets(), array.data(), range) } - DataType::Union(_, _, _) => unimplemented!(), - DataType::Dictionary(_, _) => { - // Need to check for these cases not implemented in C++: - // - "Writing DictionaryArray with nested dictionary type not yet supported" - // - "Writing DictionaryArray with null encoded in dictionary type not yet supported" - // vec![self.get_primitive_def_levels(array, field, array_mask)] - vec![self.calculate_child_levels( - array_offsets, - array_mask, - LevelType::Primitive(field.is_nullable()), - )] + DataType::Map(_, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + // A Map is just as ListArray with a StructArray child, we therefore + // treat it as such to avoid code duplication + self.write_list(array.value_offsets(), array.data(), range) } + _ => unreachable!(), } } - /// Calculate child/leaf array levels. - /// - /// The algorithm works by incrementing definitions of array values based on whether: - /// - a value is optional or required (is_nullable) - /// - a list value is repeated + optional or required (is_list) - /// - /// A record batch always starts at a populated definition = level 0. - /// When a batch only has a primitive, i.e. `>, column `a` - /// can only have a maximum level of 1 if it is not null. - /// If it is not null, we increment by 1, such that the null slots will = level 1. - /// The above applies to types that have no repetition (anything not a list or map). - /// - /// If a batch has lists, then we increment by up to 2 levels: - /// - 1 level for the list (repeated) - /// - 1 level if the list itself is nullable (optional) - /// - /// A list's child then gets incremented using the above rules. - /// - /// *Exceptions* - /// - /// There are 2 exceptions from the above rules: - /// - /// 1. When at the root of the schema: We always increment the - /// level regardless of whether the child is nullable or not. If we do not do - /// this, we could have a non-nullable array having a definition of 0. + /// Write `range` elements from ListArray `array` /// - /// 2. List parent, non-list child: We always increment the level in this case, - /// regardless of whether the child is nullable or not. - /// - /// *Examples* - /// - /// A batch with only a primitive that's non-nullable. ``: - /// * We don't increment the definition level as the array is not optional. - /// * This would leave us with a definition of 0, so the first exception applies. - /// * The definition level becomes 1. - /// - /// A batch with only a primitive that's nullable. ``: - /// * The definition level becomes 1, as we increment it once. - /// - /// A batch with a single non-nullable list (both list and child not null): - /// * We calculate the level twice, for the list, and for the child. - /// * At the list, the level becomes 1, where 0 indicates that the list is - /// empty, and 1 says it's not (determined through offsets). - /// * At the primitive level, the second exception applies. The level becomes 2. - fn calculate_child_levels( - &self, - // we use 64-bit offsets to also accommodate large arrays - array_offsets: Vec, - array_mask: Vec, - level_type: LevelType, - ) -> Self { - let min_len = *(array_offsets.last().unwrap()) as usize; - let mut definition = Vec::with_capacity(min_len); - let mut repetition = Vec::with_capacity(min_len); - let mut merged_array_mask = Vec::with_capacity(min_len); - - let max_definition = match (self.level_type, level_type) { - // Handle the illegal cases - (_, LevelType::Root) => { - unreachable!("Cannot have a root as a child") - } - (LevelType::Primitive(_), _) => { - unreachable!("Cannot have a primitive parent for any type") - } - // The general case - (_, _) => self.max_definition + level_type.level_increment(), - }; + /// Note: MapArrays are ListArray under the hood and so are dispatched to this method + fn write_list( + &mut self, + offsets: &[O], + list_data: &ArrayData, + range: Range, + ) { + let (child, ctx) = match self { + Self::List(child, ctx) => (child, ctx), + _ => unreachable!(), + }; + + let offsets = &offsets[range.start..range.end + 1]; + let child_array = make_array(list_data.child_data()[0].clone()); + + let write_non_null_slice = + |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| { + child.write(&child_array, start_idx..end_idx); + child.visit_leaves(|leaf| { + let rep_levels = leaf.rep_levels.as_mut().unwrap(); + let mut rev = rep_levels.iter_mut().rev(); + let mut remaining = end_idx - start_idx; + + loop { + let next = rev.next().unwrap(); + if *next > ctx.rep_level { + // Nested element - ignore + continue; + } - match (self.level_type, level_type) { - (LevelType::List(_), LevelType::List(is_nullable)) => { - // Parent is a list or descendant of a list, and child is a list - let reps = self.repetition.clone().unwrap(); - - // List is null, and not empty - let l1 = max_definition - is_nullable as i16; - // List is not null, but is empty - let l2 = max_definition - 1; - // List is not null, and not empty - let l3 = max_definition; - - let mut nulls_seen = 0; - - self.array_offsets.windows(2).for_each(|w| { - let start = w[0] as usize; - let end = w[1] as usize; - let parent_len = end - start; - - if parent_len == 0 { - // If the parent length is 0, there won't be a slot for the child - let index = start + nulls_seen - self.offset; - definition.push(self.definition[index]); - repetition.push(0); - merged_array_mask.push(self.array_mask[index]); - nulls_seen += 1; + remaining -= 1; + if remaining == 0 { + *next = ctx.rep_level - 1; + break; + } + } + }) + }; + + let write_empty_slice = |child: &mut LevelInfoBuilder| { + child.visit_leaves(|leaf| { + let rep_levels = leaf.rep_levels.as_mut().unwrap(); + rep_levels.push(ctx.rep_level - 1); + let def_levels = leaf.def_levels.as_mut().unwrap(); + def_levels.push(ctx.def_level - 1); + }) + }; + + let write_null_slice = |child: &mut LevelInfoBuilder| { + child.visit_leaves(|leaf| { + let rep_levels = leaf.rep_levels.as_mut().unwrap(); + rep_levels.push(ctx.rep_level - 1); + let def_levels = leaf.def_levels.as_mut().unwrap(); + def_levels.push(ctx.def_level - 2); + }) + }; + + match list_data.null_bitmap() { + Some(nulls) => { + let null_offset = list_data.offset() + range.start; + for (idx, w) in offsets.windows(2).enumerate() { + let is_valid = nulls.is_set(idx + null_offset); + let start_idx = w[0].to_usize().unwrap(); + let end_idx = w[1].to_usize().unwrap(); + if !is_valid { + write_null_slice(child) + } else if start_idx == end_idx { + write_empty_slice(child) } else { - (start..end).for_each(|parent_index| { - let index = parent_index + nulls_seen - self.offset; - let parent_index = parent_index - self.offset; - - // parent is either defined at this level, or earlier - let parent_def = self.definition[index]; - let parent_rep = reps[index]; - let parent_mask = self.array_mask[index]; - - // valid parent, index into children - let child_start = array_offsets[parent_index] as usize; - let child_end = array_offsets[parent_index + 1] as usize; - let child_len = child_end - child_start; - let child_mask = array_mask[parent_index]; - let merged_mask = parent_mask && child_mask; - - if child_len == 0 { - // Empty slot, i.e. {"parent": {"child": [] } } - // Nullness takes priority over emptiness - definition.push(if child_mask { l2 } else { l1 }); - repetition.push(parent_rep); - merged_array_mask.push(merged_mask); - } else { - (child_start..child_end).for_each(|child_index| { - let rep = match ( - parent_index == start, - child_index == child_start, - ) { - (true, true) => parent_rep, - (true, false) => parent_rep + 2, - (false, true) => parent_rep, - (false, false) => parent_rep + 1, - }; - - definition.push(if !parent_mask { - parent_def - } else if child_mask { - l3 - } else { - l1 - }); - repetition.push(rep); - merged_array_mask.push(merged_mask); - }); - } - }); + write_non_null_slice(child, start_idx, end_idx) } - }); - - debug_assert_eq!(definition.len(), merged_array_mask.len()); - - let offset = *array_offsets.first().unwrap() as usize; - let length = *array_offsets.last().unwrap() as usize - offset; - - Self { - definition, - repetition: Some(repetition), - array_offsets, - array_mask: merged_array_mask, - max_definition, - level_type, - offset: offset + self.offset, - length, } } - (LevelType::List(_), _) => { - // List and primitive (or struct). - // The list can have more values than the primitive, indicating that there - // are slots where the list is empty. We use a counter to track this behaviour. - let mut nulls_seen = 0; - - // let child_max_definition = list_max_definition + is_nullable as i16; - // child values are a function of parent list offsets - let reps = self.repetition.as_deref().unwrap(); - self.array_offsets.windows(2).for_each(|w| { - let start = w[0] as usize; - let end = w[1] as usize; - let parent_len = end - start; - - if parent_len == 0 { - let index = start + nulls_seen - self.offset; - definition.push(self.definition[index]); - repetition.push(reps[index]); - merged_array_mask.push(self.array_mask[index]); - nulls_seen += 1; + None => { + for w in offsets.windows(2) { + let start_idx = w[0].to_usize().unwrap(); + let end_idx = w[1].to_usize().unwrap(); + if start_idx == end_idx { + write_empty_slice(child) } else { - // iterate through the array, adjusting child definitions for nulls - (start..end).for_each(|child_index| { - let index = child_index + nulls_seen - self.offset; - let child_mask = array_mask[child_index - self.offset]; - let parent_mask = self.array_mask[index]; - let parent_def = self.definition[index]; - - if !parent_mask || parent_def < self.max_definition { - definition.push(parent_def); - repetition.push(reps[index]); - merged_array_mask.push(parent_mask); - } else { - definition.push(max_definition - !child_mask as i16); - repetition.push(reps[index]); - merged_array_mask.push(child_mask); - } - }); + write_non_null_slice(child, start_idx, end_idx) } - }); - - debug_assert_eq!(definition.len(), merged_array_mask.len()); - - let offset = *array_offsets.first().unwrap() as usize; - let length = *array_offsets.last().unwrap() as usize - offset; - - Self { - definition, - repetition: Some(repetition), - array_offsets: self.array_offsets.clone(), - array_mask: merged_array_mask, - max_definition, - level_type, - offset: offset + self.offset, - length, } } - (_, LevelType::List(is_nullable)) => { - // Encountering a list for the first time. - // Calculate the 2 list hierarchy definitions in advance - - // List is null, and not empty - let l1 = max_definition - 1 - is_nullable as i16; - // List is not null, but is empty - let l2 = max_definition - 1; - // List is not null, and not empty - let l3 = max_definition; - - self.definition - .iter() - .enumerate() - .for_each(|(parent_index, def)| { - let child_from = array_offsets[parent_index]; - let child_to = array_offsets[parent_index + 1]; - let child_len = child_to - child_from; - let child_mask = array_mask[parent_index]; - let parent_mask = self.array_mask[parent_index]; - - match (parent_mask, child_len) { - (true, 0) => { - // Empty slot, i.e. {"parent": {"child": [] } } - // Nullness takes priority over emptiness - definition.push(if child_mask { l2 } else { l1 }); - repetition.push(0); - merged_array_mask.push(child_mask); - } - (false, 0) => { - // Inherit the parent definition as parent was null - definition.push(*def); - repetition.push(0); - merged_array_mask.push(child_mask); - } - (true, _) => { - (child_from..child_to).for_each(|child_index| { - // l1 and l3 make sense as list is not empty, - // but we reflect that it's either null or not - definition.push(if child_mask { l3 } else { l1 }); - // Mark the first child slot as 0, and the next as 1 - repetition.push(if child_index == child_from { - 0 - } else { - 1 - }); - merged_array_mask.push(child_mask); - }); - } - (false, _) => { - (child_from..child_to).for_each(|child_index| { - // Inherit the parent definition as parent was null - definition.push(*def); - // mark the first child slot as 0, and the next as 1 - repetition.push(if child_index == child_from { - 0 - } else { - 1 - }); - merged_array_mask.push(false); - }); - } + } + } + + /// Write `range` elements from StructArray `array` + fn write_struct(&mut self, array: &StructArray, range: Range) { + let (children, ctx) = match self { + Self::Struct(children, ctx) => (children, ctx), + _ => unreachable!(), + }; + + let write_null = |children: &mut [LevelInfoBuilder], range: Range| { + for child in children { + child.visit_leaves(|info| { + let len = range.end - range.start; + + let def_levels = info.def_levels.as_mut().unwrap(); + def_levels.reserve(len); + for _ in 0..len { + def_levels.push(ctx.def_level - 1); + } + + if let Some(rep_levels) = info.rep_levels.as_mut() { + rep_levels.reserve(len); + for _ in 0..len { + rep_levels.push(ctx.rep_level) } - }); - - debug_assert_eq!(definition.len(), merged_array_mask.len()); - - let offset = *array_offsets.first().unwrap() as usize; - let length = *array_offsets.last().unwrap() as usize - offset; - - Self { - definition, - repetition: Some(repetition), - array_offsets, - array_mask: merged_array_mask, - max_definition, - level_type, - offset, - length, - } + } + }) } - (_, _) => { - self.definition - .iter() - .zip(array_mask.into_iter().zip(&self.array_mask)) - .for_each(|(current_def, (child_mask, parent_mask))| { - merged_array_mask.push(*parent_mask && child_mask); - match (parent_mask, child_mask) { - (true, true) => { - definition.push(max_definition); - } - (true, false) => { - // The child is only legally null if its array is nullable. - // Thus parent's max_definition is lower - definition.push(if *current_def <= self.max_definition { - *current_def - } else { - self.max_definition - }); + }; + + let write_non_null = |children: &mut [LevelInfoBuilder], range: Range| { + for (child_array, child) in array.columns().into_iter().zip(children) { + child.write(child_array, range.clone()) + } + }; + + match array.data().null_bitmap() { + Some(validity) => { + let null_offset = array.data().offset(); + let mut last_non_null_idx = None; + let mut last_null_idx = None; + + // TODO: BitChunkIterator + for i in range.clone() { + match validity.is_set(i + null_offset) { + true => { + if let Some(last_idx) = last_null_idx.take() { + write_null(children, last_idx..i) } - // if the parent was false, retain its definitions - (false, _) => { - definition.push(*current_def); + last_non_null_idx.get_or_insert(i); + } + false => { + if let Some(last_idx) = last_non_null_idx.take() { + write_non_null(children, last_idx..i) } + last_null_idx.get_or_insert(i); } - }); - - debug_assert_eq!(definition.len(), merged_array_mask.len()); - - Self { - definition, - repetition: self.repetition.clone(), // it's None - array_offsets, - array_mask: merged_array_mask, - max_definition, - level_type, - // Inherit parent offset and length - offset: self.offset, - length: self.length, + } } - } - } - } - /// Get the offsets of an array as 64-bit values, and validity masks as booleans - /// - Primitive, binary and struct arrays' offsets will be a sequence, masks obtained - /// from validity bitmap - /// - List array offsets will be the value offsets, masks are computed from offsets - fn get_array_offsets_and_masks( - array: &ArrayRef, - offset: usize, - len: usize, - ) -> (Vec, Vec) { - match array.data_type() { - // A NullArray is entirely nulls, despite not containing a null buffer - DataType::Null => ((0..=(len as i64)).collect(), vec![false; len]), - DataType::Boolean - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Float16 - | DataType::Float32 - | DataType::Float64 - | DataType::Timestamp(_, _) - | DataType::Date32 - | DataType::Date64 - | DataType::Time32(_) - | DataType::Time64(_) - | DataType::Duration(_) - | DataType::Interval(_) - | DataType::Binary - | DataType::LargeBinary - | DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Struct(_) - | DataType::Dictionary(_, _) - | DataType::Decimal(_, _) => { - let array_mask = match array.data().null_buffer() { - Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len), - None => vec![true; len], - }; - ((0..=(len as i64)).collect(), array_mask) - } - DataType::List(_) | DataType::Map(_, _) => { - let offsets = unsafe { array.data().buffers()[0].typed_data::() }; - let offsets = offsets - .iter() - .copied() - .skip(array.offset() + offset) - .take(len + 1) - .map(|v| v as i64) - .collect::>(); - let array_mask = match array.data().null_buffer() { - Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len), - None => vec![true; len], - }; - (offsets, array_mask) - } - DataType::LargeList(_) => { - let offsets = unsafe { array.data().buffers()[0].typed_data::() } - .iter() - .skip(array.offset() + offset) - .take(len + 1) - .copied() - .collect(); - let array_mask = match array.data().null_buffer() { - Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len), - None => vec![true; len], - }; - (offsets, array_mask) - } - DataType::FixedSizeBinary(value_len) => { - let array_mask = match array.data().null_buffer() { - Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len), - None => vec![true; len], - }; - let value_len = *value_len as i64; - ( - (0..=(len as i64)).map(|v| v * value_len).collect(), - array_mask, - ) - } - DataType::FixedSizeList(_, _) | DataType::Union(_, _, _) => { - unimplemented!("Getting offsets not yet implemented") + if let Some(last_idx) = last_null_idx.take() { + write_null(children, last_idx..range.end) + } + + if let Some(last_idx) = last_non_null_idx.take() { + write_non_null(children, last_idx..range.end) + } } + None => write_non_null(children, range), } } - /// Given a level's information, calculate the offsets required to index an array correctly. - pub(crate) fn filter_array_indices(&self) -> Vec { - if !matches!(self.level_type, LevelType::Primitive(_)) { - panic!( - "Cannot filter indices on a non-primitive array, found {:?}", - self.level_type - ); - } - - // happy path if not dealing with lists - if self.repetition.is_none() { - return self - .definition - .iter() - .enumerate() - .filter_map(|(i, def)| { - if *def == self.max_definition { - Some(i) - } else { - None + /// Write a primitive array, as defined by [`is_leaf`] + fn write_leaf(&mut self, array: &ArrayRef, range: Range) { + let info = match self { + Self::Primitive(info) => info, + _ => unreachable!(), + }; + + let len = range.end - range.start; + + match &mut info.def_levels { + Some(def_levels) => { + def_levels.reserve(len); + info.non_null_indices.reserve(len); + + match array.data().null_bitmap() { + Some(nulls) => { + let nulls_offset = array.data().offset(); + for i in range { + match nulls.is_set(i + nulls_offset) { + true => { + def_levels.push(info.max_def_level); + info.non_null_indices.push(i) + } + false => def_levels.push(info.max_def_level - 1), + } + } } - }) - .collect(); - } - - let mut filtered = vec![]; - let mut definition_levels = self.definition.iter(); - let mut index = 0; - - for len in self.array_offsets.windows(2).map(|s| s[1] - s[0]) { - if len == 0 { - // Skip this definition level--the iterator should not be empty, and the definition - // level be less than max_definition, i.e., a null value) - assert!(*definition_levels.next().unwrap() < self.max_definition); - } else { - for (_, def) in (0..len).zip(&mut definition_levels) { - if *def == self.max_definition { - filtered.push(index); + None => { + let iter = std::iter::repeat(info.max_def_level).take(len); + def_levels.extend(iter); + info.non_null_indices.extend(range); } - index += 1; } } + None => info.non_null_indices.extend(range), } - filtered + if let Some(rep_levels) = &mut info.rep_levels { + rep_levels.extend(std::iter::repeat(info.max_rep_level).take(len)) + } } -} -/// Convert an Arrow buffer to a boolean array slice -/// TODO: this was created for buffers, so might not work for bool array, might be slow too -#[inline] -fn get_bool_array_slice( - buffer: &arrow::buffer::Buffer, - offset: usize, - len: usize, -) -> Vec { - let data = buffer.as_slice(); - (offset..(len + offset)) - .map(|i| arrow::util::bit_util::get_bit(data, i)) - .collect() + /// Visits all children of this node in depth first order + fn visit_leaves(&mut self, visit: impl Fn(&mut LevelInfo) + Copy) { + match self { + LevelInfoBuilder::Primitive(info) => visit(info), + LevelInfoBuilder::List(c, _) => c.visit_leaves(visit), + LevelInfoBuilder::Struct(children, _) => { + for c in children { + c.visit_leaves(visit) + } + } + } + } } +/// The data necessary to write a primitive Arrow array to parquet, taking into account +/// any non-primitive parents it may have in the arrow representation +#[derive(Debug, Eq, PartialEq, Clone)] +pub(crate) struct LevelInfo { + /// Array's definition levels + /// + /// Present if `max_def_level != 0` + def_levels: Option>, -#[cfg(test)] -mod tests { - use super::*; + /// Array's optional repetition levels + /// + /// Present if `max_rep_level != 0` + rep_levels: Option>, - use std::sync::Arc; + /// The corresponding array identifying non-null slices of data + /// from the primitive array + non_null_indices: Vec, - use arrow::array::*; - use arrow::buffer::Buffer; - use arrow::datatypes::{Schema, ToByteSlice}; - use arrow::record_batch::RecordBatch; + /// The maximum definition level for this leaf column + max_def_level: i16, - #[test] - fn test_calculate_array_levels_twitter_example() { - // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html - // [[a, b, c], [d, e, f, g]], [[h], [i,j]] - let parent_levels = LevelInfo { - definition: vec![0, 0], - repetition: None, - array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential - array_mask: vec![true, true], // both lists defined - max_definition: 0, - level_type: LevelType::Root, - offset: 0, - length: 2, - }; - // offset into array, each level1 has 2 values - let array_offsets = vec![0, 2, 4]; - let array_mask = vec![true, true]; - - // calculate level1 levels - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask, - LevelType::List(false), - ); - // - let expected_levels = LevelInfo { - definition: vec![1, 1, 1, 1], - repetition: Some(vec![0, 1, 0, 1]), - array_offsets, - array_mask: vec![true, true, true, true], - max_definition: 1, - level_type: LevelType::List(false), - offset: 0, - length: 4, - }; - // the separate asserts make it easier to see what's failing - assert_eq!(&levels.definition, &expected_levels.definition); - assert_eq!(&levels.repetition, &expected_levels.repetition); - assert_eq!(&levels.array_mask, &expected_levels.array_mask); - assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); - assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.level_type, &expected_levels.level_type); - // this assert is to help if there are more variables added to the struct - assert_eq!(&levels, &expected_levels); - - // level2 - let parent_levels = levels; - let array_offsets = vec![0, 3, 7, 8, 10]; - let array_mask = vec![true, true, true, true]; - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask, - LevelType::List(false), - ); - let expected_levels = LevelInfo { - definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2], - repetition: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]), - array_offsets, - array_mask: vec![true; 10], - max_definition: 2, - level_type: LevelType::List(false), - offset: 0, - length: 10, - }; - assert_eq!(&levels.definition, &expected_levels.definition); - assert_eq!(&levels.repetition, &expected_levels.repetition); - assert_eq!(&levels.array_mask, &expected_levels.array_mask); - assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); - assert_eq!(&levels.level_type, &expected_levels.level_type); - assert_eq!(&levels, &expected_levels); - } + /// The maximum repetition for this leaf column + max_rep_level: i16, +} - #[test] - fn test_calculate_one_level_1() { - // This test calculates the levels for a non-null primitive array - let parent_levels = LevelInfo { - definition: vec![0; 10], - repetition: None, - array_offsets: (0..=10).collect(), - array_mask: vec![true; 10], - max_definition: 0, - level_type: LevelType::Root, - offset: 0, - length: 10, +impl LevelInfo { + fn new(ctx: LevelContext, is_nullable: bool) -> Self { + let max_rep_level = ctx.rep_level; + let max_def_level = match is_nullable { + true => ctx.def_level + 1, + false => ctx.def_level, }; - let array_offsets: Vec = (0..=10).collect(); - let array_mask = vec![true; 10]; - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask.clone(), - LevelType::Primitive(false), - ); - let expected_levels = LevelInfo { - // As it is non-null, definitions can be omitted - definition: vec![0; 10], - repetition: None, - array_offsets, - array_mask, - max_definition: 0, - level_type: LevelType::Primitive(false), - offset: 0, - length: 10, - }; - assert_eq!(&levels, &expected_levels); + Self { + def_levels: (max_def_level != 0).then(Vec::new), + rep_levels: (max_rep_level != 0).then(Vec::new), + non_null_indices: vec![], + max_def_level, + max_rep_level, + } } - #[test] - fn test_calculate_one_level_2() { - // This test calculates the levels for a non-null primitive array - let parent_levels = LevelInfo { - definition: vec![0; 5], - repetition: None, - array_offsets: (0..=5).collect(), - array_mask: vec![true, true, true, true, true], - max_definition: 0, - level_type: LevelType::Root, - offset: 0, - length: 5, - }; - let array_offsets: Vec = (0..=5).collect(); - let array_mask = vec![true, false, true, true, false]; - - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask.clone(), - LevelType::Primitive(true), - ); - let expected_levels = LevelInfo { - definition: vec![1, 0, 1, 1, 0], - repetition: None, - array_offsets, - array_mask, - max_definition: 1, - level_type: LevelType::Primitive(true), - offset: 0, - length: 5, - }; - assert_eq!(&levels, &expected_levels); + pub fn def_levels(&self) -> Option<&[i16]> { + self.def_levels.as_deref() } - #[test] - fn test_calculate_array_levels_1() { - // if all array values are defined (e.g. batch>) - // [[0], [1], [2], [3], [4]] - let parent_levels = LevelInfo { - definition: vec![0; 5], - repetition: None, - array_offsets: vec![0, 1, 2, 3, 4, 5], - array_mask: vec![true, true, true, true, true], - max_definition: 0, - level_type: LevelType::Root, - offset: 0, - length: 5, - }; - let array_offsets = vec![0, 2, 2, 4, 8, 11]; - let array_mask = vec![true, false, true, true, true]; - - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask, - LevelType::List(true), - ); - // array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]] - // all values are defined as we do not have nulls on the root (batch) - // repetition: - // 0: 0, 1 - // 1: - // 2: 0, 1 - // 3: 0, 1, 1, 1 - // 4: 0, 1, 1 - let expected_levels = LevelInfo { - // The levels are normally 2 because we: - // - Calculate the level at the list - // - Calculate the level at the list's child - // We do not do this in these tests, thus the levels are 1 less. - definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2], - repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), - array_offsets, - array_mask: vec![ - true, true, false, true, true, true, true, true, true, true, true, true, - ], - max_definition: 2, - level_type: LevelType::List(true), - offset: 0, - length: 11, // the child has 11 slots - }; - assert_eq!(&levels.definition, &expected_levels.definition); - assert_eq!(&levels.repetition, &expected_levels.repetition); - assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); - assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.level_type, &expected_levels.level_type); - assert_eq!(&levels, &expected_levels); + pub fn rep_levels(&self) -> Option<&[i16]> { + self.rep_levels.as_deref() } - #[test] - fn test_calculate_array_levels_2() { - // If some values are null - // - // This emulates an array in the form: > - // with values: - // - 0: [0, 1], but is null because of the struct - // - 1: [] - // - 2: [2, 3], but is null because of the struct - // - 3: [4, 5, 6, 7] - // - 4: [8, 9, 10] - // - // If the first values of a list are null due to a parent, we have to still account for them - // while indexing, because they would affect the way the child is indexed - // i.e. in the above example, we have to know that [0, 1] has to be skipped - let parent_levels = LevelInfo { - definition: vec![0, 1, 0, 1, 1], - repetition: None, - array_offsets: vec![0, 1, 2, 3, 4, 5], - array_mask: vec![false, true, false, true, true], - max_definition: 1, - level_type: LevelType::Struct(true), - offset: 0, - length: 5, - }; - let array_offsets = vec![0, 2, 2, 4, 8, 11]; - let array_mask = vec![true, false, true, true, true]; - - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask, - LevelType::List(true), - ); - let expected_levels = LevelInfo { - // 0 1 [2] are 0 (not defined at level 1) - // [2] is 1, but has 0 slots so is not populated (defined at level 1 only) - // 2 3 [4] are 0 - // 4 5 6 7 [8] are 1 (defined at level 1 only) - // 8 9 10 [11] are 2 (defined at both levels) - definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3], - repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), - array_offsets, - array_mask: vec![ - false, false, false, false, false, true, true, true, true, true, true, - true, - ], - max_definition: 3, - level_type: LevelType::List(true), - offset: 0, - length: 11, - }; - assert_eq!(&levels.definition, &expected_levels.definition); - assert_eq!(&levels.repetition, &expected_levels.repetition); - assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); - assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.level_type, &expected_levels.level_type); - assert_eq!(&levels, &expected_levels); - - // nested lists (using previous test) - let nested_parent_levels = levels; - let array_offsets = vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]; - let array_mask = vec![ - true, true, true, true, true, true, true, true, true, true, true, - ]; - let levels = nested_parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask, - LevelType::List(true), - ); - let expected_levels = LevelInfo { - // (def: 0) 0 1 [2] are 0 (take parent) - // (def: 0) 2 3 [4] are 0 (take parent) - // (def: 0) 4 5 [6] are 0 (take parent) - // (def: 0) 6 7 [8] are 0 (take parent) - // (def: 1) 8 9 [10] are 1 (take parent) - // (def: 1) 10 11 [12] are 1 (take parent) - // (def: 1) 12 23 [14] are 1 (take parent) - // (def: 1) 14 15 [16] are 1 (take parent) - // (def: 2) 16 17 [18] are 2 (defined at all levels) - // (def: 2) 18 19 [20] are 2 (defined at all levels) - // (def: 2) 20 21 [22] are 2 (defined at all levels) - // - // 0 1 [2] are 0 (not defined at level 1) - // [2] is 1, but has 0 slots so is not populated (defined at level 1 only) - // 2 3 [4] are 0 - // 4 5 6 7 [8] are 1 (defined at level 1 only) - // 8 9 10 [11] are 2 (defined at both levels) - // - // 0: [[100, 101], [102, 103]] - // 1: [] - // 2: [[104, 105], [106, 107]] - // 3: [[108, 109], [110, 111], [112, 113], [114, 115]] - // 4: [[116, 117], [118, 119], [120, 121]] - definition: vec![ - 0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, - ], - repetition: Some(vec![ - 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2, - ]), - array_offsets, - array_mask: vec![ - false, false, false, false, false, false, false, false, false, true, - true, true, true, true, true, true, true, true, true, true, true, true, - true, - ], - max_definition: 5, - level_type: LevelType::List(true), - offset: 0, - length: 22, - }; - assert_eq!(&levels.definition, &expected_levels.definition); - assert_eq!(&levels.repetition, &expected_levels.repetition); - assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); - assert_eq!(&levels.array_mask, &expected_levels.array_mask); - assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.level_type, &expected_levels.level_type); - assert_eq!(&levels, &expected_levels); + pub fn non_null_indices(&self) -> &[usize] { + &self.non_null_indices } +} - #[test] - fn test_calculate_array_levels_nested_list() { - // if all array values are defined (e.g. batch>) - // The array at this level looks like: - // 0: [a] - // 1: [a] - // 2: [a] - // 3: [a] - let parent_levels = LevelInfo { - definition: vec![1, 1, 1, 1], - repetition: None, - array_offsets: vec![0, 1, 2, 3, 4], - array_mask: vec![true, true, true, true], - max_definition: 1, - level_type: LevelType::Struct(true), - offset: 0, - length: 4, - }; - // 0: null ([], but mask is false, so it's not just an empty list) - // 1: [1, 2, 3] - // 2: [4, 5] - // 3: [6, 7] - let array_offsets = vec![0, 1, 4, 6, 8]; - let array_mask = vec![false, true, true, true]; - - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask, - LevelType::List(true), - ); - // 0: [null], level 1 is defined, but not 2 - // 1: [1, 2, 3] - // 2: [4, 5] - // 3: [6, 7] - let expected_levels = LevelInfo { - definition: vec![1, 3, 3, 3, 3, 3, 3, 3], - repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]), - array_offsets, - array_mask: vec![false, true, true, true, true, true, true, true], - max_definition: 3, - level_type: LevelType::List(true), - offset: 0, - length: 8, - }; - assert_eq!(&levels.definition, &expected_levels.definition); - assert_eq!(&levels.repetition, &expected_levels.repetition); - assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); - assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.level_type, &expected_levels.level_type); - assert_eq!(&levels, &expected_levels); - - // nested lists (using previous test) - let nested_parent_levels = levels; - // 0: [null] (was a populated null slot at the parent) - // 1: [201] - // 2: [202, 203] - // 3: null ([]) - // 4: [204, 205, 206] - // 5: [207, 208, 209, 210] - // 6: [] (tests a non-null empty list slot) - // 7: [211, 212, 213, 214, 215] - let array_offsets = vec![0, 1, 2, 4, 4, 7, 11, 11, 16]; - // logically, the fist slot of the mask is false - let array_mask = vec![true, true, true, false, true, true, true, true]; - let levels = nested_parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask, - LevelType::List(true), - ); - // We have 7 array values, and at least 15 primitives (from array_offsets) - // 0: (-)[null], parent was null, no value populated here - // 1: (0)[201], (1)[202, 203], (2)[[null]] - // 2: (3)[204, 205, 206], (4)[207, 208, 209, 210] - // 3: (5)[[]], (6)[211, 212, 213, 214, 215] - // - // In a JSON syntax with the schema: >>>, this translates into: - // 0: {"struct": [ null ]} - // 1: {"struct": [ [201], [202, 203], [] ]} - // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]} - // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]} - let expected_levels = LevelInfo { - definition: vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5], - repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]), - array_mask: vec![ - false, true, true, true, false, true, true, true, true, true, true, true, - true, true, true, true, true, true, - ], - array_offsets, - max_definition: 5, - level_type: LevelType::List(true), - offset: 0, - length: 16, - }; - assert_eq!(&levels.definition, &expected_levels.definition); - assert_eq!(&levels.repetition, &expected_levels.repetition); - assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); - assert_eq!(&levels.array_mask, &expected_levels.array_mask); - assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.level_type, &expected_levels.level_type); - assert_eq!(&levels, &expected_levels); - } +#[cfg(test)] +mod tests { + use super::*; - #[test] - fn test_calculate_nested_struct_levels() { - // tests a > - // array: - // - {a: {b: {c: 1}}} - // - {a: {b: {c: null}}} - // - {a: {b: {c: 3}}} - // - {a: {b: null}} - // - {a: null}} - // - {a: {b: {c: 6}}} - let a_levels = LevelInfo { - definition: vec![1, 1, 1, 1, 0, 1], - repetition: None, - array_offsets: (0..=6).collect(), - array_mask: vec![true, true, true, true, false, true], - max_definition: 1, - level_type: LevelType::Struct(true), - offset: 0, - length: 6, - }; - // b's offset and mask - let b_offsets: Vec = (0..=6).collect(); - let b_mask = vec![true, true, true, false, false, true]; - // b's expected levels - let b_expected_levels = LevelInfo { - definition: vec![2, 2, 2, 1, 0, 2], - repetition: None, - array_offsets: (0..=6).collect(), - array_mask: vec![true, true, true, false, false, true], - max_definition: 2, - level_type: LevelType::Struct(true), - offset: 0, - length: 6, - }; - let b_levels = a_levels.calculate_child_levels( - b_offsets.clone(), - b_mask, - LevelType::Struct(true), - ); - assert_eq!(&b_expected_levels, &b_levels); - - // c's offset and mask - let c_offsets = b_offsets; - let c_mask = vec![true, false, true, false, false, true]; - // c's expected levels - let c_expected_levels = LevelInfo { - definition: vec![3, 2, 3, 1, 0, 3], - repetition: None, - array_offsets: c_offsets.clone(), - array_mask: vec![true, false, true, false, false, true], - max_definition: 3, - level_type: LevelType::Struct(true), - offset: 0, - length: 6, - }; - let c_levels = - b_levels.calculate_child_levels(c_offsets, c_mask, LevelType::Struct(true)); - assert_eq!(&c_expected_levels, &c_levels); - } + use std::sync::Arc; + + use arrow::array::*; + use arrow::buffer::Buffer; + use arrow::datatypes::{Schema, ToByteSlice}; + use arrow::record_batch::RecordBatch; #[test] fn list_single_column() { // this tests the level generation from the arrow_writer equivalent test let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); - let a_value_offsets = - arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice()); + let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]); let a_list_type = - DataType::List(Box::new(Field::new("item", DataType::Int32, true))); + DataType::List(Box::new(Field::new("item", DataType::Int32, false))); let a_list_data = ArrayData::builder(a_list_type.clone()) .len(5) .add_buffer(a_value_offsets) @@ -1331,20 +511,6 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap(); - let expected_batch_level = LevelInfo { - definition: vec![0; 2], - repetition: None, - array_offsets: (0..=2).collect(), - array_mask: vec![true, true], - max_definition: 0, - level_type: LevelType::Root, - offset: 2, - length: 2, - }; - - let batch_level = LevelInfo::new(2, 2); - assert_eq!(&batch_level, &expected_batch_level); - // calculate the list's level let mut levels = vec![]; batch @@ -1352,7 +518,7 @@ mod tests { .iter() .zip(batch.schema().fields()) .for_each(|(array, field)| { - let mut array_levels = batch_level.calculate_array_levels(array, field); + let mut array_levels = calculate_array_levels(array, field).unwrap(); levels.append(&mut array_levels); }); assert_eq!(levels.len(), 1); @@ -1360,21 +526,12 @@ mod tests { let list_level = levels.get(0).unwrap(); let expected_level = LevelInfo { - definition: vec![0, 3, 3, 3], - repetition: Some(vec![0, 0, 1, 1]), - array_offsets: vec![3, 3, 6], - array_mask: vec![false, true, true, true], - max_definition: 3, - level_type: LevelType::Primitive(true), - offset: 3, - length: 3, + def_levels: Some(vec![2, 2, 2, 0, 2, 2, 2, 2, 2, 2, 2]), + rep_levels: Some(vec![0, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1]), + non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + max_def_level: 2, + max_rep_level: 1, }; - assert_eq!(&list_level.definition, &expected_level.definition); - assert_eq!(&list_level.repetition, &expected_level.repetition); - assert_eq!(&list_level.array_offsets, &expected_level.array_offsets); - assert_eq!(&list_level.array_mask, &expected_level.array_mask); - assert_eq!(&list_level.max_definition, &expected_level.max_definition); - assert_eq!(&list_level.level_type, &expected_level.level_type); assert_eq!(list_level, &expected_level); } @@ -1445,20 +602,6 @@ mod tests { .unwrap(); ////////////////////////////////////////////// - let expected_batch_level = LevelInfo { - definition: vec![0; 5], - repetition: None, - array_offsets: (0..=5).collect(), - array_mask: vec![true, true, true, true, true], - max_definition: 0, - level_type: LevelType::Root, - offset: 0, - length: 5, - }; - - let batch_level = LevelInfo::new(0, 5); - assert_eq!(&batch_level, &expected_batch_level); - // calculate the list's level let mut levels = vec![]; batch @@ -1466,7 +609,7 @@ mod tests { .iter() .zip(batch.schema().fields()) .for_each(|(array, field)| { - let mut array_levels = batch_level.calculate_array_levels(array, field); + let mut array_levels = calculate_array_levels(array, field).unwrap(); levels.append(&mut array_levels); }); assert_eq!(levels.len(), 5); @@ -1475,14 +618,11 @@ mod tests { let list_level = levels.get(0).unwrap(); let expected_level = LevelInfo { - definition: vec![0, 0, 0, 0, 0], - repetition: None, - array_offsets: vec![0, 1, 2, 3, 4, 5], - array_mask: vec![true, true, true, true, true], - max_definition: 0, - level_type: LevelType::Primitive(false), - offset: 0, - length: 5, + def_levels: None, + rep_levels: None, + non_null_indices: vec![0, 1, 2, 3, 4], + max_def_level: 0, + max_rep_level: 0, }; assert_eq!(list_level, &expected_level); @@ -1490,14 +630,11 @@ mod tests { let list_level = levels.get(1).unwrap(); let expected_level = LevelInfo { - definition: vec![1, 0, 0, 1, 1], - repetition: None, - array_offsets: vec![0, 1, 2, 3, 4, 5], - array_mask: vec![true, false, false, true, true], - max_definition: 1, - level_type: LevelType::Primitive(true), - offset: 0, - length: 5, + def_levels: Some(vec![1, 0, 0, 1, 1]), + rep_levels: None, + non_null_indices: vec![0, 3, 4], + max_def_level: 1, + max_rep_level: 0, }; assert_eq!(list_level, &expected_level); @@ -1505,14 +642,11 @@ mod tests { let list_level = levels.get(2).unwrap(); let expected_level = LevelInfo { - definition: vec![1, 1, 1, 2, 1], - repetition: None, - array_offsets: vec![0, 1, 2, 3, 4, 5], - array_mask: vec![false, false, false, true, false], - max_definition: 2, - level_type: LevelType::Primitive(true), - offset: 0, - length: 5, + def_levels: Some(vec![1, 1, 1, 2, 1]), + rep_levels: None, + non_null_indices: vec![3], + max_def_level: 2, + max_rep_level: 0, }; assert_eq!(list_level, &expected_level); @@ -1520,36 +654,15 @@ mod tests { let list_level = levels.get(3).unwrap(); let expected_level = LevelInfo { - definition: vec![3, 2, 3, 2, 3], - repetition: None, - array_offsets: vec![0, 1, 2, 3, 4, 5], - array_mask: vec![true, false, true, false, true], - max_definition: 3, - level_type: LevelType::Primitive(true), - offset: 0, - length: 5, + def_levels: Some(vec![3, 2, 3, 2, 3]), + rep_levels: None, + non_null_indices: vec![0, 2, 4], + max_def_level: 3, + max_rep_level: 0, }; assert_eq!(list_level, &expected_level); } - #[test] - fn test_filter_array_indices() { - let level = LevelInfo { - definition: vec![3, 3, 3, 1, 3, 3, 3], - repetition: Some(vec![0, 1, 1, 0, 0, 1, 1]), - array_offsets: vec![0, 3, 3, 6], - array_mask: vec![true, true, true, false, true, true, true], - max_definition: 3, - level_type: LevelType::Primitive(true), - offset: 0, - length: 6, - }; - - let expected = vec![0, 1, 2, 3, 4, 5]; - let filter = level.filter_array_indices(); - assert_eq!(expected, filter); - } - #[test] fn test_null_vs_nonnull_struct() { // define schema @@ -1571,9 +684,8 @@ mod tests { RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]) .unwrap(); - let batch_level = LevelInfo::new(0, batch.num_rows()); let struct_null_level = - batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0)); + calculate_array_levels(batch.column(0), batch.schema().field(0)); // create second batch // define schema @@ -1595,9 +707,8 @@ mod tests { RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]) .unwrap(); - let batch_level = LevelInfo::new(0, batch.num_rows()); let struct_non_null_level = - batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0)); + calculate_array_levels(batch.column(0), batch.schema().field(0)); // The 2 levels should not be the same if struct_non_null_level == struct_null_level { @@ -1634,20 +745,6 @@ mod tests { let batch = reader.next().unwrap().unwrap(); - let expected_batch_level = LevelInfo { - definition: vec![0; 3], - repetition: None, - array_offsets: (0..=3).collect(), - array_mask: vec![true, true, true], - max_definition: 0, - level_type: LevelType::Root, - offset: 0, - length: 3, - }; - - let batch_level = LevelInfo::new(0, 3); - assert_eq!(&batch_level, &expected_batch_level); - // calculate the map's level let mut levels = vec![]; batch @@ -1655,7 +752,7 @@ mod tests { .iter() .zip(batch.schema().fields()) .for_each(|(array, field)| { - let mut array_levels = batch_level.calculate_array_levels(array, field); + let mut array_levels = calculate_array_levels(array, field).unwrap(); levels.append(&mut array_levels); }); assert_eq!(levels.len(), 2); @@ -1664,14 +761,11 @@ mod tests { let list_level = levels.get(0).unwrap(); let expected_level = LevelInfo { - definition: vec![1; 7], - repetition: Some(vec![0, 1, 0, 1, 0, 1, 1]), - array_offsets: vec![0, 2, 4, 7], - array_mask: vec![true; 7], - max_definition: 1, - level_type: LevelType::Primitive(false), - offset: 0, - length: 7, + def_levels: Some(vec![1; 7]), + rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]), + non_null_indices: vec![0, 1, 2, 3, 4, 5, 6], + max_def_level: 1, + max_rep_level: 1, }; assert_eq!(list_level, &expected_level); @@ -1679,14 +773,11 @@ mod tests { let list_level = levels.get(1).unwrap(); let expected_level = LevelInfo { - definition: vec![2, 2, 2, 1, 2, 1, 2], - repetition: Some(vec![0, 1, 0, 1, 0, 1, 1]), - array_offsets: vec![0, 2, 4, 7], - array_mask: vec![true, true, true, false, true, false, true], - max_definition: 2, - level_type: LevelType::Primitive(true), - offset: 0, - length: 7, + def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]), + rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]), + non_null_indices: vec![0, 1, 2, 4, 6], + max_def_level: 2, + max_rep_level: 1, }; assert_eq!(list_level, &expected_level); } @@ -1760,63 +851,24 @@ mod tests { let array = Arc::new(list_builder.finish()); + let values_len = array.data().child_data()[0].len(); + assert_eq!(values_len, 5); + let schema = Arc::new(Schema::new(vec![list_field])); let rb = RecordBatch::try_new(schema, vec![array]).unwrap(); - let batch_level = LevelInfo::new(0, rb.num_rows()); - let list_level = - &batch_level.calculate_array_levels(rb.column(0), rb.schema().field(0))[0]; + let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap(); + let list_level = &levels[0]; let expected_level = LevelInfo { - definition: vec![4, 1, 0, 2, 2, 3, 4], - repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]), - array_offsets: vec![0, 1, 1, 1, 3, 4, 5], - array_mask: vec![true, true, false, false, false, false, true], - max_definition: 4, - level_type: LevelType::Primitive(true), - offset: 0, - length: 5, + def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]), + rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]), + non_null_indices: vec![0, 4], + max_def_level: 4, + max_rep_level: 1, }; assert_eq!(list_level, &expected_level); } - - #[test] - fn test_nested_indices() { - // Given a buffer like - // [0, null, null, 1, 2] - // - // The two level infos below might represent the two structures - // 1: [{a: 0}], [], null, [null, null], [{a: 1}], [{a: 2}] - // 2: [0], [], null, [null, null], [1], [2] - // - // (That is, their only difference is that the leaf values are nested one level deeper in a - // struct). - - let level1 = LevelInfo { - definition: vec![4, 1, 0, 2, 2, 4, 4], - repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]), - array_offsets: vec![0, 1, 1, 1, 3, 4, 5], - array_mask: vec![true, true, false, false, false, false, true], - max_definition: 4, - level_type: LevelType::Primitive(true), - offset: 0, - length: 5, - }; - - let level2 = LevelInfo { - definition: vec![3, 1, 0, 2, 2, 3, 3], - repetition: Some(vec![0, 0, 0, 0, 1, 0, 0]), - array_offsets: vec![0, 1, 1, 1, 3, 4, 5], - array_mask: vec![true, true, false, false, false, false, true], - max_definition: 3, - level_type: LevelType::Primitive(true), - offset: 0, - length: 5, - }; - - // filter_array_indices should return the same indices in this case. - assert_eq!(level1.filter_array_indices(), level2.filter_array_indices()); - } }