Skip to content

Commit

Permalink
Fix remaining page skipping bug and add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkharderdev committed Sep 3, 2022
1 parent 76837dc commit b2d6da2
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 1 deletion.
124 changes: 124 additions & 0 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ mod tests {
use arrow::array::{Array, ArrayRef, Int32Array, StringArray};
use arrow::error::Result as ArrowResult;
use futures::TryStreamExt;
use rand::{thread_rng, Rng};
use std::sync::Mutex;

struct TestReader {
Expand Down Expand Up @@ -936,6 +937,129 @@ mod tests {
assert_eq!(async_batches, sync_batches);
}

#[tokio::test]
async fn test_async_reader_skip_pages() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();

let selection = RowSelection::from(vec![
RowSelector::skip(21), // Skip first page
RowSelector::select(21), // Select page to boundary
RowSelector::skip(41), // Skip multiple pages
RowSelector::select(41), // Select multiple pages
RowSelector::skip(25), // Skip page across boundary
RowSelector::select(25), // Select across page boundary
RowSelector::skip(7116), // Skip to final page boundary
RowSelector::select(10), // Select final page
]);

let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);

let stream = builder
.with_projection(mask.clone())
.with_row_selection(selection.clone())
.build()
.expect("building stream");

let async_batches: Vec<_> = stream.try_collect().await.unwrap();

let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(1024)
.with_row_selection(selection)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();

assert_eq!(async_batches, sync_batches);
}

#[tokio::test]
async fn test_fuzz_async_reader_selection() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let mut rand = thread_rng();

for _ in 0..100 {
let mut expected_rows = 0;
let mut total_rows = 0;
let mut skip = false;
let mut selectors = vec![];

while total_rows < 7300 {
let row_count: usize = rand.gen_range(0..100);

let row_count = row_count.min(7300 - total_rows);

selectors.push(RowSelector { row_count, skip });

total_rows += row_count;
if !skip {
expected_rows += row_count;
}

skip = !skip;
}

let selection = RowSelection::from(selectors);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();

let col_idx: usize = rand.gen_range(0..13);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);

let stream = builder
.with_projection(mask.clone())
.with_row_selection(selection.clone())
.build()
.expect("building stream");

let async_batches: Vec<_> = stream.try_collect().await.unwrap();

let actual_rows: usize =
async_batches.into_iter().map(|b| b.num_rows()).sum();

assert_eq!(actual_rows, expected_rows);
}
}

#[tokio::test]
async fn test_row_filter() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ where
}

/// Read the next page as a dictionary page. If the next page is not a dictionary page,
/// this will return an error.
/// this will return an error.
fn read_dictionary_page(&mut self) -> Result<()> {
match self.page_reader.get_next_page()? {
Some(Page::DictionaryPage {
Expand Down

0 comments on commit b2d6da2

Please sign in to comment.