Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Merge branch 'jorgecarleitao:main' into mutable-struct-push-unchecked
Browse files Browse the repository at this point in the history
  • Loading branch information
hohav authored Aug 17, 2022
2 parents 1b3ffc9 + 0b345ae commit a54672b
Show file tree
Hide file tree
Showing 23 changed files with 742 additions and 229 deletions.
2 changes: 1 addition & 1 deletion benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> {

let schema = schema.filter(|index, _| index == column);

let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None);
let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None, None);

for maybe_chunk in reader {
let columns = maybe_chunk?;
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn main() -> Result<(), Error> {
.collect();

// we can then read the row groups into chunks
let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None);
let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None, None);

let start = SystemTime::now();
for maybe_chunk in chunks {
Expand Down
11 changes: 9 additions & 2 deletions examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ async fn main() -> Result<()> {
for row_group in &metadata.row_groups {
// A row group is consumed in two steps: the first step is to read the (compressed)
// columns into memory, which is IO-bounded.
let column_chunks =
read::read_columns_many_async(factory, row_group, schema.fields.clone(), None).await?;
let column_chunks = read::read_columns_many_async(
factory,
row_group,
schema.fields.clone(),
None,
None,
None,
)
.await?;

// the second step is to iterate over the columns in chunks.
// this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block
Expand Down
2 changes: 1 addition & 1 deletion src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl Bitmap {

/// Returns the number of unset bits on this [`Bitmap`].
///
/// Guaranted to be `<= self.len()`.
/// Guaranteed to be `<= self.len()`.
/// # Implementation
/// This function is `O(1)` - the number of unset bits is computed when the bitmap is
/// created
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod stream_async;
pub mod file_async;

pub(crate) use common::first_dict_field;
#[cfg(feature = "io_flight")]
pub(crate) use common::{read_dictionary, read_record_batch};
pub use file::{read_batch, read_file_dictionaries, read_file_metadata, FileMetadata};
pub use reader::FileReader;
Expand Down
170 changes: 131 additions & 39 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::default::Default;

use parquet2::{
deserialize::SliceFilteredIter,
encoding::{hybrid_rle, Encoding},
encoding::{delta_length_byte_array, hybrid_rle, Encoding},
page::{split_buffer, DataPage, DictPage},
schema::Repetition,
};
Expand All @@ -23,44 +23,6 @@ use super::super::utils::{
use super::super::Pages;
use super::{super::utils, utils::*};

/*
fn read_delta_optional<O: Offset>(
validity_buffer: &[u8],
values_buffer: &[u8],
additional: usize,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let Binary {
offsets,
values,
last_offset,
} = values;
// values_buffer: first 4 bytes are len, remaining is values
let mut values_iterator = delta_length_byte_array::Decoder::new(values_buffer);
let offsets_iterator = values_iterator.by_ref().map(|x| {
*last_offset += O::from_usize(x as usize).unwrap();
*last_offset
});
let mut page_validity = OptionalPageValidity::new(validity_buffer, additional);
// offsets:
extend_from_decoder(
validity,
&mut page_validity,
None,
offsets,
offsets_iterator,
);
// values:
let new_values = values_iterator.into_values();
values.extend_from_slice(new_values);
}
*/

#[derive(Debug)]
pub(super) struct Required<'a> {
pub values: SizedBinaryIter<'a>,
Expand All @@ -79,6 +41,52 @@ impl<'a> Required<'a> {
}
}

#[derive(Debug)]
pub(super) struct Delta<'a> {
pub lengths: std::vec::IntoIter<usize>,
pub values: &'a [u8],
}

impl<'a> Delta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;

let mut lengths_iter = delta_length_byte_array::Decoder::new(values);

#[allow(clippy::needless_collect)] // we need to consume it to get the values
let lengths = lengths_iter
.by_ref()
.map(|x| x as usize)
.collect::<Vec<_>>();

let values = lengths_iter.into_values();
Ok(Self {
lengths: lengths.into_iter(),
values,
})
}

pub fn len(&self) -> usize {
self.lengths.size_hint().0
}
}

impl<'a> Iterator for Delta<'a> {
type Item = &'a [u8];

#[inline]
fn next(&mut self) -> Option<Self::Item> {
let length = self.lengths.next()?;
let (item, remaining) = self.values.split_at(length);
self.values = remaining;
Some(item)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.lengths.size_hint()
}
}

#[derive(Debug)]
pub(super) struct FilteredRequired<'a> {
pub values: SliceFilteredIter<SizedBinaryIter<'a>>,
Expand All @@ -99,6 +107,26 @@ impl<'a> FilteredRequired<'a> {
}
}

#[derive(Debug)]
pub(super) struct FilteredDelta<'a> {
pub values: SliceFilteredIter<Delta<'a>>,
}

impl<'a> FilteredDelta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let values = Delta::try_new(page)?;

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Ok(Self { values })
}

pub fn len(&self) -> usize {
self.values.size_hint().0
}
}

pub(super) type Dict = Vec<Vec<u8>>;

