Skip to content

Commit

Permalink
Cleanup record skipping logic and tests (apache#2158)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jul 27, 2022
1 parent e096ec7 commit 2423211
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 178 deletions.
5 changes: 2 additions & 3 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
Expand Down Expand Up @@ -120,8 +120,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
5 changes: 2 additions & 3 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};

use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::array_reader::{read_records, ArrayReader, skip_records};
use crate::arrow::buffer::{
dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
};
Expand Down Expand Up @@ -181,8 +181,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
49 changes: 28 additions & 21 deletions parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl RowGroupCollection for Arc<dyn FileReader> {

/// Uses `record_reader` to read up to `batch_size` records from `pages`
///
/// Returns the number of records read, which can be less than batch_size if
/// Returns the number of records read, which can be less than `batch_size` if
/// pages is exhausted.
fn read_records<V, CV>(
record_reader: &mut GenericRecordReader<V, CV>,
Expand Down Expand Up @@ -145,29 +145,36 @@ where
Ok(records_read)
}

/// Uses `pages` to set up to `record_reader` 's `column_reader`
/// Uses `record_reader` to skip up to `batch_size` records from`pages`
///
/// If we skip records before all read operation,
/// need set `column_reader` by `set_page_reader`
/// for constructing `def_level_decoder` and `rep_level_decoder`.
fn set_column_reader<V, CV>(
/// Returns the number of records skipped, which can be less than `batch_size` if
/// pages is exhausted
fn skip_records<V, CV>(
record_reader: &mut GenericRecordReader<V, CV>,
pages: &mut dyn PageIterator,
) -> Result<bool>
where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
batch_size: usize,
) -> Result<usize>
where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
return if record_reader.column_reader().is_none() {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = pages.next() {
record_reader.set_page_reader(page_reader?)?;
Ok(true)
} else {
Ok(false)
let mut records_skipped = 0usize;
while records_skipped < batch_size {
let records_to_read = batch_size - records_skipped;

let records_skipped_once = record_reader.skip_records(records_to_read)?;
records_skipped += records_skipped_once;

// Record reader exhausted
if records_skipped_once < records_to_read {
if let Some(page_reader) = pages.next() {
// Read from new page reader (i.e. column chunk)
record_reader.set_page_reader(page_reader?)?;
} else {
// Page reader also exhausted
break;
}
}
} else {
Ok(true)
};
}
Ok(records_skipped)
}
5 changes: 2 additions & 3 deletions parquet/src/arrow/array_reader/null_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::array_reader::{read_records, ArrayReader, skip_records};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::RecordReader;
use crate::column::page::PageIterator;
Expand Down Expand Up @@ -97,8 +97,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
5 changes: 2 additions & 3 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::{read_records, set_column_reader, ArrayReader};
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
Expand Down Expand Up @@ -222,8 +222,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
Loading

0 comments on commit 2423211

Please sign in to comment.