-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Adaptive fragment sizes in Parquet writer #12627
Conversation
Pull requests from external contributors require approval from a |
@vuule you had mentioned awhile back that you thought there was a function somewhere that calculates a column's size. I couldn't find it so implemented my own. If you can point me to something better I'll use it. |
Side benefit is better performance in benchmarks for list columns, without the performance regression when just setting the default fragment size to 1000. Before this PR
With these changes:
|
Another side benefit is faster reading of files with list columns, when written with this change :) |
😃 Right you are Before:
This PR:
|
auto const avg_len = column_size(column, stream) / num_rows; | ||
|
||
if (avg_len > 0) { | ||
size_type frag_size = max_page_size_bytes / avg_len; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this too large? IIUC, max_page_size_bytes / avg_len
is the average number of rows in each page. That means that any deviation in size between rows would cause us to overshoot the max page size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, avg will tend to overshoot. But with the deeply nested cases, it we overshoot anyway, now just by much less. 😅 I could try max row length perhaps, but that will be trickier to calculate than total size for the nested case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to suggest a better option, but anything small enough to get really precise page sizes in the largest column would significantly degrade performance for other columns.
Current implementation is actually a good compromise.
|
||
if (avg_len > 0) { | ||
size_type frag_size = max_page_size_bytes / avg_len; | ||
max_page_fragment_size_ = std::min(frag_size, max_page_fragment_size_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so we use the fragment size based on the largest column.
Do you expect (perf) issues when we have a single large column and many small columns? The benchmarks show the best case scenario, where each table has columns of similar size.
When we talked about dynamic fragment size I envisioned per-column fragment size. That seems more optimal than the static size in all cases. I'm trying to figure out if we can claim that this PR is also always better than the (current) static option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will probably be bad in the single large column/many fixed length columns case. I'm interested to see what this does with @jbrennan333's user data, which seems more mixed than the test data he generated.
I really see this as a POC to demonstrate the value of changing up the fragment size. I agree having a per column fragment size would be best, but that's a heavier lift too. But maybe with these numbers it can be justified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you ran the "parquet_write_io_compression" group of benchmarks? That one has an even mix of all supported types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have, but my WS is shut down now. I can post tomorrow, but IIRC there was not a big difference, with this code being maybe a percent or two faster in most cases. I actually want to run that benchmark with ZSTD too, since that has issues with the run length=32 cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try this with the customer data. Have you already verified it with the test data I provided in #12613?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the parquet-tools inspect output for the gpu file:
############ file meta data ############
created_by:
num_columns: 7
num_rows: 17105
num_row_groups: 1
format_version: 1.0
serialized_size: 5922
############ Columns ############
format
hash
data
id
part
offset
relayTs
############ Column(format) ############
name: format
path: format
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 100%)
############ Column(hash) ############
name: hash
path: hash
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 47%)
############ Column(data) ############
name: data
path: data
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 85%)
############ Column(id) ############
name: id
path: origin.id
max_definition_level: 2
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)
############ Column(part) ############
name: part
path: origin.part
max_definition_level: 2
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)
############ Column(offset) ############
name: offset
path: origin.offset
max_definition_level: 2
max_repetition_level: 0
physical_type: INT64
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 89%)
############ Column(relayTs) ############
name: relayTs
path: relayTs
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 54%)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, that's somewhat disappointing. Can you also run parquet-tools dump -d -n on the gpu and cpu files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have those options in my version of parquet-tools. Here is the washed inspect --detail output for each.
cust-inspect-detail-cpu.txt
cust-inspect-detail-gpu.txt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't realize parquet-tools was an overloaded name. I was referring to the (now deprecated it seems) jar that comes with parquet-mr. Thanks for the extra details...combing through it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jbrennan333 So the compression is less good for the 'data' column ("only" 85% vs 92%). That may be down to there being fewer pages, given the 1MB default page size for parquet-mr vs 512KB for libcudf. But it seems the fragment sizes are allowing zstd to compress things, so that's good news.
@vuule here are the parquet_write_io_compression benchmarks: before:
This PR:
|
cpp/src/io/parquet/writer_impl.cu
Outdated
|
||
if (column.type().id() == type_id::STRING) { | ||
auto scol = strings_column_view(column); | ||
size_type colsize = cudf::detail::get_value<size_type>(scol.offsets(), column.size(), stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if there are sliced rows at the start of a column. I think we would need to subtract the first offset from this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm frankly amazed this code worked at all 😉 Yes, I'll subtract offsets[0]
.
Ack. The size calculation is all wrong...should be using leaf columns rather than the whole nested mess. I'm closing this to look at ways to do the per-column fragment sizes we were talking about. |
Description
Trying to write Parquet files where rows are very wide can result in pages that are much too large due to the default fragment size of 5000. This in turn can have an adverse effect on file size when using Zstandard compression. This PR attempts to address this by modifying the fragment size to a value where each fragment will still fit in the desired page size.
Fixes #12613
Checklist