From 2185ce22043953b3185001a04b46f50fbd6956d7 Mon Sep 17 00:00:00 2001 From: Dan Harris <1327726+thinkharderdev@users.noreply.github.com> Date: Wed, 17 Aug 2022 09:38:33 -0400 Subject: [PATCH] Use OffsetIndex to prune IO with RowSelection (#2473) * Add struct for in-memory row group with only selected pages * Read only pages required for row selection * Remove InMemoryColumnChumk and prune IO for row selection * Review comments * Unignore test * Avoid copies * Fix docs * Linting --- parquet/src/arrow/arrow_reader/selection.rs | 147 +++++- parquet/src/arrow/async_reader.rs | 530 +++++++++----------- parquet/src/column/page.rs | 1 + parquet/src/file/page_index/index_reader.rs | 2 +- 4 files changed, 384 insertions(+), 296 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 8e129f5667ec..72867e8916de 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -17,12 +17,14 @@ use arrow::array::{Array, BooleanArray}; use arrow::compute::SlicesIterator; +use parquet_format::PageLocation; use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -/// [`RowSelector`] represents a range of rows to scan from a parquet file -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when +/// scanning a parquet file +#[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct RowSelector { /// The number of rows pub row_count: usize, @@ -116,6 +118,57 @@ impl RowSelection { Self { selectors } } + /// Given an offset index, return the offset ranges for all data pages selected by `self` + pub(crate) fn scan_ranges( + &self, + page_locations: &[PageLocation], + ) -> Vec> { + let mut ranges = vec![]; + let mut row_offset = 0; + + let mut pages = page_locations.iter().peekable(); + let mut selectors = self.selectors.iter().cloned(); + let mut current_selector = selectors.next(); + let mut current_page = pages.next(); + + let mut current_page_included = false; + + while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { + if !(selector.skip || current_page_included) { + let start = page.offset as usize; + let end = start + page.compressed_page_size as usize; + ranges.push(start..end); + current_page_included = true; + } + + if let Some(next_page) = pages.peek() { + if row_offset + selector.row_count > next_page.first_row_index as usize { + let remaining_in_page = + next_page.first_row_index as usize - row_offset; + selector.row_count -= remaining_in_page; + row_offset += remaining_in_page; + current_page = pages.next(); + current_page_included = false; + + continue; + } else { + if row_offset + selector.row_count + == next_page.first_row_index as usize + { + current_page = pages.next(); + current_page_included = false; + } + row_offset += selector.row_count; + current_selector = selectors.next(); + } + } else { + break; + } + } + + ranges + } + /// Splits off the first `row_count` from this [`RowSelection`] pub fn split_off(&mut self, row_count: usize) -> Self { let mut total_count = 0; @@ -162,7 +215,7 @@ impl RowSelection { /// self: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY /// other: YYYYYNNNNYYYYYYYYYYYYY YYNNN /// - /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN + /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN /// /// pub fn and_then(&self, other: &Self) -> Self { @@ -423,4 +476,92 @@ mod tests { assert_eq!(a.and_then(&b), expected); } } + + #[test] + fn test_scan_ranges() { + let index = vec![ + PageLocation { + offset: 0, + compressed_page_size: 10, + first_row_index: 0, + }, + PageLocation { + offset: 10, + compressed_page_size: 10, + first_row_index: 10, + }, + PageLocation { + offset: 20, + compressed_page_size: 10, + first_row_index: 20, + }, + PageLocation { + offset: 30, + compressed_page_size: 10, + first_row_index: 30, + }, + PageLocation { + offset: 40, + compressed_page_size: 10, + first_row_index: 40, + }, + PageLocation { + offset: 50, + compressed_page_size: 10, + first_row_index: 50, + }, + PageLocation { + offset: 60, + compressed_page_size: 10, + first_row_index: 60, + }, + ]; + + let selection = RowSelection::from(vec![ + // Skip first page + RowSelector::skip(10), + // Multiple selects in same page + RowSelector::select(3), + RowSelector::skip(3), + RowSelector::select(4), + // Select to page boundary + RowSelector::skip(5), + RowSelector::select(5), + // Skip full page past page boundary + RowSelector::skip(12), + // Select across page boundaries + RowSelector::select(12), + // Skip final page + RowSelector::skip(12), + ]); + + let ranges = selection.scan_ranges(&index); + + // assert_eq!(mask, vec![false, true, true, false, true, true, false]); + assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]); + + let selection = RowSelection::from(vec![ + // Skip first page + RowSelector::skip(10), + // Multiple selects in same page + RowSelector::select(3), + RowSelector::skip(3), + RowSelector::select(4), + // Select to page boundary + RowSelector::skip(5), + RowSelector::select(5), + // Skip full page past page boundary + RowSelector::skip(12), + // Select across page boundaries + RowSelector::select(12), + RowSelector::skip(1), + // Select across page boundaries including final page + RowSelector::select(8), + ]); + + let ranges = selection.scan_ranges(&index); + + // assert_eq!(mask, vec![false, true, true, false, true, true, true]); + assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]); + } } diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 6c449bef49ee..090b9514dc2d 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -78,17 +78,17 @@ use std::collections::VecDeque; use std::fmt::Formatter; -use std::io::{Cursor, SeekFrom}; +use std::io::SeekFrom; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use futures::stream::Stream; -use parquet_format::{PageHeader, PageType}; + use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow::datatypes::SchemaRef; @@ -100,14 +100,16 @@ use crate::arrow::arrow_reader::{ RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; -use crate::basic::Compression; -use crate::column::page::{Page, PageIterator, PageMetadata, PageReader}; -use crate::compression::{create_codec, Codec}; + +use crate::column::page::{PageIterator, PageReader}; + use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; -use crate::file::serialized_reader::{decode_page, read_page_header}; +use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; + use crate::file::FOOTER_SIZE; + use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files @@ -286,7 +288,8 @@ where let meta = self.metadata.row_group(row_group_idx); let mut row_group = InMemoryRowGroup { - schema: meta.schema_descr_ptr(), + metadata: meta, + // schema: meta.schema_descr_ptr(), row_count: meta.num_rows() as usize, column_chunks: vec![None; meta.columns().len()], }; @@ -299,12 +302,7 @@ where let predicate_projection = predicate.projection().clone(); row_group - .fetch( - &mut self.input, - meta, - &predicate_projection, - selection.as_ref(), - ) + .fetch(&mut self.input, &predicate_projection, selection.as_ref()) .await?; let array_reader = build_array_reader( @@ -327,7 +325,7 @@ where } row_group - .fetch(&mut self.input, meta, &projection, selection.as_ref()) + .fetch(&mut self.input, &projection, selection.as_ref()) .await?; let reader = ParquetRecordBatchReader::new( @@ -471,62 +469,101 @@ where } /// An in-memory collection of column chunks -struct InMemoryRowGroup { - schema: SchemaDescPtr, - column_chunks: Vec>, +struct InMemoryRowGroup<'a> { + metadata: &'a RowGroupMetaData, + column_chunks: Vec>>, row_count: usize, } -impl InMemoryRowGroup { +impl<'a> InMemoryRowGroup<'a> { /// Fetches the necessary column data into memory async fn fetch( &mut self, input: &mut T, - metadata: &RowGroupMetaData, projection: &ProjectionMask, - _selection: Option<&RowSelection>, + selection: Option<&RowSelection>, ) -> Result<()> { - // TODO: Use OffsetIndex and selection to prune pages - - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .into_iter() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + if let Some((selection, page_locations)) = + selection.zip(self.metadata.page_offset_index().as_ref()) + { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut page_start_offsets: Vec> = vec![]; + + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let ranges = selection.scan_ranges(&page_locations[idx]); + page_start_offsets + .push(ranges.iter().map(|range| range.start).collect()); + ranges + }) }) - }) - .collect(); + .flatten() + .collect(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut page_start_offsets = page_start_offsets.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(offsets) = page_start_offsets.next() { + let mut chunks = Vec::with_capacity(offsets.len()); + for _ in 0..offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(Arc::new(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: offsets.into_iter().zip(chunks.into_iter()).collect(), + })) + } } + } else { + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); - let column = metadata.column(idx); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(data) = chunk_data.next() { + *chunk = Some(Arc::new(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data, + })); + } } } + Ok(()) } } -impl RowGroupCollection for InMemoryRowGroup { +impl<'a> RowGroupCollection for InMemoryRowGroup<'a> { fn schema(&self) -> SchemaDescPtr { - self.schema.clone() + self.metadata.schema_descr_ptr() } fn num_rows(&self) -> usize { @@ -534,158 +571,79 @@ impl RowGroupCollection for InMemoryRowGroup { } fn column_chunks(&self, i: usize) -> Result> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); - - Ok(Box::new(ColumnChunkIterator { - schema: self.schema.clone(), - column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), - })) + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {}, column was not fetched", + i + ))), + Some(data) => { + let page_locations = self + .metadata + .page_offset_index() + .as_ref() + .map(|index| index[i].clone()); + let page_reader: Box = + Box::new(SerializedPageReader::new( + data.clone(), + self.metadata.column(i), + self.row_count, + page_locations, + )?); + + Ok(Box::new(ColumnChunkIterator { + schema: self.metadata.schema_descr_ptr(), + column_schema: self.metadata.schema_descr_ptr().columns()[i].clone(), + reader: Some(Ok(page_reader)), + })) + } + } } } -/// Data for a single column chunk +/// An in-memory column chunk #[derive(Clone)] -struct InMemoryColumnChunk { - num_values: i64, - compression: Compression, - physical_type: crate::basic::Type, - data: Bytes, -} - -impl InMemoryColumnChunk { - fn pages(&self) -> Result> { - let page_reader = InMemoryColumnChunkReader::new(self.clone())?; - Ok(Box::new(page_reader)) - } -} - -// A serialized implementation for Parquet [`PageReader`]. -struct InMemoryColumnChunkReader { - chunk: InMemoryColumnChunk, - decompressor: Option>, - offset: usize, - seen_num_values: i64, - // If the next page header has already been "peeked", we will cache it here - next_page_header: Option, -} - -impl InMemoryColumnChunkReader { - /// Creates a new serialized page reader from file source. - fn new(chunk: InMemoryColumnChunk) -> Result { - let decompressor = create_codec(chunk.compression)?; - let result = Self { - chunk, - decompressor, - offset: 0, - seen_num_values: 0, - next_page_header: None, - }; - Ok(result) - } -} - -impl Iterator for InMemoryColumnChunkReader { - type Item = Result; - - fn next(&mut self) -> Option { - self.get_next_page().transpose() - } +enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + /// Length of the full column chunk + length: usize, + /// Set of data pages included in this sparse chunk. Each element is a tuple + /// of (page offset, page data) + data: Vec<(usize, Bytes)>, + }, + /// Full column chunk and its offset + Dense { offset: usize, data: Bytes }, } -impl PageReader for InMemoryColumnChunkReader { - fn get_next_page(&mut self) -> Result> { - while self.seen_num_values < self.chunk.num_values { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = if let Some(page_header) = self.next_page_header.take() { - // The next page header has already been peeked, so use the cached value - page_header - } else { - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - page_header - }; - - let compressed_size = page_header.compressed_page_size as usize; - - let start_offset = self.offset; - let end_offset = self.offset + compressed_size; - self.offset = end_offset; - - let buffer = self.chunk.data.slice(start_offset..end_offset); - - let result = match page_header.type_ { - PageType::DataPage | PageType::DataPageV2 => { - let decoded = decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?; - self.seen_num_values += decoded.num_values() as i64; - decoded - } - PageType::DictionaryPage => decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?, - _ => { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - } - }; - - return Ok(Some(result)); +impl Length for ColumnChunkData { + fn len(&self) -> u64 { + match &self { + ColumnChunkData::Sparse { length, .. } => *length as u64, + ColumnChunkData::Dense { data, .. } => data.len() as u64, } - - // We are at the end of this column chunk and no more page left. Return None. - Ok(None) - } - - fn peek_next_page(&mut self) -> Result> { - while self.seen_num_values < self.chunk.num_values { - return if let Some(buffered_header) = self.next_page_header.as_ref() { - if let Ok(page_metadata) = buffered_header.try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - self.next_page_header = None; - continue; - } - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - - let page_metadata = if let Ok(page_metadata) = (&page_header).try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - }; - - self.next_page_header = Some(page_header); - page_metadata - }; - } - - Ok(None) } +} - fn skip_next_page(&mut self) -> Result<()> { - if let Some(buffered_header) = self.next_page_header.take() { - // The next page header has already been peeked, so just advance the offset - self.offset += buffered_header.compressed_page_size as usize; - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - self.offset += page_header.compressed_page_size as usize; +impl ChunkReader for ColumnChunkData { + type T = bytes::buf::Reader; + + fn get_read(&self, start: u64, length: usize) -> Result { + match &self { + ColumnChunkData::Sparse { data, .. } => data + .binary_search_by_key(&start, |(offset, _)| *offset as u64) + .map(|idx| data[idx].1.slice(0..length).reader()) + .map_err(|_| { + ParquetError::General(format!( + "Invalid offset in sparse column chunk data: {}", + start + )) + }), + ColumnChunkData::Dense { offset, data } => { + let start = start as usize - *offset; + let end = start + length; + Ok(data.slice(start..end).reader()) + } } - - Ok(()) } } @@ -717,11 +675,15 @@ impl PageIterator for ColumnChunkIterator { #[cfg(test)] mod tests { use super::*; - use crate::arrow::arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder}; - use crate::arrow::ArrowWriter; + use crate::arrow::arrow_reader::{ + ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector, + }; + use crate::arrow::{parquet_to_arrow_schema, ArrowWriter}; use crate::file::footer::parse_metadata; + use crate::file::page_index::index_reader; use arrow::array::{Array, ArrayRef, Int32Array, StringArray}; use arrow::error::Result as ArrowResult; + use futures::TryStreamExt; use std::sync::Mutex; @@ -797,105 +759,6 @@ mod tests { ); } - #[tokio::test] - async fn test_in_memory_column_chunk_reader() { - let testdata = arrow::util::test_util::parquet_test_data(); - let path = format!("{}/alltypes_plain.parquet", testdata); - let data = Bytes::from(std::fs::read(path).unwrap()); - - let metadata = crate::file::footer::parse_metadata(&data).unwrap(); - - let column_metadata = metadata.row_group(0).column(0); - - let (start, length) = column_metadata.byte_range(); - - let column_data = data.slice(start as usize..(start + length) as usize); - - let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk { - num_values: column_metadata.num_values(), - compression: column_metadata.compression(), - physical_type: column_metadata.column_type(), - data: column_data, - }) - .expect("building reader"); - - let first_page = reader - .peek_next_page() - .expect("peeking first page") - .expect("first page is empty"); - - assert!(first_page.is_dict); - assert_eq!(first_page.num_rows, 0); - - let first_page = reader - .get_next_page() - .expect("getting first page") - .expect("first page is empty"); - - assert_eq!( - first_page.page_type(), - crate::basic::PageType::DICTIONARY_PAGE - ); - assert_eq!(first_page.num_values(), 8); - - let second_page = reader - .peek_next_page() - .expect("peeking second page") - .expect("second page is empty"); - - assert!(!second_page.is_dict); - assert_eq!(second_page.num_rows, 8); - - let second_page = reader - .get_next_page() - .expect("getting second page") - .expect("second page is empty"); - - assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE); - assert_eq!(second_page.num_values(), 8); - - let third_page = reader.peek_next_page().expect("getting third page"); - - assert!(third_page.is_none()); - - let third_page = reader.get_next_page().expect("getting third page"); - - assert!(third_page.is_none()); - } - - #[tokio::test] - async fn test_in_memory_column_chunk_reader_skip_page() { - let testdata = arrow::util::test_util::parquet_test_data(); - let path = format!("{}/alltypes_plain.parquet", testdata); - let data = Bytes::from(std::fs::read(path).unwrap()); - - let metadata = crate::file::footer::parse_metadata(&data).unwrap(); - - let column_metadata = metadata.row_group(0).column(0); - - let (start, length) = column_metadata.byte_range(); - - let column_data = data.slice(start as usize..(start + length) as usize); - - let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk { - num_values: column_metadata.num_values(), - compression: column_metadata.compression(), - physical_type: column_metadata.column_type(), - data: column_data, - }) - .expect("building reader"); - - reader.skip_next_page().expect("skipping first page"); - - let second_page = reader - .get_next_page() - .expect("getting second page") - .expect("second page is empty"); - - assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE); - assert_eq!(second_page.num_values(), 8); - } - #[tokio::test] async fn test_row_filter() { let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); @@ -964,4 +827,87 @@ mod tests { // Should only have made 3 requests assert_eq!(requests.lock().unwrap().len(), 3); } + + #[tokio::test] + async fn test_in_memory_row_group_sparse() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_tiny_pages.parquet", testdata); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = parse_metadata(&data).unwrap(); + + let offset_index = + index_reader::read_pages_locations(&data, metadata.row_group(0).columns()) + .expect("reading offset index"); + + let mut row_group_meta = metadata.row_group(0).clone(); + row_group_meta.set_page_offset(offset_index.clone()); + let metadata = + ParquetMetaData::new(metadata.file_metadata().clone(), vec![row_group_meta]); + + let metadata = Arc::new(metadata); + + let num_rows = metadata.row_group(0).num_rows(); + + assert_eq!(metadata.num_row_groups(), 1); + + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), + requests: Default::default(), + }; + + let requests = async_reader.requests.clone(); + let schema = Arc::new( + parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) + .expect("building arrow schema"), + ); + + let _schema_desc = metadata.file_metadata().schema_descr(); + + let projection = + ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]); + + let reader_factory = ReaderFactory { + metadata, + schema, + input: async_reader, + filter: None, + }; + + let mut skip = true; + let mut pages = offset_index[0].iter().peekable(); + + // Setup `RowSelection` so that we can skip every other page + let mut selectors = vec![]; + let mut expected_page_requests: Vec> = vec![]; + while let Some(page) = pages.next() { + let num_rows = if let Some(next_page) = pages.peek() { + next_page.first_row_index - page.first_row_index + } else { + num_rows - page.first_row_index + }; + + if skip { + selectors.push(RowSelector::skip(num_rows as usize)); + } else { + selectors.push(RowSelector::select(num_rows as usize)); + let start = page.offset as usize; + let end = start + page.compressed_page_size as usize; + expected_page_requests.push(start..end); + } + skip = !skip; + } + + let selection = RowSelection::from(selectors); + + let (_factory, _reader) = reader_factory + .read_row_group(0, Some(selection), projection, 48) + .await + .expect("reading row group"); + + let requests = requests.lock().unwrap(); + + assert_eq!(&requests[..], &expected_page_requests) + } } diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 1658797cee7d..ab2d885a23f7 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -195,6 +195,7 @@ impl PageWriteSpec { } /// Contains metadata for a page +#[derive(Clone)] pub struct PageMetadata { /// The number of rows in this page pub num_rows: usize, diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index 33499e7426a5..e3f37fbc6613 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -65,7 +65,7 @@ pub fn read_pages_locations( let (offset, total_length) = get_location_offset_and_total_length(chunks)?; //read all need data into buffer - let mut reader = reader.get_read(offset, reader.len() as usize)?; + let mut reader = reader.get_read(offset, total_length)?; let mut data = vec![0; total_length]; reader.read_exact(&mut data)?;