Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Made chunk_size optional in parquet's column_iter_to_arrays #1055

Merged
merged 1 commit into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,12 @@ pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
iter: I,
data_type: DataType,
items: VecDeque<(Binary<O>, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iter<O, A, I> {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
Self {
iter,
data_type,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ where
data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
phantom: std::marker::PhantomData<O>,
}

Expand All @@ -35,7 +35,7 @@ where
O: Offset,
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
Expand Down
9 changes: 7 additions & 2 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,17 @@ pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
init: Vec<InitNested>,
items: VecDeque<(Binary<O>, MutableBitmap)>,
nested: VecDeque<NestedState>,
chunk_size: usize,
chunk_size: Option<usize>,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
pub fn new(iter: I, init: Vec<InitNested>, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> Self {
Self {
iter,
data_type,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ pub struct Iter<I: DataPages> {
iter: I,
data_type: DataType,
items: VecDeque<(MutableBitmap, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
}

impl<I: DataPages> Iter<I> {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
Self {
iter,
data_type,
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use self::basic::Iter;
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: Vec<InitNested>,
chunk_size: usize,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: DataPages,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ pub struct ArrayIterator<I: DataPages> {
// invariant: items.len() == nested.len()
items: VecDeque<(MutableBitmap, MutableBitmap)>,
nested: VecDeque<NestedState>,
chunk_size: usize,
chunk_size: Option<usize>,
}

impl<I: DataPages> ArrayIterator<I> {
pub fn new(iter: I, init: Vec<InitNested>, chunk_size: usize) -> Self {
pub fn new(iter: I, init: Vec<InitNested>, chunk_size: Option<usize>) -> Self {
Self {
iter,
init,
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/deserialize/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ pub(super) fn next_dict<
iter: &'a mut I,
items: &mut VecDeque<(Vec<K>, MutableBitmap)>,
dict: &mut Dict,
chunk_size: usize,
chunk_size: Option<usize>,
read_dict: F,
) -> MaybeNext<Result<DictionaryArray<K>>> {
if items.len() > 1 {
Expand Down Expand Up @@ -249,7 +249,7 @@ pub(super) fn next_dict<

utils::extend_from_new_page(page, chunk_size, items, &PrimitiveDecoder::<K>::default());

if items.front().unwrap().len() < chunk_size {
if items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX) {
MaybeNext::More
} else {
let (values, validity) = items.pop_front().unwrap();
Expand All @@ -262,7 +262,7 @@ pub(super) fn next_dict<
if let Some((values, validity)) = items.pop_front() {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= chunk_size);
debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX));

let keys = finish_key(values, validity);

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,11 @@ pub struct Iter<I: DataPages> {
data_type: DataType,
size: usize,
items: VecDeque<(FixedSizeBinary, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
}

impl<I: DataPages> Iter<I> {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
let size = FixedSizeBinaryArray::get_size(&data_type);
Self {
iter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ where
data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
}

impl<K, I> DictIter<K, I>
where
K: DictionaryKey,
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
Expand Down
11 changes: 8 additions & 3 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ mod simple;
mod struct_;
mod utils;

use parquet2::read::get_page_iterator as _get_page_iterator;
use parquet2::schema::types::PrimitiveType;

use crate::{
array::{Array, BinaryArray, FixedSizeListArray, ListArray, MapArray, Utf8Array},
datatypes::{DataType, Field},
error::{Error, Result},
};

use self::nested_utils::{InitNested, NestedArrayIter, NestedState};
use parquet2::schema::types::PrimitiveType;
use simple::page_iter_to_arrays;

use super::*;
Expand Down Expand Up @@ -94,7 +96,7 @@ fn columns_to_iter_recursive<'a, I: 'a>(
mut types: Vec<&PrimitiveType>,
field: Field,
mut init: Vec<InitNested>,
chunk_size: usize,
chunk_size: Option<usize>,
) -> Result<NestedArrayIter<'a>>
where
I: DataPages,
Expand Down Expand Up @@ -359,12 +361,15 @@ fn n_columns(data_type: &DataType) -> usize {

/// An iterator adapter that maps multiple iterators of [`DataPages`] into an iterator of [`Array`]s.
///
/// For a non-nested datatypes such as [`DataType::Int32`], this function requires a single element in `columns` and `types`.
/// For nested types, `columns` must be composed by all parquet columns with associated types `types`.
///
/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`.
pub fn column_iter_to_arrays<'a, I: 'a>(
columns: Vec<I>,
types: Vec<&PrimitiveType>,
field: Field,
chunk_size: usize,
chunk_size: Option<usize>,
) -> Result<ArrayIter<'a>>
where
I: DataPages,
Expand Down
15 changes: 9 additions & 6 deletions src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,11 @@ pub fn extend_offsets1<'a>(
page: &mut NestedPage<'a>,
init: &[InitNested],
items: &mut VecDeque<NestedState>,
chunk_size: usize,
chunk_size: Option<usize>,
) {
let capacity = chunk_size.unwrap_or(0);
let chunk_size = chunk_size.unwrap_or(usize::MAX);

let mut nested = if let Some(nested) = items.pop_back() {
// there is a already a state => it must be incomplete...
debug_assert!(
Expand All @@ -374,7 +377,7 @@ pub fn extend_offsets1<'a>(
nested
} else {
// there is no state => initialize it
init_nested(init, chunk_size)
init_nested(init, capacity)
};

let remaining = chunk_size - nested.len();
Expand All @@ -384,7 +387,7 @@ pub fn extend_offsets1<'a>(
items.push_back(nested);

while page.len() > 0 {
let mut nested = init_nested(init, chunk_size);
let mut nested = init_nested(init, capacity);
extend_offsets2(page, &mut nested, chunk_size);
items.push_back(nested);
}
Expand Down Expand Up @@ -425,7 +428,7 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi

let next_rep = page.iter.peek().map(|x| x.0).unwrap_or(0);

if next_rep == 0 && rows == additional + 1 {
if next_rep == 0 && rows == additional.saturating_add(1) {
break;
}
}
Expand Down Expand Up @@ -478,7 +481,7 @@ pub(super) fn next<'a, I, D>(
items: &mut VecDeque<D::DecodedState>,
nested_items: &mut VecDeque<NestedState>,
init: &[InitNested],
chunk_size: usize,
chunk_size: Option<usize>,
decoder: &D,
) -> MaybeNext<Result<(NestedState, D::DecodedState)>>
where
Expand Down Expand Up @@ -517,7 +520,7 @@ where

extend_from_new_page(page, items, nested_items, decoder);

if nested_items.front().unwrap().len() < chunk_size {
if nested_items.front().unwrap().len() < chunk_size.unwrap_or(0) {
MaybeNext::More
} else {
let nested = nested_items.pop_front().unwrap();
Expand Down
8 changes: 7 additions & 1 deletion src/io/parquet/read/deserialize/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ use crate::{array::NullArray, datatypes::DataType};
use super::super::{ArrayIter, DataPages};

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, I>(mut iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
pub fn iter_to_arrays<'a, I>(
mut iter: I,
data_type: DataType,
chunk_size: Option<usize>,
) -> ArrayIter<'a>
where
I: 'a + DataPages,
{
Expand All @@ -13,6 +17,8 @@ where
len += x.num_values()
}

let chunk_size = chunk_size.unwrap_or(len);

let complete_chunks = chunk_size / len;
let remainder = chunk_size % len;
let i_data_type = data_type.clone();
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ where
iter: I,
data_type: DataType,
items: VecDeque<(Vec<T>, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
op: F,
phantom: std::marker::PhantomData<P>,
}
Expand All @@ -309,7 +309,7 @@ where
P: ParquetNativeType,
F: Copy + Fn(P) -> T,
{
pub fn new(iter: I, data_type: DataType, chunk_size: usize, op: F) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, op: F) -> Self {
Self {
iter,
data_type,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/primitive/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: usize,
chunk_size: Option<usize>,
op: F,
phantom: std::marker::PhantomData<P>,
}
Expand All @@ -61,7 +61,7 @@ where
P: ParquetNativeType,
F: Copy + Fn(P) -> T,
{
pub fn new(iter: I, data_type: DataType, chunk_size: usize, op: F) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, op: F) -> Self {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => *values,
_ => data_type,
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn iter_to_arrays_nested<'a, I, T, P, F>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
op: F,
) -> NestedArrayIter<'a>
where
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ where
// invariant: items.len() == nested.len()
items: VecDeque<(Vec<T>, MutableBitmap)>,
nested: VecDeque<NestedState>,
chunk_size: usize,
chunk_size: Option<usize>,
decoder: PrimitiveDecoder<T, P, F>,
}

Expand All @@ -195,7 +195,7 @@ where
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
op: F,
) -> Self {
Self {
Expand Down
8 changes: 4 additions & 4 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
pages: I,
type_: &PrimitiveType,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
) -> Result<ArrayIter<'a>> {
use DataType::*;

Expand Down Expand Up @@ -232,7 +232,7 @@ fn timestamp<'a, I: 'a + DataPages>(
physical_type: &PhysicalType,
logical_type: &Option<PrimitiveLogicalType>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
time_unit: TimeUnit,
) -> Result<ArrayIter<'a>> {
if physical_type == &PhysicalType::Int96 {
Expand Down Expand Up @@ -291,7 +291,7 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>(
physical_type: &PhysicalType,
logical_type: &Option<PrimitiveLogicalType>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
time_unit: TimeUnit,
) -> Result<ArrayIter<'a>> {
if physical_type == &PhysicalType::Int96 {
Expand Down Expand Up @@ -426,7 +426,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>(
physical_type: &PhysicalType,
logical_type: &Option<PrimitiveLogicalType>,
data_type: DataType,
chunk_size: usize,
chunk_size: Option<usize>,
) -> Result<ArrayIter<'a>> {
use DataType::*;
let values_data_type = if let Dictionary(_, v, _) = &data_type {
Expand Down
Loading