Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed limit pushdown in parquet (#1180)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jul 25, 2022
1 parent 4fc223d commit e810d39
Show file tree
Hide file tree
Showing 23 changed files with 474 additions and 192 deletions.
5 changes: 4 additions & 1 deletion src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,16 +426,18 @@ pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
data_type: DataType,
items: VecDeque<(Binary<O>, MutableBitmap)>,
chunk_size: Option<usize>,
remaining: usize,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iter<O, A, I> {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, num_rows: usize) -> Self {
Self {
iter,
data_type,
items: VecDeque::new(),
chunk_size,
remaining: num_rows,
phantom_a: Default::default(),
}
}
Expand All @@ -448,6 +450,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for Iter<O, A, I>
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.remaining,
self.chunk_size,
&BinaryDecoder::<O>::default(),
);
Expand Down
12 changes: 10 additions & 2 deletions src/io/parquet/read/deserialize/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ where
data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
remaining: usize,
chunk_size: Option<usize>,
phantom: std::marker::PhantomData<O>,
}
Expand All @@ -36,12 +37,13 @@ where
O: Offset,
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option<usize>) -> Self {
Self {
iter,
data_type,
values: Dict::Empty,
items: VecDeque::new(),
remaining: num_rows,
chunk_size,
phantom: std::marker::PhantomData,
}
Expand Down Expand Up @@ -93,6 +95,7 @@ where
&mut self.items,
&mut self.values,
self.data_type.clone(),
&mut self.remaining,
self.chunk_size,
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
Expand All @@ -117,6 +120,7 @@ where
data_type: DataType,
values: Dict,
items: VecDeque<(NestedState, (Vec<K>, MutableBitmap))>,
remaining: usize,
chunk_size: Option<usize>,
phantom: std::marker::PhantomData<O>,
}
Expand All @@ -131,6 +135,7 @@ where
iter: I,
init: Vec<InitNested>,
data_type: DataType,
num_rows: usize,
chunk_size: Option<usize>,
) -> Self {
Self {
Expand All @@ -139,6 +144,7 @@ where
data_type,
values: Dict::Empty,
items: VecDeque::new(),
remaining: num_rows,
chunk_size,
phantom: Default::default(),
}
Expand All @@ -157,6 +163,7 @@ where
let maybe_state = nested_next_dict(
&mut self.iter,
&mut self.items,
&mut self.remaining,
&self.init,
&mut self.values,
self.data_type.clone(),
Expand All @@ -177,6 +184,7 @@ pub fn iter_to_arrays_nested<'a, K, O, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
num_rows: usize,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
Expand All @@ -185,7 +193,7 @@ where
K: DictionaryKey,
{
Box::new(
NestedDictIter::<K, O, I>::new(iter, init, data_type, chunk_size).map(|result| {
NestedDictIter::<K, O, I>::new(iter, init, data_type, num_rows, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
Expand Down
16 changes: 11 additions & 5 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ pub struct NestedIter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
init: Vec<InitNested>,
items: VecDeque<(NestedState, (Binary<O>, MutableBitmap))>,
chunk_size: Option<usize>,
remaining: usize,
phantom_a: std::marker::PhantomData<A>,
}

Expand All @@ -156,6 +157,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> NestedIter<O, A, I> {
iter: I,
init: Vec<InitNested>,
data_type: DataType,
num_rows: usize,
chunk_size: Option<usize>,
) -> Self {
Self {
Expand All @@ -164,6 +166,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> NestedIter<O, A, I> {
init,
items: VecDeque::new(),
chunk_size,
remaining: num_rows,
phantom_a: Default::default(),
}
}
Expand All @@ -176,6 +179,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for NestedIter<O,
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.remaining,
&self.init,
self.chunk_size,
&BinaryDecoder::<O>::default(),
Expand All @@ -196,6 +200,7 @@ pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
num_rows: usize,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
Expand All @@ -204,11 +209,12 @@ where
O: Offset,
{
Box::new(
NestedIter::<O, A, I>::new(iter, init, data_type, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
let array = Box::new(array) as Box<dyn Array>;
Ok((nested, array))
NestedIter::<O, A, I>::new(iter, init, data_type, num_rows, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = Box::new(array) as Box<dyn Array>;
(nested, values)
})
}),
)
}
5 changes: 4 additions & 1 deletion src/io/parquet/read/deserialize/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,17 @@ pub struct Iter<I: DataPages> {
data_type: DataType,
items: VecDeque<(MutableBitmap, MutableBitmap)>,
chunk_size: Option<usize>,
remaining: usize,
}

impl<I: DataPages> Iter<I> {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, num_rows: usize) -> Self {
Self {
iter,
data_type,
items: VecDeque::new(),
chunk_size,
remaining: num_rows,
}
}
}
Expand All @@ -212,6 +214,7 @@ impl<I: DataPages> Iterator for Iter<I> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.remaining,
self.chunk_size,
&BooleanDecoder::default(),
);
Expand Down
16 changes: 11 additions & 5 deletions src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,17 @@ pub struct NestedIter<I: DataPages> {
iter: I,
init: Vec<InitNested>,
items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>,
remaining: usize,
chunk_size: Option<usize>,
}

impl<I: DataPages> NestedIter<I> {
pub fn new(iter: I, init: Vec<InitNested>, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, init: Vec<InitNested>, num_rows: usize, chunk_size: Option<usize>) -> Self {
Self {
iter,
init,
items: VecDeque::new(),
remaining: num_rows,
chunk_size,
}
}
Expand All @@ -130,6 +132,7 @@ impl<I: DataPages> Iterator for NestedIter<I> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.remaining,
&self.init,
self.chunk_size,
&BooleanDecoder::default(),
Expand All @@ -149,14 +152,17 @@ impl<I: DataPages> Iterator for NestedIter<I> {
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: Vec<InitNested>,
num_rows: usize,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: DataPages,
{
Box::new(NestedIter::new(iter, init, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
Box::new(NestedIter::new(iter, init, num_rows, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = array.boxed();
(nested, values)
})
}))
}
9 changes: 8 additions & 1 deletion src/io/parquet/read/deserialize/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ pub(super) fn next_dict<
items: &mut VecDeque<(Vec<K>, MutableBitmap)>,
dict: &mut Dict,
data_type: DataType,
remaining: &mut usize,
chunk_size: Option<usize>,
read_dict: F,
) -> MaybeNext<Result<DictionaryArray<K>>> {
Expand Down Expand Up @@ -288,7 +289,13 @@ pub(super) fn next_dict<
Err(e) => return MaybeNext::Some(Err(e)),
};

utils::extend_from_new_page(page, chunk_size, items, &PrimitiveDecoder::<K>::default());
utils::extend_from_new_page(
page,
chunk_size,
items,
remaining,
&PrimitiveDecoder::<K>::default(),
);

if items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX) {
MaybeNext::More
Expand Down
3 changes: 3 additions & 0 deletions src/io/parquet/read/deserialize/dictionary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,11 @@ impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder<K> {
}
}

#[allow(clippy::too_many_arguments)]
pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box<dyn Array>>(
iter: &'a mut I,
items: &mut VecDeque<(NestedState, (Vec<K>, MutableBitmap))>,
remaining: &mut usize,
init: &[InitNested],
dict: &mut Dict,
data_type: DataType,
Expand Down Expand Up @@ -171,6 +173,7 @@ pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box
page,
init,
items,
remaining,
&DictionaryDecoder::<K>::default(),
chunk_size,
);
Expand Down
5 changes: 4 additions & 1 deletion src/io/parquet/read/deserialize/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,19 @@ pub struct Iter<I: DataPages> {
size: usize,
items: VecDeque<(FixedSizeBinary, MutableBitmap)>,
chunk_size: Option<usize>,
remaining: usize,
}

impl<I: DataPages> Iter<I> {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option<usize>) -> Self {
let size = FixedSizeBinaryArray::get_size(&data_type);
Self {
iter,
data_type,
size,
items: VecDeque::new(),
chunk_size,
remaining: num_rows,
}
}
}
Expand All @@ -308,6 +310,7 @@ impl<I: DataPages> Iterator for Iter<I> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.remaining,
self.chunk_size,
&BinaryDecoder { size: self.size },
);
Expand Down
12 changes: 10 additions & 2 deletions src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ where
data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
remaining: usize,
chunk_size: Option<usize>,
}

