Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support peek_next_page and skip_next_page in InMemoryPageReader #2407

Merged
merged 2 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions parquet/src/arrow/record_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,4 +786,186 @@ mod tests {
assert_eq!(record_reader.num_records(), 8);
assert_eq!(record_reader.num_values(), 14);
}

#[test]
fn test_skip_required_records() {
// Construct column schema
let message_type = "
message test_schema {
REQUIRED INT32 leaf;
}
";
let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();

// Construct record reader
let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());

// First page

// Records data:
// test_schema
// leaf: 4
// test_schema
// leaf: 7
// test_schema
// leaf: 6
// test_schema
// left: 3
// test_schema
// left: 2
{
let values = [4, 7, 6, 3, 2];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(2).unwrap());
assert_eq!(0, record_reader.num_records());
assert_eq!(0, record_reader.num_values());
assert_eq!(3, record_reader.read_records(3).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
}

// Second page

// Records data:
// test_schema
// leaf: 8
// test_schema
// leaf: 9
{
let values = [8, 9];
let mut pb = DataPageBuilderImpl::new(desc, 2, true);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(10).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
assert_eq!(0, record_reader.read_records(10).unwrap());
}

let mut bb = Int32BufferBuilder::new(3);
bb.append_slice(&[6, 3, 2]);
let expected_buffer = bb.finish();
assert_eq!(expected_buffer, record_reader.consume_record_data());
assert_eq!(None, record_reader.consume_def_levels());
assert_eq!(None, record_reader.consume_bitmap());
}

#[test]
fn test_skip_optional_records() {
// Construct column schema
let message_type = "
message test_schema {
OPTIONAL Group test_struct {
OPTIONAL INT32 leaf;
}
}
";

let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();

// Construct record reader
let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());

// First page

// Records data:
// test_schema
// test_struct
// test_schema
// test_struct
// leaf: 7
// test_schema
// test_schema
// test_struct
// leaf: 6
// test_schema
// test_struct
// leaf: 6
{
let values = [7, 6, 3];
//empty, non-empty, empty, non-empty, non-empty
let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(2).unwrap());
assert_eq!(0, record_reader.num_records());
assert_eq!(0, record_reader.num_values());
assert_eq!(3, record_reader.read_records(3).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
}

// Second page

// Records data:
// test_schema
// test_schema
// test_struct
// left: 8
{
let values = [8];
//empty, non-empty
let def_levels = [0i16, 2i16];
let mut pb = DataPageBuilderImpl::new(desc, 2, true);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(10).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
assert_eq!(0, record_reader.read_records(10).unwrap());
}

// Verify result def levels
let mut bb = Int16BufferBuilder::new(7);
bb.append_slice(&[0i16, 2i16, 2i16]);
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels()
);

// Verify bitmap
let expected_valid = &[false, true, true];
let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
let expected_bitmap = Bitmap::from(expected_buffer);
assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());

// Verify result record data
let actual = record_reader.consume_record_data();
let actual_values = actual.typed_data::<i32>();

let expected = &[0, 6, 3];
assert_eq!(actual_values.len(), expected.len());

// Only validate valid values are equal
let iter = expected_valid.iter().zip(actual_values).zip(expected);
for ((valid, actual), expected) in iter {
if *valid {
assert_eq!(actual, expected)
}
}
}
}
30 changes: 24 additions & 6 deletions parquet/src/util/test_common/page_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::encodings::levels::LevelEncoder;
use crate::errors::Result;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
use std::iter::Peekable;
use std::mem;

pub trait DataPageBuilder {
Expand Down Expand Up @@ -127,8 +128,8 @@ impl DataPageBuilder for DataPageBuilderImpl {
encoding: self.encoding.unwrap(),
num_nulls: 0, /* set to dummy value - don't need this when reading
* data page */
num_rows: self.num_values, /* also don't need this when reading
* data page */
num_rows: self.num_values, /* num_rows only needs in skip_records, now we not support skip REPEATED field,
* so we can assume num_values == num_rows */
def_levels_byte_len: self.def_levels_byte_len,
rep_levels_byte_len: self.rep_levels_byte_len,
is_compressed: false,
Expand All @@ -149,13 +150,13 @@ impl DataPageBuilder for DataPageBuilderImpl {

/// A utility page reader which stores pages in memory.
pub struct InMemoryPageReader<P: Iterator<Item = Page>> {
page_iter: P,
page_iter: Peekable<P>,
}

impl<P: Iterator<Item = Page>> InMemoryPageReader<P> {
pub fn new(pages: impl IntoIterator<Item = Page, IntoIter = P>) -> Self {
Self {
page_iter: pages.into_iter(),
page_iter: pages.into_iter().peekable(),
}
}
}
Expand All @@ -166,11 +167,28 @@ impl<P: Iterator<Item = Page> + Send> PageReader for InMemoryPageReader<P> {
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
unimplemented!()
if let Some(x) = self.page_iter.peek() {
match x {
Page::DataPage { .. } => {
unreachable!()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this unreachable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found all test pages read by InMemoryPageReader are using DataPagev2.
and DataPagev1 not have the 'num_rows' info.

Copy link
Member Author

@Ted-Jiang Ted-Jiang Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I my mind , i think most application(like spark) still use v1 page(which not supported DeltaBitPack). Is iox using datapagev2?

Copy link
Contributor

@tustvold tustvold Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, most applications have yet to update to parquet v2 despite it being released almost a decade ago now 😆

IOx isn't currently, but may in future. DeltaBitPack is very good for storing sorted timestamps as are common for our workload. Although their are some questionable decisions in the DeltaBitPack encoding scheme that make it more expensive than it really should be... Perhaps that is why most applications haven't bothered to upgrade 😅

That was why I was surprised to see this isn't being exercised 😅

I can't remember if column index support requires v2 🤔

DataPagev1 not have the 'num_rows' info.

Just use num_values (which also includes nulls) it is a best effort number that is only relied upon if there aren't any levels (in which case it is the row count)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool ! thanks for your info.

Copy link
Member Author

@Ted-Jiang Ted-Jiang Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember if column index support requires v2 🤔

supported both v1 and v2

index info stored in ColumnChunk

https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L783-L808

}
Page::DataPageV2 { num_rows, .. } => Ok(Some(PageMetadata {
num_rows: *num_rows as usize,
is_dict: false,
})),
Page::DictionaryPage { .. } => Ok(Some(PageMetadata {
num_rows: 0,
is_dict: true,
})),
}
} else {
Ok(None)
}
}

fn skip_next_page(&mut self) -> Result<()> {
unimplemented!()
self.page_iter.next();
Ok(())
}
}

Expand Down