From b14cd617480913129b9b3136d9b88363046f3247 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Mon, 27 Jun 2022 13:36:10 -0700 Subject: [PATCH] Removed some panics reading invalid parquet files (#1106) --- Cargo.toml | 2 +- .../parquet/read/deserialize/binary/basic.rs | 56 ++++++++++--------- .../parquet/read/deserialize/binary/nested.rs | 20 ++++--- .../parquet/read/deserialize/boolean/basic.rs | 29 +++++----- .../read/deserialize/boolean/nested.rs | 20 ++++--- src/io/parquet/read/deserialize/dictionary.rs | 32 +++++------ .../deserialize/fixed_size_binary/basic.rs | 52 ++++++++--------- .../parquet/read/deserialize/nested_utils.rs | 26 +++++---- .../read/deserialize/primitive/basic.rs | 42 +++++++------- .../read/deserialize/primitive/nested.rs | 15 ++--- src/io/parquet/read/deserialize/utils.rs | 33 ++++++----- 11 files changed, 172 insertions(+), 155 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5d355c6a0c0..4bc21d59487 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,7 +67,7 @@ futures = { version = "0.3", optional = true } ahash = { version = "0.7", optional = true } # parquet support -parquet2 = { version = "0.13.1", optional = true, default_features = false } +parquet2 = { version = "0.14.0", optional = true, default_features = false } # avro support avro-schema = { version = "0.2", optional = true } diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index f678ba204a9..63aa8ff642a 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -4,7 +4,7 @@ use std::default::Default; use parquet2::{ deserialize::SliceFilteredIter, encoding::{hybrid_rle, Encoding}, - page::{BinaryPageDict, DataPage}, + page::{split_buffer, BinaryPageDict, DataPage}, schema::Repetition, }; @@ -67,11 +67,11 @@ pub(super) struct Required<'a> { } impl<'a> Required<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, _, values) = utils::split_buffer(page); + pub fn try_new(page: &'a DataPage) -> Result { + let (_, _, values) = split_buffer(page)?; let values = SizedBinaryIter::new(values, page.num_values()); - Self { values } + Ok(Self { values }) } pub fn len(&self) -> usize { @@ -106,10 +106,10 @@ pub(super) struct RequiredDictionary<'a> { } impl<'a> RequiredDictionary<'a> { - pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self { - let values = utils::dict_indices_decoder(page); + pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result { + let values = utils::dict_indices_decoder(page)?; - Self { dict, values } + Ok(Self { dict, values }) } #[inline] @@ -125,13 +125,13 @@ pub(super) struct FilteredRequiredDictionary<'a> { } impl<'a> FilteredRequiredDictionary<'a> { - pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self { - let values = utils::dict_indices_decoder(page); + pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result { + let values = utils::dict_indices_decoder(page)?; let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); - Self { values, dict } + Ok(Self { values, dict }) } #[inline] @@ -147,10 +147,10 @@ pub(super) struct ValuesDictionary<'a> { } impl<'a> ValuesDictionary<'a> { - pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self { - let values = utils::dict_indices_decoder(page); + pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result { + let values = utils::dict_indices_decoder(page)?; - Self { dict, values } + Ok(Self { dict, values }) } #[inline] @@ -246,50 +246,52 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { is_filtered, ) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { - Ok(State::RequiredDictionary(RequiredDictionary::new( + Ok(State::RequiredDictionary(RequiredDictionary::try_new( page, dict.as_any().downcast_ref().unwrap(), - ))) + )?)) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(State::OptionalDictionary( - OptionalPageValidity::new(page), - ValuesDictionary::new(page, dict), + OptionalPageValidity::try_new(page)?, + ValuesDictionary::try_new(page, dict)?, )) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, true) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::FilteredRequiredDictionary( - FilteredRequiredDictionary::new(page, dict), - )) + FilteredRequiredDictionary::try_new(page, dict) + .map(State::FilteredRequiredDictionary) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, true) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(State::FilteredOptionalDictionary( - FilteredOptionalPageValidity::new(page), - ValuesDictionary::new(page, dict), + FilteredOptionalPageValidity::try_new(page)?, + ValuesDictionary::try_new(page, dict)?, )) } (Encoding::Plain, _, true, false) => { - let (_, _, values) = utils::split_buffer(page); + let (_, _, values) = split_buffer(page)?; let values = BinaryIter::new(values); - Ok(State::Optional(OptionalPageValidity::new(page), values)) + Ok(State::Optional( + OptionalPageValidity::try_new(page)?, + values, + )) } - (Encoding::Plain, _, false, false) => Ok(State::Required(Required::new(page))), + (Encoding::Plain, _, false, false) => Ok(State::Required(Required::try_new(page)?)), (Encoding::Plain, _, false, true) => { Ok(State::FilteredRequired(FilteredRequired::new(page))) } (Encoding::Plain, _, true, true) => { - let (_, _, values) = utils::split_buffer(page); + let (_, _, values) = split_buffer(page)?; Ok(State::FilteredOptional( - FilteredOptionalPageValidity::new(page), + FilteredOptionalPageValidity::try_new(page)?, BinaryIter::new(values), )) } diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index aeb87a352c5..8dbf1ba22d0 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -1,6 +1,10 @@ use std::collections::VecDeque; -use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition}; +use parquet2::{ + encoding::Encoding, + page::{split_buffer, DataPage}, + schema::Repetition, +}; use crate::{ array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result, @@ -58,23 +62,25 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { ) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict))) + Ok(State::RequiredDictionary(ValuesDictionary::try_new( + page, dict, + )?)) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(State::OptionalDictionary( - Optional::new(page), - ValuesDictionary::new(page, dict), + Optional::try_new(page)?, + ValuesDictionary::try_new(page, dict)?, )) } (Encoding::Plain, _, true, false) => { - let (_, _, values) = utils::split_buffer(page); + let (_, _, values) = split_buffer(page)?; let values = BinaryIter::new(values); - Ok(State::Optional(Optional::new(page), values)) + Ok(State::Optional(Optional::try_new(page)?, values)) } - (Encoding::Plain, _, false, false) => Ok(State::Required(Required::new(page))), + (Encoding::Plain, _, false, false) => Ok(State::Required(Required::try_new(page)?)), _ => Err(utils::not_implemented(page)), } } diff --git a/src/io/parquet/read/deserialize/boolean/basic.rs b/src/io/parquet/read/deserialize/boolean/basic.rs index cca21bae28e..d3189b23f64 100644 --- a/src/io/parquet/read/deserialize/boolean/basic.rs +++ b/src/io/parquet/read/deserialize/boolean/basic.rs @@ -1,7 +1,10 @@ use std::collections::VecDeque; use parquet2::{ - deserialize::SliceFilteredIter, encoding::Encoding, page::DataPage, schema::Repetition, + deserialize::SliceFilteredIter, + encoding::Encoding, + page::{split_buffer, DataPage}, + schema::Repetition, }; use crate::{ @@ -13,7 +16,7 @@ use crate::{ use super::super::utils; use super::super::utils::{ - extend_from_decoder, get_selected_rows, next, split_buffer, DecodedState, Decoder, + extend_from_decoder, get_selected_rows, next, DecodedState, Decoder, FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity, }; use super::super::DataPages; @@ -22,10 +25,10 @@ use super::super::DataPages; struct Values<'a>(BitmapIter<'a>); impl<'a> Values<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, _, values) = split_buffer(page); + pub fn try_new(page: &'a DataPage) -> Result { + let (_, _, values) = split_buffer(page)?; - Self(BitmapIter::new(values, 0, values.len() * 8)) + Ok(Self(BitmapIter::new(values, 0, values.len() * 8))) } } @@ -54,15 +57,15 @@ struct FilteredRequired<'a> { } impl<'a> FilteredRequired<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, _, values) = utils::split_buffer(page); + pub fn try_new(page: &'a DataPage) -> Result { + let (_, _, values) = split_buffer(page)?; // todo: replace this by an iterator over slices, for faster deserialization let values = BitmapIter::new(values, 0, page.num_values()); let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); - Self { values } + Ok(Self { values }) } #[inline] @@ -117,16 +120,16 @@ impl<'a> Decoder<'a> for BooleanDecoder { match (page.encoding(), is_optional, is_filtered) { (Encoding::Plain, true, false) => Ok(State::Optional( - OptionalPageValidity::new(page), - Values::new(page), + OptionalPageValidity::try_new(page)?, + Values::try_new(page)?, )), (Encoding::Plain, false, false) => Ok(State::Required(Required::new(page))), (Encoding::Plain, true, true) => Ok(State::FilteredOptional( - FilteredOptionalPageValidity::new(page), - Values::new(page), + FilteredOptionalPageValidity::try_new(page)?, + Values::try_new(page)?, )), (Encoding::Plain, false, true) => { - Ok(State::FilteredRequired(FilteredRequired::new(page))) + Ok(State::FilteredRequired(FilteredRequired::try_new(page)?)) } _ => Err(utils::not_implemented(page)), } diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index 8b162908cb3..163f23a9f30 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -1,6 +1,10 @@ use std::collections::VecDeque; -use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition}; +use parquet2::{ + encoding::Encoding, + page::{split_buffer, DataPage}, + schema::Repetition, +}; use crate::{ array::BooleanArray, @@ -24,13 +28,13 @@ struct Required<'a> { } impl<'a> Required<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, _, values) = utils::split_buffer(page); - Self { + pub fn try_new(page: &'a DataPage) -> Result { + let (_, _, values) = split_buffer(page)?; + Ok(Self { values, offset: 0, length: page.num_values(), - } + }) } } @@ -71,12 +75,12 @@ impl<'a> Decoder<'a> for BooleanDecoder { match (page.encoding(), is_optional, is_filtered) { (Encoding::Plain, true, false) => { - let (_, _, values) = utils::split_buffer(page); + let (_, _, values) = split_buffer(page)?; let values = BitmapIter::new(values, 0, values.len() * 8); - Ok(State::Optional(Optional::new(page), values)) + Ok(State::Optional(Optional::try_new(page)?, values)) } - (Encoding::Plain, false, false) => Ok(State::Required(Required::new(page))), + (Encoding::Plain, false, false) => Ok(State::Required(Required::try_new(page)?)), _ => Err(utils::not_implemented(page)), } } diff --git a/src/io/parquet/read/deserialize/dictionary.rs b/src/io/parquet/read/deserialize/dictionary.rs index d62260f5b53..62ef51adc9c 100644 --- a/src/io/parquet/read/deserialize/dictionary.rs +++ b/src/io/parquet/read/deserialize/dictionary.rs @@ -36,9 +36,9 @@ pub struct Required<'a> { } impl<'a> Required<'a> { - fn new(page: &'a DataPage) -> Self { - let values = dict_indices_decoder(page); - Self { values } + fn try_new(page: &'a DataPage) -> Result { + let values = dict_indices_decoder(page)?; + Ok(Self { values }) } } @@ -48,13 +48,13 @@ pub struct FilteredRequired<'a> { } impl<'a> FilteredRequired<'a> { - fn new(page: &'a DataPage) -> Self { - let values = dict_indices_decoder(page); + fn try_new(page: &'a DataPage) -> Result { + let values = dict_indices_decoder(page)?; let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); - Self { values } + Ok(Self { values }) } } @@ -65,13 +65,13 @@ pub struct Optional<'a> { } impl<'a> Optional<'a> { - fn new(page: &'a DataPage) -> Self { - let values = dict_indices_decoder(page); + fn try_new(page: &'a DataPage) -> Result { + let values = dict_indices_decoder(page)?; - Self { + Ok(Self { values, - validity: OptionalPageValidity::new(page), - } + validity: OptionalPageValidity::try_new(page)?, + }) } } @@ -120,18 +120,18 @@ where match (page.encoding(), is_optional, is_filtered) { (Encoding::PlainDictionary | Encoding::RleDictionary, false, false) => { - Ok(State::Required(Required::new(page))) + Required::try_new(page).map(State::Required) } (Encoding::PlainDictionary | Encoding::RleDictionary, true, false) => { - Ok(State::Optional(Optional::new(page))) + Optional::try_new(page).map(State::Optional) } (Encoding::PlainDictionary | Encoding::RleDictionary, false, true) => { - Ok(State::FilteredRequired(FilteredRequired::new(page))) + FilteredRequired::try_new(page).map(State::FilteredRequired) } (Encoding::PlainDictionary | Encoding::RleDictionary, true, true) => { Ok(State::FilteredOptional( - FilteredOptionalPageValidity::new(page), - dict_indices_decoder(page), + FilteredOptionalPageValidity::try_new(page)?, + dict_indices_decoder(page)?, )) } _ => Err(utils::not_implemented(page)), diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs index bbf8abb00bc..5f260967e71 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs @@ -3,7 +3,7 @@ use std::collections::VecDeque; use parquet2::{ deserialize::SliceFilteredIter, encoding::{hybrid_rle, Encoding}, - page::{DataPage, FixedLenByteArrayPageDict}, + page::{split_buffer, DataPage, FixedLenByteArrayPageDict}, schema::Repetition, }; @@ -13,8 +13,8 @@ use crate::{ use super::super::utils::{ dict_indices_decoder, extend_from_decoder, get_selected_rows, next, not_implemented, - split_buffer, DecodedState, Decoder, FilteredOptionalPageValidity, MaybeNext, - OptionalPageValidity, PageState, Pushable, + DecodedState, Decoder, FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity, + PageState, Pushable, }; use super::super::DataPages; use super::utils::FixedSizeBinary; @@ -26,15 +26,15 @@ struct Optional<'a> { } impl<'a> Optional<'a> { - fn new(page: &'a DataPage, size: usize) -> Self { - let (_, _, values_buffer) = split_buffer(page); + fn try_new(page: &'a DataPage, size: usize) -> Result { + let (_, _, values) = split_buffer(page)?; - let values = values_buffer.chunks_exact(size); + let values = values.chunks_exact(size); - Self { + Ok(Self { values, - validity: OptionalPageValidity::new(page), - } + validity: OptionalPageValidity::try_new(page)?, + }) } } @@ -87,10 +87,10 @@ struct RequiredDictionary<'a> { } impl<'a> RequiredDictionary<'a> { - fn new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Self { - let values = dict_indices_decoder(page); + fn try_new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Result { + let values = dict_indices_decoder(page)?; - Self { dict, values } + Ok(Self { dict, values }) } #[inline] @@ -107,14 +107,14 @@ struct OptionalDictionary<'a> { } impl<'a> OptionalDictionary<'a> { - fn new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Self { - let values = dict_indices_decoder(page); + fn try_new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Result { + let values = dict_indices_decoder(page)?; - Self { + Ok(Self { values, - validity: OptionalPageValidity::new(page), + validity: OptionalPageValidity::try_new(page)?, dict, - } + }) } } @@ -170,31 +170,27 @@ impl<'a> Decoder<'a> for BinaryDecoder { is_filtered, ) { (Encoding::Plain, None, true, false) => { - Ok(State::Optional(Optional::new(page, self.size))) + Ok(State::Optional(Optional::try_new(page, self.size)?)) } (Encoding::Plain, None, false, false) => { Ok(State::Required(Required::new(page, self.size))) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { - Ok(State::RequiredDictionary(RequiredDictionary::new( - page, - dict.as_any().downcast_ref().unwrap(), - ))) + RequiredDictionary::try_new(page, dict.as_any().downcast_ref().unwrap()) + .map(State::RequiredDictionary) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { - Ok(State::OptionalDictionary(OptionalDictionary::new( - page, - dict.as_any().downcast_ref().unwrap(), - ))) + OptionalDictionary::try_new(page, dict.as_any().downcast_ref().unwrap()) + .map(State::OptionalDictionary) } (Encoding::Plain, None, false, true) => Ok(State::FilteredRequired( FilteredRequired::new(page, self.size), )), (Encoding::Plain, _, true, true) => { - let (_, _, values) = split_buffer(page); + let (_, _, values) = split_buffer(page)?; Ok(State::FilteredOptional( - FilteredOptionalPageValidity::new(page), + FilteredOptionalPageValidity::try_new(page)?, values.chunks_exact(self.size), )) } diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index c829ca43ea6..5c7f062f969 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -1,14 +1,16 @@ use std::collections::VecDeque; use parquet2::{ - encoding::hybrid_rle::HybridRleDecoder, page::DataPage, read::levels::get_bit_width, + encoding::hybrid_rle::HybridRleDecoder, + page::{split_buffer, DataPage}, + read::levels::get_bit_width, }; use crate::{array::Array, bitmap::MutableBitmap, error::Result}; use super::super::DataPages; pub use super::utils::Zip; -use super::utils::{split_buffer, DecodedState, Decoder, MaybeNext, Pushable}; +use super::utils::{DecodedState, Decoder, MaybeNext, Pushable}; /// trait describing deserialized repetition and definition levels pub trait Nested: std::fmt::Debug + Send + Sync { @@ -271,8 +273,8 @@ pub struct NestedPage<'a> { } impl<'a> NestedPage<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (rep_levels, def_levels, _) = split_buffer(page); + pub fn try_new(page: &'a DataPage) -> Result { + let (rep_levels, def_levels, _) = split_buffer(page)?; let max_rep_level = page.descriptor.max_rep_level; let max_def_level = page.descriptor.max_def_level; @@ -284,7 +286,7 @@ impl<'a> NestedPage<'a> { let iter = reps.zip(defs).peekable(); - Self { iter } + Ok(Self { iter }) } // number of values (!= number of rows) @@ -458,15 +460,15 @@ impl<'a> Iterator for Optional<'a> { } impl<'a> Optional<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, def_levels, _) = split_buffer(page); + pub fn try_new(page: &'a DataPage) -> Result { + let (_, def_levels, _) = split_buffer(page)?; let max_def = page.descriptor.max_def_level; - Self { + Ok(Self { iter: HybridRleDecoder::new(def_levels, get_bit_width(max_def), page.num_values()), max_def: max_def as u32, - } + }) } #[inline] @@ -508,7 +510,11 @@ where } Ok(Some(page)) => { // there is a new page => consume the page from the start - let mut nested_page = NestedPage::new(page); + let nested_page = NestedPage::try_new(page); + let mut nested_page = match nested_page { + Ok(page) => page, + Err(e) => return MaybeNext::Some(Err(e)), + }; extend_offsets1(&mut nested_page, init, nested_items, chunk_size); diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index 51aac9736bf..6e947b5643b 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -3,7 +3,7 @@ use std::collections::VecDeque; use parquet2::{ deserialize::SliceFilteredIter, encoding::{hybrid_rle, Encoding}, - page::{DataPage, PrimitivePageDict}, + page::{split_buffer, DataPage, PrimitivePageDict}, schema::Repetition, types::decode, types::NativeType as ParquetNativeType, @@ -24,8 +24,8 @@ struct FilteredRequiredValues<'a> { } impl<'a> FilteredRequiredValues<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, _, values) = utils::split_buffer(page); + pub fn try_new(page: &'a DataPage) -> Result { + let (_, _, values) = split_buffer(page)?; assert_eq!(values.len() % std::mem::size_of::

(), 0); let values = values.chunks_exact(std::mem::size_of::

()); @@ -33,7 +33,7 @@ impl<'a> FilteredRequiredValues<'a> { let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); - Self { values } + Ok(Self { values }) } #[inline] @@ -48,12 +48,12 @@ pub(super) struct Values<'a> { } impl<'a> Values<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, _, values) = utils::split_buffer(page); + pub fn try_new(page: &'a DataPage) -> Result { + let (_, _, values) = split_buffer(page)?; assert_eq!(values.len() % std::mem::size_of::

