Skip to content

Commit

Permalink
Add offset pushdown to parquet (#3848)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Mar 14, 2023
1 parent c156715 commit dfb8c76
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 50 deletions.
92 changes: 78 additions & 14 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub struct ArrowReaderBuilder<T> {
pub(crate) selection: Option<RowSelection>,

pub(crate) limit: Option<usize>,

pub(crate) offset: Option<usize>,
}

impl<T> ArrowReaderBuilder<T> {
Expand Down Expand Up @@ -101,6 +103,7 @@ impl<T> ArrowReaderBuilder<T> {
filter: None,
selection: None,
limit: None,
offset: None,
})
}

Expand Down Expand Up @@ -181,6 +184,17 @@ impl<T> ArrowReaderBuilder<T> {
..self
}
}

/// Provide an offset to skip over the given number of rows
///
/// The offset will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
/// allowing it to skip rows after any pushed down predicates
pub fn with_offset(self, offset: usize) -> Self {
Self {
offset: Some(offset),
..self
}
}
}

/// Arrow reader api.
Expand Down Expand Up @@ -467,23 +481,10 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
selection = Some(RowSelection::from(vec![]));
}

// If a limit is defined, apply it to the final `RowSelection`
if let Some(limit) = self.limit {
selection = Some(
selection
.map(|selection| selection.limit(limit))
.unwrap_or_else(|| {
RowSelection::from(vec![RowSelector::select(
limit.min(reader.num_rows()),
)])
}),
);
}

Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
selection,
apply_range(selection, reader.num_rows(), self.offset, self.limit),
))
}
}
Expand Down Expand Up @@ -620,6 +621,41 @@ pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
selection.map(|x| x.selects_any()).unwrap_or(true)
}

/// Applies an optional offset and limit to an optional [`RowSelection`]
pub(crate) fn apply_range(
mut selection: Option<RowSelection>,
row_count: usize,
offset: Option<usize>,
limit: Option<usize>,
) -> Option<RowSelection> {
// If an offset is defined, apply it to the `selection`
if let Some(offset) = offset {
selection = Some(match row_count.checked_sub(offset) {
None => RowSelection::from(vec![]),
Some(remaining) => selection
.map(|selection| selection.offset(offset))
.unwrap_or_else(|| {
RowSelection::from(vec![
RowSelector::skip(offset),
RowSelector::select(remaining),
])
}),
});
}

// If a limit is defined, apply it to the final `selection`
if let Some(limit) = limit {
selection = Some(
selection
.map(|selection| selection.limit(limit))
.unwrap_or_else(|| {
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
}),
);
}
selection
}