#[derive(Debug)]
Expand Down Expand Up @@ -167,7 +195,11 @@ enum State<'a> {
Required(Required<'a>),
RequiredDictionary(RequiredDictionary<'a>),
OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a>),
Delta(Delta<'a>),
OptionalDelta(OptionalPageValidity<'a>, Delta<'a>),
FilteredRequired(FilteredRequired<'a>),
FilteredDelta(FilteredDelta<'a>),
FilteredOptionalDelta(FilteredOptionalPageValidity<'a>, Delta<'a>),
FilteredOptional(FilteredOptionalPageValidity<'a>, BinaryIter<'a>),
FilteredRequiredDictionary(FilteredRequiredDictionary<'a>),
FilteredOptionalDictionary(FilteredOptionalPageValidity<'a>, ValuesDictionary<'a>),
Expand All @@ -178,10 +210,14 @@ impl<'a> utils::PageState<'a> for State<'a> {
match self {
State::Optional(validity, _) => validity.len(),
State::Required(state) => state.len(),
State::Delta(state) => state.len(),
State::OptionalDelta(state, _) => state.len(),
State::RequiredDictionary(values) => values.len(),
State::OptionalDictionary(optional, _) => optional.len(),
State::FilteredRequired(state) => state.len(),
State::FilteredOptional(validity, _) => validity.len(),
State::FilteredDelta(state) => state.len(),
State::FilteredOptionalDelta(state, _) => state.len(),
State::FilteredRequiredDictionary(values) => values.len(),
State::FilteredOptionalDictionary(optional, _) => optional.len(),
}
Expand Down Expand Up @@ -284,6 +320,20 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
BinaryIter::new(values),
))
}
(Encoding::DeltaLengthByteArray, _, false, false) => {
Delta::try_new(page).map(State::Delta)
}
(Encoding::DeltaLengthByteArray, _, true, false) => Ok(State::OptionalDelta(
OptionalPageValidity::try_new(page)?,
Delta::try_new(page)?,
)),
(Encoding::DeltaLengthByteArray, _, false, true) => {
FilteredDelta::try_new(page).map(State::FilteredDelta)
}
(Encoding::DeltaLengthByteArray, _, true, true) => Ok(State::FilteredOptionalDelta(
FilteredOptionalPageValidity::try_new(page)?,
Delta::try_new(page)?,
)),
_ => Err(utils::not_implemented(page)),
}
}
Expand Down Expand Up @@ -315,11 +365,44 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
values.push(x)
}
}
State::Delta(page) => {
values.extend_lengths(page.lengths.by_ref().take(additional), &mut page.values);
}
State::OptionalDelta(page_validity, page_values) => {
let Binary {
offsets,
values: values_,
last_offset,
} = values;

let offset = *last_offset;
extend_from_decoder(
validity,
page_validity,
Some(additional),
offsets,
page_values.lengths.by_ref().map(|x| {
*last_offset += O::from_usize(x).unwrap();
*last_offset
}),
);

let length = *last_offset - offset;

let (consumed, remaining) = page_values.values.split_at(length.to_usize());
page_values.values = remaining;
values_.extend_from_slice(consumed);
}
State::FilteredRequired(page) => {
for x in page.values.by_ref().take(additional) {
values.push(x)
}
}
State::FilteredDelta(page) => {
for x in page.values.by_ref().take(additional) {
values.push(x)
}
}
State::OptionalDictionary(page_validity, page_values) => {
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
Expand Down Expand Up @@ -348,6 +431,15 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
page_values.by_ref(),
);
}
State::FilteredOptionalDelta(page_validity, page_values) => {
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
page_values.by_ref(),
);
}
State::FilteredRequiredDictionary(page) => {
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
Expand Down
22 changes: 22 additions & 0 deletions src/io/parquet/read/deserialize/binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ pub struct Binary<O: Offset> {
#[derive(Debug)]
pub struct Offsets<O: Offset>(pub Vec<O>);

impl<O: Offset> Offsets<O> {
#[inline]
pub fn extend_lengths<I: Iterator<Item = usize>>(&mut self, lengths: I) {
let mut last_offset = *self.0.last().unwrap();
self.0.extend(lengths.map(|length| {
last_offset += O::from_usize(length).unwrap();
last_offset
}));
}
}

impl<O: Offset> Pushable<O> for Offsets<O> {
#[inline]
fn len(&self) -> usize {
Expand Down Expand Up @@ -63,6 +74,17 @@ impl<O: Offset> Binary<O> {
pub fn len(&self) -> usize {
self.offsets.len()
}

#[inline]
pub fn extend_lengths<I: Iterator<Item = usize>>(&mut self, lengths: I, values: &mut &[u8]) {
let current_offset = self.last_offset;
self.offsets.extend_lengths(lengths);
self.last_offset = *self.offsets.0.last().unwrap(); // guaranteed to have one
let length = self.last_offset.to_usize() - current_offset.to_usize();
let (consumed, remaining) = values.split_at(length);
*values = remaining;
self.values.extend_from_slice(consumed);
}
}

impl<'a, O: Offset> Pushable<&'a [u8]> for Binary<O> {
Expand Down
Loading

0 comments on commit a54672b

Please sign in to comment.