From dfb8c769606efd4fd8731706b287993479b339ca Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 14 Mar 2023 08:43:57 +0000 Subject: [PATCH] Add offset pushdown to parquet (#3848) --- parquet/src/arrow/arrow_reader/mod.rs | 92 ++++++++++++--- parquet/src/arrow/arrow_reader/selection.rs | 124 +++++++++++++++++--- parquet/src/arrow/async_reader/mod.rs | 114 ++++++++++++++---- 3 files changed, 280 insertions(+), 50 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index c4b645da7ce5..6c8d08de251d 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -71,6 +71,8 @@ pub struct ArrowReaderBuilder { pub(crate) selection: Option, pub(crate) limit: Option, + + pub(crate) offset: Option, } impl ArrowReaderBuilder { @@ -101,6 +103,7 @@ impl ArrowReaderBuilder { filter: None, selection: None, limit: None, + offset: None, }) } @@ -181,6 +184,17 @@ impl ArrowReaderBuilder { ..self } } + + /// Provide an offset to skip over the given number of rows + /// + /// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`] + /// allowing it to skip rows after any pushed down predicates + pub fn with_offset(self, offset: usize) -> Self { + Self { + offset: Some(offset), + ..self + } + } } /// Arrow reader api. @@ -467,23 +481,10 @@ impl ArrowReaderBuilder> { selection = Some(RowSelection::from(vec![])); } - // If a limit is defined, apply it to the final `RowSelection` - if let Some(limit) = self.limit { - selection = Some( - selection - .map(|selection| selection.limit(limit)) - .unwrap_or_else(|| { - RowSelection::from(vec![RowSelector::select( - limit.min(reader.num_rows()), - )]) - }), - ); - } - Ok(ParquetRecordBatchReader::new( batch_size, array_reader, - selection, + apply_range(selection, reader.num_rows(), self.offset, self.limit), )) } } @@ -620,6 +621,41 @@ pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool { selection.map(|x| x.selects_any()).unwrap_or(true) } +/// Applies an optional offset and limit to an optional [`RowSelection`] +pub(crate) fn apply_range( + mut selection: Option, + row_count: usize, + offset: Option, + limit: Option, +) -> Option { + // If an offset is defined, apply it to the `selection` + if let Some(offset) = offset { + selection = Some(match row_count.checked_sub(offset) { + None => RowSelection::from(vec![]), + Some(remaining) => selection + .map(|selection| selection.offset(offset)) + .unwrap_or_else(|| { + RowSelection::from(vec![ + RowSelector::skip(offset), + RowSelector::select(remaining), + ]) + }), + }); + } + + // If a limit is defined, apply it to the final `selection` + if let Some(limit) = limit { + selection = Some( + selection + .map(|selection| selection.limit(limit)) + .unwrap_or_else(|| { + RowSelection::from(vec![RowSelector::select(limit.min(row_count))]) + }), + ); + } + selection +} + /// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`] /// /// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the @@ -1244,6 +1280,8 @@ mod tests { row_filter: Option>, /// limit limit: Option, + /// offset + offset: Option, } /// Manually implement this to avoid printing entire contents of row_selections and row_filter @@ -1263,6 +1301,7 @@ mod tests { .field("row_selections", &self.row_selections.is_some()) .field("row_filter", &self.row_filter.is_some()) .field("limit", &self.limit) + .field("offset", &self.offset) .finish() } } @@ -1283,6 +1322,7 @@ mod tests { row_selections: None, row_filter: None, limit: None, + offset: None, } } } @@ -1361,6 +1401,13 @@ mod tests { } } + fn with_offset(self, offset: usize) -> Self { + Self { + offset: Some(offset), + ..self + } + } + fn writer_props(&self) -> WriterProperties { let builder = WriterProperties::builder() .set_data_pagesize_limit(self.max_data_page_size) @@ -1427,6 +1474,12 @@ mod tests { TestOptions::new(4, 100, 25).with_limit(10), // Test with limit larger than number of rows TestOptions::new(4, 100, 25).with_limit(101), + // Test with limit + offset equal to number of rows + TestOptions::new(4, 100, 25).with_offset(30).with_limit(20), + // Test with limit + offset equal to number of rows + TestOptions::new(4, 100, 25).with_offset(20).with_limit(80), + // Test with limit + offset larger than number of rows + TestOptions::new(4, 100, 25).with_offset(20).with_limit(81), // Test with no page-level statistics TestOptions::new(2, 256, 91) .with_null_percent(25) @@ -1474,6 +1527,12 @@ mod tests { .with_null_percent(25) .with_row_selections() .with_limit(10), + // Test optional with nulls + TestOptions::new(2, 256, 93) + .with_null_percent(25) + .with_row_selections() + .with_offset(20) + .with_limit(10), // Test filter // Test with row filter @@ -1673,6 +1732,11 @@ mod tests { None => expected_data, }; + if let Some(offset) = opts.offset { + builder = builder.with_offset(offset); + expected_data = expected_data.into_iter().skip(offset).collect(); + } + if let Some(limit) = opts.limit { builder = builder.with_limit(limit); expected_data = expected_data.into_iter().take(limit).collect(); diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index d2af4516dd08..d3abf968b3b2 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -19,7 +19,6 @@ use arrow_array::{Array, BooleanArray}; use arrow_select::filter::SlicesIterator; use std::cmp::Ordering; use std::collections::VecDeque; -use std::mem; use std::ops::Range; /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when @@ -236,13 +235,13 @@ impl RowSelection { let mut total_count = 0; // Find the index where the selector exceeds the row count - let find = self.selectors.iter().enumerate().find(|(_, selector)| { + let find = self.selectors.iter().position(|selector| { total_count += selector.row_count; total_count > row_count }); let split_idx = match find { - Some((idx, _)) => idx, + Some(idx) => idx, None => { let selectors = std::mem::take(&mut self.selectors); return Self { selectors }; @@ -372,29 +371,63 @@ impl RowSelection { self } + /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows + pub(crate) fn offset(mut self, offset: usize) -> Self { + if offset == 0 { + return self; + } + + let mut selected_count = 0; + let mut skipped_count = 0; + + // Find the index where the selector exceeds the row count + let find = self + .selectors + .iter() + .position(|selector| match selector.skip { + true => { + skipped_count += selector.row_count; + false + } + false => { + selected_count += selector.row_count; + selected_count > offset + } + }); + + let split_idx = match find { + Some(idx) => idx, + None => { + self.selectors.clear(); + return self; + } + }; + + let mut selectors = Vec::with_capacity(self.selectors.len() - split_idx + 1); + selectors.push(RowSelector::skip(skipped_count + offset)); + selectors.push(RowSelector::select(selected_count - offset)); + selectors.extend_from_slice(&self.selectors[split_idx + 1..]); + + Self { selectors } + } + /// Limit this [`RowSelection`] to only select `limit` rows pub(crate) fn limit(mut self, mut limit: usize) -> Self { - let mut new_selectors = Vec::with_capacity(self.selectors.len()); - for mut selection in mem::take(&mut self.selectors) { - if limit == 0 { - break; - } + if limit == 0 { + self.selectors.clear(); + } + for (idx, selection) in self.selectors.iter_mut().enumerate() { if !selection.skip { if selection.row_count >= limit { selection.row_count = limit; - new_selectors.push(selection); + self.selectors.truncate(idx + 1); break; } else { limit -= selection.row_count; - new_selectors.push(selection); } - } else { - new_selectors.push(selection); } } - - self.selectors = new_selectors; self } @@ -403,6 +436,11 @@ impl RowSelection { pub fn iter(&self) -> impl Iterator { self.selectors.iter() } + + /// Returns the number of selected rows + pub fn row_count(&self) -> usize { + self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum() + } } impl From> for RowSelection { @@ -593,6 +631,64 @@ mod tests { assert!(selection.selectors.is_empty()); } + #[test] + fn test_offset() { + let selection = RowSelection::from(vec![ + RowSelector::select(5), + RowSelector::skip(23), + RowSelector::select(7), + RowSelector::skip(33), + RowSelector::select(6), + ]); + + let selection = selection.offset(2); + assert_eq!( + selection.selectors, + vec![ + RowSelector::skip(2), + RowSelector::select(3), + RowSelector::skip(23), + RowSelector::select(7), + RowSelector::skip(33), + RowSelector::select(6), + ] + ); + + let selection = selection.offset(5); + assert_eq!( + selection.selectors, + vec![ + RowSelector::skip(30), + RowSelector::select(5), + RowSelector::skip(33), + RowSelector::select(6), + ] + ); + + let selection = selection.offset(3); + assert_eq!( + selection.selectors, + vec![ + RowSelector::skip(33), + RowSelector::select(2), + RowSelector::skip(33), + RowSelector::select(6), + ] + ); + + let selection = selection.offset(2); + assert_eq!( + selection.selectors, + vec![RowSelector::skip(68), RowSelector::select(6),] + ); + + let selection = selection.offset(3); + assert_eq!( + selection.selectors, + vec![RowSelector::skip(71), RowSelector::select(3),] + ); + } + #[test] fn test_and() { let mut a = RowSelection::from(vec![ diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 213f61818c15..99fe650695a0 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -98,8 +98,8 @@ use arrow_schema::SchemaRef; use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; use crate::arrow::arrow_reader::{ - evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions, - ParquetRecordBatchReader, RowFilter, RowSelection, RowSelector, + apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions, + ParquetRecordBatchReader, RowFilter, RowSelection, }; use crate::arrow::schema::ParquetField; use crate::arrow::ProjectionMask; @@ -347,12 +347,13 @@ impl ArrowReaderBuilder> { filter: self.filter, metadata: self.metadata.clone(), fields: self.fields, + limit: self.limit, + offset: self.offset, }; Ok(ParquetRecordBatchStream { metadata: self.metadata, batch_size, - limit: self.limit, row_groups, projection: self.projection, selection: self.selection, @@ -375,6 +376,10 @@ struct ReaderFactory { input: T, filter: Option, + + limit: Option, + + offset: Option, } impl ReaderFactory @@ -390,7 +395,6 @@ where mut selection: Option, projection: ProjectionMask, batch_size: usize, - limit: Option, ) -> ReadResult { // TODO: calling build_array multiple times is wasteful @@ -428,19 +432,37 @@ where } } - if !selects_any(selection.as_ref()) { + // Compute the number of rows in the selection before applying limit and offset + let rows_before = selection + .as_ref() + .map(|s| s.row_count()) + .unwrap_or(row_group.row_count); + + if rows_before == 0 { + return Ok((self, None)); + } + + selection = apply_range(selection, row_group.row_count, self.offset, self.limit); + + // Compute the number of rows in the selection after applying limit and offset + let rows_after = selection + .as_ref() + .map(|s| s.row_count()) + .unwrap_or(row_group.row_count); + + // Update offset if necessary + if let Some(offset) = &mut self.offset { + // Reduction is either because of offset or limit, as limit is applied + // after offset has been "exhausted" can just use saturating sub here + *offset = offset.saturating_sub(rows_before - rows_after) + } + + if rows_after == 0 { return Ok((self, None)); } - // If a limit is defined, apply it to the final `RowSelection` - if let Some(limit) = limit { - selection = Some( - selection - .map(|selection| selection.limit(limit)) - .unwrap_or_else(|| { - RowSelection::from(vec![RowSelector::select(limit)]) - }), - ); + if let Some(limit) = &mut self.limit { + *limit -= rows_after; } row_group @@ -492,8 +514,6 @@ pub struct ParquetRecordBatchStream { batch_size: usize, - limit: Option, - selection: Option, /// This is an option so it can be moved into a future @@ -535,9 +555,6 @@ where match &mut self.state { StreamState::Decoding(batch_reader) => match batch_reader.next() { Some(Ok(batch)) => { - if let Some(limit) = self.limit.as_mut() { - *limit -= batch.num_rows(); - } return Poll::Ready(Some(Ok(batch))); } Some(Err(e)) => { @@ -568,7 +585,6 @@ where selection, self.projection.clone(), self.batch_size, - self.limit, ) .boxed(); @@ -824,11 +840,14 @@ mod tests { use crate::file::page_index::index_reader; use crate::file::properties::WriterProperties; use arrow::error::Result as ArrowResult; + use arrow_array::cast::as_primitive_array; + use arrow_array::types::Int32Type; use arrow_array::{Array, ArrayRef, Int32Array, StringArray}; use futures::TryStreamExt; use rand::{thread_rng, Rng}; use std::sync::Mutex; + #[derive(Clone)] struct TestReader { data: Bytes, metadata: Arc, @@ -1320,7 +1339,7 @@ mod tests { requests: Default::default(), }; - let stream = ParquetRecordBatchStreamBuilder::new(test) + let stream = ParquetRecordBatchStreamBuilder::new(test.clone()) .await .unwrap() .with_batch_size(1024) @@ -1336,11 +1355,60 @@ mod tests { // First batch should contain all rows assert_eq!(batch.num_rows(), 3); assert_eq!(batch.num_columns(), 3); + let col2 = as_primitive_array::(batch.column(2)); + assert_eq!(col2.values(), &[0, 1, 2]); let batch = &batches[1]; // Second batch should trigger the limit and only have one row assert_eq!(batch.num_rows(), 1); assert_eq!(batch.num_columns(), 3); + let col2 = as_primitive_array::(batch.column(2)); + assert_eq!(col2.values(), &[3]); + + let stream = ParquetRecordBatchStreamBuilder::new(test.clone()) + .await + .unwrap() + .with_offset(2) + .with_limit(3) + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + // Expect one batch for each row group + assert_eq!(batches.len(), 2); + + let batch = &batches[0]; + // First batch should contain one row + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 3); + let col2 = as_primitive_array::(batch.column(2)); + assert_eq!(col2.values(), &[2]); + + let batch = &batches[1]; + // Second batch should contain two rows + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 3); + let col2 = as_primitive_array::(batch.column(2)); + assert_eq!(col2.values(), &[3, 4]); + + let stream = ParquetRecordBatchStreamBuilder::new(test.clone()) + .await + .unwrap() + .with_offset(4) + .with_limit(20) + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + // Should skip first row group + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + // First batch should contain two rows + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 3); + let col2 = as_primitive_array::(batch.column(2)); + assert_eq!(col2.values(), &[4, 5]); } #[tokio::test] @@ -1440,6 +1508,8 @@ mod tests { fields, input: async_reader, filter: None, + limit: None, + offset: None, }; let mut skip = true; @@ -1469,7 +1539,7 @@ mod tests { let selection = RowSelection::from(selectors); let (_factory, _reader) = reader_factory - .read_row_group(0, Some(selection), projection.clone(), 48, None) + .read_row_group(0, Some(selection), projection.clone(), 48) .await .expect("reading row group");