diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 05e802b97199..4be5269a37bf 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -120,7 +120,16 @@ impl ArrayReader for ByteArrayReader { } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records, self.pages.as_mut()) + if self.record_reader.column_reader().is_none() { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = self.pages.next() { + self.record_reader.set_page_reader(page_reader?)?; + } else { + return Ok(0); + } + } + self.record_reader.skip_records(num_records) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index c5b2aad98fbf..3afbcaa61a74 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -181,7 +181,16 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records, self.pages.as_mut()) + if self.record_reader.column_reader().is_none() { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = self.pages.next() { + self.record_reader.set_page_reader(page_reader?)?; + } else { + return Ok(0); + } + } + self.record_reader.skip_records(num_records) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index a25fc08a791a..6d7ed1bb547d 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -97,7 +97,16 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records, self.pages.as_mut()) + if self.record_reader.column_reader().is_none() { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = self.pages.next() { + self.record_reader.set_page_reader(page_reader?)?; + } else { + return Ok(0); + } + } + self.record_reader.skip_records(num_records) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index fcd2fc24f7df..a3c6cc7b1b10 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -222,7 +222,16 @@ where } fn skip_records(&mut self, num_records: usize) -> Result { - self.record_reader.skip_records(num_records, self.pages.as_mut()) + if self.record_reader.column_reader().is_none() { + // If we skip records before all read operation + // we need set `column_reader` by `set_page_reader` + if let Some(page_reader) = self.pages.next() { + self.record_reader.set_page_reader(page_reader?)?; + } else { + return Ok(0); + } + } + self.record_reader.skip_records(num_records) } fn get_def_levels(&self) -> Option<&[i16]> { diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index f03a44c511eb..9dcf5d7bf8da 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -309,14 +309,13 @@ impl Iterator for ParquetRecordBatchReader { // try to read record let to_read = match front.row_count.checked_sub(self.batch_size) { - Some(remaining) => { + Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - if remaining != 0 { - selection.push_front(RowSelection::select(remaining)); - } + selection.push_front(RowSelection::select(remaining)); self.batch_size } + Some(_) => self.batch_size, None => front.row_count, }; diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 195671db1cd5..b68f59d514f2 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -24,7 +24,6 @@ use crate::arrow::record_reader::{ buffer::{BufferQueue, ScalarBuffer, ValuesBuffer}, definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder}, }; -use crate::column::page::PageIterator; use crate::column::{ page::PageReader, reader::{ @@ -46,6 +45,9 @@ pub(crate) const MIN_BATCH_SIZE: usize = 1024; pub type RecordReader = GenericRecordReader::T>, ColumnValueDecoderImpl>; +pub(crate) type ColumnReader = + GenericColumnReader; + /// A generic stateful column reader that delimits semantic records /// /// This type is hidden from the docs, and relies on private traits with no @@ -57,9 +59,7 @@ pub struct GenericRecordReader { records: V, def_levels: Option, rep_levels: Option>, - column_reader: Option< - GenericColumnReader, - >, + column_reader: Option>, /// Number of records accumulated in records num_records: usize, @@ -185,24 +185,11 @@ where /// # Returns /// /// Number of records skipped - pub fn skip_records( - &mut self, - num_records: usize, - pages: &mut dyn PageIterator, - ) -> Result { + pub fn skip_records(&mut self, num_records: usize) -> Result { // First need to clear the buffer let end_of_column = match self.column_reader.as_mut() { Some(reader) => !reader.has_next()?, - None => { - // If we skip records before all read operation - // we need set `column_reader` by `set_page_reader` - if let Some(page_reader) = pages.next() { - self.set_page_reader(page_reader?)?; - false - } else { - return Ok(0); - } - } + None => return Ok(0), }; let (buffered_records, buffered_values) = @@ -292,6 +279,11 @@ where .map(|levels| levels.split_bitmask(self.num_values)) } + /// Returns column reader. + pub(crate) fn column_reader(&self) -> Option<&ColumnReader> { + self.column_reader.as_ref() + } + /// Try to read one batch of data. fn read_one_batch(&mut self, batch_size: usize) -> Result { let rep_levels = self