diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index d8d9e7b67949..894c54ddb153 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -25,10 +25,10 @@ use std::sync::Arc; use std::vec::Vec; use arrow::array::{ - new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray, - BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalBuilder, - FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder, - Int32Array, Int64Array, MapArray, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder, + Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray, BinaryBuilder, + BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalBuilder, FixedSizeBinaryArray, + FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder, Int32Array, + Int64Array, MapArray, new_empty_array, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder, StructArray, }; use arrow::buffer::{Buffer, MutableBuffer}; @@ -45,33 +45,34 @@ use arrow::datatypes::{ Time32MillisecondType as ArrowTime32MillisecondType, Time32SecondType as ArrowTime32SecondType, Time64MicrosecondType as ArrowTime64MicrosecondType, - Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit, - TimestampMicrosecondType as ArrowTimestampMicrosecondType, + Time64NanosecondType as ArrowTime64NanosecondType, TimestampMicrosecondType as ArrowTimestampMicrosecondType, TimestampMillisecondType as ArrowTimestampMillisecondType, TimestampNanosecondType as ArrowTimestampNanosecondType, - TimestampSecondType as ArrowTimestampSecondType, ToByteSlice, + TimestampSecondType as ArrowTimestampSecondType, + TimeUnit as ArrowTimeUnit, ToByteSlice, UInt16Type as ArrowUInt16Type, UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type, UInt8Type as ArrowUInt8Type, }; use arrow::util::bit_util; +use byte_array::make_byte_array_reader; +use byte_array_dictionary::make_byte_array_dictionary_reader; use crate::arrow::converter::{ - BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter, - DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, - Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, - IntervalDayTimeConverter, IntervalYearMonthArrayConverter, - IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter, + Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter, + FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, + IntervalDayTimeArrayConverter, IntervalDayTimeConverter, + IntervalYearMonthArrayConverter, IntervalYearMonthConverter, }; -use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; use crate::arrow::record_reader::{GenericRecordReader, RecordReader}; +use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Repetition, Type as PhysicalType}; use crate::column::page::PageIterator; -use crate::column::reader::decoder::ColumnValueDecoder; use crate::column::reader::ColumnReaderImpl; +use crate::column::reader::decoder::ColumnValueDecoder; use crate::data_type::{ - BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType, - Int32Type, Int64Type, Int96Type, + BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, + Int64Type, Int96Type, }; use crate::errors::{ParquetError, ParquetError::ArrowError, Result}; use crate::file::reader::{FilePageIterator, FileReader}; @@ -81,8 +82,9 @@ use crate::schema::types::{ use crate::schema::visitor::TypeVisitor; mod byte_array; - -pub use byte_array::ByteArrayReader; +mod byte_array_dictionary; +mod dictionary_buffer; +mod offset_buffer; /// Array reader reads parquet data into arrow array. pub trait ArrayReader { @@ -270,7 +272,8 @@ where .clone(), }; - let record_reader = RecordReader::::new_with_options(column_desc.clone(), null_mask_only); + let record_reader = + RecordReader::::new_with_options(column_desc.clone(), null_mask_only); Ok(Self { data_type, @@ -1753,40 +1756,18 @@ impl<'a> ArrayReaderBuilder { )?, )), PhysicalType::BYTE_ARRAY => match arrow_type { - // TODO: Replace with optimised dictionary reader (#171) - Some(ArrowType::Dictionary(_, _)) => { - match cur_type.get_basic_info().converted_type() { - ConvertedType::UTF8 => { - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - Utf8Converter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } - _ => { - let converter = BinaryConverter::new(BinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - BinaryConverter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } - } - } - _ => Ok(Box::new(ByteArrayReader::new( + Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader( page_iterator, column_desc, arrow_type, - )?)), + null_mask_only, + ), + _ => make_byte_array_reader( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + ), }, PhysicalType::FIXED_LEN_BYTE_ARRAY if cur_type.get_basic_info().converted_type() @@ -1974,8 +1955,8 @@ mod tests { use std::collections::VecDeque; use std::sync::Arc; + use rand::{Rng, thread_rng}; use rand::distributions::uniform::SampleUniform; - use rand::{thread_rng, Rng}; use arrow::array::{ Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray, @@ -1994,15 +1975,15 @@ mod tests { use crate::arrow::schema::parquet_to_arrow_schema; use crate::basic::{Encoding, Type as PhysicalType}; use crate::column::page::{Page, PageReader}; - use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type}; + use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type}; use crate::errors::Result; use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; + use crate::util::test_common::{get_test_file, make_pages}; use crate::util::test_common::page_util::{ DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, }; - use crate::util::test_common::{get_test_file, make_pages}; use super::*; diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 93089bf8c1ab..ac9282de8bcc 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -15,15 +15,14 @@ // specific language governing permissions and limitations // under the License. +use crate::arrow::array_reader::offset_buffer::OffsetBuffer; use crate::arrow::array_reader::{read_records, ArrayReader}; -use crate::arrow::record_reader::buffer::{ - BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, -}; +use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Encoding}; use crate::column::page::PageIterator; -use crate::column::reader::decoder::{ColumnValueDecoder, ValuesBufferSlice}; +use crate::column::reader::decoder::ColumnValueDecoder; use crate::data_type::Int32Type; use crate::encodings::{ decoding::{Decoder, DeltaBitPackDecoder}, @@ -32,115 +31,76 @@ use crate::encodings::{ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::memory::ByteBufferPtr; -use arrow::array::{ - ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray, LargeBinaryArray, - LargeStringArray, OffsetSizeTrait, StringArray, -}; +use arrow::array::{ArrayRef, OffsetSizeTrait}; use arrow::buffer::Buffer; use arrow::datatypes::DataType as ArrowType; use std::any::Any; use std::ops::Range; -use std::sync::Arc; - -enum Reader { - Binary(GenericRecordReader, ByteArrayDecoder>), - LargeBinary(GenericRecordReader, ByteArrayDecoder>), - Utf8(GenericRecordReader, ByteArrayDecoder>), - LargeUtf8(GenericRecordReader, ByteArrayDecoder>), -} -fn consume_array_data( - data_type: ArrowType, - reader: &mut GenericRecordReader, ByteArrayDecoder>, -) -> Result { - let buffer = reader.consume_record_data()?; - let mut array_data_builder = ArrayDataBuilder::new(data_type) - .len(buffer.len()) - .add_buffer(buffer.offsets.into()) - .add_buffer(buffer.values.into()); - - if let Some(buffer) = reader.consume_bitmap_buffer()? { - array_data_builder = array_data_builder.null_bit_buffer(buffer); +pub fn make_byte_array_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, + null_mask_only: bool, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; + + match data_type { + ArrowType::Binary | ArrowType::Utf8 => { + let reader = + GenericRecordReader::new_with_options(column_desc, null_mask_only); + Ok(Box::new(ByteArrayReader::::new( + pages, data_type, reader, + ))) + } + ArrowType::LargeUtf8 | ArrowType::LargeBinary => { + let reader = + GenericRecordReader::new_with_options(column_desc, null_mask_only); + Ok(Box::new(ByteArrayReader::::new( + pages, data_type, reader, + ))) + } + _ => Err(general_err!( + "invalid data type for byte array reader - {}", + data_type + )), } - Ok(unsafe { array_data_builder.build_unchecked() }) } -pub struct ByteArrayReader { +struct ByteArrayReader { data_type: ArrowType, pages: Box, def_levels_buffer: Option, rep_levels_buffer: Option, - column_desc: ColumnDescPtr, - record_reader: Reader, + record_reader: GenericRecordReader, ByteArrayColumnValueDecoder>, } -impl ByteArrayReader { - /// Construct primitive array reader. - pub fn new( - pages: Box, - column_desc: ColumnDescPtr, - arrow_type: Option, - ) -> Result { - Self::new_with_options(pages, column_desc, arrow_type, false) - } - - /// Construct primitive array reader with ability to only compute null mask and not - /// buffer level data - pub fn new_with_options( +impl ByteArrayReader { + fn new( pages: Box, - column_desc: ColumnDescPtr, - arrow_type: Option, - null_mask_only: bool, - ) -> Result { - // Check if Arrow type is specified, else create it from Parquet type - let data_type = match arrow_type { - Some(t) => t, - None => parquet_to_arrow_field(column_desc.as_ref())? - .data_type() - .clone(), - }; - - let record_reader = match data_type { - ArrowType::Binary => Reader::Binary(GenericRecordReader::new_with_options( - column_desc.clone(), - null_mask_only, - )), - ArrowType::LargeBinary => { - Reader::LargeBinary(GenericRecordReader::new_with_options( - column_desc.clone(), - null_mask_only, - )) - } - ArrowType::Utf8 => Reader::Utf8(GenericRecordReader::new_with_options( - column_desc.clone(), - null_mask_only, - )), - ArrowType::LargeUtf8 => { - Reader::LargeUtf8(GenericRecordReader::new_with_options( - column_desc.clone(), - null_mask_only, - )) - } - _ => { - return Err(general_err!( - "invalid data type for ByteArrayReader - {}", - data_type - )) - } - }; - - Ok(Self { + data_type: ArrowType, + record_reader: GenericRecordReader< + OffsetBuffer, + ByteArrayColumnValueDecoder, + >, + ) -> Self { + Self { data_type, pages, def_levels_buffer: None, rep_levels_buffer: None, - column_desc, record_reader, - }) + } } } -impl ArrayReader for ByteArrayReader { +impl ArrayReader for ByteArrayReader { fn as_any(&self) -> &dyn Any { self } @@ -149,32 +109,15 @@ impl ArrayReader for ByteArrayReader { &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> crate::errors::Result { - let data = match &mut self.record_reader { - Reader::Binary(r) | Reader::Utf8(r) => { - read_records(r, self.pages.as_mut(), batch_size)?; - let data = consume_array_data(self.data_type.clone(), r)?; - self.def_levels_buffer = r.consume_def_levels()?; - self.rep_levels_buffer = r.consume_rep_levels()?; - r.reset(); - data - } - Reader::LargeBinary(r) | Reader::LargeUtf8(r) => { - read_records(r, self.pages.as_mut(), batch_size)?; - let data = consume_array_data(self.data_type.clone(), r)?; - self.def_levels_buffer = r.consume_def_levels()?; - self.rep_levels_buffer = r.consume_rep_levels()?; - r.reset(); - data - } - }; + fn next_batch(&mut self, batch_size: usize) -> Result { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + let buffer = self.record_reader.consume_record_data()?; + let null_buffer = self.record_reader.consume_bitmap_buffer()?; + self.def_levels_buffer = self.record_reader.consume_def_levels()?; + self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.record_reader.reset(); - Ok(match &self.record_reader { - Reader::Binary(_) => Arc::new(BinaryArray::from(data)), - Reader::LargeBinary(_) => Arc::new(LargeBinaryArray::from(data)), - Reader::Utf8(_) => Arc::new(StringArray::from(data)), - Reader::LargeUtf8(_) => Arc::new(LargeStringArray::from(data)), - }) + Ok(buffer.into_array(null_buffer, self.data_type.clone())) } fn get_def_levels(&self) -> Option<&[i16]> { @@ -190,145 +133,15 @@ impl ArrayReader for ByteArrayReader { } } -struct OffsetBuffer { - offsets: ScalarBuffer, - values: ScalarBuffer, -} - -impl Default for OffsetBuffer { - fn default() -> Self { - let mut offsets = ScalarBuffer::new(); - offsets.resize(1); - Self { - offsets, - values: ScalarBuffer::new(), - } - } -} - -impl OffsetBuffer { - fn len(&self) -> usize { - self.offsets.len() - 1 - } - - fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { - if validate_utf8 { - if let Err(e) = std::str::from_utf8(data) { - return Err(ParquetError::General(format!( - "encountered non UTF-8 data: {}", - e - ))); - } - } - - self.values.extend_from_slice(data); - - let index_offset = I::from_usize(self.values.len()) - .ok_or_else(|| general_err!("index overflow decoding byte array"))?; - - self.offsets.push(index_offset); - Ok(()) - } -} - -impl BufferQueue for OffsetBuffer { - type Output = Self; - type Slice = Self; - - fn split_off(&mut self, len: usize) -> Self::Output { - let remaining_offsets = self.offsets.len() - len - 1; - let offsets = self.offsets.as_slice(); - - let end_offset = offsets[len]; - - let mut new_offsets = ScalarBuffer::new(); - new_offsets.reserve(remaining_offsets + 1); - for v in &offsets[len..] { - new_offsets.push(*v - end_offset) - } - - self.offsets.resize(len + 1); - - Self { - offsets: std::mem::replace(&mut self.offsets, new_offsets), - values: self.values.take(end_offset.to_usize().unwrap()), - } - } - - fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { - self - } - - fn set_len(&mut self, len: usize) { - assert_eq!(self.offsets.len(), len + 1); - } -} - -impl ValuesBuffer for OffsetBuffer { - fn pad_nulls( - &mut self, - values_range: Range, - levels_range: Range, - rev_position_iter: impl Iterator, - ) { - assert_eq!(self.offsets.len(), values_range.end + 1); - self.offsets.resize(levels_range.end + 1); - - let offsets = self.offsets.as_slice_mut(); - - let values_start = values_range.start; - let mut last_offset = levels_range.end + 1; - - for (value_pos, level_pos) in values_range.rev().zip(rev_position_iter) { - assert!(level_pos >= value_pos); - assert!(level_pos < last_offset); - - if level_pos == value_pos { - // Pad trailing nulls if necessary - if level_pos != last_offset && last_offset == levels_range.end + 1 { - let value = offsets[value_pos]; - for x in &mut offsets[level_pos + 1..last_offset] { - *x = value; - } - } - - // We are done - return; - } - - // Fill in any nulls - let value_end = offsets[value_pos + 1]; - let value_start = offsets[value_pos]; - - for x in &mut offsets[level_pos + 1..last_offset] { - *x = value_end; - } - - offsets[level_pos] = value_start; - last_offset = level_pos; - } - - // Pad leading nulls up to `last_offset` - let value = offsets[values_start]; - for x in &mut offsets[values_start + 1..last_offset] { - *x = value - } - } -} - -impl ValuesBufferSlice for OffsetBuffer { - fn capacity(&self) -> usize { - usize::MAX - } -} - -struct ByteArrayDecoder { +struct ByteArrayColumnValueDecoder { dict: Option>, - decoder: Option, + decoder: Option, validate_utf8: bool, } -impl ColumnValueDecoder for ByteArrayDecoder { +impl ColumnValueDecoder + for ByteArrayColumnValueDecoder +{ type Slice = OffsetBuffer; fn new(desc: &ColumnDescPtr) -> Self { @@ -358,7 +171,8 @@ impl ColumnValueDecoder for ByteArrayDecoder ColumnValueDecoder for ByteArrayDecoder Result<()> { + self.decoder = Some(ByteArrayDecoder::new( + encoding, + data, + num_values, + self.validate_utf8, + )?); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + self.decoder + .as_mut() + .expect("decoder set") + .read(out, range, self.dict.as_ref()) + } +} + +pub enum ByteArrayDecoder { + Plain(ByteArrayDecoderPlain), + Dictionary(ByteArrayDecoderDictionary), + DeltaLength(ByteArrayDecoderDeltaLength), + DeltaByteArray(ByteArrayDecoderDelta), +} + +impl ByteArrayDecoder { + pub fn new( + encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + validate_utf8: bool, + ) -> Result { let decoder = match encoding { - Encoding::PLAIN => StringDecoder::Plain(PlainDecoder::new( + Encoding::PLAIN => ByteArrayDecoder::Plain(ByteArrayDecoderPlain::new( data, num_values, - self.validate_utf8, + validate_utf8, )), Encoding::RLE_DICTIONARY => { - StringDecoder::Dictionary(DictionaryDecoder::new(data)) + ByteArrayDecoder::Dictionary(ByteArrayDecoderDictionary::new(data)) } - Encoding::DELTA_LENGTH_BYTE_ARRAY => StringDecoder::DeltaLength( - DeltaLengthDecoder::new(data, num_values, self.validate_utf8)?, + Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength( + ByteArrayDecoderDeltaLength::new(data, num_values, validate_utf8)?, ), - Encoding::DELTA_BYTE_ARRAY => StringDecoder::DeltaByteArray( - DeltaByteArrayDecoder::new(data, num_values, self.validate_utf8)?, + Encoding::DELTA_BYTE_ARRAY => ByteArrayDecoder::DeltaByteArray( + ByteArrayDecoderDelta::new(data, num_values, validate_utf8)?, ), _ => { return Err(general_err!( @@ -392,41 +237,39 @@ impl ColumnValueDecoder for ByteArrayDecoder) -> Result { + pub fn read( + &mut self, + out: &mut OffsetBuffer, + range: Range, + dict: Option<&OffsetBuffer>, + ) -> Result { let len = range.end - range.start; - match self.decoder.as_mut().expect("decoder set") { - StringDecoder::Plain(d) => d.read(out, len), - StringDecoder::Dictionary(d) => { - let dict = self.dict.as_ref().expect("dictionary set"); + match self { + ByteArrayDecoder::Plain(d) => d.read(out, len), + ByteArrayDecoder::Dictionary(d) => { + let dict = dict.expect("dictionary set"); d.read(out, dict, len) } - StringDecoder::DeltaLength(d) => d.read(out, len), - StringDecoder::DeltaByteArray(d) => d.read(out, len), + ByteArrayDecoder::DeltaLength(d) => d.read(out, len), + ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len), } } } -enum StringDecoder { - Plain(PlainDecoder), - Dictionary(DictionaryDecoder), - DeltaLength(DeltaLengthDecoder), - DeltaByteArray(DeltaByteArrayDecoder), -} - /// Decoder for [`Encoding::PLAIN`] -struct PlainDecoder { +pub struct ByteArrayDecoderPlain { buf: ByteBufferPtr, offset: usize, remaining_values: usize, validate_utf8: bool, } -impl PlainDecoder { - fn new(buf: ByteBufferPtr, values: usize, validate_utf8: bool) -> Self { +impl ByteArrayDecoderPlain { + pub fn new(buf: ByteBufferPtr, values: usize, validate_utf8: bool) -> Self { Self { buf, validate_utf8, @@ -435,7 +278,7 @@ impl PlainDecoder { } } - fn read( + pub fn read( &mut self, output: &mut OffsetBuffer, len: usize, @@ -483,7 +326,7 @@ impl PlainDecoder { } /// Decoder for [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] -struct DeltaLengthDecoder { +pub struct ByteArrayDecoderDeltaLength { lengths: Vec, data: ByteBufferPtr, length_offset: usize, @@ -491,7 +334,7 @@ struct DeltaLengthDecoder { validate_utf8: bool, } -impl DeltaLengthDecoder { +impl ByteArrayDecoderDeltaLength { fn new(data: ByteBufferPtr, values: usize, validate_utf8: bool) -> Result { let mut len_decoder = DeltaBitPackDecoder::::new(); len_decoder.set_data(data.all(), values)?; @@ -543,7 +386,7 @@ impl DeltaLengthDecoder { } /// Decoder for [`Encoding::DELTA_BYTE_ARRAY`] -struct DeltaByteArrayDecoder { +pub struct ByteArrayDecoderDelta { prefix_lengths: Vec, suffix_lengths: Vec, data: ByteBufferPtr, @@ -553,7 +396,7 @@ struct DeltaByteArrayDecoder { validate_utf8: bool, } -impl DeltaByteArrayDecoder { +impl ByteArrayDecoderDelta { fn new(data: ByteBufferPtr, values: usize, validate_utf8: bool) -> Result { let mut prefix = DeltaBitPackDecoder::::new(); prefix.set_data(data.all(), values)?; @@ -616,13 +459,13 @@ impl DeltaByteArrayDecoder { } } -struct DictionaryDecoder { +pub struct ByteArrayDecoderDictionary { decoder: RleDecoder, index_buf: Box<[i32; 1024]>, index_offset: usize, } -impl DictionaryDecoder { +impl ByteArrayDecoderDictionary { fn new(data: ByteBufferPtr) -> Self { let bit_width = data[0]; let mut decoder = RleDecoder::new(bit_width); @@ -657,20 +500,11 @@ impl DictionaryDecoder { let to_read = (len - values_read).min(self.index_buf.len() - self.index_offset); - let offsets = dict.offsets.as_slice(); - let values = dict.values.as_slice(); - - for index in &self.index_buf[self.index_offset..self.index_offset + to_read] { - let index = *index as usize; - if index + 1 >= offsets.len() { - return Err(general_err!("invalid offset in byte array: {}", index)); - } - let start_offset = offsets[index].to_usize().unwrap(); - let end_offset = offsets[index + 1].to_usize().unwrap(); - - // Dictionary values are verified when decoding dictionary page - output.try_push(&values[start_offset..end_offset], false)?; - } + output.extend_from_dictionary( + &self.index_buf[self.index_offset..self.index_offset + to_read], + dict.offsets.as_slice(), + dict.values.as_slice(), + )?; self.index_offset += to_read; values_read += to_read; diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs new file mode 100644 index 000000000000..d77c35db04ad --- /dev/null +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -0,0 +1,315 @@ +use std::any::Any; +use std::marker::PhantomData; +use std::ops::Range; +use std::sync::Arc; + +use arrow::array::{ArrayData, ArrayDataBuilder, ArrayRef, OffsetSizeTrait}; +use arrow::buffer::Buffer; +use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; + +use crate::arrow::array_reader::dictionary_buffer::{DictOrValues, DictionaryBuffer}; +use crate::arrow::array_reader::{ + byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}, + offset_buffer::OffsetBuffer, +}; +use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue}; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{ConvertedType, Encoding}; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; +use crate::encodings::rle::RleDecoder; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use crate::util::bit_util::FromBytes; +use crate::util::memory::ByteBufferPtr; + +macro_rules! make_reader { + ( + ($pages:expr, $column_desc:expr, $data_type:expr, $null_mask_only:expr) => match ($k:expr, $v:expr) { + $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+ + } + ) => { + match (($k, $v)) { + $( + ($key_arrow, $value_arrow) => { + let reader = GenericRecordReader::new_with_options( + $column_desc, + $null_mask_only, + ); + Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new( + $pages, $data_type, reader, + ))) + } + )+ + _ => Err(general_err!( + "unsupported data type for byte array dictionary reader - {}", + $data_type + )), + } + } +} + +pub fn make_byte_array_dictionary_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, + null_mask_only: bool, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; + + match &data_type { + ArrowType::Dictionary(key_type, value_type) => { + make_reader! { + (pages, column_desc, data_type, null_mask_only) => match (key_type.as_ref(), value_type.as_ref()) { + (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8) => (u8, i32), + (ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64), + (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8) => (i8, i32), + (ArrowType::Int8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i8, i64), + (ArrowType::UInt16, ArrowType::Binary | ArrowType::Utf8) => (u16, i32), + (ArrowType::UInt16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u16, i64), + (ArrowType::Int16, ArrowType::Binary | ArrowType::Utf8) => (i16, i32), + (ArrowType::Int16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i16, i64), + (ArrowType::UInt32, ArrowType::Binary | ArrowType::Utf8) => (u32, i32), + (ArrowType::UInt32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u32, i64), + (ArrowType::Int32, ArrowType::Binary | ArrowType::Utf8) => (i32, i32), + (ArrowType::Int32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i32, i64), + (ArrowType::UInt64, ArrowType::Binary | ArrowType::Utf8) => (u64, i32), + (ArrowType::UInt64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u64, i64), + (ArrowType::Int64, ArrowType::Binary | ArrowType::Utf8) => (i64, i32), + (ArrowType::Int64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i64, i64), + } + } + } + _ => Err(general_err!( + "invalid non-dictionary data type for byte array dictionary reader - {}", + data_type + )), + } +} + +struct ByteArrayDictionaryReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option, + rep_levels_buffer: Option, + record_reader: GenericRecordReader, DictionaryDecoder>, +} + +impl ByteArrayDictionaryReader +where + K: FromBytes + ScalarValue + ArrowNativeType, + V: ScalarValue + OffsetSizeTrait, +{ + fn new( + pages: Box, + data_type: ArrowType, + record_reader: GenericRecordReader< + DictionaryBuffer, + DictionaryDecoder, + >, + ) -> Self { + Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + record_reader, + } + } +} + +impl ArrayReader for ByteArrayDictionaryReader +where + K: FromBytes + ScalarValue + ArrowNativeType, + V: ScalarValue + OffsetSizeTrait, +{ + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + let buffer = self.record_reader.consume_record_data()?; + let null_buffer = self.record_reader.consume_bitmap_buffer()?; + let array = buffer.into_array(null_buffer, &self.data_type); + + self.def_levels_buffer = self.record_reader.consume_def_levels()?; + self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.record_reader.reset(); + + Ok(array) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } +} + +enum MaybeRLEDecoder { + RLE(RleDecoder), + Fallback(ByteArrayDecoder), +} + +struct DictionaryDecoder { + /// The current dictionary + dict: Option>, + + /// Dictionary decoder + decoder: Option, + + validate_utf8: bool, + + value_type: ArrowType, + + phantom: PhantomData<(K, V)>, +} + +impl ColumnValueDecoder for DictionaryDecoder +where + K: FromBytes + ScalarValue + ArrowNativeType, + V: ScalarValue + OffsetSizeTrait, +{ + type Slice = DictionaryBuffer; + + fn new(col: &ColumnDescPtr) -> Self { + let validate_utf8 = col.converted_type() == ConvertedType::UTF8; + + let value_type = + match (V::is_large(), col.converted_type() == ConvertedType::UTF8) { + (true, true) => ArrowType::LargeUtf8, + (true, false) => ArrowType::LargeBinary, + (false, true) => ArrowType::Utf8, + (false, false) => ArrowType::Binary, + }; + + Self { + dict: None, + decoder: None, + validate_utf8, + value_type, + phantom: Default::default(), + } + } + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + let mut buffer = OffsetBuffer::::default(); + let mut decoder = + ByteArrayDecoderPlain::new(buf, num_values as usize, self.validate_utf8); + decoder.read(&mut buffer, usize::MAX)?; + + let data = ArrayDataBuilder::new(self.value_type.clone()) + .len(buffer.len()) + .add_buffer(buffer.offsets.into()) + .add_buffer(buffer.values.into()) + .build()?; + + self.dict = Some(Arc::new(data)); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()> { + let decoder = match encoding { + Encoding::RLE_DICTIONARY => { + let bit_width = data[0]; + let mut decoder = RleDecoder::new(bit_width); + decoder.set_data(data.start_from(1)); + MaybeRLEDecoder::RLE(decoder) + } + _ => MaybeRLEDecoder::Fallback(ByteArrayDecoder::new( + encoding, + data, + num_values, + self.validate_utf8, + )?), + }; + + self.decoder = Some(decoder); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + let len = range.end - range.start; + match self.decoder.as_mut().expect("decoder set") { + MaybeRLEDecoder::Fallback(decoder) => { + let output = out.data.get_or_insert_with(|| DictOrValues::Values { + values: Default::default(), + }); + decoder.read(output.spill_values()?, range, None) + } + MaybeRLEDecoder::RLE(decoder) => { + let dict = self.dict.as_ref().expect("dictionary set"); + assert_eq!(dict.data_type(), &self.value_type); + + let output = out.data.get_or_insert_with(|| DictOrValues::Dict { + keys: Default::default(), + values: Arc::clone(dict), + }); + + match output { + DictOrValues::Dict { keys, values } if Arc::ptr_eq(values, dict) => { + // Happy path - can just copy keys + decoder.get_batch(keys.spare_capacity_mut(len)) + } + _ => { + // Sad path - need to recompute dictionary + let values = output.spill_values()?; + let mut keys = vec![K::default(); len]; + let len = decoder.get_batch(&mut keys)?; + + let dict_offsets = unsafe { dict.buffers()[0].typed_data::() }; + let dict_values = &dict.buffers()[1].as_slice(); + + values.extend_from_dictionary( + &keys[..len], + dict_offsets, + dict_values, + )?; + + Ok(len) + } + } + } + } + } +} diff --git a/parquet/src/arrow/array_reader/dictionary_buffer.rs b/parquet/src/arrow/array_reader/dictionary_buffer.rs new file mode 100644 index 000000000000..c9420f89f21f --- /dev/null +++ b/parquet/src/arrow/array_reader/dictionary_buffer.rs @@ -0,0 +1,176 @@ +use crate::arrow::array_reader::offset_buffer::OffsetBuffer; +use crate::arrow::record_reader::buffer::{ + BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, +}; +use crate::column::reader::decoder::ValuesBufferSlice; +use crate::errors::Result; +use arrow::array::{make_array, ArrayData, ArrayDataBuilder, ArrayRef, OffsetSizeTrait}; +use arrow::buffer::Buffer; +use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; +use std::ops::Range; +use std::sync::Arc; + +pub enum DictOrValues { + Dict { + keys: ScalarBuffer, + values: Arc, + }, + Values { + values: OffsetBuffer, + }, +} + +impl + DictOrValues +{ + pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer> { + match self { + Self::Values { values } => Ok(values), + Self::Dict { keys, values } => { + let mut spilled = OffsetBuffer::default(); + let dict_offsets = unsafe { values.buffers()[0].typed_data::() }; + let dict_values = &values.buffers()[1].as_slice(); + + spilled.extend_from_dictionary( + keys.as_slice(), + dict_offsets, + dict_values, + )?; + + *self = Self::Values { values: spilled }; + match self { + Self::Values { values } => Ok(values), + _ => unreachable!(), + } + } + } + } +} + +#[derive(Default)] +pub struct DictionaryBuffer { + /// Whilst decoding we may encounter new dictionaries or pages + /// that aren't dictionary encoded + /// + /// The output buffer therefore needs to be able to accommodate this + pub data: Option>, +} + +impl DictionaryBuffer { + pub fn len(&self) -> usize { + match &self.data { + Some(DictOrValues::Dict { keys, .. }) => keys.len(), + Some(DictOrValues::Values { values }) => values.len(), + None => 0, + } + } + + pub fn into_array( + self, + null_buffer: Option, + data_type: &ArrowType, + ) -> ArrayRef { + match self.data { + Some(DictOrValues::Dict { keys, values }) => { + let mut builder = ArrayDataBuilder::new(data_type.clone()) + .len(keys.len()) + .add_buffer(keys.into()) + .add_child_data(values.as_ref().clone()); + + if let Some(buffer) = null_buffer { + builder = builder.null_bit_buffer(buffer); + } + + let data = match cfg!(debug_assertions) { + true => builder.build().unwrap(), + false => unsafe { builder.build_unchecked() }, + }; + + make_array(data) + } + Some(DictOrValues::Values { values }) => { + let value_type = match data_type { + ArrowType::Dictionary(_, v) => v.as_ref().clone(), + _ => unreachable!(), + }; + + arrow::compute::cast( + &values.into_array(null_buffer, value_type), + data_type, + ) + .expect("cast should be infallible") + } + None => Arc::new(make_array(ArrayData::new_empty(data_type))), + } + } +} + +impl ValuesBufferSlice for DictionaryBuffer { + fn capacity(&self) -> usize { + usize::MAX + } +} + +impl ValuesBuffer + for DictionaryBuffer +{ + fn pad_nulls( + &mut self, + values_range: Range, + levels_range: Range, + rev_valid_position_iter: impl Iterator, + ) { + match self.data.as_mut() { + Some(DictOrValues::Dict { keys, .. }) => { + keys.resize(levels_range.end); + keys.pad_nulls(values_range, levels_range, rev_valid_position_iter) + } + Some(DictOrValues::Values { values, .. }) => { + values.pad_nulls(values_range, levels_range, rev_valid_position_iter) + } + None => {} + } + } +} + +impl BufferQueue + for DictionaryBuffer +{ + type Output = Self; + type Slice = Self; + + fn split_off(&mut self, len: usize) -> Self::Output { + let ret = match &mut self.data { + Some(DictOrValues::Dict { keys, values }) => Self { + data: Some(DictOrValues::Dict { + keys: keys.take(len), + values: values.clone(), + }), + }, + Some(DictOrValues::Values { values }) => Self { + data: Some(DictOrValues::Values { + values: values.split_off(len), + }), + }, + None => Self { data: None }, + }; + + if self.len() == 0 { + self.data = None + } + + ret + } + + fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + self + } + + fn set_len(&mut self, len: usize) { + match &mut self.data { + Some(DictOrValues::Dict { keys, .. }) => keys.set_len(len), + Some(DictOrValues::Values { values }) => values.set_len(len), + None => assert_eq!(len, 0), + } + } +} diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs b/parquet/src/arrow/array_reader/offset_buffer.rs new file mode 100644 index 000000000000..d490e378b3c8 --- /dev/null +++ b/parquet/src/arrow/array_reader/offset_buffer.rs @@ -0,0 +1,189 @@ +use crate::arrow::record_reader::buffer::{ + BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, +}; +use crate::column::reader::decoder::ValuesBufferSlice; +use crate::errors::{ParquetError, Result}; +use arrow::array::{make_array, ArrayDataBuilder, ArrayRef, OffsetSizeTrait}; +use arrow::buffer::Buffer; +use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; +use std::ops::Range; + +pub struct OffsetBuffer { + pub offsets: ScalarBuffer, + pub values: ScalarBuffer, +} + +impl Default for OffsetBuffer { + fn default() -> Self { + let mut offsets = ScalarBuffer::new(); + offsets.resize(1); + Self { + offsets, + values: ScalarBuffer::new(), + } + } +} + +impl OffsetBuffer { + pub fn len(&self) -> usize { + self.offsets.len() - 1 + } + + pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { + if validate_utf8 { + if let Err(e) = std::str::from_utf8(data) { + return Err(ParquetError::General(format!( + "encountered non UTF-8 data: {}", + e + ))); + } + } + + self.values.extend_from_slice(data); + + let index_offset = I::from_usize(self.values.len()) + .ok_or_else(|| general_err!("index overflow decoding byte array"))?; + + self.offsets.push(index_offset); + Ok(()) + } + + pub fn extend_from_dictionary( + &mut self, + keys: &[K], + dict_offsets: &[V], + dict_values: &[u8], + ) -> Result<()> { + for key in keys { + let index = key.to_usize().unwrap(); + if index + 1 >= dict_offsets.len() { + return Err(general_err!("invalid offset in byte array: {}", index)); + } + let start_offset = dict_offsets[index].to_usize().unwrap(); + let end_offset = dict_offsets[index + 1].to_usize().unwrap(); + + // Dictionary values are verified when decoding dictionary page + self.try_push(&dict_values[start_offset..end_offset], false)?; + } + Ok(()) + } + + pub fn into_array( + self, + null_buffer: Option, + data_type: ArrowType, + ) -> ArrayRef { + let mut array_data_builder = ArrayDataBuilder::new(data_type) + .len(self.len()) + .add_buffer(self.offsets.into()) + .add_buffer(self.values.into()); + + if let Some(buffer) = null_buffer { + array_data_builder = array_data_builder.null_bit_buffer(buffer); + } + + let data = match cfg!(debug_assertions) { + true => array_data_builder.build().unwrap(), + false => unsafe { array_data_builder.build_unchecked() } + }; + + make_array(data) + } +} + +impl BufferQueue for OffsetBuffer { + type Output = Self; + type Slice = Self; + + fn split_off(&mut self, len: usize) -> Self::Output { + let remaining_offsets = self.offsets.len() - len - 1; + let offsets = self.offsets.as_slice(); + + let end_offset = offsets[len]; + + let mut new_offsets = ScalarBuffer::new(); + new_offsets.reserve(remaining_offsets + 1); + for v in &offsets[len..] { + new_offsets.push(*v - end_offset) + } + + self.offsets.resize(len + 1); + + Self { + offsets: std::mem::replace(&mut self.offsets, new_offsets), + values: self.values.take(end_offset.to_usize().unwrap()), + } + } + + fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + self + } + + fn set_len(&mut self, len: usize) { + assert_eq!(self.offsets.len(), len + 1); + self.assert_valid(); + } +} + +impl ValuesBuffer for OffsetBuffer { + fn pad_nulls( + &mut self, + values_range: Range, + levels_range: Range, + rev_position_iter: impl Iterator, + ) { + assert_eq!(values_range.start, levels_range.start); + assert_eq!(self.offsets.len(), values_range.end + 1); + self.offsets.resize(levels_range.end + 1); + + let offsets = self.offsets.as_slice_mut(); + + let values_start = values_range.start; + let mut last_offset = levels_range.end + 1; + + for (value_pos, level_pos) in values_range.rev().zip(rev_position_iter) { + assert!(level_pos >= value_pos); + assert!(level_pos < last_offset); + + if level_pos == value_pos { + // Pad leading nulls if necessary + if level_pos != last_offset { + let value = offsets[last_offset]; + for x in &mut offsets[level_pos + 1..last_offset] { + *x = value; + } + } + + self.assert_valid(); + + // We are done + return; + } + + // Fill in any nulls + let value_end = offsets[value_pos + 1]; + let value_start = offsets[value_pos]; + + for x in &mut offsets[level_pos + 1..last_offset] { + *x = value_end; + } + + offsets[level_pos] = value_start; + last_offset = level_pos; + } + + // Pad leading nulls up to `last_offset` + let value = offsets[values_start]; + for x in &mut offsets[values_start + 1..last_offset] { + *x = value + } + + self.assert_valid(); + } +} + +impl ValuesBufferSlice for OffsetBuffer { + fn capacity(&self) -> usize { + usize::MAX + } +} diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index a6fc6a0f8850..62ccf0219376 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -391,13 +391,13 @@ mod tests { RandUtf8Gen, >(2, ConvertedType::NONE, None, &converter); - let converter = Utf8ArrayConverter {}; + let utf8_converter = Utf8ArrayConverter {}; run_single_column_reader_tests::< ByteArrayType, StringArray, Utf8ArrayConverter, RandUtf8Gen, - >(2, ConvertedType::UTF8, None, &converter); + >(2, ConvertedType::UTF8, None, &utf8_converter); run_single_column_reader_tests::< ByteArrayType, @@ -408,25 +408,10 @@ mod tests { 2, ConvertedType::UTF8, Some(ArrowDataType::Utf8), - &converter, + &utf8_converter, ); - run_single_column_reader_tests::< - ByteArrayType, - StringArray, - Utf8ArrayConverter, - RandUtf8Gen, - >( - 2, - ConvertedType::UTF8, - Some(ArrowDataType::Dictionary( - Box::new(ArrowDataType::Int32), - Box::new(ArrowDataType::Utf8), - )), - &converter, - ); - - let converter = LargeUtf8ArrayConverter {}; + let large_utf8_converter = LargeUtf8ArrayConverter {}; run_single_column_reader_tests::< ByteArrayType, LargeStringArray, @@ -436,8 +421,70 @@ mod tests { 2, ConvertedType::UTF8, Some(ArrowDataType::LargeUtf8), - &converter, + &large_utf8_converter, ); + + let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8]; + for key in &small_key_types { + // Cannot run full test suite as keys overflow, run small test instead + single_column_reader_test::< + ByteArrayType, + StringArray, + Utf8ArrayConverter, + RandUtf8Gen, + >( + TestOptions::new(2, 20, 15).with_null_percent(50), + 2, + ConvertedType::UTF8, + Some(ArrowDataType::Dictionary( + Box::new(key.clone()), + Box::new(ArrowDataType::Utf8), + )), + &utf8_converter, + ); + } + + let key_types = [ + ArrowDataType::Int16, + ArrowDataType::UInt16, + ArrowDataType::Int32, + ArrowDataType::UInt32, + ArrowDataType::Int64, + ArrowDataType::UInt64, + ]; + + for key in &key_types { + run_single_column_reader_tests::< + ByteArrayType, + StringArray, + Utf8ArrayConverter, + RandUtf8Gen, + >( + 2, + ConvertedType::UTF8, + Some(ArrowDataType::Dictionary( + Box::new(key.clone()), + Box::new(ArrowDataType::Utf8), + )), + &utf8_converter, + ); + + // https://github.com/apache/arrow-rs/issues/1179 + // run_single_column_reader_tests::< + // ByteArrayType, + // LargeStringArray, + // LargeUtf8ArrayConverter, + // RandUtf8Gen, + // >( + // 2, + // ConvertedType::UTF8, + // Some(ArrowDataType::Dictionary( + // Box::new(key.clone()), + // Box::new(ArrowDataType::LargeUtf8), + // )), + // &large_utf8_converter, + // ); + } } #[test] @@ -735,7 +782,7 @@ mod tests { } } assert_eq!(a.data_type(), b.data_type()); - assert_eq!(a.data(), b.data()); + assert_eq!(a.data(), b.data(), "{:#?} vs {:#?}", a.data(), b.data()); total_read = end; } else { diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 1f9baadd0b7b..fbf261591e7d 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -76,8 +76,12 @@ pub trait BufferQueue: Sized { pub trait ScalarValue {} impl ScalarValue for bool {} impl ScalarValue for u8 {} +impl ScalarValue for i8 {} +impl ScalarValue for u16 {} impl ScalarValue for i16 {} +impl ScalarValue for u32 {} impl ScalarValue for i32 {} +impl ScalarValue for u64 {} impl ScalarValue for i64 {} impl ScalarValue for f32 {} impl ScalarValue for f64 {}