-
Notifications
You must be signed in to change notification settings - Fork 841
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify parquet arror RecordReader
#1021
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1021 +/- ##
==========================================
- Coverage 82.31% 82.30% -0.01%
==========================================
Files 168 168
Lines 49031 49026 -5
==========================================
- Hits 40359 40350 -9
- Misses 8672 8676 +4
Continue to review full report at Codecov.
|
Filed #1022 to track CI failure in "nightly" builds |
I fixed the nightly failures in #1023 -- will merge to this PR to get that to pass too |
I think we should run the parquet performance benchmark for this change -- I will do so |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read the code carefully and looks good to me. I am running the benchmarks on a GCP machine and will report the numbers shortly
@@ -75,9 +73,7 @@ impl<T: DataType> RecordReader<T> { | |||
column_desc: column_schema, | |||
num_records: 0, | |||
num_values: 0, | |||
values_seen: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These fields look like they have been here since the initial implementation by @liurenjie1024 in apache/arrow#4292
My performance tests showed no significant performance difference Test command cargo bench -p parquet --bench arrow_array_reader --features=test_common -- --save-baseline <name> Result:
|
It looks like this PR needs some clippy appeasement: https://github.com/apache/arrow-rs/runs/4485244206?check_suite_focus=true But otherwise looks good from my perspective |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a nice simplification @tustvold 👍 I didn't see any discernable performance difference.
let (record_count, value_count) = | ||
self.count_records(num_records - records_read); | ||
|
||
self.num_records += record_count; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we can update this only once before returning from the method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would leave RecordReader in a strange state if read_one_batch returned an error, as self.num_values
would have been updated and not self.num
? I can't pull self.num_values
out to match as it is used by count_records
.
parquet/src/arrow/record_reader.rs
Outdated
let mut end_of_last_record = self.num_values; | ||
|
||
for current in self.num_values..self.values_written { | ||
if buf[current] == 0 && current != end_of_last_record { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, what if you haven't finished the current repeated list, and it continues to the next batch? seems we'll return here and count as if the repeated list has been read completely (since we'll increment the records_read
here)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if you haven't finished the current repeated list
I'm not sure I follow, buf[current] == 0
implies we've reached the end of the list. Perhaps it would be more clear if the second condition were current != self.num_values
it's only false on the first iteration? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, my bad. yea this looks OK. I think the downside is we could potentially read a batch of repLevels
multiple times if, say, the repLevels
are all non-zero values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also strange that we initialize the repLevels
to be the min batch size but keep growing it as we read more batches, until it hit the total number of levels for the entire column chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users of RecordReader
call read_records
and then call consume_rep_levels
and friends to split data out. The result being it should only buffer a little bit more than the batch_size passed to read_records
.
I agree this API is not particularly intuitive, I created #1032 in part because I felt these APIs were clearly not designed for external consumption. I believe the funky arises because ArrayReader
wants to be able to stitch together multiple column chunks from different row groups (i.e. PageReader
) into the same RecordBatch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the context. Yea I think consume_rep_levels
and the friends are for assembling complex records like array, list and map. It'd be nice if we can simplify the APIs.
Further context for this PR can be found in #1041 as it was what motivated me to juggle the logic a bit, so that I could traitify it |
} | ||
|
||
if (records_read >= num_records) || end_of_column { | ||
if end_of_column { | ||
// Since page reader contains complete records, if we reached end of a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is true though. Take parquet-mr
as example, this is true for the latest version but in versions before 1.11.0, it seems there is no such guarantee: https://github.com/apache/parquet-mr/blob/apache-parquet-1.10.1/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java#L106, and a repeated list could span multiple pages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment below, page reader is a column chunk. So this is effectively saying that a record can't be split across row groups, which I think is guaranteed?
} | ||
|
||
if (records_read >= num_records) || end_of_column { | ||
if end_of_column { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this should be called end_of_page
since read_records
consumes at most a page? a new page is set in ArrayReader.next_batch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ehehe, PageReader
is actually a column chunk... So the end of a PageReader
is the end of a row group, not the end of a page. Confusingly PageIterator
is an iterator of PageReader
which are themselves iterators of Page
😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it, thanks 🤦 . It all makes sense now!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Merged, thanks! |
Which issue does this PR close?
Closes #1020. Related to #171 (better performance reading dictionary encoded strings)
Rationale for this change
See ticket
What changes are included in this PR?
This alters RecordReader to remove some shared mutable state, along with the concept of being in the middle of a record.
Are there any user-facing changes?
No