-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix maximum page size estimate in Parquet writer #11962
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -217,6 +217,14 @@ __global__ void __launch_bounds__(128) | |
if (frag_id < num_fragments_per_column and lane_id == 0) groups[column_id][frag_id] = *g; | ||
} | ||
|
||
constexpr uint32_t max_RLE_page_size(uint8_t value_bit_width, uint32_t num_values) | ||
{ | ||
if (value_bit_width == 0) return 0; | ||
|
||
// Run length = 4, max(rle/bitpack header) = 5, add one byte per 256 values for overhead | ||
return 4 + 5 + util::div_rounding_up_unsafe(num_values * value_bit_width, 8) + (num_values / 256); | ||
} | ||
|
||
// blockDim {128,1,1} | ||
__global__ void __launch_bounds__(128) | ||
gpuInitPages(device_2dspan<EncColumnChunk> chunks, | ||
|
@@ -329,7 +337,7 @@ __global__ void __launch_bounds__(128) | |
__syncwarp(); | ||
uint32_t fragment_data_size = | ||
(ck_g.use_dictionary) | ||
? frag_g.num_leaf_values * 2 // Assume worst-case of 2-bytes per dictionary index | ||
? frag_g.num_leaf_values * util::div_rounding_up_unsafe(ck_g.dict_rle_bits, 8) | ||
: frag_g.fragment_data_size; | ||
// TODO (dm): this convoluted logic to limit page size needs refactoring | ||
size_t this_max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024 | ||
|
@@ -343,8 +351,8 @@ __global__ void __launch_bounds__(128) | |
(values_in_page > 0 && (page_size + fragment_data_size > this_max_page_size)) || | ||
rows_in_page >= max_page_size_rows) { | ||
if (ck_g.use_dictionary) { | ||
page_size = | ||
1 + 5 + ((values_in_page * ck_g.dict_rle_bits + 7) >> 3) + (values_in_page >> 8); | ||
// Additional byte to store entry bit width | ||
page_size = 1 + max_RLE_page_size(ck_g.dict_rle_bits, values_in_page); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on Apache Parquet format docs:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the bug fix and the rest of the diff is refactoring, right? Is a C++ test or Python comparison to another reader/writer needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically there is already a gtest for this. The error was an OOB write and only manifested with a memcheck without an rmm pool which is tested nightly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. Approving. |
||
} | ||
if (!t) { | ||
page_g.num_fragments = fragments_in_chunk - page_start; | ||
|
@@ -367,23 +375,13 @@ __global__ void __launch_bounds__(128) | |
if (not comp_page_sizes.empty()) { | ||
page_g.compressed_data = ck_g.compressed_bfr + comp_page_offset; | ||
} | ||
page_g.start_row = cur_row; | ||
page_g.num_rows = rows_in_page; | ||
page_g.num_leaf_values = leaf_values_in_page; | ||
page_g.num_values = values_in_page; | ||
uint32_t def_level_bits = col_g.num_def_level_bits(); | ||
uint32_t rep_level_bits = col_g.num_rep_level_bits(); | ||
// Run length = 4, max(rle/bitpack header) = 5, add one byte per 256 values for overhead | ||
// TODO (dm): Improve readability of these calculations. | ||
uint32_t def_level_size = | ||
(def_level_bits != 0) | ||
? 4 + 5 + ((def_level_bits * page_g.num_values + 7) >> 3) + (page_g.num_values >> 8) | ||
: 0; | ||
uint32_t rep_level_size = | ||
(rep_level_bits != 0) | ||
? 4 + 5 + ((rep_level_bits * page_g.num_values + 7) >> 3) + (page_g.num_values >> 8) | ||
: 0; | ||
page_g.max_data_size = page_size + def_level_size + rep_level_size; | ||
page_g.start_row = cur_row; | ||
page_g.num_rows = rows_in_page; | ||
page_g.num_leaf_values = leaf_values_in_page; | ||
page_g.num_values = values_in_page; | ||
auto const def_level_size = max_RLE_page_size(col_g.num_def_level_bits(), values_in_page); | ||
auto const rep_level_size = max_RLE_page_size(col_g.num_rep_level_bits(), values_in_page); | ||
page_g.max_data_size = page_size + def_level_size + rep_level_size; | ||
|
||
pagestats_g.start_chunk = ck_g.first_fragment + page_start; | ||
pagestats_g.num_chunks = page_g.num_fragments; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice refactoring here.