diff --git a/arrow/src/array/builder.rs b/arrow/src/array/builder.rs index 1f7e91ff74e1..85c013c6b362 100644 --- a/arrow/src/array/builder.rs +++ b/arrow/src/array/builder.rs @@ -419,6 +419,11 @@ impl BooleanBufferBuilder { ); } + /// Returns the packed bits + pub fn as_slice(&self) -> &[u8] { + self.buffer.as_slice() + } + #[inline] pub fn finish(&mut self) -> Buffer { let buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0)); diff --git a/parquet/benches/arrow_array_reader.rs b/parquet/benches/arrow_array_reader.rs index acc5141bcbab..5587b5211f96 100644 --- a/parquet/benches/arrow_array_reader.rs +++ b/parquet/benches/arrow_array_reader.rs @@ -301,8 +301,13 @@ fn create_int32_primitive_array_reader( column_desc: ColumnDescPtr, ) -> impl ArrayReader { use parquet::arrow::array_reader::PrimitiveArrayReader; - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) - .unwrap() + PrimitiveArrayReader::::new_with_options( + Box::new(page_iterator), + column_desc, + None, + true, + ) + .unwrap() } fn create_string_arrow_array_reader( diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index d166be78c7ec..6ba08f9820e4 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -247,6 +247,17 @@ where pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, + ) -> Result { + Self::new_with_options(pages, column_desc, arrow_type, false) + } + + /// Construct primitive array reader with ability to only compute null mask and not + /// buffer level data + pub fn new_with_options( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, + null_mask_only: bool, ) -> Result { // Check if Arrow type is specified, else create it from Parquet type let data_type = match arrow_type { @@ -256,7 +267,7 @@ where .clone(), }; - let record_reader = RecordReader::::new(column_desc.clone()); + let record_reader = RecordReader::::new_with_options(column_desc.clone(), null_mask_only); Ok(Self { data_type, @@ -1350,19 +1361,26 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext let mut new_context = context.clone(); new_context.path.append(vec![cur_type.name().to_string()]); - match cur_type.get_basic_info().repetition() { + let null_mask_only = match cur_type.get_basic_info().repetition() { Repetition::REPEATED => { new_context.def_level += 1; new_context.rep_level += 1; + false } Repetition::OPTIONAL => { new_context.def_level += 1; + + // Can just compute null mask if no parent + context.def_level == 0 && context.rep_level == 0 } - _ => (), - } + _ => false, + }; - let reader = - self.build_for_primitive_type_inner(cur_type.clone(), &new_context)?; + let reader = self.build_for_primitive_type_inner( + cur_type.clone(), + &new_context, + null_mask_only, + )?; if cur_type.get_basic_info().repetition() == Repetition::REPEATED { Err(ArrowError( @@ -1641,6 +1659,7 @@ impl<'a> ArrayReaderBuilder { &self, cur_type: TypePtr, context: &'a ArrayReaderBuilderContext, + null_mask_only: bool, ) -> Result> { let column_desc = Arc::new(ColumnDescriptor::new( cur_type.clone(), @@ -1658,11 +1677,14 @@ impl<'a> ArrayReaderBuilder { .map(|f| f.data_type().clone()); match cur_type.get_physical_type() { - PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - arrow_type, - )?)), + PhysicalType::BOOLEAN => Ok(Box::new( + PrimitiveArrayReader::::new_with_options( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + )?, + )), PhysicalType::INT32 => { if let Some(ArrowType::Null) = arrow_type { Ok(Box::new(NullArrayReader::::new( @@ -1670,18 +1692,24 @@ impl<'a> ArrayReaderBuilder { column_desc, )?)) } else { - Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - arrow_type, - )?)) + Ok(Box::new( + PrimitiveArrayReader::::new_with_options( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + )?, + )) } } - PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - arrow_type, - )?)), + PhysicalType::INT64 => Ok(Box::new( + PrimitiveArrayReader::::new_with_options( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + )?, + )), PhysicalType::INT96 => { // get the optional timezone information from arrow type let timezone = arrow_type @@ -1705,18 +1733,22 @@ impl<'a> ArrayReaderBuilder { arrow_type, )?)) } - PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - arrow_type, - )?)), - PhysicalType::DOUBLE => { - Ok(Box::new(PrimitiveArrayReader::::new( + PhysicalType::FLOAT => Ok(Box::new( + PrimitiveArrayReader::::new_with_options( page_iterator, column_desc, arrow_type, - )?)) - } + null_mask_only, + )?, + )), + PhysicalType::DOUBLE => Ok(Box::new( + PrimitiveArrayReader::::new_with_options( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + )?, + )), PhysicalType::BYTE_ARRAY => { if cur_type.get_basic_info().converted_type() == ConvertedType::UTF8 { if let Some(ArrowType::LargeUtf8) = arrow_type { diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 4913e1434ea6..a81dc2504e9d 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -73,9 +73,19 @@ where V: ValuesBuffer + Default, CV: ColumnValueDecoder, { + /// Create a new [`GenericRecordReader`] pub fn new(desc: ColumnDescPtr) -> Self { - let def_levels = - (desc.max_def_level() > 0).then(|| DefinitionLevelBuffer::new(&desc)); + Self::new_with_options(desc, false) + } + + /// Create a new [`GenericRecordReader`] with the ability to only generate the bitmask + /// + /// If `null_mask_only` is true only the null bitmask will be generated and + /// [`Self::consume_def_levels`] and [`Self::consume_rep_levels`] will always return `None` + /// + pub(crate) fn new_with_options(desc: ColumnDescPtr, null_mask_only: bool) -> Self { + let def_levels = (desc.max_def_level() > 0) + .then(|| DefinitionLevelBuffer::new(&desc, null_mask_only)); let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new); @@ -171,7 +181,7 @@ where /// as record values, e.g. those from `self.num_values` to `self.values_written`. pub fn consume_def_levels(&mut self) -> Result> { Ok(match self.def_levels.as_mut() { - Some(x) => Some(x.split_off(self.num_values)), + Some(x) => x.split_levels(self.num_values), None => None, }) } @@ -221,10 +231,7 @@ where .as_mut() .map(|levels| levels.spare_capacity_mut(batch_size)); - let def_levels = self - .def_levels - .as_mut() - .map(|levels| levels.spare_capacity_mut(batch_size)); + let def_levels = self.def_levels.as_mut(); let values = self.records.spare_capacity_mut(batch_size); diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 7dbf2d137b3a..29e6110d519f 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -114,6 +114,11 @@ impl ScalarBuffer { self.len == 0 } + pub fn resize(&mut self, len: usize) { + self.buffer.resize(len * std::mem::size_of::(), 0); + self.len = len; + } + #[inline] pub fn as_slice(&self) -> &[T] { let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::() }; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 86c089fc4516..762d94bf534e 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -15,74 +15,110 @@ // specific language governing permissions and limitations // under the License. +use std::ops::Range; + use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; -use std::ops::Range; +use arrow::util::bit_chunk_iterator::BitChunks; -use crate::column::reader::decoder::ColumnLevelDecoderImpl; +use crate::arrow::record_reader::buffer::BufferQueue; +use crate::basic::Encoding; +use crate::column::reader::decoder::{ + ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsBufferSlice, +}; +use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; +use crate::util::memory::ByteBufferPtr; -use super::{ - buffer::{BufferQueue, ScalarBuffer}, - MIN_BATCH_SIZE, -}; +use super::{buffer::ScalarBuffer, MIN_BATCH_SIZE}; + +enum BufferInner { + /// Compute levels and null mask + Full { + levels: ScalarBuffer, + nulls: BooleanBufferBuilder, + max_level: i16, + }, + /// Only compute null bitmask - requires max level to be 1 + Mask { nulls: BooleanBufferBuilder }, +} pub struct DefinitionLevelBuffer { - buffer: ScalarBuffer, - builder: BooleanBufferBuilder, - max_level: i16, + inner: BufferInner, + + /// The length of this buffer + /// + /// Note: `buffer` and `builder` may contain more elements + len: usize, } -impl BufferQueue for DefinitionLevelBuffer { - type Output = Buffer; - type Slice = [i16]; +impl DefinitionLevelBuffer { + pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self { + let inner = match null_mask_only { + true => { + assert_eq!( + desc.max_def_level(), + 1, + "max definition level must be 1 to only compute null bitmask" + ); - fn split_off(&mut self, len: usize) -> Self::Output { - self.buffer.split_off(len) - } + assert_eq!( + desc.max_rep_level(), + 0, + "max repetition level must be 0 to only compute null bitmask" + ); - fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice { - assert_eq!(self.buffer.len(), self.builder.len()); - self.buffer.spare_capacity_mut(batch_size) - } + BufferInner::Mask { + nulls: BooleanBufferBuilder::new(0), + } + } + false => BufferInner::Full { + levels: ScalarBuffer::new(), + nulls: BooleanBufferBuilder::new(0), + max_level: desc.max_def_level(), + }, + }; - fn set_len(&mut self, len: usize) { - self.buffer.set_len(len); - let buf = self.buffer.as_slice(); + Self { inner, len: 0 } + } - let range = self.builder.len()..len; - self.builder.reserve(range.end - range.start); - for i in &buf[range] { - self.builder.append(*i == self.max_level) + pub fn split_levels(&mut self, len: usize) -> Option { + match &mut self.inner { + BufferInner::Full { levels, .. } => { + let out = levels.split_off(len); + self.len = levels.len(); + Some(out) + } + BufferInner::Mask { .. } => None, } } -} -impl DefinitionLevelBuffer { - pub fn new(desc: &ColumnDescPtr) -> Self { - Self { - buffer: ScalarBuffer::new(), - builder: BooleanBufferBuilder::new(0), - max_level: desc.max_def_level(), - } + pub fn set_len(&mut self, len: usize) { + assert_eq!(self.nulls().len(), len); + self.len = len; } /// Split `len` levels out of `self` pub fn split_bitmask(&mut self, len: usize) -> Bitmap { - let old_len = self.builder.len(); + let builder = match &mut self.inner { + BufferInner::Full { nulls, .. } => nulls, + BufferInner::Mask { nulls } => nulls, + }; + + let old_len = builder.len(); let num_left_values = old_len - len; let new_bitmap_builder = BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values)); - let old_bitmap = - std::mem::replace(&mut self.builder, new_bitmap_builder).finish(); + let old_bitmap = std::mem::replace(builder, new_bitmap_builder).finish(); let old_bitmap = Bitmap::from(old_bitmap); for i in len..old_len { - self.builder.append(old_bitmap.is_set(i)); + builder.append(old_bitmap.is_set(i)); } + self.len = builder.len(); old_bitmap } @@ -91,10 +127,283 @@ impl DefinitionLevelBuffer { &self, range: Range, ) -> impl Iterator + '_ { - let max_def_level = self.max_level; - let slice = self.buffer.as_slice(); - range.rev().filter(move |x| slice[*x] == max_def_level) + assert_eq!(range.start, self.len); + iter_set_bits_rev(self.nulls().as_slice()) + } + + fn nulls(&self) -> &BooleanBufferBuilder { + match &self.inner { + BufferInner::Full { nulls, .. } => nulls, + BufferInner::Mask { nulls } => nulls, + } } } -pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl; +impl LevelsBufferSlice for DefinitionLevelBuffer { + fn capacity(&self) -> usize { + usize::MAX + } + + fn count_nulls(&self, range: Range, _max_level: i16) -> usize { + let total_count = range.end - range.start; + let range = range.start + self.len..range.end + self.len; + total_count - count_set_bits(self.nulls().as_slice(), range) + } +} + +pub struct DefinitionLevelDecoder { + max_level: i16, + encoding: Encoding, + data: Option, + column_decoder: Option, + packed_decoder: Option, +} + +impl ColumnLevelDecoder for DefinitionLevelDecoder { + type Slice = DefinitionLevelBuffer; + + fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self { + Self { + max_level, + encoding, + data: Some(data), + column_decoder: None, + packed_decoder: None, + } + } + + fn read( + &mut self, + writer: &mut Self::Slice, + range: Range, + ) -> crate::errors::Result { + match &mut writer.inner { + BufferInner::Full { + levels, + nulls, + max_level, + } => { + assert_eq!(self.max_level, *max_level); + assert_eq!(range.start + writer.len, nulls.len()); + + let decoder = match self.data.take() { + Some(data) => self.column_decoder.insert( + ColumnLevelDecoderImpl::new(self.max_level, self.encoding, data), + ), + None => self + .column_decoder + .as_mut() + .expect("consistent null_mask_only"), + }; + + levels.resize(range.end + writer.len); + + let slice = &mut levels.as_slice_mut()[writer.len..]; + let levels_read = decoder.read(slice, range.clone())?; + + nulls.reserve(levels_read); + for i in &slice[range.start..range.start + levels_read] { + nulls.append(i == max_level) + } + + Ok(levels_read) + } + BufferInner::Mask { nulls } => { + assert_eq!(self.max_level, 1); + assert_eq!(range.start + writer.len, nulls.len()); + + let decoder = match self.data.take() { + Some(data) => self + .packed_decoder + .insert(PackedDecoder::new(self.encoding, data)), + None => self + .packed_decoder + .as_mut() + .expect("consistent null_mask_only"), + }; + + decoder.read(nulls, range.end - range.start) + } + } + } +} + +struct PackedDecoder { + data: ByteBufferPtr, + data_offset: usize, + rle_left: usize, + rle_value: bool, + packed_count: usize, + packed_offset: usize, +} + +impl PackedDecoder { + fn next_rle_block(&mut self) -> Result<()> { + let indicator_value = self.decode_header()?; + if indicator_value & 1 == 1 { + let len = (indicator_value >> 1) as usize; + self.packed_count = len * 8; + self.packed_offset = 0; + } else { + self.rle_left = (indicator_value >> 1) as usize; + let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| { + ParquetError::EOF( + "unexpected end of file whilst decoding definition levels rle value" + .into(), + ) + })?; + + self.data_offset += 1; + self.rle_value = byte != 0; + } + Ok(()) + } + + /// Decodes a VLQ encoded little endian integer and returns it + fn decode_header(&mut self) -> Result { + let mut offset = 0; + let mut v: i64 = 0; + while offset < 10 { + let byte = *self + .data + .as_ref() + .get(self.data_offset + offset) + .ok_or_else(|| { + ParquetError::EOF( + "unexpected end of file whilst decoding definition levels rle header" + .into(), + ) + })?; + + v |= ((byte & 0x7F) as i64) << (offset * 7); + offset += 1; + if byte & 0x80 == 0 { + self.data_offset += offset; + return Ok(v); + } + } + Err(general_err!("too many bytes for VLQ")) + } +} + +impl PackedDecoder { + fn new(encoding: Encoding, data: ByteBufferPtr) -> Self { + match encoding { + Encoding::RLE => Self { + data, + data_offset: 0, + rle_left: 0, + rle_value: false, + packed_count: 0, + packed_offset: 0, + }, + Encoding::BIT_PACKED => Self { + data_offset: 0, + rle_left: 0, + rle_value: false, + packed_count: data.len() * 8, + packed_offset: 0, + data, + }, + _ => unreachable!("invalid level encoding: {}", encoding), + } + } + + fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result { + let mut read = 0; + while read != len { + if self.rle_left != 0 { + let to_read = self.rle_left.min(len - read); + buffer.append_n(to_read, self.rle_value); + self.rle_left -= to_read; + read += to_read; + } else if self.packed_count != self.packed_offset { + let to_read = (self.packed_count - self.packed_offset).min(len - read); + let offset = self.data_offset * 8 + self.packed_offset; + buffer.append_packed_range(offset..offset + to_read, self.data.as_ref()); + self.packed_offset += to_read; + read += to_read; + + if self.packed_offset == self.packed_count { + self.data_offset += self.packed_count / 8; + } + } else if self.data_offset == self.data.len() { + break; + } else { + self.next_rle_block()? + } + } + Ok(read) + } +} + +/// Counts the number of set bits in the provided range +pub fn count_set_bits(bytes: &[u8], range: Range) -> usize { + let mut count = 0_usize; + let chunks = BitChunks::new(bytes, range.start, range.end - range.start); + chunks.iter().for_each(|chunk| { + count += chunk.count_ones() as usize; + }); + count += chunks.remainder_bits().count_ones() as usize; + count +} + +fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator + '_ { + let mut byte_idx = bytes.len() - 1; + let mut in_progress = bytes.get(byte_idx).cloned().unwrap_or(0); + + std::iter::from_fn(move || loop { + if in_progress != 0 { + let bit_pos = 7 - in_progress.leading_zeros(); + in_progress ^= 1 << bit_pos; + return Some((byte_idx << 3) + (bit_pos as usize)); + } + + if byte_idx == 0 { + return None; + } + + byte_idx -= 1; + in_progress = bytes[byte_idx]; + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + use rand::{thread_rng, Rng, RngCore}; + + #[test] + fn test_bit_fns() { + let mut rng = thread_rng(); + let mask_length = (rng.next_u32() % 50) as usize; + let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0)) + .take(mask_length) + .collect(); + + let mut nulls = BooleanBufferBuilder::new(mask_length); + bools.iter().for_each(|b| nulls.append(*b)); + + let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect(); + let expected: Vec<_> = bools + .iter() + .enumerate() + .rev() + .filter_map(|(x, y)| y.then(|| x)) + .collect(); + assert_eq!(actual, expected); + + assert_eq!(count_set_bits(&[0xFF], 1..1), 0); + + for _ in 0..20 { + let start = rng.gen_range(0..bools.len()); + let end = rng.gen_range(start..bools.len()); + + let actual = count_set_bits(nulls.as_slice(), start..end); + let expected = bools[start..end].iter().filter(|x| **x).count(); + + assert_eq!(actual, expected); + } + } +}