Skip to content

Commit

Permalink
fix comment.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Jul 26, 2022
1 parent 25cb93d commit 881752c
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 27 deletions.
11 changes: 10 additions & 1 deletion parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,16 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records, self.pages.as_mut())
if self.record_reader.column_reader().is_none() {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = self.pages.next() {
self.record_reader.set_page_reader(page_reader?)?;
} else {
return Ok(0);
}
}
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
11 changes: 10 additions & 1 deletion parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,16 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records, self.pages.as_mut())
if self.record_reader.column_reader().is_none() {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = self.pages.next() {
self.record_reader.set_page_reader(page_reader?)?;
} else {
return Ok(0);
}
}
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
11 changes: 10 additions & 1 deletion parquet/src/arrow/array_reader/null_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,16 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records, self.pages.as_mut())
if self.record_reader.column_reader().is_none() {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = self.pages.next() {
self.record_reader.set_page_reader(page_reader?)?;
} else {
return Ok(0);
}
}
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
11 changes: 10 additions & 1 deletion parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,16 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records, self.pages.as_mut())
if self.record_reader.column_reader().is_none() {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = self.pages.next() {
self.record_reader.set_page_reader(page_reader?)?;
} else {
return Ok(0);
}
}
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down
7 changes: 3 additions & 4 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,13 @@ impl Iterator for ParquetRecordBatchReader {

// try to read record
let to_read = match front.row_count.checked_sub(self.batch_size) {
Some(remaining) => {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
if remaining != 0 {
selection.push_front(RowSelection::select(remaining));
}
selection.push_front(RowSelection::select(remaining));
self.batch_size
}
Some(_) => self.batch_size,
None => front.row_count,
};

Expand Down
30 changes: 11 additions & 19 deletions parquet/src/arrow/record_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::arrow::record_reader::{
buffer::{BufferQueue, ScalarBuffer, ValuesBuffer},
definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
};
use crate::column::page::PageIterator;
use crate::column::{
page::PageReader,
reader::{
Expand All @@ -46,6 +45,9 @@ pub(crate) const MIN_BATCH_SIZE: usize = 1024;
pub type RecordReader<T> =
GenericRecordReader<ScalarBuffer<<T as DataType>::T>, ColumnValueDecoderImpl<T>>;

pub(crate) type ColumnReader<CV> =
GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>;

/// A generic stateful column reader that delimits semantic records
///
/// This type is hidden from the docs, and relies on private traits with no
Expand All @@ -57,9 +59,7 @@ pub struct GenericRecordReader<V, CV> {
records: V,
def_levels: Option<DefinitionLevelBuffer>,
rep_levels: Option<ScalarBuffer<i16>>,
column_reader: Option<
GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>,
>,
column_reader: Option<ColumnReader<CV>>,

/// Number of records accumulated in records
num_records: usize,
Expand Down Expand Up @@ -185,24 +185,11 @@ where
/// # Returns
///
/// Number of records skipped
pub fn skip_records(
&mut self,
num_records: usize,
pages: &mut dyn PageIterator,
) -> Result<usize> {
pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
// First need to clear the buffer
let end_of_column = match self.column_reader.as_mut() {
Some(reader) => !reader.has_next()?,
None => {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = pages.next() {
self.set_page_reader(page_reader?)?;
false
} else {
return Ok(0);
}
}
None => return Ok(0),
};

let (buffered_records, buffered_values) =
Expand Down Expand Up @@ -292,6 +279,11 @@ where
.map(|levels| levels.split_bitmask(self.num_values))
}

/// Returns column reader.
pub(crate) fn column_reader(&self) -> Option<&ColumnReader<CV>> {
self.column_reader.as_ref()
}

/// Try to read one batch of data.
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
let rep_levels = self
Expand Down

0 comments on commit 881752c

Please sign in to comment.