(), 0); - Self { + Ok(Self { values: values.chunks_exact(std::mem::size_of::

()), - } + }) } #[inline] @@ -75,13 +75,13 @@ impl<'a, P> ValuesDictionary<'a, P> where P: ParquetNativeType, { - pub fn new(page: &'a DataPage, dict: &'a PrimitivePageDict

) -> Self { - let values = utils::dict_indices_decoder(page); + pub fn try_new(page: &'a DataPage, dict: &'a PrimitivePageDict

) -> Result { + let values = utils::dict_indices_decoder(page)?; - Self { + Ok(Self { dict: dict.values(), values, - } + }) } #[inline] @@ -176,29 +176,29 @@ where ) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict))) + ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(State::OptionalDictionary( - OptionalPageValidity::new(page), - ValuesDictionary::new(page, dict), + OptionalPageValidity::try_new(page)?, + ValuesDictionary::try_new(page, dict)?, )) } (Encoding::Plain, _, true, false) => { - let validity = OptionalPageValidity::new(page); - let values = Values::new::

(page); + let validity = OptionalPageValidity::try_new(page)?; + let values = Values::try_new::

(page)?; Ok(State::Optional(validity, values)) } - (Encoding::Plain, _, false, false) => Ok(State::Required(Values::new::

(page))), + (Encoding::Plain, _, false, false) => Ok(State::Required(Values::try_new::

(page)?)), (Encoding::Plain, _, false, true) => Ok(State::FilteredRequired( - FilteredRequiredValues::new::

(page), + FilteredRequiredValues::try_new::

(page)?, )), (Encoding::Plain, _, true, true) => Ok(State::FilteredOptional( - FilteredOptionalPageValidity::new(page), - Values::new::

(page), + FilteredOptionalPageValidity::try_new(page)?, + Values::try_new::

(page)?, )), _ => Err(utils::not_implemented(page)), } diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 841ef1719b9..92631372989 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -92,19 +92,20 @@ where ) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict))) + ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(State::OptionalDictionary( - Optional::new(page), - ValuesDictionary::new(page, dict), + Optional::try_new(page)?, + ValuesDictionary::try_new(page, dict)?, )) } - (Encoding::Plain, _, true, false) => { - Ok(State::Optional(Optional::new(page), Values::new::

(page))) - } - (Encoding::Plain, _, false, false) => Ok(State::Required(Values::new::

(page))), + (Encoding::Plain, _, true, false) => Ok(State::Optional( + Optional::try_new(page)?, + Values::try_new::

(page)?, + )), + (Encoding::Plain, _, false, false) => Ok(State::Required(Values::try_new::

(page)?)), _ => Err(utils::not_implemented(page)), } } diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index dd2174c4180..b29bc48ba4c 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -5,7 +5,7 @@ use parquet2::deserialize::{ }; use parquet2::encoding::hybrid_rle; use parquet2::indexes::Interval; -use parquet2::page::{split_buffer as _split_buffer, DataPage}; +use parquet2::page::{split_buffer, DataPage}; use parquet2::schema::Repetition; use crate::bitmap::utils::BitmapIter; @@ -34,11 +34,6 @@ pub fn not_implemented(page: &DataPage) -> Error { )) } -#[inline] -pub fn split_buffer(page: &DataPage) -> (&[u8], &[u8], &[u8]) { - _split_buffer(page) -} - /// A private trait representing structs that can receive elements. pub(super) trait Pushable: Sized { //fn reserve(&mut self, additional: usize); @@ -104,18 +99,18 @@ pub struct FilteredOptionalPageValidity<'a> { } impl<'a> FilteredOptionalPageValidity<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, validity, _) = split_buffer(page); + pub fn try_new(page: &'a DataPage) -> Result { + let (_, validity, _) = split_buffer(page)?; let iter = hybrid_rle::Decoder::new(validity, 1); let iter = HybridDecoderBitmapIter::new(iter, page.num_values()); let selected_rows = get_selected_rows(page); let iter = FilteredHybridRleDecoderIter::new(iter, selected_rows); - Self { + Ok(Self { iter, current: None, - } + }) } pub fn len(&self) -> usize { @@ -219,15 +214,15 @@ pub struct OptionalPageValidity<'a> { } impl<'a> OptionalPageValidity<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, validity, _) = split_buffer(page); + pub fn try_new(page: &'a DataPage) -> Result { + let (_, validity, _) = split_buffer(page)?; let iter = hybrid_rle::Decoder::new(validity, 1); let iter = HybridDecoderBitmapIter::new(iter, page.num_values()); - Self { + Ok(Self { iter, current: None, - } + }) } /// Number of items remaining @@ -459,13 +454,17 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( } #[inline] -pub(super) fn dict_indices_decoder(page: &DataPage) -> hybrid_rle::HybridRleDecoder { - let (_, _, indices_buffer) = split_buffer(page); +pub(super) fn dict_indices_decoder(page: &DataPage) -> Result { + let (_, _, indices_buffer) = split_buffer(page)?; // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). let bit_width = indices_buffer[0]; let indices_buffer = &indices_buffer[1..]; - hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, page.num_values()) + Ok(hybrid_rle::HybridRleDecoder::new( + indices_buffer, + bit_width as u32, + page.num_values(), + )) }