diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index 0f10f0578d5..f678ba204a9 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -423,12 +423,12 @@ pub struct Iter, I: DataPages> { iter: I, data_type: DataType, items: VecDeque<(Binary, MutableBitmap)>, - chunk_size: usize, + chunk_size: Option, phantom_a: std::marker::PhantomData, } impl, I: DataPages> Iter { - pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { Self { iter, data_type, diff --git a/src/io/parquet/read/deserialize/binary/dictionary.rs b/src/io/parquet/read/deserialize/binary/dictionary.rs index 4f8adc70268..3cdc179947c 100644 --- a/src/io/parquet/read/deserialize/binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/binary/dictionary.rs @@ -25,7 +25,7 @@ where data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, - chunk_size: usize, + chunk_size: Option, phantom: std::marker::PhantomData, } @@ -35,7 +35,7 @@ where O: Offset, I: DataPages, { - pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { let data_type = match data_type { DataType::Dictionary(_, values, _) => values.as_ref().clone(), _ => unreachable!(), diff --git a/src/io/parquet/read/deserialize/binary/mod.rs b/src/io/parquet/read/deserialize/binary/mod.rs index 103c6c5fcab..a351434d4ad 100644 --- a/src/io/parquet/read/deserialize/binary/mod.rs +++ b/src/io/parquet/read/deserialize/binary/mod.rs @@ -25,7 +25,7 @@ pub fn iter_to_arrays_nested<'a, O, A, I>( iter: I, init: Vec, data_type: DataType, - chunk_size: usize, + chunk_size: Option, ) -> NestedArrayIter<'a> where I: 'a + DataPages, diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 2cc11e5b95b..aeb87a352c5 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -146,12 +146,17 @@ pub struct ArrayIterator, I: DataPages> { init: Vec, items: VecDeque<(Binary, MutableBitmap)>, nested: VecDeque, - chunk_size: usize, + chunk_size: Option, phantom_a: std::marker::PhantomData, } impl, I: DataPages> ArrayIterator { - pub fn new(iter: I, init: Vec, data_type: DataType, chunk_size: usize) -> Self { + pub fn new( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + ) -> Self { Self { iter, data_type, diff --git a/src/io/parquet/read/deserialize/boolean/basic.rs b/src/io/parquet/read/deserialize/boolean/basic.rs index 7ef7a5de89d..cca21bae28e 100644 --- a/src/io/parquet/read/deserialize/boolean/basic.rs +++ b/src/io/parquet/read/deserialize/boolean/basic.rs @@ -188,11 +188,11 @@ pub struct Iter { iter: I, data_type: DataType, items: VecDeque<(MutableBitmap, MutableBitmap)>, - chunk_size: usize, + chunk_size: Option, } impl Iter { - pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { Self { iter, data_type, diff --git a/src/io/parquet/read/deserialize/boolean/mod.rs b/src/io/parquet/read/deserialize/boolean/mod.rs index e928c86b56a..9caa2dadfe2 100644 --- a/src/io/parquet/read/deserialize/boolean/mod.rs +++ b/src/io/parquet/read/deserialize/boolean/mod.rs @@ -13,7 +13,7 @@ pub use self::basic::Iter; pub fn iter_to_arrays_nested<'a, I: 'a>( iter: I, init: Vec, - chunk_size: usize, + chunk_size: Option, ) -> NestedArrayIter<'a> where I: DataPages, diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index b2d83d57e7a..8b162908cb3 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -118,11 +118,11 @@ pub struct ArrayIterator { // invariant: items.len() == nested.len() items: VecDeque<(MutableBitmap, MutableBitmap)>, nested: VecDeque, - chunk_size: usize, + chunk_size: Option, } impl ArrayIterator { - pub fn new(iter: I, init: Vec, chunk_size: usize) -> Self { + pub fn new(iter: I, init: Vec, chunk_size: Option) -> Self { Self { iter, init, diff --git a/src/io/parquet/read/deserialize/dictionary.rs b/src/io/parquet/read/deserialize/dictionary.rs index b37cb46d7fe..a6b545d6b01 100644 --- a/src/io/parquet/read/deserialize/dictionary.rs +++ b/src/io/parquet/read/deserialize/dictionary.rs @@ -216,7 +216,7 @@ pub(super) fn next_dict< iter: &'a mut I, items: &mut VecDeque<(Vec, MutableBitmap)>, dict: &mut Dict, - chunk_size: usize, + chunk_size: Option, read_dict: F, ) -> MaybeNext>> { if items.len() > 1 { @@ -249,7 +249,7 @@ pub(super) fn next_dict< utils::extend_from_new_page(page, chunk_size, items, &PrimitiveDecoder::::default()); - if items.front().unwrap().len() < chunk_size { + if items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX) { MaybeNext::More } else { let (values, validity) = items.pop_front().unwrap(); @@ -262,7 +262,7 @@ pub(super) fn next_dict< if let Some((values, validity)) = items.pop_front() { // we have a populated item and no more pages // the only case where an item's length may be smaller than chunk_size - debug_assert!(values.len() <= chunk_size); + debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX)); let keys = finish_key(values, validity); diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs index afcb72bf4e4..bbf8abb00bc 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs @@ -289,11 +289,11 @@ pub struct Iter { data_type: DataType, size: usize, items: VecDeque<(FixedSizeBinary, MutableBitmap)>, - chunk_size: usize, + chunk_size: Option, } impl Iter { - pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { let size = FixedSizeBinaryArray::get_size(&data_type); Self { iter, diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs index 4dc4fbcc81c..d3150d7e819 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs @@ -24,7 +24,7 @@ where data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, - chunk_size: usize, + chunk_size: Option, } impl DictIter @@ -32,7 +32,7 @@ where K: DictionaryKey, I: DataPages, { - pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { let data_type = match data_type { DataType::Dictionary(_, values, _) => values.as_ref().clone(), _ => unreachable!(), diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 3d157916977..f80a7becad7 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -10,6 +10,9 @@ mod simple; mod struct_; mod utils; +use parquet2::read::get_page_iterator as _get_page_iterator; +use parquet2::schema::types::PrimitiveType; + use crate::{ array::{Array, BinaryArray, FixedSizeListArray, ListArray, MapArray, Utf8Array}, datatypes::{DataType, Field}, @@ -17,7 +20,6 @@ use crate::{ }; use self::nested_utils::{InitNested, NestedArrayIter, NestedState}; -use parquet2::schema::types::PrimitiveType; use simple::page_iter_to_arrays; use super::*; @@ -94,7 +96,7 @@ fn columns_to_iter_recursive<'a, I: 'a>( mut types: Vec<&PrimitiveType>, field: Field, mut init: Vec, - chunk_size: usize, + chunk_size: Option, ) -> Result> where I: DataPages, @@ -359,12 +361,15 @@ fn n_columns(data_type: &DataType) -> usize { /// An iterator adapter that maps multiple iterators of [`DataPages`] into an iterator of [`Array`]s. /// +/// For a non-nested datatypes such as [`DataType::Int32`], this function requires a single element in `columns` and `types`. +/// For nested types, `columns` must be composed by all parquet columns with associated types `types`. +/// /// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`. pub fn column_iter_to_arrays<'a, I: 'a>( columns: Vec, types: Vec<&PrimitiveType>, field: Field, - chunk_size: usize, + chunk_size: Option, ) -> Result> where I: DataPages, diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 5fbfd51cf38..613f6106e4e 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -363,8 +363,11 @@ pub fn extend_offsets1<'a>( page: &mut NestedPage<'a>, init: &[InitNested], items: &mut VecDeque, - chunk_size: usize, + chunk_size: Option, ) { + let capacity = chunk_size.unwrap_or(0); + let chunk_size = chunk_size.unwrap_or(usize::MAX); + let mut nested = if let Some(nested) = items.pop_back() { // there is a already a state => it must be incomplete... debug_assert!( @@ -374,7 +377,7 @@ pub fn extend_offsets1<'a>( nested } else { // there is no state => initialize it - init_nested(init, chunk_size) + init_nested(init, capacity) }; let remaining = chunk_size - nested.len(); @@ -384,7 +387,7 @@ pub fn extend_offsets1<'a>( items.push_back(nested); while page.len() > 0 { - let mut nested = init_nested(init, chunk_size); + let mut nested = init_nested(init, capacity); extend_offsets2(page, &mut nested, chunk_size); items.push_back(nested); } @@ -425,7 +428,7 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi let next_rep = page.iter.peek().map(|x| x.0).unwrap_or(0); - if next_rep == 0 && rows == additional + 1 { + if next_rep == 0 && rows == additional.saturating_add(1) { break; } } @@ -478,7 +481,7 @@ pub(super) fn next<'a, I, D>( items: &mut VecDeque, nested_items: &mut VecDeque, init: &[InitNested], - chunk_size: usize, + chunk_size: Option, decoder: &D, ) -> MaybeNext> where @@ -517,7 +520,7 @@ where extend_from_new_page(page, items, nested_items, decoder); - if nested_items.front().unwrap().len() < chunk_size { + if nested_items.front().unwrap().len() < chunk_size.unwrap_or(0) { MaybeNext::More } else { let nested = nested_items.pop_front().unwrap(); diff --git a/src/io/parquet/read/deserialize/null.rs b/src/io/parquet/read/deserialize/null.rs index d8e050676be..b3c159f0b86 100644 --- a/src/io/parquet/read/deserialize/null.rs +++ b/src/io/parquet/read/deserialize/null.rs @@ -3,7 +3,11 @@ use crate::{array::NullArray, datatypes::DataType}; use super::super::{ArrayIter, DataPages}; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, I>(mut iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> +pub fn iter_to_arrays<'a, I>( + mut iter: I, + data_type: DataType, + chunk_size: Option, +) -> ArrayIter<'a> where I: 'a + DataPages, { @@ -13,6 +17,8 @@ where len += x.num_values() } + let chunk_size = chunk_size.unwrap_or(len); + let complete_chunks = chunk_size / len; let remainder = chunk_size % len; let i_data_type = data_type.clone(); diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index abb766a6968..51aac9736bf 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -296,7 +296,7 @@ where iter: I, data_type: DataType, items: VecDeque<(Vec, MutableBitmap)>, - chunk_size: usize, + chunk_size: Option, op: F, phantom: std::marker::PhantomData

, } @@ -309,7 +309,7 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - pub fn new(iter: I, data_type: DataType, chunk_size: usize, op: F) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option, op: F) -> Self { Self { iter, data_type, diff --git a/src/io/parquet/read/deserialize/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs index b3a9a9cc521..ade0330a53b 100644 --- a/src/io/parquet/read/deserialize/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -47,7 +47,7 @@ where data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, - chunk_size: usize, + chunk_size: Option, op: F, phantom: std::marker::PhantomData

, } @@ -61,7 +61,7 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - pub fn new(iter: I, data_type: DataType, chunk_size: usize, op: F) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option, op: F) -> Self { let data_type = match data_type { DataType::Dictionary(_, values, _) => *values, _ => data_type, diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index 44ab76fa4d0..5fcaef5a53c 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -16,7 +16,7 @@ pub fn iter_to_arrays_nested<'a, I, T, P, F>( iter: I, init: Vec, data_type: DataType, - chunk_size: usize, + chunk_size: Option, op: F, ) -> NestedArrayIter<'a> where diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 4b6e01ab40f..841ef1719b9 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -179,7 +179,7 @@ where // invariant: items.len() == nested.len() items: VecDeque<(Vec, MutableBitmap)>, nested: VecDeque, - chunk_size: usize, + chunk_size: Option, decoder: PrimitiveDecoder, } @@ -195,7 +195,7 @@ where iter: I, init: Vec, data_type: DataType, - chunk_size: usize, + chunk_size: Option, op: F, ) -> Self { Self { diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index fe895c326f1..43d79250f3f 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -62,7 +62,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( pages: I, type_: &PrimitiveType, data_type: DataType, - chunk_size: usize, + chunk_size: Option, ) -> Result> { use DataType::*; @@ -232,7 +232,7 @@ fn timestamp<'a, I: 'a + DataPages>( physical_type: &PhysicalType, logical_type: &Option, data_type: DataType, - chunk_size: usize, + chunk_size: Option, time_unit: TimeUnit, ) -> Result> { if physical_type == &PhysicalType::Int96 { @@ -291,7 +291,7 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( physical_type: &PhysicalType, logical_type: &Option, data_type: DataType, - chunk_size: usize, + chunk_size: Option, time_unit: TimeUnit, ) -> Result> { if physical_type == &PhysicalType::Int96 { @@ -426,7 +426,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( physical_type: &PhysicalType, logical_type: &Option, data_type: DataType, - chunk_size: usize, + chunk_size: Option, ) -> Result> { use DataType::*; let values_data_type = if let Dictionary(_, v, _) = &data_type { diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index 851f8ffd4f7..9e560e43014 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -374,10 +374,13 @@ pub(super) trait Decoder<'a> { pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( mut page: T::State, - chunk_size: usize, + chunk_size: Option, items: &mut VecDeque, decoder: &T, ) { + let capacity = chunk_size.unwrap_or(0); + let chunk_size = chunk_size.unwrap_or(usize::MAX); + let mut decoded = if let Some(decoded) = items.pop_back() { // there is a already a state => it must be incomplete... debug_assert!( @@ -387,7 +390,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( decoded } else { // there is no state => initialize it - decoder.with_capacity(chunk_size) + decoder.with_capacity(capacity) }; let remaining = chunk_size - decoded.len(); @@ -398,7 +401,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( items.push_back(decoded); while page.len() > 0 { - let mut decoded = decoder.with_capacity(chunk_size); + let mut decoded = decoder.with_capacity(capacity); decoder.extend_from_state(&mut page, &mut decoded, chunk_size); items.push_back(decoded) } @@ -415,7 +418,7 @@ pub enum MaybeNext

{ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( iter: &'a mut I, items: &mut VecDeque, - chunk_size: usize, + chunk_size: Option, decoder: &D, ) -> MaybeNext> { // front[a1, a2, a3, ...]back @@ -435,7 +438,7 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( extend_from_new_page(page, chunk_size, items, decoder); - if (items.len() == 1) && items.front().unwrap().len() < chunk_size { + if (items.len() == 1) && items.front().unwrap().len() < chunk_size.unwrap_or(0) { MaybeNext::More } else { let decoded = items.pop_front().unwrap(); @@ -446,7 +449,7 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( if let Some(decoded) = items.pop_front() { // we have a populated item and no more pages // the only case where an item's length may be smaller than chunk_size - debug_assert!(decoded.len() <= chunk_size); + debug_assert!(decoded.len() <= chunk_size.unwrap_or(usize::MAX)); MaybeNext::Some(Ok(decoded)) } else { MaybeNext::None diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index dfac4c38df4..19192390643 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -22,11 +22,11 @@ pub use parquet2::{ metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData}, page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ - decompress, get_column_iterator, get_page_iterator as _get_page_iterator, - get_page_stream as _get_page_stream, read_columns_indexes as _read_columns_indexes, - read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, - read_pages_locations, BasicDecompressor, ColumnChunkIter, Decompressor, - MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State, + decompress, get_column_iterator, get_page_stream, + read_columns_indexes as _read_columns_indexes, read_metadata as _read_metadata, + read_metadata_async as _read_metadata_async, read_pages_locations, BasicDecompressor, + ColumnChunkIter, Decompressor, MutStreamingIterator, PageFilter, PageReader, + ReadColumnIterator, State, }, schema::types::{ GroupLogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType, diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index fea9638e5e2..38dd4520bd7 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -196,7 +196,7 @@ pub fn to_deserializer<'a>( }) .unzip(); - column_iter_to_arrays(columns, types, field, chunk_size) + column_iter_to_arrays(columns, types, field, Some(chunk_size)) } /// Returns a vector of iterators of [`Array`] ([`ArrayIter`]) corresponding to the top diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index 3f794a1cc56..181e3d2702e 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -125,7 +125,7 @@ fn read_with_indexes( vec![pages], vec![&c1.descriptor().descriptor.primitive_type], schema.fields[1].clone(), - row_group.num_rows() as usize, + Some(row_group.num_rows() as usize), )?; let arrays = arrays.collect::>>()?;