Expand All @@ -33,12 +34,13 @@ where
K: DictionaryKey,
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option<usize>) -> Self {
Self {
iter,
data_type,
values: Dict::Empty,
items: VecDeque::new(),
remaining: num_rows,
chunk_size,
}
}
Expand Down Expand Up @@ -75,6 +77,7 @@ where
&mut self.items,
&mut self.values,
self.data_type.clone(),
&mut self.remaining,
self.chunk_size,
|dict| read_dict(self.data_type.clone(), dict),
);
Expand All @@ -98,6 +101,7 @@ where
data_type: DataType,
values: Dict,
items: VecDeque<(NestedState, (Vec<K>, MutableBitmap))>,
remaining: usize,
chunk_size: Option<usize>,
}

Expand All @@ -110,13 +114,15 @@ where
iter: I,
init: Vec<InitNested>,
data_type: DataType,
num_rows: usize,
chunk_size: Option<usize>,
) -> Self {
Self {
iter,
init,
data_type,
values: Dict::Empty,
remaining: num_rows,
items: VecDeque::new(),
chunk_size,
}
Expand All @@ -134,6 +140,7 @@ where
let maybe_state = nested_next_dict(
&mut self.iter,
&mut self.items,
&mut self.remaining,
&self.init,
&mut self.values,
self.data_type.clone(),
Expand All @@ -154,14 +161,15 @@ pub fn iter_to_arrays_nested<'a, K, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
num_rows: usize,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
K: DictionaryKey,
{
Box::new(
NestedDictIter::<K, I>::new(iter, init, data_type, chunk_size).map(|result| {
NestedDictIter::<K, I>::new(iter, init, data_type, num_rows, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
Expand Down
Loading

0 comments on commit e810d39

Please sign in to comment.