Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fix parquet binary dict page reading (#791)
Browse files Browse the repository at this point in the history
  • Loading branch information
danburkert authored Jan 27, 2022
1 parent c80b39d commit 601fa08
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 40 deletions.
15 changes: 4 additions & 11 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,14 @@ fn read_dict_buffer<O: Offset>(
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let length = values.len() + additional;

let values_iterator = values_iter(indices_buffer, dict, additional);

let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

extend_from_decoder(
validity,
&mut validity_iterator,
length,
additional,
values,
values_iterator,
);
Expand All @@ -69,12 +67,11 @@ fn read_dict_required<O: Offset>(
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
debug_assert_eq!(0, validity.len());
let values_iterator = values_iter(indices_buffer, dict, additional);

for value in values_iterator {
values.push(value);
}
validity.extend_constant(additional, true);
}

struct Offsets<'a, O: Offset>(pub &'a mut Vec<O>);
Expand Down Expand Up @@ -108,8 +105,6 @@ fn read_delta_optional<O: Offset>(
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let length = values.len() + additional;

let Binary {
offsets,
values,
Expand All @@ -129,7 +124,7 @@ fn read_delta_optional<O: Offset>(
extend_from_decoder(
validity,
&mut validity_iterator,
length,
additional,
&mut Offsets::<O>(offsets),
offsets_iterator,
);
Expand All @@ -146,8 +141,6 @@ fn read_plain_optional<O: Offset>(
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let length = values.len() + additional;

// values_buffer: first 4 bytes are len, remaining is values
let values_iterator = utils::BinaryIter::new(values_buffer);

Expand All @@ -156,7 +149,7 @@ fn read_plain_optional<O: Offset>(
extend_from_decoder(
validity,
&mut validity_iterator,
length,
additional,
values,
values_iterator,
)
Expand Down
7 changes: 4 additions & 3 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ where
ArrowError: From<E>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let is_nullable = nested.pop().unwrap().is_nullable();
let capacity = metadata.num_values() as usize;
let mut values = Binary::<O>::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

let is_nullable = nested.pop().unwrap().is_nullable();
let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable));

if nested.is_empty() {
while let Some(page) = iter.next()? {
basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)?
}
debug_assert_eq!(values.len(), capacity);
debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable));
} else {
while let Some(page) = iter.next()? {
nested::extend_from_page(
Expand Down
7 changes: 4 additions & 3 deletions src/io/parquet/read/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ where
ArrowError: From<E>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let is_nullable = nested.pop().unwrap().is_nullable();
let capacity = metadata.num_values() as usize;
let mut values = MutableBitmap::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

let is_nullable = nested.pop().unwrap().is_nullable();
let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable));

if nested.is_empty() {
while let Some(page) = iter.next()? {
basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)?
}
debug_assert_eq!(values.len(), capacity);
debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable));
} else {
while let Some(page) = iter.next()? {
nested::extend_from_page(
Expand Down
11 changes: 6 additions & 5 deletions src/io/parquet/read/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,12 @@ pub(crate) fn read_dict_required(
values: &mut FixedSizeBinary,
validity: &mut MutableBitmap,
) {
let size = values.size;
debug_assert!(validity.is_empty());

let values_iter = values_iter(indices_buffer, dict.values(), values.size, additional);

for value in values_iter {
values.push(value);
}
validity.extend_constant(additional * size, true);
}

pub(crate) fn read_optional(
Expand Down Expand Up @@ -114,14 +112,17 @@ where
ArrowError: From<E>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let is_nullable = metadata.descriptor().max_def_level() == 1;
let size = FixedSizeBinaryArray::get_size(&data_type);

let capacity = metadata.num_values() as usize;
let mut values = FixedSizeBinary::with_capacity(capacity, size);
let mut validity = MutableBitmap::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable));

while let Some(page) = iter.next()? {
extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)?
}
debug_assert_eq!(values.len(), capacity);
debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable));

Ok(FixedSizeBinaryArray::from_data(
data_type,
Expand Down
5 changes: 5 additions & 0 deletions src/io/parquet/read/fixed_size_binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ impl FixedSizeBinary {
self.values
.resize(self.values.len() + additional * self.size, 0);
}

#[inline]
pub fn len(&mut self) -> usize {
self.values.len() / self.size
}
}

impl Pushable<&[u8]> for FixedSizeBinary {
Expand Down
10 changes: 3 additions & 7 deletions src/io/parquet/read/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,17 @@ fn read_dict_buffer_optional<T, A, F>(
A: ArrowNativeType,
F: Fn(T) -> A,
{
let length = additional + values.len();

let values_iterator = values_iter(indices_buffer, dict.values(), additional, op);

let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

extend_from_decoder(
validity,
&mut validity_iterator,
length,
additional,
values,
values_iterator,
)
);
}

fn read_dict_buffer_required<T, A, F>(
Expand All @@ -73,11 +71,9 @@ fn read_dict_buffer_required<T, A, F>(
A: ArrowNativeType,
F: Fn(T) -> A,
{
debug_assert_eq!(0, validity.len());
let values_iterator = values_iter(indices_buffer, dict.values(), additional, op);

values.extend(values_iterator);

validity.extend_constant(additional, true);
}

fn read_nullable<T, A, F>(
Expand Down
7 changes: 4 additions & 3 deletions src/io/parquet/read/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ where
F: Copy + Fn(T) -> A,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
{
let is_nullable = nested.pop().unwrap().is_nullable();
let capacity = metadata.num_values() as usize;
let mut values = Vec::<A>::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

let is_nullable = nested.pop().unwrap().is_nullable();
let mut validity = MutableBitmap::with_capacity(capacity * usize::from(is_nullable));

if nested.is_empty() {
while let Some(page) = iter.next()? {
basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity, op)?
}
debug_assert_eq!(values.len(), capacity);
debug_assert_eq!(validity.len(), capacity * usize::from(is_nullable));
} else {
while let Some(page) = iter.next()? {
nested::extend_from_page(
Expand Down
17 changes: 9 additions & 8 deletions src/io/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,13 @@ pub(super) fn extend_from_decoder<'a, T: Default, C: Pushable<T>, I: Iterator<It
values: &mut C,
mut values_iter: I,
) {
let remaining = page_length;
let mut remaining = page_length;
for run in decoder {
match run {
hybrid_rle::HybridEncoded::Bitpacked(pack) => {
// compute the length of the pack
let pack_size = pack.len() * 8;
let pack_remaining = page_length;
let length = std::cmp::min(pack_size, pack_remaining);

let additional = remaining.min(length);
let additional = pack_size.min(remaining);

// extend validity
validity.extend_from_slice(pack, 0, additional);
Expand All @@ -140,13 +137,13 @@ pub(super) fn extend_from_decoder<'a, T: Default, C: Pushable<T>, I: Iterator<It
values.push_null()
};
}

remaining -= additional;
}
hybrid_rle::HybridEncoded::Rle(value, length) => {
hybrid_rle::HybridEncoded::Rle(value, additional) => {
let is_set = value[0] == 1;

// extend validity
let length = length;
let additional = remaining.min(length);
validity.extend_constant(additional, is_set);

// extend values
Expand All @@ -155,9 +152,13 @@ pub(super) fn extend_from_decoder<'a, T: Default, C: Pushable<T>, I: Iterator<It
} else {
values.extend_constant(additional, T::default());
}

remaining -= additional;
}
}
}

debug_assert_eq!(remaining, 0);
}

pub(super) fn read_dict_optional<K>(
Expand Down

0 comments on commit 601fa08

Please sign in to comment.