From 95ed8a020c2f44f5b30cfffd0682b98022cc4aea Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sat, 1 May 2021 22:14:37 +0100 Subject: [PATCH] implement StringArrayConverter and replace ComplexObjectArrayReader with ArrowArrayReader for reading StringArrays --- rust/parquet/src/arrow/array_reader.rs | 23 +- rust/parquet/src/arrow/arrow_array_reader.rs | 297 ++++++++++++++++--- rust/parquet/src/encodings/mod.rs | 2 +- 3 files changed, 270 insertions(+), 52 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index a906147a8f93d..2b7c853c575f5 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -1477,16 +1477,21 @@ impl<'a> ArrayReaderBuilder { arrow_type, )?)) } else { - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - Utf8Converter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, + use crate::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader}; + let converter = StringArrayConverter::new(); + Ok(Box::new(ArrowArrayReader::try_new( + *page_iterator, column_desc, converter, None )?)) + // let converter = Utf8Converter::new(Utf8ArrayConverter {}); + // Ok(Box::new(ComplexObjectArrayReader::< + // ByteArrayType, + // Utf8Converter, + // >::new( + // page_iterator, + // column_desc, + // converter, + // arrow_type, + // )?)) } } else if let Some(ArrowType::LargeBinary) = arrow_type { let converter = diff --git a/rust/parquet/src/arrow/arrow_array_reader.rs b/rust/parquet/src/arrow/arrow_array_reader.rs index 369d40f4686e1..cd67a7c5698a4 100644 --- a/rust/parquet/src/arrow/arrow_array_reader.rs +++ b/rust/parquet/src/arrow/arrow_array_reader.rs @@ -4,6 +4,7 @@ use arrow::{array::{ArrayRef, Int16Array}, buffer::MutableBuffer, datatypes::{Da use crate::{column::page::{Page, PageIterator}, memory::{ByteBufferPtr, BufferPtr}, schema::types::{ColumnDescPtr, ColumnDescriptor}}; use crate::arrow::schema::parquet_to_arrow_field; use crate::errors::{ParquetError, Result}; +use crate::basic::Encoding; use super::array_reader::ArrayReader; struct UnzipIter @@ -217,6 +218,10 @@ where // source iterator exhausted return None; } + + fn size_hint(&self) -> (usize, Option) { + (0, Some(self.batch_size)) + } } impl Splittable for Result> { @@ -287,11 +292,11 @@ impl Splittable for Result { type LevelBufferPtr = BufferPtr; -trait ArrayConverter { - fn convert_value_chunks(&self, value_byte_chunks: Vec) -> Result; +pub(crate) trait ArrayConverter { + fn convert_value_chunks(&self, value_byte_chunks: impl IntoIterator>) -> Result; } -struct ArrowArrayReader<'a, C: ArrayConverter + 'a> { +pub(crate) struct ArrowArrayReader<'a, C: ArrayConverter + 'a> { column_desc: ColumnDescPtr, data_type: ArrowType, def_level_iter_factory: SplittableBatchingIteratorFactory<'a, Result>, @@ -302,8 +307,20 @@ struct ArrowArrayReader<'a, C: ArrayConverter + 'a> { array_converter: C, } +pub(crate) struct ColumnChunkContext { + dictionary_values: Option>, +} + +impl ColumnChunkContext { + fn new() -> Self { + Self { + dictionary_values: None, + } + } +} + impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { - fn try_new(column_chunk_iterator: P, column_desc: ColumnDescPtr, array_converter: C, arrow_type: Option) -> Result { + pub(crate) fn try_new(column_chunk_iterator: P, column_desc: ColumnDescPtr, array_converter: C, arrow_type: Option) -> Result { let data_type = match arrow_type { Some(t) => t, None => parquet_to_arrow_field(column_desc.as_ref())? @@ -311,12 +328,13 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { .clone(), }; - // TODO: attach column chunk context let page_iter = column_chunk_iterator // build iterator of pages across column chunks - .flat_map(|x| -> Box>> { + .flat_map(|x| -> Box>)>>> { + // attach column chunk context + let context = Rc::new(RefCell::new(ColumnChunkContext::new())); match x { - Ok(page_reader) => Box::new(page_reader.into_iter()), + Ok(page_reader) => Box::new(page_reader.map(move |pr| pr.and_then(|p| Ok((p, context.clone()))))), // errors from reading column chunks / row groups are propagated to page level Err(e) => Box::new(std::iter::once(Err(e))) } @@ -327,7 +345,9 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { // Ok(p) => Self::map_page(p, column_desc.as_ref()), // Err(e) => Err(e), // } - move |x: Result| x.and_then(|p| Self::map_page(p, column_desc.as_ref())) + move |x: Result<(Page, Rc>)>| x.and_then( + |(page, context)| Self::map_page(page, context, column_desc.as_ref()) + ) })(column_desc.clone()); // map page iterator into tuple of buffer iterators for (values, def levels, rep levels) // errors from lower levels are surfaced through the value decoder iterator @@ -373,11 +393,84 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { // Split Result into Result<(Iterator, Iterator, Iterator)> // this method could fail, e.g. if the page encoding is not supported - fn map_page(page: Page, column_desc: &ColumnDescriptor) -> Result<(Box>>, Box>>, Box>>)> + fn map_page(page: Page, column_chunk_context: Rc>, column_desc: &ColumnDescriptor) -> Result<(Box>>, Box>>, Box>>)> { use crate::encodings::levels::LevelDecoder; - // process page (V1, V2, Dictionary) match page { + Page::DictionaryPage { + buf, + num_values, + encoding, + .. + } => { + let mut column_chunk_context = column_chunk_context.borrow_mut(); + if column_chunk_context.dictionary_values.is_some() { + return Err(general_err!("Column chunk cannot have more than one dictionary")); + } + // create plain decoder for dictionary values + let value_iter = Self::decode_dictionary_page(buf, num_values as usize, encoding, column_desc)?; + // decode and cache dictionary values + let dictionary_values: Vec = value_iter.collect::>()?; + column_chunk_context.dictionary_values = Some(dictionary_values); + + // a dictionary page doesn't return any values + Ok(( + Box::new(std::iter::empty()), + Box::new(std::iter::empty()), + Box::new(std::iter::empty()), + )) + } + Page::DataPage { + buf, + num_values, + encoding, + def_level_encoding, + rep_level_encoding, + statistics: _, + } => { + let mut buffer_ptr = buf; + // create rep level decoder iterator + let rep_level_iter: Box>> = if Self::rep_levels_available(&column_desc) { + let mut rep_decoder = + LevelDecoder::v1(rep_level_encoding, column_desc.max_rep_level()); + let rep_level_byte_len = rep_decoder.set_data( + num_values as usize, + buffer_ptr.all(), + ); + // advance buffer pointer + buffer_ptr = buffer_ptr.start_from(rep_level_byte_len); + Box::new(rep_decoder) + } + else { + Box::new(std::iter::once(Err(ParquetError::General(format!("rep levels are not available"))))) + }; + // create def level decoder iterator + let def_level_iter: Box>> = if Self::def_levels_available(&column_desc) { + let mut def_decoder = LevelDecoder::v1( + def_level_encoding, + column_desc.max_def_level(), + ); + let def_levels_byte_len = def_decoder.set_data( + num_values as usize, + buffer_ptr.all(), + ); + // advance buffer pointer + buffer_ptr = buffer_ptr.start_from(def_levels_byte_len); + Box::new(def_decoder) + } + else { + Box::new(std::iter::once(Err(ParquetError::General(format!("def levels are not available"))))) + }; + // create value decoder iterator + let value_iter = Self::get_values_decoder_iter( + buffer_ptr, num_values as usize, encoding, column_desc, column_chunk_context + )?; + Ok(( + value_iter, + def_level_iter, + rep_level_iter + )) + } Page::DataPageV2 { buf, num_values, @@ -428,33 +521,52 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { // create value decoder iterator let values_buffer = buf.start_from(offset); let value_iter = Self::get_values_decoder_iter( - values_buffer, num_values as usize, encoding, column_desc + values_buffer, num_values as usize, encoding, column_desc, column_chunk_context )?; Ok(( value_iter, def_level_iter, rep_level_iter )) - }, - p @ _ => Err(ParquetError::General( - format!("type of page not supported: {}", p.page_type())) - ) + } } } - fn get_values_decoder_iter(values_buffer: ByteBufferPtr, num_values: usize, mut encoding: crate::basic::Encoding, column_desc: &ColumnDescriptor) -> Result>>> { - use crate::basic::Encoding; + fn decode_dictionary_page(values_buffer: ByteBufferPtr, num_values: usize, mut encoding: Encoding, column_desc: &ColumnDescriptor) -> Result>>> { + if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY + } + + if encoding == Encoding::RLE_DICTIONARY { + Ok(Self::get_plain_decoder_iterator(values_buffer, num_values, column_desc)) + } else { + Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )) + } + } + + fn get_values_decoder_iter(values_buffer: ByteBufferPtr, num_values: usize, mut encoding: Encoding, column_desc: &ColumnDescriptor, column_chunk_context: Rc>) -> Result>>> { if encoding == Encoding::PLAIN_DICTIONARY { encoding = Encoding::RLE_DICTIONARY; } match encoding { - Encoding::PLAIN => Self::get_plain_decoder_iterator(values_buffer, num_values, column_desc), - // Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { - // return Err(general_err!( - // "Cannot initialize this encoding through this function" - // )); - // } + Encoding::PLAIN => Ok(Self::get_plain_decoder_iterator(values_buffer, num_values, column_desc)), + Encoding::RLE_DICTIONARY => { + // TODO: add support for fixed-length types + if column_chunk_context.borrow().dictionary_values.is_some() { + Ok(Box::new(DictionaryDecoder::new( + column_chunk_context, values_buffer, num_values + ))) + } + else { + Err(general_err!( + "Dictionary values have not been initialized." + )) + } + } // Encoding::RLE => Box::new(RleValueDecoder::new()), // Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()), // Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()), @@ -463,7 +575,7 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { } } - fn get_plain_decoder_iterator(values_buffer: ByteBufferPtr, num_values: usize, column_desc: &ColumnDescriptor) -> Result>>> { + fn get_plain_decoder_iterator(values_buffer: ByteBufferPtr, num_values: usize, column_desc: &ColumnDescriptor) -> Box>> { use crate::encodings::decoding::{FixedLenPlainDecoder, VariableLenPlainDecoder}; use crate::basic::Type as PhysicalType; @@ -475,12 +587,12 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { PhysicalType::INT64 | PhysicalType::DOUBLE => 64, PhysicalType::INT96 => 96, PhysicalType::BYTE_ARRAY => { - return Ok(Box::new(VariableLenPlainDecoder::new(values_buffer, num_values))); + return Box::new(VariableLenPlainDecoder::new(values_buffer, num_values)); }, PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize * 8, }; - Ok(Box::new(FixedLenPlainDecoder::new(values_buffer, num_values, value_bit_len))) + Box::new(FixedLenPlainDecoder::new(values_buffer, num_values, value_bit_len)) } fn build_level_array(level_buffers: Vec) -> Int16Array { @@ -538,12 +650,16 @@ impl ArrayReader for ArrowArrayReader<'static, C> { // read a batch of values let value_iter = self.value_iter_factory.get_batch_iter(values_to_read); + + // NOTE: collecting value chunks here is actually slower + // TODO: re-evaluate when iterators are migrated to async streams // collect and unwrap values into Vec, return first error if any // this will separate reading and decoding values from creating an arrow array // extra memory is allocated for the Vec, but the values still point to the page buffer - let values: Vec = value_iter.collect::>()?; + // let values: Vec = value_iter.collect::>()?; + // converter only creates a no-null / all value array data - let mut value_array_data = self.array_converter.convert_value_chunks(values)?; + let mut value_array_data = self.array_converter.convert_value_chunks(value_iter)?; if let Some(null_bitmap_array) = null_bitmap_array { // Only if def levels are available - insert null values efficiently using MutableArrayData. @@ -575,7 +691,7 @@ impl ArrayReader for ArrowArrayReader<'static, C> { .null_bit_buffer(null_bitmap_array.values().clone()) .build(); } - + // TODO: cast array to self.data_type if necessary Ok(arrow::array::make_array(value_array_data)) } @@ -588,40 +704,137 @@ impl ArrayReader for ArrowArrayReader<'static, C> { } } -struct StringArrayConverter {} +use crate::encodings::rle::RleDecoder; + +pub(crate) struct DictionaryDecoder { + context_ref: Rc>, + key_data_bufer: ByteBufferPtr, + num_values: usize, + rle_decoder: RleDecoder, + // value_buffer: VecDeque, + // keys_buffer: Vec, +} + +impl DictionaryDecoder { + pub(crate) fn new(column_chunk_context: Rc>, key_data_bufer: ByteBufferPtr, num_values: usize) -> Self { + // First byte in `data` is bit width + let bit_width = key_data_bufer.data()[0]; + let mut rle_decoder = RleDecoder::new(bit_width); + rle_decoder.set_data(key_data_bufer.start_from(1)); + + Self { + context_ref: column_chunk_context, + key_data_bufer, + num_values, + rle_decoder, + // value_buffer: VecDeque::with_capacity(128), + // keys_buffer: vec![0; 128], + } + } +} + +impl Iterator for DictionaryDecoder { + type Item = Result; + + fn next(&mut self) -> Option { + // this simpler, non-buffering implementation is actually a bit faster + // TODO: re-evaluate when iterators are replaced with async streams + if self.num_values > 0 { + let value_index = match self.rle_decoder.get::() { + Ok(maybe_key) => match maybe_key { + Some(key) => key, + None => return None, + } + Err(e) => return Some(Err(e)), + }; + let context = self.context_ref.borrow(); + let value_chunk = &context.dictionary_values.as_ref().unwrap()[value_index as usize]; + self.num_values -= 1; + return Some(Ok(value_chunk.clone())); + } + return None; + + // match self.value_buffer.pop_front() { + // Some(value) => Some(Ok(value)), + // None => { + // if self.num_values <= 0 { + // return None; + // } + // let values_to_read = std::cmp::min(self.num_values, self.keys_buffer.len()); + // let keys_read = match self.rle_decoder.get_batch(&mut self.keys_buffer[..values_to_read]) { + // Ok(keys_read) => keys_read, + // Err(e) => return Some(Err(e)), + // }; + // if keys_read == 0 { + // self.num_values = 0; + // return None; + // } + // let context = self.context_ref.borrow(); + // let values = context.dictionary_values.as_ref().unwrap(); + // let first_value = values[self.keys_buffer[0] as usize].clone(); + // let values_iter = + // self.keys_buffer[1..keys_read].iter() + // .map(|key| values[*key as usize].clone()); + // self.value_buffer.extend(values_iter); + + // self.num_values -= keys_read; + // Some(Ok(first_value)) + // } + // } + } +} + +pub(crate) struct StringArrayConverter {} impl StringArrayConverter { - fn new() -> Self { + pub(crate) fn new() -> Self { Self {} } } impl ArrayConverter for StringArrayConverter { - fn convert_value_chunks(&self, value_byte_chunks: Vec) -> Result { + fn convert_value_chunks(&self, value_byte_chunks: impl IntoIterator>) -> Result { use arrow::datatypes::ArrowNativeType; - let data_len = value_byte_chunks.len(); + let value_chunks_iter = value_byte_chunks.into_iter(); + let value_capacity = value_chunks_iter.size_hint().1 + .ok_or_else(|| ParquetError::ArrowError("StringArrayConverter expects input iterator to declare an upper size hint.".to_string()))?; let offset_size = std::mem::size_of::(); - let mut offsets = MutableBuffer::new((data_len + 1) * offset_size); - let values_byte_len = value_byte_chunks.iter().map(|x| x.data.len()).sum(); - let mut values = MutableBuffer::new(values_byte_len); + let mut offsets_buffer = MutableBuffer::new((value_capacity + 1) * offset_size); + // NOTE: calculating exact value byte capacity is actually slower with current implementation + // TODO: re-evaluate when iterators are migrated to async streams + // calculate values_byte_capacity + // let mut values_byte_capacity = 0; + // let mut value_chunks = Vec::::with_capacity(value_capacity); + // for value_chunk in value_iter { + // let value_chunk = value_chunk?; + // values_byte_capacity += value_chunk.data.len(); + // value_chunks.push(value_chunk); + // } + + // allocate initial capacity of 1 byte for each item + let values_byte_capacity = value_capacity; + let mut values_buffer = MutableBuffer::new(values_byte_capacity); let mut length_so_far = i32::default(); - offsets.push(length_so_far); + offsets_buffer.push(length_so_far); - for value_chunk in value_byte_chunks { + for value_chunk in value_chunks_iter { + let value_chunk = value_chunk?; debug_assert_eq!( value_chunk.value_count, 1, "offset length value buffers can only contain bytes for a single value" ); let value_bytes = value_chunk.data; length_so_far += ::from_usize(value_bytes.len()).unwrap(); - offsets.push(length_so_far); - values.extend_from_slice(value_bytes.data()); + offsets_buffer.push(length_so_far); + values_buffer.extend_from_slice(value_bytes.data()); } + // calculate actual data_len, which may be different from the iterator's upper bound + let data_len = (offsets_buffer.len() / offset_size) - 1; let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8) .len(data_len) - .add_buffer(offsets.into()) - .add_buffer(values.into()) + .add_buffer(offsets_buffer.into()) + .add_buffer(values_buffer.into()) .build(); Ok(array_data) } diff --git a/rust/parquet/src/encodings/mod.rs b/rust/parquet/src/encodings/mod.rs index 33b1e233d8931..6046ddaec8055 100644 --- a/rust/parquet/src/encodings/mod.rs +++ b/rust/parquet/src/encodings/mod.rs @@ -18,4 +18,4 @@ pub mod decoding; pub mod encoding; pub mod levels; -mod rle; +pub(crate) mod rle;