-
Notifications
You must be signed in to change notification settings - Fork 849
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add integration test for scan rows with selection #2158
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2158 +/- ##
==========================================
+ Coverage 82.85% 83.06% +0.21%
==========================================
Files 237 237
Lines 61381 61620 +239
==========================================
+ Hits 50856 51186 +330
+ Misses 10525 10434 -91
Help us with your feedback. Take ten seconds to tell us how you rate us. |
let to_read = match front.row_count.checked_sub(self.batch_size) { | ||
Some(remaining) => { | ||
selection.push_front(RowSelection::skip(remaining)); | ||
// if page row count less than batch_size we must set batch size to page row count. | ||
// add check avoid dead loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix wrong logic, remaining
record need read
None => { | ||
// If we skip records before all read operation | ||
// we need set `column_reader` by `set_page_reader` | ||
if let Some(page_reader) = pages.next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix skip before all read operator, need set column_reader
@tustvold @thinkharderdev PTAL😊 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a brief look, will review in more detail later (flying today)
// we need set `column_reader` by `set_page_reader` | ||
if let Some(page_reader) = pages.next() { | ||
self.set_page_reader(page_reader?)?; | ||
false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong, as it will now only mark end_of_column when it reaches the end of the file, instead of the end of a column chunk within a row group. This will break record delimiting for repeated fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tustvold i move it out to
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
if self.record_reader.column_reader().is_none() {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = self.pages.next() {
self.record_reader.set_page_reader(page_reader?)?;
} else {
return Ok(0);
}
}
self.record_reader.skip_records(num_records)
}
I think in this situation , only skip the first page without read any record the column_reader
is none. related #2171 if
we create it in colchunk, then we will remove this check.
parquet/src/column/reader.rs
Outdated
@@ -312,13 +312,20 @@ where | |||
|
|||
// If page has less rows than the remaining records to | |||
// be skipped, skip entire page | |||
if metadata.num_rows < remaining { | |||
while metadata.num_rows < remaining { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary, there is already an outer while loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because first add below
// because self.num_buffered_values == self.num_decoded_values means
// we need reads a new page and set up the decoders for levels
self.read_new_page()?;
if we still use if
, we may read needless page header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This while loop should result in the same behaviour as the previous continue
??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh... it's an useless loop
Thank you @Ted-Jiang -- the project to add page index and skipping is really coming along very nicely. It is a very nice piece of work. |
@@ -120,6 +120,15 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> { | |||
} | |||
|
|||
fn skip_records(&mut self, num_records: usize) -> Result<usize> { | |||
if self.record_reader.column_reader().is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This now behaves differently from next_batch which will potentially read from multiple column chunks for the same "batch". Can we extract this logic into a free function, similar to read_records, that performs the same loop?
This would also avoid duplicating this code in every ArrayReader
parquet/src/arrow/arrow_reader.rs
Outdated
self.batch_size | ||
} | ||
Some(_) => self.batch_size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some(_) => self.batch_size, | |
_ => self.batch_size, |
And remove the None
case below. If remaining == 0
then front.row_count == self.batch_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes more elegance 👍
parquet/src/column/reader.rs
Outdated
} | ||
// because self.num_buffered_values == self.num_decoded_values means | ||
// we need reads a new page and set up the decoders for levels | ||
self.read_new_page()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could check the return type of this, and short-circuit if it returns false?
Benchmark runs are scheduled for baseline = e96ae8a and contender = d10d962. d10d962 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
@tustvold thanks for your patient review 👍 |
FYI I'm working on a follow up PR to address some stuff, e.g. get this integrated into the fuzz tests |
Which issue does this PR close?
Closes #2106 .
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?