From 2784f5890d107cbace7c6059ca25504fb8569e43 Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Wed, 25 Jan 2023 14:17:03 -0600 Subject: [PATCH] Parquet reader optimization to address V100 regression. (#12577) Addresses https://github.com/rapidsai/cudf/issues/12316 Some recent changes caused a performance regression in the parquet reader benchmarks for lists. The culprit ended up being slightly different code generation happening for arch 70. In several memory hotspots, the code was reading values from global, modifying them and then storing them. Previously it had done a better job of loading and keeping them in registers and the L2 cache was helping keep things fast. But the extra store was causing twice as many L2 access in these places and causing many long scoreboard stalls. Ultimately the issue is that these values shouldn't be kept in global memory. The initial implementation did it this way because the data was variable in size (based on depth of column nesting). But in practice, we never see more than 2 or 3 levels of nesting. So the solution is: - Keep these values (in a struct called `PageNestingDecodeInfo`) that is kept in shared memory for up to N nesting levels. N is currently 10. - If the nesting information for the incoming column fits in the cache, use it. Otherwise fall back to the arrays in global memory. In practice, it is exceedingly rare to see columns nested >= 10 deep. This addresses the performance regression and actually gives some performance increases. Some comparisons for LIST benchmarks. ``` cudf 22.10 (prior to regression) | data_type | cardinality | run_length | bytes_per_second | |-----------|-------------|------------|------------------| | LIST | 0 | 1 | 892901208 | | LIST | 1000 | 1 | 952863876 | | LIST | 0 | 32 | 1246033395 | | LIST | 1000 | 32 | 1232884866 | ``` ``` cudf 22.12 (where the regression occurred) | data_type | cardinality | run_length | bytes_per_second | |-----------|-------------|------------|------------------| | LIST | 0 | 1 | 747758436 | | LIST | 1000 | 1 | 827763260 | | LIST | 0 | 32 | 1026048576 | | LIST | 1000 | 32 | 1022928119 | ``` ``` This PR | data_type | cardinality | run_length | bytes_per_second | |-----------|-------------|------------|------------------| | LIST | 0 | 1 | 927347737 | | LIST | 1000 | 1 | 1024566150 | | LIST | 0 | 32 | 1315972881 | | LIST | 1000 | 32 | 1303995168 | ``` Authors: - https://github.com/nvdbaranec Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/12577 --- cpp/src/io/parquet/page_data.cu | 204 ++++++++++++------- cpp/src/io/parquet/parquet_gpu.hpp | 69 ++++--- cpp/src/io/parquet/reader_impl.cpp | 16 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 27 ++- cpp/tests/io/parquet_test.cpp | 44 ++++ 5 files changed, 245 insertions(+), 115 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 70176392ee9..23d130e1585 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -90,6 +90,13 @@ struct page_state_s { const uint8_t* lvl_start[NUM_LEVEL_TYPES]; // [def,rep] int32_t lvl_count[NUM_LEVEL_TYPES]; // how many of each of the streams we've decoded int32_t row_index_lower_bound; // lower bound of row indices we should process + + // a shared-memory cache of frequently used data when decoding. The source of this data is + // normally stored in global memory which can yield poor performance. So, when possible + // we copy that info here prior to decoding + PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info]; + // points to either nesting_decode_cache above when possible, or to the global source otherwise + PageNestingDecodeInfo* nesting_info; }; /** @@ -927,23 +934,49 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, int chunk_idx; // Fetch page info - if (t == 0) s->page = *p; + if (!t) s->page = *p; __syncthreads(); if (s->page.flags & PAGEINFO_FLAGS_DICTIONARY) { return false; } // Fetch column chunk info chunk_idx = s->page.chunk_idx; - if (t == 0) { s->col = chunks[chunk_idx]; } - - // zero nested value and valid counts - int d = 0; - while (d < s->page.num_output_nesting_levels) { - if (d + t < s->page.num_output_nesting_levels) { - s->page.nesting[d + t].valid_count = 0; - s->page.nesting[d + t].value_count = 0; - s->page.nesting[d + t].null_count = 0; + if (!t) { s->col = chunks[chunk_idx]; } + + // if we can use the decode cache, set it up now + auto const can_use_decode_cache = s->page.nesting_info_size <= max_cacheable_nesting_decode_info; + if (can_use_decode_cache) { + int depth = 0; + while (depth < s->page.nesting_info_size) { + int const thread_depth = depth + t; + if (thread_depth < s->page.nesting_info_size) { + // these values need to be copied over from global + s->nesting_decode_cache[thread_depth].max_def_level = + s->page.nesting_decode[thread_depth].max_def_level; + s->nesting_decode_cache[thread_depth].page_start_value = + s->page.nesting_decode[thread_depth].page_start_value; + s->nesting_decode_cache[thread_depth].start_depth = + s->page.nesting_decode[thread_depth].start_depth; + s->nesting_decode_cache[thread_depth].end_depth = + s->page.nesting_decode[thread_depth].end_depth; + } + depth += blockDim.x; + } + } + if (!t) { + s->nesting_info = can_use_decode_cache ? s->nesting_decode_cache : s->page.nesting_decode; + } + __syncthreads(); + + // zero counts + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + int const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { + s->nesting_info[thread_depth].valid_count = 0; + s->nesting_info[thread_depth].value_count = 0; + s->nesting_info[thread_depth].null_count = 0; } - d += blockDim.x; + depth += blockDim.x; } __syncthreads(); @@ -1076,7 +1109,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, if (is_decode_step) { int max_depth = s->col.max_nesting_depth; for (int idx = 0; idx < max_depth; idx++) { - PageNestingInfo* pni = &s->page.nesting[idx]; + PageNestingDecodeInfo* nesting_info = &s->nesting_info[idx]; size_t output_offset; // schemas without lists @@ -1085,21 +1118,21 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, } // for schemas with lists, we've already got the exact value precomputed else { - output_offset = pni->page_start_value; + output_offset = nesting_info->page_start_value; } - pni->data_out = static_cast(s->col.column_data_base[idx]); + nesting_info->data_out = static_cast(s->col.column_data_base[idx]); - if (pni->data_out != nullptr) { + if (nesting_info->data_out != nullptr) { // anything below max depth with a valid data pointer must be a list, so the // element size is the size of the offset type. uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len; - pni->data_out += (output_offset * len); + nesting_info->data_out += (output_offset * len); } - pni->valid_map = s->col.valid_map_base[idx]; - if (pni->valid_map != nullptr) { - pni->valid_map += output_offset >> 5; - pni->valid_map_offset = (int32_t)(output_offset & 0x1f); + nesting_info->valid_map = s->col.valid_map_base[idx]; + if (nesting_info->valid_map != nullptr) { + nesting_info->valid_map += output_offset >> 5; + nesting_info->valid_map_offset = (int32_t)(output_offset & 0x1f); } } } @@ -1217,26 +1250,26 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, * @brief Store a validity mask containing value_count bits into the output validity buffer of the * page. * - * @param[in,out] pni The page/nesting information to store the mask in. The validity map offset is - * also updated + * @param[in,out] nesting_info The page/nesting information to store the mask in. The validity map + * offset is also updated * @param[in] valid_mask The validity mask to be stored * @param[in] value_count # of bits in the validity mask */ -static __device__ void store_validity(PageNestingInfo* pni, +static __device__ void store_validity(PageNestingDecodeInfo* nesting_info, uint32_t valid_mask, int32_t value_count) { - int word_offset = pni->valid_map_offset / 32; - int bit_offset = pni->valid_map_offset % 32; + int word_offset = nesting_info->valid_map_offset / 32; + int bit_offset = nesting_info->valid_map_offset % 32; // if we fit entirely in the output word if (bit_offset + value_count <= 32) { auto relevant_mask = static_cast((static_cast(1) << value_count) - 1); if (relevant_mask == ~0) { - pni->valid_map[word_offset] = valid_mask; + nesting_info->valid_map[word_offset] = valid_mask; } else { - atomicAnd(pni->valid_map + word_offset, ~(relevant_mask << bit_offset)); - atomicOr(pni->valid_map + word_offset, (valid_mask & relevant_mask) << bit_offset); + atomicAnd(nesting_info->valid_map + word_offset, ~(relevant_mask << bit_offset)); + atomicOr(nesting_info->valid_map + word_offset, (valid_mask & relevant_mask) << bit_offset); } } // we're going to spill over into the next word. @@ -1250,17 +1283,17 @@ static __device__ void store_validity(PageNestingInfo* pni, // first word. strip bits_left bits off the beginning and store that uint32_t relevant_mask = ((1 << bits_left) - 1); uint32_t mask_word0 = valid_mask & relevant_mask; - atomicAnd(pni->valid_map + word_offset, ~(relevant_mask << bit_offset)); - atomicOr(pni->valid_map + word_offset, mask_word0 << bit_offset); + atomicAnd(nesting_info->valid_map + word_offset, ~(relevant_mask << bit_offset)); + atomicOr(nesting_info->valid_map + word_offset, mask_word0 << bit_offset); // second word. strip the remainder of the bits off the end and store that relevant_mask = ((1 << (value_count - bits_left)) - 1); uint32_t mask_word1 = valid_mask & (relevant_mask << bits_left); - atomicAnd(pni->valid_map + word_offset + 1, ~(relevant_mask)); - atomicOr(pni->valid_map + word_offset + 1, mask_word1 >> bits_left); + atomicAnd(nesting_info->valid_map + word_offset + 1, ~(relevant_mask)); + atomicOr(nesting_info->valid_map + word_offset + 1, mask_word1 >> bits_left); } - pni->valid_map_offset += value_count; + nesting_info->valid_map_offset += value_count; } /** @@ -1294,8 +1327,8 @@ inline __device__ void get_nesting_bounds(int& start_depth, // bound what nesting levels we apply values to if (s->col.max_level[level_type::REPETITION] > 0) { int r = s->rep[index]; - start_depth = s->page.nesting[r].start_depth; - end_depth = s->page.nesting[d].end_depth; + start_depth = s->nesting_info[r].start_depth; + end_depth = s->nesting_info[d].end_depth; } // for columns without repetition (even ones involving structs) we always // traverse the entire hierarchy. @@ -1326,6 +1359,8 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // how many rows we've processed in the page so far int input_row_count = s->input_row_count; + PageNestingDecodeInfo* nesting_info_base = s->nesting_info; + // process until we've reached the target while (input_value_count < target_input_value_count) { // determine the nesting bounds for this thread (the range of nesting depths we @@ -1367,14 +1402,14 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // walk from 0 to max_depth uint32_t next_thread_value_count, next_warp_value_count; for (int s_idx = 0; s_idx < max_depth; s_idx++) { - PageNestingInfo* pni = &s->page.nesting[s_idx]; + PageNestingDecodeInfo* nesting_info = &nesting_info_base[s_idx]; // if we are within the range of nesting levels we should be adding value indices for int const in_nesting_bounds = ((s_idx >= start_depth && s_idx <= end_depth) && in_row_bounds) ? 1 : 0; // everything up to the max_def_level is a non-null value - uint32_t const is_valid = d >= pni->max_def_level && in_nesting_bounds ? 1 : 0; + uint32_t const is_valid = d >= nesting_info->max_def_level && in_nesting_bounds ? 1 : 0; // compute warp and thread valid counts uint32_t const warp_valid_mask = @@ -1395,8 +1430,8 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // if this is the value column emit an index for value decoding if (is_valid && s_idx == max_depth - 1) { - int const src_pos = pni->valid_count + thread_valid_count; - int const dst_pos = pni->value_count + thread_value_count; + int const src_pos = nesting_info->valid_count + thread_valid_count; + int const dst_pos = nesting_info->value_count + thread_value_count; // nz_idx is a mapping of src buffer indices to destination buffer indices s->nz_idx[rolling_index(src_pos)] = dst_pos; } @@ -1414,12 +1449,12 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // if we're -not- at a leaf column and we're within nesting/row bounds // and we have a valid data_out pointer, it implies this is a list column, so // emit an offset. - if (in_nesting_bounds && pni->data_out != nullptr) { - int const idx = pni->value_count + thread_value_count; - cudf::size_type const ofs = s->page.nesting[s_idx + 1].value_count + + if (in_nesting_bounds && nesting_info->data_out != nullptr) { + int const idx = nesting_info->value_count + thread_value_count; + cudf::size_type const ofs = nesting_info_base[s_idx + 1].value_count + next_thread_value_count + - s->page.nesting[s_idx + 1].page_start_value; - (reinterpret_cast(pni->data_out))[idx] = ofs; + nesting_info_base[s_idx + 1].page_start_value; + (reinterpret_cast(nesting_info->data_out))[idx] = ofs; } } @@ -1441,14 +1476,14 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // increment count of valid values, count of total values, and update validity mask if (!t) { - if (pni->valid_map != nullptr && warp_valid_mask_bit_count > 0) { + if (nesting_info->valid_map != nullptr && warp_valid_mask_bit_count > 0) { uint32_t const warp_output_valid_mask = warp_valid_mask >> first_thread_in_write_range; - store_validity(pni, warp_output_valid_mask, warp_valid_mask_bit_count); + store_validity(nesting_info, warp_output_valid_mask, warp_valid_mask_bit_count); - pni->null_count += warp_valid_mask_bit_count - __popc(warp_output_valid_mask); + nesting_info->null_count += warp_valid_mask_bit_count - __popc(warp_output_valid_mask); } - pni->valid_count += warp_valid_count; - pni->value_count += warp_value_count; + nesting_info->valid_count += warp_valid_count; + nesting_info->value_count += warp_value_count; } // propagate value counts for the next level @@ -1463,7 +1498,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // update if (!t) { // update valid value count for decoding and total # of values we've processed - s->nz_count = s->page.nesting[max_depth - 1].valid_count; + s->nz_count = nesting_info_base[max_depth - 1].valid_count; s->input_value_count = input_value_count; s->input_row_count = input_row_count; } @@ -1545,7 +1580,7 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, // count rows and leaf values int const is_new_row = start_depth == 0 ? 1 : 0; uint32_t const warp_row_count_mask = ballot(is_new_row); - int const is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; + int const is_new_leaf = (d >= s->nesting_info[max_depth - 1].max_def_level) ? 1 : 0; uint32_t const warp_leaf_count_mask = ballot(is_new_leaf); // is this thread within row bounds? on the first pass we don't know the bounds, so we will be // computing the full size of the column. on the second pass, we will know our actual row @@ -1673,14 +1708,14 @@ __global__ void __launch_bounds__(block_size) // to do the expensive work of traversing the level data to determine sizes. we can just compute // it directly. if (!has_repetition && !compute_string_sizes) { - int d = 0; - while (d < s->page.num_output_nesting_levels) { - auto const i = d + t; - if (i < s->page.num_output_nesting_levels) { - if (is_base_pass) { pp->nesting[i].size = pp->num_input_values; } - pp->nesting[i].batch_size = pp->num_input_values; + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + auto const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { + if (is_base_pass) { pp->nesting[thread_depth].size = pp->num_input_values; } + pp->nesting[thread_depth].batch_size = pp->num_input_values; } - d += blockDim.x; + depth += blockDim.x; } return; } @@ -1688,25 +1723,29 @@ __global__ void __launch_bounds__(block_size) // in the trim pass, for anything with lists, we only need to fully process bounding pages (those // at the beginning or the end of the row bounds) if (!is_base_pass && !is_bounds_page(s, min_row, num_rows)) { - int d = 0; - while (d < s->page.num_output_nesting_levels) { - auto const i = d + t; - if (i < s->page.num_output_nesting_levels) { + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + auto const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { // if we are not a bounding page (as checked above) then we are either // returning 0 rows from the page (completely outside the bounds) or all // rows in the page (completely within the bounds) - pp->nesting[i].batch_size = s->num_rows == 0 ? 0 : pp->nesting[i].size; + pp->nesting[thread_depth].batch_size = + s->num_rows == 0 ? 0 : pp->nesting[thread_depth].size; } - d += blockDim.x; + depth += blockDim.x; } return; } // zero sizes - int d = 0; - while (d < s->page.num_output_nesting_levels) { - if (d + t < s->page.num_output_nesting_levels) { s->page.nesting[d + t].batch_size = 0; } - d += blockDim.x; + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + auto const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { + s->page.nesting[thread_depth].batch_size = 0; + } + depth += blockDim.x; } __syncthreads(); @@ -1754,13 +1793,13 @@ __global__ void __launch_bounds__(block_size) if (!t) { pp->num_rows = s->page.nesting[0].batch_size; } // store off this batch size as the "full" size - int d = 0; - while (d < s->page.num_output_nesting_levels) { - auto const i = d + t; - if (i < s->page.num_output_nesting_levels) { - pp->nesting[i].size = pp->nesting[i].batch_size; + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + auto const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { + pp->nesting[thread_depth].size = pp->nesting[thread_depth].batch_size; } - d += blockDim.x; + depth += blockDim.x; } } @@ -1808,6 +1847,8 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( ((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32; } + PageNestingDecodeInfo* nesting_info_base = s->nesting_info; + // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t skipped_leaf_values = s->page.skipped_leaf_values; while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { @@ -1876,7 +1917,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( uint32_t dtype_len = s->dtype_len; void* dst = - s->page.nesting[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; if (dtype == BYTE_ARRAY) { if (s->col.converted_type == DECIMAL) { auto const [ptr, len] = gpuGetStringData(s, val_src_pos); @@ -1931,6 +1972,19 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( } __syncthreads(); } + + // if we are using the nesting decode cache, copy null count back + if (s->nesting_info == s->nesting_decode_cache) { + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + int const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { + s->page.nesting_decode[thread_depth].null_count = + s->nesting_decode_cache[thread_depth].null_count; + } + depth += blockDim.x; + } + } } } // anonymous namespace diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 33a189cdf87..9b156745e41 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -83,6 +83,40 @@ enum level_type { NUM_LEVEL_TYPES }; +/** + * @brief Nesting information specifically needed by the decode and preprocessing + * kernels. + * + * This data is kept separate from PageNestingInfo to keep it as small as possible. + * It is used in a cached form in shared memory when possible. + */ +struct PageNestingDecodeInfo { + // set up prior to decoding + int32_t max_def_level; + // input repetition/definition levels are remapped with these values + // into the corresponding real output nesting depths. + int32_t start_depth; + int32_t end_depth; + + // computed during preprocessing + int32_t page_start_value; + + // computed during decoding + int32_t null_count; + + // used internally during decoding + int32_t valid_map_offset; + int32_t valid_count; + int32_t value_count; + uint8_t* data_out; + bitmask_type* valid_map; +}; + +// Use up to 512 bytes of shared memory as a cache for nesting information. +// As of 1/20/23, this gives us a max nesting depth of 10 (after which it falls back to +// global memory). This handles all but the most extreme cases. +constexpr int max_cacheable_nesting_decode_info = (512) / sizeof(PageNestingDecodeInfo); + /** * @brief Nesting information * @@ -94,30 +128,15 @@ enum level_type { * */ struct PageNestingInfo { - // input repetition/definition levels are remapped with these values - // into the corresponding real output nesting depths. - int32_t start_depth; - int32_t end_depth; - - // set at initialization - int32_t max_def_level; - int32_t max_rep_level; + // set at initialization (see start_offset_output_iterator in reader_impl_preprocess.cu) cudf::type_id type; // type of the corresponding cudf output column bool nullable; - // set during preprocessing + // TODO: these fields might make sense to move into PageNestingDecodeInfo for memory performance + // reasons. int32_t size; // this page/nesting-level's row count contribution to the output column, if fully // decoded - int32_t batch_size; // the size of the page for this batch - int32_t page_start_value; // absolute output start index in output column data - - // set during data decoding - int32_t valid_count; // # of valid values decoded in this page/nesting-level - int32_t value_count; // total # of values decoded in this page/nesting-level - int32_t null_count; // null count - int32_t valid_map_offset; // current offset in bits relative to valid_map - uint8_t* data_out; // pointer into output buffer - uint32_t* valid_map; // pointer into output validity buffer + int32_t batch_size; // the size of the page for this batch }; /** @@ -159,9 +178,9 @@ struct PageInfo { // skipped_leaf_values will always be 0. // // # of values skipped in the repetition/definition level stream - int skipped_values; + int32_t skipped_values; // # of values skipped in the actual data stream. - int skipped_leaf_values; + int32_t skipped_leaf_values; // for string columns only, the size of all the chars in the string for // this page. only valid/computed during the base preprocess pass int32_t str_bytes; @@ -170,9 +189,10 @@ struct PageInfo { // input column nesting information, output column nesting information and // mappings between the two. the length of the array, nesting_info_size is // max(num_output_nesting_levels, max_definition_levels + 1) - int num_output_nesting_levels; - int nesting_info_size; + int32_t num_output_nesting_levels; + int32_t nesting_info_size; PageNestingInfo* nesting; + PageNestingDecodeInfo* nesting_decode; }; /** @@ -242,7 +262,7 @@ struct ColumnChunkDesc { PageInfo* page_info; // output page info for up to num_dict_pages + // num_data_pages (dictionary pages first) string_index_pair* str_dict_index; // index for string dictionary - uint32_t** valid_map_base; // base pointers of valid bit map for this column + bitmask_type** valid_map_base; // base pointers of valid bit map for this column void** column_data_base; // base pointers of column data int8_t codec; // compressed codec enum int8_t converted_type; // converted type enum @@ -263,6 +283,7 @@ struct file_intermediate_data { hostdevice_vector chunks{}; hostdevice_vector pages_info{}; hostdevice_vector page_nesting_info{}; + hostdevice_vector page_nesting_decode_info{}; }; /** diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index d5dac10b8f6..b1c4dd22c0d 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -24,9 +24,10 @@ namespace cudf::io::detail::parquet { void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) { - auto& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; - auto& page_nesting = _file_itm_data.page_nesting_info; + auto& chunks = _file_itm_data.chunks; + auto& pages = _file_itm_data.pages_info; + auto& page_nesting = _file_itm_data.page_nesting_info; + auto& page_nesting_decode = _file_itm_data.page_nesting_decode_info; // Should not reach here if there is no page data. CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -39,7 +40,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) // In order to reduce the number of allocations of hostdevice_vector, we allocate a single vector // to store all per-chunk pointers to nested data/nullmask. `chunk_offsets[i]` will store the // offset into `chunk_nested_data`/`chunk_nested_valids` for the array of pointers for chunk `i` - auto chunk_nested_valids = hostdevice_vector(sum_max_depths, _stream); + auto chunk_nested_valids = hostdevice_vector(sum_max_depths, _stream); auto chunk_nested_data = hostdevice_vector(sum_max_depths, _stream); auto chunk_offsets = std::vector(); @@ -124,6 +125,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) pages.device_to_host(_stream); page_nesting.device_to_host(_stream); + page_nesting_decode.device_to_host(_stream); _stream.synchronize(); // for list columns, add the final offset to every offset buffer. @@ -166,8 +168,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) gpu::ColumnChunkDesc* col = &chunks[pi->chunk_idx]; input_column_info const& input_col = _input_columns[col->src_col_index]; - int index = pi->nesting - page_nesting.device_ptr(); - gpu::PageNestingInfo* pni = &page_nesting[index]; + int index = pi->nesting_decode - page_nesting_decode.device_ptr(); + gpu::PageNestingDecodeInfo* pndi = &page_nesting_decode[index]; auto* cols = &_output_buffers; for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { @@ -178,7 +180,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) if (chunk_nested_valids.host_ptr(chunk_offsets[pi->chunk_idx])[l_idx] == nullptr) { continue; } - out_buf.null_count() += pni[l_idx].null_count; + out_buf.null_count() += pndi[l_idx].null_count; } } diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 435fdb1a411..6577a1a3f0f 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -523,9 +523,10 @@ void decode_page_headers(hostdevice_vector& chunks, void reader::impl::allocate_nesting_info() { - auto const& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; - auto& page_nesting_info = _file_itm_data.page_nesting_info; + auto const& chunks = _file_itm_data.chunks; + auto& pages = _file_itm_data.pages_info; + auto& page_nesting_info = _file_itm_data.page_nesting_info; + auto& page_nesting_decode_info = _file_itm_data.page_nesting_decode_info; // compute total # of page_nesting infos needed and allocate space. doing this in one // buffer to keep it to a single gpu allocation @@ -539,6 +540,8 @@ void reader::impl::allocate_nesting_info() }); page_nesting_info = hostdevice_vector{total_page_nesting_infos, _stream}; + page_nesting_decode_info = + hostdevice_vector{total_page_nesting_infos, _stream}; // retrieve from the gpu so we can update pages.device_to_host(_stream, true); @@ -556,6 +559,9 @@ void reader::impl::allocate_nesting_info() target_page_index += chunks[idx].num_dict_pages; for (int p_idx = 0; p_idx < chunks[idx].num_data_pages; p_idx++) { pages[target_page_index + p_idx].nesting = page_nesting_info.device_ptr() + src_info_index; + pages[target_page_index + p_idx].nesting_decode = + page_nesting_decode_info.device_ptr() + src_info_index; + pages[target_page_index + p_idx].nesting_info_size = per_page_nesting_info_size; pages[target_page_index + p_idx].num_output_nesting_levels = _metadata->get_output_nesting_depth(src_col_schema); @@ -601,6 +607,9 @@ void reader::impl::allocate_nesting_info() gpu::PageNestingInfo* pni = &page_nesting_info[nesting_info_index + (p_idx * per_page_nesting_info_size)]; + gpu::PageNestingDecodeInfo* nesting_info = + &page_nesting_decode_info[nesting_info_index + (p_idx * per_page_nesting_info_size)]; + // if we have lists, set our start and end depth remappings if (schema.max_repetition_level > 0) { auto remap = depth_remapping.find(src_col_schema); @@ -610,17 +619,16 @@ void reader::impl::allocate_nesting_info() std::vector const& def_depth_remap = (remap->second.second); for (size_t m = 0; m < rep_depth_remap.size(); m++) { - pni[m].start_depth = rep_depth_remap[m]; + nesting_info[m].start_depth = rep_depth_remap[m]; } for (size_t m = 0; m < def_depth_remap.size(); m++) { - pni[m].end_depth = def_depth_remap[m]; + nesting_info[m].end_depth = def_depth_remap[m]; } } // values indexed by output column index - pni[cur_depth].max_def_level = cur_schema.max_definition_level; - pni[cur_depth].max_rep_level = cur_schema.max_repetition_level; - pni[cur_depth].size = 0; + nesting_info[cur_depth].max_def_level = cur_schema.max_definition_level; + pni[cur_depth].size = 0; pni[cur_depth].type = to_type_id(cur_schema, _strings_to_categorical, _timestamp_type.id()); pni[cur_depth].nullable = cur_schema.repetition_type == OPTIONAL; @@ -640,6 +648,7 @@ void reader::impl::allocate_nesting_info() // copy nesting info to the device page_nesting_info.host_to_device(_stream); + page_nesting_decode_info.host_to_device(_stream); } void reader::impl::load_and_decompress_data(std::vector const& row_groups_info, @@ -1256,7 +1265,7 @@ struct start_offset_output_iterator { if (p.src_col_schema != src_col_schema || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { return empty; } - return p.nesting[nesting_depth].page_start_value; + return p.nesting_decode[nesting_depth].page_start_value; } }; diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 2cd6e49d7bb..21752196430 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -38,6 +38,7 @@ #include #include +#include #include @@ -4826,6 +4827,49 @@ TEST_F(ParquetReaderTest, StructByteArray) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_F(ParquetReaderTest, NestingOptimizationTest) +{ + // test nesting levels > cudf::io::parquet::gpu::max_cacheable_nesting_decode_info deep. + constexpr cudf::size_type num_nesting_levels = 16; + static_assert(num_nesting_levels > cudf::io::parquet::gpu::max_cacheable_nesting_decode_info); + constexpr cudf::size_type rows_per_level = 2; + + constexpr cudf::size_type num_values = (1 << num_nesting_levels) * rows_per_level; + auto value_iter = thrust::make_counting_iterator(0); + auto validity = + cudf::detail::make_counting_transform_iterator(0, [](cudf::size_type i) { return i % 2; }); + cudf::test::fixed_width_column_wrapper values(value_iter, value_iter + num_values, validity); + + // ~256k values with num_nesting_levels = 16 + int total_values_produced = num_values; + auto prev_col = values.release(); + for (int idx = 0; idx < num_nesting_levels; idx++) { + auto const depth = num_nesting_levels - idx; + auto const num_rows = (1 << (num_nesting_levels - idx)); + + auto offsets_iter = cudf::detail::make_counting_transform_iterator( + 0, [depth, rows_per_level](cudf::size_type i) { return i * rows_per_level; }); + total_values_produced += (num_rows + 1); + + cudf::test::fixed_width_column_wrapper offsets(offsets_iter, + offsets_iter + num_rows + 1); + auto c = cudf::make_lists_column(num_rows, offsets.release(), std::move(prev_col), 0, {}); + prev_col = std::move(c); + } + auto const& expect = prev_col; + + auto filepath = temp_env->get_temp_filepath("NestingDecodeCache.parquet"); + cudf::io::parquet_writer_options opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table_view{{*expect}}); + cudf::io::write_parquet(opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expect, result.tbl->get_column(0)); +} + TEST_F(ParquetWriterTest, SingleValueDictionaryTest) { constexpr unsigned int expected_bits = 1;