/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`]
///
/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the
Expand Down Expand Up @@ -1244,6 +1280,8 @@ mod tests {
row_filter: Option<Vec<bool>>,
/// limit
limit: Option<usize>,
/// offset
offset: Option<usize>,
}

/// Manually implement this to avoid printing entire contents of row_selections and row_filter
Expand All @@ -1263,6 +1301,7 @@ mod tests {
.field("row_selections", &self.row_selections.is_some())
.field("row_filter", &self.row_filter.is_some())
.field("limit", &self.limit)
.field("offset", &self.offset)
.finish()
}
}
Expand All @@ -1283,6 +1322,7 @@ mod tests {
row_selections: None,
row_filter: None,
limit: None,
offset: None,
}
}
}
Expand Down Expand Up @@ -1361,6 +1401,13 @@ mod tests {
}
}

fn with_offset(self, offset: usize) -> Self {
Self {
offset: Some(offset),
..self
}
}

fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_pagesize_limit(self.max_data_page_size)
Expand Down Expand Up @@ -1427,6 +1474,12 @@ mod tests {
TestOptions::new(4, 100, 25).with_limit(10),
// Test with limit larger than number of rows
TestOptions::new(4, 100, 25).with_limit(101),
// Test with limit + offset equal to number of rows
TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
// Test with limit + offset equal to number of rows
TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
// Test with limit + offset larger than number of rows
TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
// Test with no page-level statistics
TestOptions::new(2, 256, 91)
.with_null_percent(25)
Expand Down Expand Up @@ -1474,6 +1527,12 @@ mod tests {
.with_null_percent(25)
.with_row_selections()
.with_limit(10),
// Test optional with nulls
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections()
.with_offset(20)
.with_limit(10),
// Test filter

// Test with row filter
Expand Down Expand Up @@ -1673,6 +1732,11 @@ mod tests {
None => expected_data,
};

if let Some(offset) = opts.offset {
builder = builder.with_offset(offset);
expected_data = expected_data.into_iter().skip(offset).collect();
}

if let Some(limit) = opts.limit {
builder = builder.with_limit(limit);
expected_data = expected_data.into_iter().take(limit).collect();
Expand Down
124 changes: 110 additions & 14 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use arrow_array::{Array, BooleanArray};
use arrow_select::filter::SlicesIterator;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::mem;
use std::ops::Range;

/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
Expand Down Expand Up @@ -236,13 +235,13 @@ impl RowSelection {
let mut total_count = 0;

// Find the index where the selector exceeds the row count
let find = self.selectors.iter().enumerate().find(|(_, selector)| {
let find = self.selectors.iter().position(|selector| {
total_count += selector.row_count;
total_count > row_count
});

let split_idx = match find {
Some((idx, _)) => idx,
Some(idx) => idx,
None => {
let selectors = std::mem::take(&mut self.selectors);
return Self { selectors };
Expand Down Expand Up @@ -372,29 +371,63 @@ impl RowSelection {
self
}

/// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows
pub(crate) fn offset(mut self, offset: usize) -> Self {
if offset == 0 {
return self;
}

let mut selected_count = 0;
let mut skipped_count = 0;

// Find the index where the selector exceeds the row count
let find = self
.selectors
.iter()
.position(|selector| match selector.skip {
true => {
skipped_count += selector.row_count;
false
}
false => {
selected_count += selector.row_count;
selected_count > offset
}
});

let split_idx = match find {
Some(idx) => idx,
None => {
self.selectors.clear();
return self;
}
};

let mut selectors = Vec::with_capacity(self.selectors.len() - split_idx + 1);
selectors.push(RowSelector::skip(skipped_count + offset));
selectors.push(RowSelector::select(selected_count - offset));
selectors.extend_from_slice(&self.selectors[split_idx + 1..]);

Self { selectors }
}

/// Limit this [`RowSelection`] to only select `limit` rows
pub(crate) fn limit(mut self, mut limit: usize) -> Self {
let mut new_selectors = Vec::with_capacity(self.selectors.len());
for mut selection in mem::take(&mut self.selectors) {
if limit == 0 {
break;
}
if limit == 0 {
self.selectors.clear();
}

for (idx, selection) in self.selectors.iter_mut().enumerate() {
if !selection.skip {
if selection.row_count >= limit {
selection.row_count = limit;
new_selectors.push(selection);
self.selectors.truncate(idx + 1);
break;
} else {
limit -= selection.row_count;
new_selectors.push(selection);
}
} else {
new_selectors.push(selection);
}
}

self.selectors = new_selectors;
self
}

Expand All @@ -403,6 +436,11 @@ impl RowSelection {
pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
self.selectors.iter()
}

/// Returns the number of selected rows
pub fn row_count(&self) -> usize {
self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum()
}
}

impl From<Vec<RowSelector>> for RowSelection {
Expand Down Expand Up @@ -593,6 +631,64 @@ mod tests {
assert!(selection.selectors.is_empty());
}

#[test]
fn test_offset() {
let selection = RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(23),
RowSelector::select(7),
RowSelector::skip(33),
RowSelector::select(6),
]);

let selection = selection.offset(2);
assert_eq!(
selection.selectors,
vec![
RowSelector::skip(2),
RowSelector::select(3),
RowSelector::skip(23),
RowSelector::select(7),
RowSelector::skip(33),
RowSelector::select(6),
]
);

let selection = selection.offset(5);
assert_eq!(
selection.selectors,
vec![
RowSelector::skip(30),
RowSelector::select(5),
RowSelector::skip(33),
RowSelector::select(6),
]
);

let selection = selection.offset(3);
assert_eq!(
selection.selectors,
vec![
RowSelector::skip(33),
RowSelector::select(2),
RowSelector::skip(33),
RowSelector::select(6),
]
);

let selection = selection.offset(2);
assert_eq!(
selection.selectors,
vec![RowSelector::skip(68), RowSelector::select(6),]
);

let selection = selection.offset(3);
assert_eq!(
selection.selectors,
vec![RowSelector::skip(71), RowSelector::select(3),]
);
}

#[test]
fn test_and() {
let mut a = RowSelection::from(vec![
Expand Down
Loading

0 comments on commit dfb8c76

Please sign in to comment.