From 794c330f413d9bfaa85254ba240afc62d7020555 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Wed, 18 Jan 2023 15:45:38 -0600 Subject: [PATCH 1/8] Use a shared-memory cache of frequently accessed values during parquet decoding. --- cpp/src/io/parquet/page_data.cu | 134 ++++++++++++------- cpp/src/io/parquet/parquet_gpu.hpp | 37 +++-- cpp/src/io/parquet/reader_impl.cpp | 14 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 24 ++-- 4 files changed, 133 insertions(+), 76 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 326407c2073..5492d9c59d4 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -51,6 +51,8 @@ constexpr int non_zero_buffer_size = block_size * 2; constexpr int rolling_index(int index) { return index & (non_zero_buffer_size - 1); } +constexpr int max_cacheable_nesting_decode_info = 8; + struct page_state_s { const uint8_t* data_start; const uint8_t* data_end; @@ -90,6 +92,13 @@ struct page_state_s { const uint8_t* lvl_start[NUM_LEVEL_TYPES]; // [def,rep] int32_t lvl_count[NUM_LEVEL_TYPES]; // how many of each of the streams we've decoded int32_t row_index_lower_bound; // lower bound of row indices we should process + + // a shared-memory cache of frequently used data when decoding. The source of this data is + // normally stored in global memory which can yield poor performance. So, when possible + // we copy that info here prior to decoding + PageNestingDecodeInfo nesting_decode[max_cacheable_nesting_decode_info]; + // points to either nesting_decode above when possible, or to the global source otherwise + PageNestingDecodeInfo* pndi; }; /** @@ -908,21 +917,37 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, int chunk_idx; // Fetch page info - if (t == 0) s->page = *p; + if (!t) s->page = *p; __syncthreads(); if (s->page.flags & PAGEINFO_FLAGS_DICTIONARY) { return false; } // Fetch column chunk info chunk_idx = s->page.chunk_idx; - if (t == 0) { s->col = chunks[chunk_idx]; } + if (!t) { s->col = chunks[chunk_idx]; } - // zero nested value and valid counts + // if we can use the decode cache, set it up now + auto const can_use_decode_cache = s->page.num_nesting_levels <= max_cacheable_nesting_decode_info; + if (can_use_decode_cache) { + int d = 0; + while (d < s->page.num_nesting_levels) { + if (d + t < s->page.num_nesting_levels) { + // these values need to be copied over from global + s->nesting_decode[d + t].max_def_level = s->page.nesting_decode[d + t].max_def_level; + s->nesting_decode[d + t].page_start_value = s->page.nesting_decode[d + t].page_start_value; + } + d += blockDim.x; + } + } + if (!t) { s->pndi = can_use_decode_cache ? s->nesting_decode : s->page.nesting_decode; } + __syncthreads(); + + // zero counts int d = 0; while (d < s->page.num_nesting_levels) { if (d + t < s->page.num_nesting_levels) { - s->page.nesting[d + t].valid_count = 0; - s->page.nesting[d + t].value_count = 0; - s->page.nesting[d + t].null_count = 0; + s->pndi[d + t].valid_count = 0; + s->pndi[d + t].value_count = 0; + s->pndi[d + t].null_count = 0; } d += blockDim.x; } @@ -1057,7 +1082,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, if (is_decode_step) { int max_depth = s->col.max_nesting_depth; for (int idx = 0; idx < max_depth; idx++) { - PageNestingInfo* pni = &s->page.nesting[idx]; + PageNestingDecodeInfo* pndi = &s->pndi[idx]; size_t output_offset; // schemas without lists @@ -1066,21 +1091,21 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, } // for schemas with lists, we've already got the exact value precomputed else { - output_offset = pni->page_start_value; + output_offset = pndi->page_start_value; } - pni->data_out = static_cast(s->col.column_data_base[idx]); + pndi->data_out = static_cast(s->col.column_data_base[idx]); - if (pni->data_out != nullptr) { + if (pndi->data_out != nullptr) { // anything below max depth with a valid data pointer must be a list, so the // element size is the size of the offset type. uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len; - pni->data_out += (output_offset * len); + pndi->data_out += (output_offset * len); } - pni->valid_map = s->col.valid_map_base[idx]; - if (pni->valid_map != nullptr) { - pni->valid_map += output_offset >> 5; - pni->valid_map_offset = (int32_t)(output_offset & 0x1f); + pndi->valid_map = s->col.valid_map_base[idx]; + if (pndi->valid_map != nullptr) { + pndi->valid_map += output_offset >> 5; + pndi->valid_map_offset = (int32_t)(output_offset & 0x1f); } } } @@ -1198,26 +1223,26 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, * @brief Store a validity mask containing value_count bits into the output validity buffer of the * page. * - * @param[in,out] pni The page/nesting information to store the mask in. The validity map offset is + * @param[in,out] pndi The page/nesting information to store the mask in. The validity map offset is * also updated * @param[in] valid_mask The validity mask to be stored * @param[in] value_count # of bits in the validity mask */ -static __device__ void store_validity(PageNestingInfo* pni, +static __device__ void store_validity(PageNestingDecodeInfo* pndi, uint32_t valid_mask, int32_t value_count) { - int word_offset = pni->valid_map_offset / 32; - int bit_offset = pni->valid_map_offset % 32; + int word_offset = pndi->valid_map_offset / 32; + int bit_offset = pndi->valid_map_offset % 32; // if we fit entirely in the output word if (bit_offset + value_count <= 32) { auto relevant_mask = static_cast((static_cast(1) << value_count) - 1); if (relevant_mask == ~0) { - pni->valid_map[word_offset] = valid_mask; + pndi->valid_map[word_offset] = valid_mask; } else { - atomicAnd(pni->valid_map + word_offset, ~(relevant_mask << bit_offset)); - atomicOr(pni->valid_map + word_offset, (valid_mask & relevant_mask) << bit_offset); + atomicAnd(pndi->valid_map + word_offset, ~(relevant_mask << bit_offset)); + atomicOr(pndi->valid_map + word_offset, (valid_mask & relevant_mask) << bit_offset); } } // we're going to spill over into the next word. @@ -1231,17 +1256,17 @@ static __device__ void store_validity(PageNestingInfo* pni, // first word. strip bits_left bits off the beginning and store that uint32_t relevant_mask = ((1 << bits_left) - 1); uint32_t mask_word0 = valid_mask & relevant_mask; - atomicAnd(pni->valid_map + word_offset, ~(relevant_mask << bit_offset)); - atomicOr(pni->valid_map + word_offset, mask_word0 << bit_offset); + atomicAnd(pndi->valid_map + word_offset, ~(relevant_mask << bit_offset)); + atomicOr(pndi->valid_map + word_offset, mask_word0 << bit_offset); // second word. strip the remainder of the bits off the end and store that relevant_mask = ((1 << (value_count - bits_left)) - 1); uint32_t mask_word1 = valid_mask & (relevant_mask << bits_left); - atomicAnd(pni->valid_map + word_offset + 1, ~(relevant_mask)); - atomicOr(pni->valid_map + word_offset + 1, mask_word1 >> bits_left); + atomicAnd(pndi->valid_map + word_offset + 1, ~(relevant_mask)); + atomicOr(pndi->valid_map + word_offset + 1, mask_word1 >> bits_left); } - pni->valid_map_offset += value_count; + pndi->valid_map_offset += value_count; } /** @@ -1307,6 +1332,8 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // how many rows we've processed in the page so far int input_row_count = s->input_row_count; + PageNestingDecodeInfo* pndi_base = s->pndi; + // process until we've reached the target while (input_value_count < target_input_value_count) { // determine the nesting bounds for this thread (the range of nesting depths we @@ -1348,14 +1375,14 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // walk from 0 to max_depth uint32_t next_thread_value_count, next_warp_value_count; for (int s_idx = 0; s_idx < max_depth; s_idx++) { - PageNestingInfo* pni = &s->page.nesting[s_idx]; + PageNestingDecodeInfo* pndi = &pndi_base[s_idx]; // if we are within the range of nesting levels we should be adding value indices for int const in_nesting_bounds = ((s_idx >= start_depth && s_idx <= end_depth) && in_row_bounds) ? 1 : 0; // everything up to the max_def_level is a non-null value - uint32_t const is_valid = d >= pni->max_def_level && in_nesting_bounds ? 1 : 0; + uint32_t const is_valid = d >= pndi->max_def_level && in_nesting_bounds ? 1 : 0; // compute warp and thread valid counts uint32_t const warp_valid_mask = @@ -1376,8 +1403,8 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // if this is the value column emit an index for value decoding if (is_valid && s_idx == max_depth - 1) { - int const src_pos = pni->valid_count + thread_valid_count; - int const dst_pos = pni->value_count + thread_value_count; + int const src_pos = pndi->valid_count + thread_valid_count; + int const dst_pos = pndi->value_count + thread_value_count; // nz_idx is a mapping of src buffer indices to destination buffer indices s->nz_idx[rolling_index(src_pos)] = dst_pos; } @@ -1395,12 +1422,11 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // if we're -not- at a leaf column and we're within nesting/row bounds // and we have a valid data_out pointer, it implies this is a list column, so // emit an offset. - if (in_nesting_bounds && pni->data_out != nullptr) { - int const idx = pni->value_count + thread_value_count; - cudf::size_type const ofs = s->page.nesting[s_idx + 1].value_count + - next_thread_value_count + - s->page.nesting[s_idx + 1].page_start_value; - (reinterpret_cast(pni->data_out))[idx] = ofs; + if (in_nesting_bounds && pndi->data_out != nullptr) { + int const idx = pndi->value_count + thread_value_count; + cudf::size_type const ofs = pndi_base[s_idx + 1].value_count + next_thread_value_count + + pndi_base[s_idx + 1].page_start_value; + (reinterpret_cast(pndi->data_out))[idx] = ofs; } } @@ -1422,14 +1448,14 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // increment count of valid values, count of total values, and update validity mask if (!t) { - if (pni->valid_map != nullptr && warp_valid_mask_bit_count > 0) { + if (pndi->valid_map != nullptr && warp_valid_mask_bit_count > 0) { uint32_t const warp_output_valid_mask = warp_valid_mask >> first_thread_in_write_range; - store_validity(pni, warp_output_valid_mask, warp_valid_mask_bit_count); + store_validity(pndi, warp_output_valid_mask, warp_valid_mask_bit_count); - pni->null_count += warp_valid_mask_bit_count - __popc(warp_output_valid_mask); + pndi->null_count += warp_valid_mask_bit_count - __popc(warp_output_valid_mask); } - pni->valid_count += warp_valid_count; - pni->value_count += warp_value_count; + pndi->valid_count += warp_valid_count; + pndi->value_count += warp_value_count; } // propagate value counts for the next level @@ -1444,7 +1470,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // update if (!t) { // update valid value count for decoding and total # of values we've processed - s->nz_count = s->page.nesting[max_depth - 1].valid_count; + s->nz_count = pndi_base[max_depth - 1].valid_count; s->input_value_count = input_value_count; s->input_row_count = input_row_count; } @@ -1524,9 +1550,9 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, start_depth, end_depth, d, s, input_value_count, target_input_value_count, t); // count rows and leaf values - int const is_new_row = start_depth == 0 ? 1 : 0; - uint32_t const warp_row_count_mask = ballot(is_new_row); - int const is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; + int const is_new_row = start_depth == 0 ? 1 : 0; + uint32_t const warp_row_count_mask = ballot(is_new_row); + int const is_new_leaf = (d >= s->pndi[max_depth - 1].max_def_level) ? 1 : 0; uint32_t const warp_leaf_count_mask = ballot(is_new_leaf); // is this thread within row bounds? on the first pass we don't know the bounds, so we will be // computing the full size of the column. on the second pass, we will know our actual row @@ -1796,6 +1822,8 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; + PageNestingDecodeInfo* pni_base = s->pndi; + // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t skipped_leaf_values = s->page.skipped_leaf_values; while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { @@ -1863,8 +1891,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( int leaf_level_index = s->col.max_nesting_depth - 1; uint32_t dtype_len = s->dtype_len; - void* dst = - s->page.nesting[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + void* dst = pni_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; if (dtype == BYTE_ARRAY) { if (s->col.converted_type == DECIMAL) { auto const [ptr, len] = gpuGetStringData(s, val_src_pos); @@ -1919,6 +1946,17 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( } __syncthreads(); } + + // if we are using the nesting decode cache, copy null count back + if (s->pndi == s->nesting_decode) { + int d = 0; + while (d < s->page.num_nesting_levels) { + if (d + t < s->page.num_nesting_levels) { + s->page.nesting_decode[d + t].null_count = s->pndi[d + t].null_count; + } + d += blockDim.x; + } + } } } // anonymous namespace diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 9f55d9b4210..fb184d28f64 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -83,6 +83,24 @@ enum level_type { NUM_LEVEL_TYPES }; +struct PageNestingDecodeInfo { + // set up prior to decoding + int32_t max_def_level; + + // computed during preprocessing + int32_t page_start_value; + + // computed during decoding + int32_t null_count; + + // used internally during decoding + int32_t valid_map_offset; + int32_t valid_count; + int32_t value_count; + uint8_t* data_out; + uint32_t* valid_map; +}; + /** * @brief Nesting information */ @@ -92,25 +110,14 @@ struct PageNestingInfo { int32_t start_depth; int32_t end_depth; - // set at initialization - int32_t max_def_level; - int32_t max_rep_level; + // set at initialization (see start_offset_output_iterator in reader_impl_preprocess.cu) cudf::type_id type; // type of the corresponding cudf output column bool nullable; // set during preprocessing int32_t size; // this page/nesting-level's row count contribution to the output column, if fully // decoded - int32_t batch_size; // the size of the page for this batch - int32_t page_start_value; // absolute output start index in output column data - - // set during data decoding - int32_t valid_count; // # of valid values decoded in this page/nesting-level - int32_t value_count; // total # of values decoded in this page/nesting-level - int32_t null_count; // null count - int32_t valid_map_offset; // current offset in bits relative to valid_map - uint8_t* data_out; // pointer into output buffer - uint32_t* valid_map; // pointer into output validity buffer + int32_t batch_size; // the size of the page for this batch }; /** @@ -160,8 +167,9 @@ struct PageInfo { int32_t str_bytes; // nesting information (input/output) for each page - int num_nesting_levels; PageNestingInfo* nesting; + PageNestingDecodeInfo* nesting_decode; + int num_nesting_levels; }; /** @@ -252,6 +260,7 @@ struct file_intermediate_data { hostdevice_vector chunks{}; hostdevice_vector pages_info{}; hostdevice_vector page_nesting_info{}; + hostdevice_vector page_nesting_decode_info{}; }; /** diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index fb2a34bbcf2..d9d123ef899 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -24,9 +24,10 @@ namespace cudf::io::detail::parquet { void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) { - auto& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; - auto& page_nesting = _file_itm_data.page_nesting_info; + auto& chunks = _file_itm_data.chunks; + auto& pages = _file_itm_data.pages_info; + auto& page_nesting = _file_itm_data.page_nesting_info; + auto& page_nesting_decode = _file_itm_data.page_nesting_decode_info; // Should not reach here if there is no page data. CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -124,6 +125,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) pages.device_to_host(_stream); page_nesting.device_to_host(_stream); + page_nesting_decode.device_to_host(_stream); _stream.synchronize(); // for list columns, add the final offset to every offset buffer. @@ -166,8 +168,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) gpu::ColumnChunkDesc* col = &chunks[pi->chunk_idx]; input_column_info const& input_col = _input_columns[col->src_col_index]; - int index = pi->nesting - page_nesting.device_ptr(); - gpu::PageNestingInfo* pni = &page_nesting[index]; + int index = pi->nesting_decode - page_nesting_decode.device_ptr(); + gpu::PageNestingDecodeInfo* pndi = &page_nesting_decode[index]; auto* cols = &_output_buffers; for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { @@ -178,7 +180,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) if (chunk_nested_valids.host_ptr(chunk_offsets[pi->chunk_idx])[l_idx] == nullptr) { continue; } - out_buf.null_count() += pni[l_idx].null_count; + out_buf.null_count() += pndi[l_idx].null_count; } } diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 934ea98c7bb..13ae11afc70 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -523,9 +523,10 @@ void decode_page_headers(hostdevice_vector& chunks, void reader::impl::allocate_nesting_info() { - auto const& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; - auto& page_nesting_info = _file_itm_data.page_nesting_info; + auto const& chunks = _file_itm_data.chunks; + auto& pages = _file_itm_data.pages_info; + auto& page_nesting_info = _file_itm_data.page_nesting_info; + auto& page_nesting_decode_info = _file_itm_data.page_nesting_decode_info; // compute total # of page_nesting infos needed and allocate space. doing this in one // buffer to keep it to a single gpu allocation @@ -539,6 +540,8 @@ void reader::impl::allocate_nesting_info() }); page_nesting_info = hostdevice_vector{total_page_nesting_infos, _stream}; + page_nesting_decode_info = + hostdevice_vector{total_page_nesting_infos, _stream}; // retrieve from the gpu so we can update pages.device_to_host(_stream, true); @@ -556,6 +559,8 @@ void reader::impl::allocate_nesting_info() target_page_index += chunks[idx].num_dict_pages; for (int p_idx = 0; p_idx < chunks[idx].num_data_pages; p_idx++) { pages[target_page_index + p_idx].nesting = page_nesting_info.device_ptr() + src_info_index; + pages[target_page_index + p_idx].nesting_decode = + page_nesting_decode_info.device_ptr() + src_info_index; pages[target_page_index + p_idx].num_nesting_levels = per_page_nesting_info_size; src_info_index += per_page_nesting_info_size; @@ -599,6 +604,9 @@ void reader::impl::allocate_nesting_info() gpu::PageNestingInfo* pni = &page_nesting_info[nesting_info_index + (p_idx * per_page_nesting_info_size)]; + gpu::PageNestingDecodeInfo* pndi = + &page_nesting_decode_info[nesting_info_index + (p_idx * per_page_nesting_info_size)]; + // if we have lists, set our start and end depth remappings if (schema.max_repetition_level > 0) { auto remap = depth_remapping.find(src_col_schema); @@ -616,9 +624,8 @@ void reader::impl::allocate_nesting_info() } // values indexed by output column index - pni[cur_depth].max_def_level = cur_schema.max_definition_level; - pni[cur_depth].max_rep_level = cur_schema.max_repetition_level; - pni[cur_depth].size = 0; + pndi[cur_depth].max_def_level = cur_schema.max_definition_level; + pni[cur_depth].size = 0; pni[cur_depth].type = to_type_id(cur_schema, _strings_to_categorical, _timestamp_type.id()); pni[cur_depth].nullable = cur_schema.repetition_type == OPTIONAL; @@ -638,6 +645,7 @@ void reader::impl::allocate_nesting_info() // copy nesting info to the device page_nesting_info.host_to_device(_stream); + page_nesting_decode_info.host_to_device(_stream); } void reader::impl::load_and_decompress_data(std::vector const& row_groups_info, @@ -1252,7 +1260,7 @@ struct start_offset_output_iterator { if (p.src_col_schema != src_col_schema || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { return empty; } - return p.nesting[nesting_depth].page_start_value; + return p.nesting_decode[nesting_depth].page_start_value; } }; @@ -1625,4 +1633,4 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses } } -} // namespace cudf::io::detail::parquet +} // namespace cudf::io::detail::parquet \ No newline at end of file From fd81cca9e5c9050a7b1e1ac7fe3fc59b8d073014 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Wed, 18 Jan 2023 17:59:15 -0600 Subject: [PATCH 2/8] Move several fields used during the preprocess step into the decode cache. --- cpp/src/io/parquet/page_data.cu | 25 +++++++++++++------- cpp/src/io/parquet/parquet_gpu.hpp | 19 ++++++++++----- cpp/src/io/parquet/reader_impl_preprocess.cu | 4 ++-- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 5492d9c59d4..82806d2e828 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -51,7 +51,11 @@ constexpr int non_zero_buffer_size = block_size * 2; constexpr int rolling_index(int index) { return index & (non_zero_buffer_size - 1); } -constexpr int max_cacheable_nesting_decode_info = 8; +// the PageNestingDecodeInfo struct is 48 bytes (as of 1/18/23) so +// this cache costs 480 bytes per block. Columns nested more than 2 or 3 deep are exceedingly +// rare. If we encounter one that exceeds the below limit, the code falls back on the uncached +// PageNestingDecodeInfo stored in global memory. +constexpr int max_cacheable_nesting_decode_info = 10; struct page_state_s { const uint8_t* data_start; @@ -96,7 +100,7 @@ struct page_state_s { // a shared-memory cache of frequently used data when decoding. The source of this data is // normally stored in global memory which can yield poor performance. So, when possible // we copy that info here prior to decoding - PageNestingDecodeInfo nesting_decode[max_cacheable_nesting_decode_info]; + PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info]; // points to either nesting_decode above when possible, or to the global source otherwise PageNestingDecodeInfo* pndi; }; @@ -932,13 +936,16 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, while (d < s->page.num_nesting_levels) { if (d + t < s->page.num_nesting_levels) { // these values need to be copied over from global - s->nesting_decode[d + t].max_def_level = s->page.nesting_decode[d + t].max_def_level; - s->nesting_decode[d + t].page_start_value = s->page.nesting_decode[d + t].page_start_value; + s->nesting_decode_cache[d + t].max_def_level = s->page.nesting_decode[d + t].max_def_level; + s->nesting_decode_cache[d + t].page_start_value = + s->page.nesting_decode[d + t].page_start_value; + s->nesting_decode_cache[d + t].start_depth = s->page.nesting_decode[d + t].start_depth; + s->nesting_decode_cache[d + t].end_depth = s->page.nesting_decode[d + t].end_depth; } d += blockDim.x; } } - if (!t) { s->pndi = can_use_decode_cache ? s->nesting_decode : s->page.nesting_decode; } + if (!t) { s->pndi = can_use_decode_cache ? s->nesting_decode_cache : s->page.nesting_decode; } __syncthreads(); // zero counts @@ -1300,8 +1307,8 @@ inline __device__ void get_nesting_bounds(int& start_depth, // bound what nesting levels we apply values to if (s->col.max_level[level_type::REPETITION] > 0) { int r = s->rep[index]; - start_depth = s->page.nesting[r].start_depth; - end_depth = s->page.nesting[d].end_depth; + start_depth = s->pndi[r].start_depth; + end_depth = s->pndi[d].end_depth; } // for columns without repetition (even ones involving structs) we always // traverse the entire hierarchy. @@ -1948,11 +1955,11 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( } // if we are using the nesting decode cache, copy null count back - if (s->pndi == s->nesting_decode) { + if (s->pndi == s->nesting_decode_cache) { int d = 0; while (d < s->page.num_nesting_levels) { if (d + t < s->page.num_nesting_levels) { - s->page.nesting_decode[d + t].null_count = s->pndi[d + t].null_count; + s->page.nesting_decode[d + t].null_count = s->nesting_decode_cache[d + t].null_count; } d += blockDim.x; } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index fb184d28f64..fc4d6abd83e 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -83,9 +83,20 @@ enum level_type { NUM_LEVEL_TYPES }; +/** + * @brief Nesting information specifically needed by the decode and preprocessing + * kernels. + * + * This data is kept separate from PageNestingInfo to keep it as small as possible. + * It is used in a cached form in shared memory when possible. + */ struct PageNestingDecodeInfo { // set up prior to decoding int32_t max_def_level; + // input repetition/definition levels are remapped with these values + // into the corresponding real output nesting depths. + int32_t start_depth; + int32_t end_depth; // computed during preprocessing int32_t page_start_value; @@ -105,16 +116,12 @@ struct PageNestingDecodeInfo { * @brief Nesting information */ struct PageNestingInfo { - // input repetition/definition levels are remapped with these values - // into the corresponding real output nesting depths. - int32_t start_depth; - int32_t end_depth; - // set at initialization (see start_offset_output_iterator in reader_impl_preprocess.cu) cudf::type_id type; // type of the corresponding cudf output column bool nullable; - // set during preprocessing + // TODO: these fields might make sense to move into PageNestingDecodeInfo for memory performance + // reasons. int32_t size; // this page/nesting-level's row count contribution to the output column, if fully // decoded int32_t batch_size; // the size of the page for this batch diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 13ae11afc70..f383547d1f3 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -616,10 +616,10 @@ void reader::impl::allocate_nesting_info() std::vector const& def_depth_remap = (remap->second.second); for (size_t m = 0; m < rep_depth_remap.size(); m++) { - pni[m].start_depth = rep_depth_remap[m]; + pndi[m].start_depth = rep_depth_remap[m]; } for (size_t m = 0; m < def_depth_remap.size(); m++) { - pni[m].end_depth = def_depth_remap[m]; + pndi[m].end_depth = def_depth_remap[m]; } } From fe5ad35d8909ea98256d7ebf7fa594402c9267d8 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Thu, 19 Jan 2023 16:54:47 -0600 Subject: [PATCH 3/8] Code formatting changes. --- cpp/src/io/parquet/page_data.cu | 2 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 82806d2e828..78ac974d17e 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -98,7 +98,7 @@ struct page_state_s { int32_t row_index_lower_bound; // lower bound of row indices we should process // a shared-memory cache of frequently used data when decoding. The source of this data is - // normally stored in global memory which can yield poor performance. So, when possible + // normally stored in global memory which can yield poor performance. So, when possible // we copy that info here prior to decoding PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info]; // points to either nesting_decode above when possible, or to the global source otherwise diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index f383547d1f3..e870e9b8745 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1633,4 +1633,4 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses } } -} // namespace cudf::io::detail::parquet \ No newline at end of file +} // namespace cudf::io::detail::parquet From 8920e762b96dea8220cfecc629015e2ed63b2876 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Fri, 20 Jan 2023 10:01:09 -0600 Subject: [PATCH 4/8] Change computation of max decode cache size. --- cpp/src/io/parquet/page_data.cu | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 78ac974d17e..9f4cadcc952 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -51,11 +51,10 @@ constexpr int non_zero_buffer_size = block_size * 2; constexpr int rolling_index(int index) { return index & (non_zero_buffer_size - 1); } -// the PageNestingDecodeInfo struct is 48 bytes (as of 1/18/23) so -// this cache costs 480 bytes per block. Columns nested more than 2 or 3 deep are exceedingly -// rare. If we encounter one that exceeds the below limit, the code falls back on the uncached -// PageNestingDecodeInfo stored in global memory. -constexpr int max_cacheable_nesting_decode_info = 10; +// Use up to 512 bytes of shared memory as a cache for nesting information. +// As of 1/20/23, this gives us a max nesting depth of 10 (after which it falls back to +// global memory). This handles all but the most extreme cases. +constexpr int max_cacheable_nesting_decode_info = (512) / sizeof(PageNestingDecodeInfo); struct page_state_s { const uint8_t* data_start; From 24acc5010814694257bf4210101dda25622e1455 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Tue, 24 Jan 2023 13:48:02 -0600 Subject: [PATCH 5/8] PR review feedback. Cleaned up some errant int fields to be int32_t. Renamed PageNestingDecodeInfo::pndi to PageNestingDecodeInfo::nesting_info for clarity. Improved some variable names. --- cpp/src/io/parquet/page_data.cu | 186 +++++++++++++++-------------- cpp/src/io/parquet/parquet_gpu.hpp | 12 +- cpp/src/io/parquet/reader_impl.cpp | 2 +- 3 files changed, 106 insertions(+), 94 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 5f0822a32e5..5c5d491a98e 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -100,8 +100,8 @@ struct page_state_s { // normally stored in global memory which can yield poor performance. So, when possible // we copy that info here prior to decoding PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info]; - // points to either nesting_decode above when possible, or to the global source otherwise - PageNestingDecodeInfo* pndi; + // points to either nesting_decode_cache above when possible, or to the global source otherwise + PageNestingDecodeInfo* nesting_info; }; /** @@ -950,31 +950,38 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // if we can use the decode cache, set it up now auto const can_use_decode_cache = s->page.nesting_info_size <= max_cacheable_nesting_decode_info; if (can_use_decode_cache) { - int d = 0; - while (d < s->page.nesting_info_size) { - if (d + t < s->page.nesting_info_size) { + int depth = 0; + while (depth < s->page.nesting_info_size) { + int const thread_depth = depth + t; + if (thread_depth < s->page.nesting_info_size) { // these values need to be copied over from global - s->nesting_decode_cache[d + t].max_def_level = s->page.nesting_decode[d + t].max_def_level; - s->nesting_decode_cache[d + t].page_start_value = - s->page.nesting_decode[d + t].page_start_value; - s->nesting_decode_cache[d + t].start_depth = s->page.nesting_decode[d + t].start_depth; - s->nesting_decode_cache[d + t].end_depth = s->page.nesting_decode[d + t].end_depth; + s->nesting_decode_cache[thread_depth].max_def_level = + s->page.nesting_decode[thread_depth].max_def_level; + s->nesting_decode_cache[thread_depth].page_start_value = + s->page.nesting_decode[thread_depth].page_start_value; + s->nesting_decode_cache[thread_depth].start_depth = + s->page.nesting_decode[thread_depth].start_depth; + s->nesting_decode_cache[thread_depth].end_depth = + s->page.nesting_decode[thread_depth].end_depth; } - d += blockDim.x; + depth += blockDim.x; } } - if (!t) { s->pndi = can_use_decode_cache ? s->nesting_decode_cache : s->page.nesting_decode; } + if (!t) { + s->nesting_info = can_use_decode_cache ? s->nesting_decode_cache : s->page.nesting_decode; + } __syncthreads(); // zero counts - int d = 0; - while (d < s->page.num_output_nesting_levels) { - if (d + t < s->page.num_output_nesting_levels) { - s->pndi[d + t].valid_count = 0; - s->pndi[d + t].value_count = 0; - s->pndi[d + t].null_count = 0; + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + int const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { + s->nesting_info[thread_depth].valid_count = 0; + s->nesting_info[thread_depth].value_count = 0; + s->nesting_info[thread_depth].null_count = 0; } - d += blockDim.x; + depth += blockDim.x; } __syncthreads(); @@ -1107,7 +1114,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, if (is_decode_step) { int max_depth = s->col.max_nesting_depth; for (int idx = 0; idx < max_depth; idx++) { - PageNestingDecodeInfo* pndi = &s->pndi[idx]; + PageNestingDecodeInfo* nesting_info = &s->nesting_info[idx]; size_t output_offset; // schemas without lists @@ -1116,21 +1123,21 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, } // for schemas with lists, we've already got the exact value precomputed else { - output_offset = pndi->page_start_value; + output_offset = nesting_info->page_start_value; } - pndi->data_out = static_cast(s->col.column_data_base[idx]); + nesting_info->data_out = static_cast(s->col.column_data_base[idx]); - if (pndi->data_out != nullptr) { + if (nesting_info->data_out != nullptr) { // anything below max depth with a valid data pointer must be a list, so the // element size is the size of the offset type. uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len; - pndi->data_out += (output_offset * len); + nesting_info->data_out += (output_offset * len); } - pndi->valid_map = s->col.valid_map_base[idx]; - if (pndi->valid_map != nullptr) { - pndi->valid_map += output_offset >> 5; - pndi->valid_map_offset = (int32_t)(output_offset & 0x1f); + nesting_info->valid_map = s->col.valid_map_base[idx]; + if (nesting_info->valid_map != nullptr) { + nesting_info->valid_map += output_offset >> 5; + nesting_info->valid_map_offset = (int32_t)(output_offset & 0x1f); } } } @@ -1248,26 +1255,26 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, * @brief Store a validity mask containing value_count bits into the output validity buffer of the * page. * - * @param[in,out] pndi The page/nesting information to store the mask in. The validity map offset is - * also updated + * @param[in,out] nesting_info The page/nesting information to store the mask in. The validity map + * offset is also updated * @param[in] valid_mask The validity mask to be stored * @param[in] value_count # of bits in the validity mask */ -static __device__ void store_validity(PageNestingDecodeInfo* pndi, +static __device__ void store_validity(PageNestingDecodeInfo* nesting_info, uint32_t valid_mask, int32_t value_count) { - int word_offset = pndi->valid_map_offset / 32; - int bit_offset = pndi->valid_map_offset % 32; + int word_offset = nesting_info->valid_map_offset / 32; + int bit_offset = nesting_info->valid_map_offset % 32; // if we fit entirely in the output word if (bit_offset + value_count <= 32) { auto relevant_mask = static_cast((static_cast(1) << value_count) - 1); if (relevant_mask == ~0) { - pndi->valid_map[word_offset] = valid_mask; + nesting_info->valid_map[word_offset] = valid_mask; } else { - atomicAnd(pndi->valid_map + word_offset, ~(relevant_mask << bit_offset)); - atomicOr(pndi->valid_map + word_offset, (valid_mask & relevant_mask) << bit_offset); + atomicAnd(nesting_info->valid_map + word_offset, ~(relevant_mask << bit_offset)); + atomicOr(nesting_info->valid_map + word_offset, (valid_mask & relevant_mask) << bit_offset); } } // we're going to spill over into the next word. @@ -1281,17 +1288,17 @@ static __device__ void store_validity(PageNestingDecodeInfo* pndi, // first word. strip bits_left bits off the beginning and store that uint32_t relevant_mask = ((1 << bits_left) - 1); uint32_t mask_word0 = valid_mask & relevant_mask; - atomicAnd(pndi->valid_map + word_offset, ~(relevant_mask << bit_offset)); - atomicOr(pndi->valid_map + word_offset, mask_word0 << bit_offset); + atomicAnd(nesting_info->valid_map + word_offset, ~(relevant_mask << bit_offset)); + atomicOr(nesting_info->valid_map + word_offset, mask_word0 << bit_offset); // second word. strip the remainder of the bits off the end and store that relevant_mask = ((1 << (value_count - bits_left)) - 1); uint32_t mask_word1 = valid_mask & (relevant_mask << bits_left); - atomicAnd(pndi->valid_map + word_offset + 1, ~(relevant_mask)); - atomicOr(pndi->valid_map + word_offset + 1, mask_word1 >> bits_left); + atomicAnd(nesting_info->valid_map + word_offset + 1, ~(relevant_mask)); + atomicOr(nesting_info->valid_map + word_offset + 1, mask_word1 >> bits_left); } - pndi->valid_map_offset += value_count; + nesting_info->valid_map_offset += value_count; } /** @@ -1325,8 +1332,8 @@ inline __device__ void get_nesting_bounds(int& start_depth, // bound what nesting levels we apply values to if (s->col.max_level[level_type::REPETITION] > 0) { int r = s->rep[index]; - start_depth = s->pndi[r].start_depth; - end_depth = s->pndi[d].end_depth; + start_depth = s->nesting_info[r].start_depth; + end_depth = s->nesting_info[d].end_depth; } // for columns without repetition (even ones involving structs) we always // traverse the entire hierarchy. @@ -1357,7 +1364,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // how many rows we've processed in the page so far int input_row_count = s->input_row_count; - PageNestingDecodeInfo* pndi_base = s->pndi; + PageNestingDecodeInfo* nesting_info_base = s->nesting_info; // process until we've reached the target while (input_value_count < target_input_value_count) { @@ -1400,14 +1407,14 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // walk from 0 to max_depth uint32_t next_thread_value_count, next_warp_value_count; for (int s_idx = 0; s_idx < max_depth; s_idx++) { - PageNestingDecodeInfo* pndi = &pndi_base[s_idx]; + PageNestingDecodeInfo* nesting_info = &nesting_info_base[s_idx]; // if we are within the range of nesting levels we should be adding value indices for int const in_nesting_bounds = ((s_idx >= start_depth && s_idx <= end_depth) && in_row_bounds) ? 1 : 0; // everything up to the max_def_level is a non-null value - uint32_t const is_valid = d >= pndi->max_def_level && in_nesting_bounds ? 1 : 0; + uint32_t const is_valid = d >= nesting_info->max_def_level && in_nesting_bounds ? 1 : 0; // compute warp and thread valid counts uint32_t const warp_valid_mask = @@ -1428,8 +1435,8 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // if this is the value column emit an index for value decoding if (is_valid && s_idx == max_depth - 1) { - int const src_pos = pndi->valid_count + thread_valid_count; - int const dst_pos = pndi->value_count + thread_value_count; + int const src_pos = nesting_info->valid_count + thread_valid_count; + int const dst_pos = nesting_info->value_count + thread_value_count; // nz_idx is a mapping of src buffer indices to destination buffer indices s->nz_idx[rolling_index(src_pos)] = dst_pos; } @@ -1447,11 +1454,12 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // if we're -not- at a leaf column and we're within nesting/row bounds // and we have a valid data_out pointer, it implies this is a list column, so // emit an offset. - if (in_nesting_bounds && pndi->data_out != nullptr) { - int const idx = pndi->value_count + thread_value_count; - cudf::size_type const ofs = pndi_base[s_idx + 1].value_count + next_thread_value_count + - pndi_base[s_idx + 1].page_start_value; - (reinterpret_cast(pndi->data_out))[idx] = ofs; + if (in_nesting_bounds && nesting_info->data_out != nullptr) { + int const idx = nesting_info->value_count + thread_value_count; + cudf::size_type const ofs = nesting_info_base[s_idx + 1].value_count + + next_thread_value_count + + nesting_info_base[s_idx + 1].page_start_value; + (reinterpret_cast(nesting_info->data_out))[idx] = ofs; } } @@ -1473,14 +1481,14 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // increment count of valid values, count of total values, and update validity mask if (!t) { - if (pndi->valid_map != nullptr && warp_valid_mask_bit_count > 0) { + if (nesting_info->valid_map != nullptr && warp_valid_mask_bit_count > 0) { uint32_t const warp_output_valid_mask = warp_valid_mask >> first_thread_in_write_range; - store_validity(pndi, warp_output_valid_mask, warp_valid_mask_bit_count); + store_validity(nesting_info, warp_output_valid_mask, warp_valid_mask_bit_count); - pndi->null_count += warp_valid_mask_bit_count - __popc(warp_output_valid_mask); + nesting_info->null_count += warp_valid_mask_bit_count - __popc(warp_output_valid_mask); } - pndi->valid_count += warp_valid_count; - pndi->value_count += warp_value_count; + nesting_info->valid_count += warp_valid_count; + nesting_info->value_count += warp_value_count; } // propagate value counts for the next level @@ -1495,7 +1503,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // update if (!t) { // update valid value count for decoding and total # of values we've processed - s->nz_count = pndi_base[max_depth - 1].valid_count; + s->nz_count = nesting_info_base[max_depth - 1].valid_count; s->input_value_count = input_value_count; s->input_row_count = input_row_count; } @@ -1575,9 +1583,9 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, start_depth, end_depth, d, s, input_value_count, target_input_value_count, t); // count rows and leaf values - int const is_new_row = start_depth == 0 ? 1 : 0; - uint32_t const warp_row_count_mask = ballot(is_new_row); - int const is_new_leaf = (d >= s->pndi[max_depth - 1].max_def_level) ? 1 : 0; + int const is_new_row = start_depth == 0 ? 1 : 0; + uint32_t const warp_row_count_mask = ballot(is_new_row); + int const is_new_leaf = (d >= s->nesting_info[max_depth - 1].max_def_level) ? 1 : 0; uint32_t const warp_leaf_count_mask = ballot(is_new_leaf); // is this thread within row bounds? on the first pass we don't know the bounds, so we will be // computing the full size of the column. on the second pass, we will know our actual row @@ -1705,14 +1713,14 @@ __global__ void __launch_bounds__(block_size) // to do the expensive work of traversing the level data to determine sizes. we can just compute // it directly. if (!has_repetition && !compute_string_sizes) { - int d = 0; - while (d < s->page.num_output_nesting_levels) { - auto const i = d + t; - if (i < s->page.num_output_nesting_levels) { - if (is_base_pass) { pp->nesting[i].size = pp->num_input_values; } - pp->nesting[i].batch_size = pp->num_input_values; + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + auto const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { + if (is_base_pass) { pp->nesting[thread_depth].size = pp->num_input_values; } + pp->nesting[thread_depth].batch_size = pp->num_input_values; } - d += blockDim.x; + depth += blockDim.x; } return; } @@ -1720,25 +1728,29 @@ __global__ void __launch_bounds__(block_size) // in the trim pass, for anything with lists, we only need to fully process bounding pages (those // at the beginning or the end of the row bounds) if (!is_base_pass && !is_bounds_page(s, min_row, num_rows)) { - int d = 0; - while (d < s->page.num_output_nesting_levels) { - auto const i = d + t; - if (i < s->page.num_output_nesting_levels) { + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + auto const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { // if we are not a bounding page (as checked above) then we are either // returning 0 rows from the page (completely outside the bounds) or all // rows in the page (completely within the bounds) - pp->nesting[i].batch_size = s->num_rows == 0 ? 0 : pp->nesting[i].size; + pp->nesting[thread_depth].batch_size = + s->num_rows == 0 ? 0 : pp->nesting[thread_depth].size; } - d += blockDim.x; + depth += blockDim.x; } return; } // zero sizes - int d = 0; - while (d < s->page.num_output_nesting_levels) { - if (d + t < s->page.num_output_nesting_levels) { s->page.nesting[d + t].batch_size = 0; } - d += blockDim.x; + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + auto const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { + s->page.nesting[thread_depth].batch_size = 0; + } + depth += blockDim.x; } __syncthreads(); @@ -1786,13 +1798,13 @@ __global__ void __launch_bounds__(block_size) if (!t) { pp->num_rows = s->page.nesting[0].batch_size; } // store off this batch size as the "full" size - int d = 0; - while (d < s->page.num_output_nesting_levels) { - auto const i = d + t; - if (i < s->page.num_output_nesting_levels) { - pp->nesting[i].size = pp->nesting[i].batch_size; + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + auto const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { + pp->nesting[thread_depth].size = pp->nesting[thread_depth].batch_size; } - d += blockDim.x; + depth += blockDim.x; } } @@ -1840,7 +1852,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( ((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32; } - PageNestingDecodeInfo* pni_base = s->pndi; + PageNestingDecodeInfo* pni_base = s->nesting_info; // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t skipped_leaf_values = s->page.skipped_leaf_values; @@ -1966,7 +1978,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( } // if we are using the nesting decode cache, copy null count back - if (s->pndi == s->nesting_decode_cache) { + if (s->nesting_info == s->nesting_decode_cache) { int d = 0; while (d < s->page.num_output_nesting_levels) { if (d + t < s->page.num_output_nesting_levels) { diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index d24cf693216..360c65353ee 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -109,7 +109,7 @@ struct PageNestingDecodeInfo { int32_t valid_count; int32_t value_count; uint8_t* data_out; - uint32_t* valid_map; + bitmask_type* valid_map; }; /** @@ -173,9 +173,9 @@ struct PageInfo { // skipped_leaf_values will always be 0. // // # of values skipped in the repetition/definition level stream - int skipped_values; + int32_t skipped_values; // # of values skipped in the actual data stream. - int skipped_leaf_values; + int32_t skipped_leaf_values; // for string columns only, the size of all the chars in the string for // this page. only valid/computed during the base preprocess pass int32_t str_bytes; @@ -184,8 +184,8 @@ struct PageInfo { // input column nesting information, output column nesting information and // mappings between the two. the length of the array, nesting_info_size is // max(num_output_nesting_levels, max_definition_levels + 1) - int num_output_nesting_levels; - int nesting_info_size; + int32_t num_output_nesting_levels; + int32_t nesting_info_size; PageNestingInfo* nesting; PageNestingDecodeInfo* nesting_decode; }; @@ -257,7 +257,7 @@ struct ColumnChunkDesc { PageInfo* page_info; // output page info for up to num_dict_pages + // num_data_pages (dictionary pages first) string_index_pair* str_dict_index; // index for string dictionary - uint32_t** valid_map_base; // base pointers of valid bit map for this column + bitmask_type** valid_map_base; // base pointers of valid bit map for this column void** column_data_base; // base pointers of column data int8_t codec; // compressed codec enum int8_t converted_type; // converted type enum diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 4163d4e8ebd..1c5c89992ac 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -40,7 +40,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) // In order to reduce the number of allocations of hostdevice_vector, we allocate a single vector // to store all per-chunk pointers to nested data/nullmask. `chunk_offsets[i]` will store the // offset into `chunk_nested_data`/`chunk_nested_valids` for the array of pointers for chunk `i` - auto chunk_nested_valids = hostdevice_vector(sum_max_depths, _stream); + auto chunk_nested_valids = hostdevice_vector(sum_max_depths, _stream); auto chunk_nested_data = hostdevice_vector(sum_max_depths, _stream); auto chunk_offsets = std::vector(); From 7ab44b60070f2398fa4fcda0d49cc7580cd126a0 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Wed, 25 Jan 2023 10:27:20 -0600 Subject: [PATCH 6/8] Some more variable name cleanup. --- cpp/src/io/parquet/page_data.cu | 5 +++-- cpp/src/io/parquet/reader_impl_preprocess.cu | 10 +++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 5c5d491a98e..e70cb112d00 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -1852,7 +1852,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( ((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32; } - PageNestingDecodeInfo* pni_base = s->nesting_info; + PageNestingDecodeInfo* nesting_info_base = s->nesting_info; // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t skipped_leaf_values = s->page.skipped_leaf_values; @@ -1921,7 +1921,8 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( int leaf_level_index = s->col.max_nesting_depth - 1; uint32_t dtype_len = s->dtype_len; - void* dst = pni_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + void* dst = + nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; if (dtype == BYTE_ARRAY) { if (s->col.converted_type == DECIMAL) { auto const [ptr, len] = gpuGetStringData(s, val_src_pos); diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index dbfdf90916e..6577a1a3f0f 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -607,7 +607,7 @@ void reader::impl::allocate_nesting_info() gpu::PageNestingInfo* pni = &page_nesting_info[nesting_info_index + (p_idx * per_page_nesting_info_size)]; - gpu::PageNestingDecodeInfo* pndi = + gpu::PageNestingDecodeInfo* nesting_info = &page_nesting_decode_info[nesting_info_index + (p_idx * per_page_nesting_info_size)]; // if we have lists, set our start and end depth remappings @@ -619,16 +619,16 @@ void reader::impl::allocate_nesting_info() std::vector const& def_depth_remap = (remap->second.second); for (size_t m = 0; m < rep_depth_remap.size(); m++) { - pndi[m].start_depth = rep_depth_remap[m]; + nesting_info[m].start_depth = rep_depth_remap[m]; } for (size_t m = 0; m < def_depth_remap.size(); m++) { - pndi[m].end_depth = def_depth_remap[m]; + nesting_info[m].end_depth = def_depth_remap[m]; } } // values indexed by output column index - pndi[cur_depth].max_def_level = cur_schema.max_definition_level; - pni[cur_depth].size = 0; + nesting_info[cur_depth].max_def_level = cur_schema.max_definition_level; + pni[cur_depth].size = 0; pni[cur_depth].type = to_type_id(cur_schema, _strings_to_categorical, _timestamp_type.id()); pni[cur_depth].nullable = cur_schema.repetition_type == OPTIONAL; From cf691e0b306f58e1874b43d0e9534767a6f1644a Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Wed, 25 Jan 2023 11:42:52 -0600 Subject: [PATCH 7/8] Add a test which specifically forces the parquet reader to go past the nesting decode cache limit. --- cpp/src/io/parquet/page_data.cu | 5 ---- cpp/src/io/parquet/parquet_gpu.hpp | 5 ++++ cpp/tests/io/parquet_test.cpp | 44 ++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index e70cb112d00..4fafad4350b 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -51,11 +51,6 @@ constexpr int non_zero_buffer_size = block_size * 2; constexpr int rolling_index(int index) { return index & (non_zero_buffer_size - 1); } -// Use up to 512 bytes of shared memory as a cache for nesting information. -// As of 1/20/23, this gives us a max nesting depth of 10 (after which it falls back to -// global memory). This handles all but the most extreme cases. -constexpr int max_cacheable_nesting_decode_info = (512) / sizeof(PageNestingDecodeInfo); - struct page_state_s { const uint8_t* data_start; const uint8_t* data_end; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 360c65353ee..9b156745e41 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -112,6 +112,11 @@ struct PageNestingDecodeInfo { bitmask_type* valid_map; }; +// Use up to 512 bytes of shared memory as a cache for nesting information. +// As of 1/20/23, this gives us a max nesting depth of 10 (after which it falls back to +// global memory). This handles all but the most extreme cases. +constexpr int max_cacheable_nesting_decode_info = (512) / sizeof(PageNestingDecodeInfo); + /** * @brief Nesting information * diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 2cd6e49d7bb..1017716c69b 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -38,6 +38,7 @@ #include #include +#include #include @@ -3638,6 +3639,49 @@ TEST_F(ParquetWriterTest, DeepEmptyList) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->view().column(0), *L0); } +TEST_F(ParquetWriterTest, NestingOptimizationTest) +{ + // test nesting levels > cudf::io::parquet::gpu::max_cacheable_nesting_decode_info deep. + constexpr cudf::size_type num_nesting_levels = 16; + static_assert(num_nesting_levels > cudf::io::parquet::gpu::max_cacheable_nesting_decode_info); + constexpr cudf::size_type rows_per_level = 2; + + constexpr cudf::size_type num_values = (1 << num_nesting_levels) * rows_per_level; + auto value_iter = thrust::make_counting_iterator(0); + auto validity = + cudf::detail::make_counting_transform_iterator(0, [](cudf::size_type i) { return i % 2; }); + cudf::test::fixed_width_column_wrapper values(value_iter, value_iter + num_values, validity); + + // ~256k values with num_nesting_levels = 16 + int total_values_produced = num_values; + auto prev_col = values.release(); + for (int idx = 0; idx < num_nesting_levels; idx++) { + auto const depth = num_nesting_levels - idx; + auto const num_rows = (1 << (num_nesting_levels - idx)); + + auto offsets_iter = cudf::detail::make_counting_transform_iterator( + 0, [depth, rows_per_level](cudf::size_type i) { return i * rows_per_level; }); + total_values_produced += (num_rows + 1); + + cudf::test::fixed_width_column_wrapper offsets(offsets_iter, + offsets_iter + num_rows + 1); + auto c = cudf::make_lists_column(num_rows, offsets.release(), std::move(prev_col), 0, {}); + prev_col = std::move(c); + } + auto const& expect = prev_col; + + auto filepath = temp_env->get_temp_filepath("NestingDecodeCache.parquet"); + cudf::io::parquet_writer_options opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table_view{{*expect}}); + cudf::io::write_parquet(opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expect, result.tbl->get_column(0)); +} + TEST_F(ParquetWriterTest, EmptyListWithStruct) { auto L2 = cudf::make_lists_column(0, From 6d269a95d7d4dc42bb88c6dc430bfe44c4ce0d2b Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Wed, 25 Jan 2023 11:55:03 -0600 Subject: [PATCH 8/8] Move NestingOptimizationTest from ParquetWriterTest to ParquetReaderTest. Cleaned up some more variable names. --- cpp/src/io/parquet/page_data.cu | 12 +++-- cpp/tests/io/parquet_test.cpp | 86 ++++++++++++++++----------------- 2 files changed, 50 insertions(+), 48 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 4fafad4350b..23d130e1585 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -1975,12 +1975,14 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( // if we are using the nesting decode cache, copy null count back if (s->nesting_info == s->nesting_decode_cache) { - int d = 0; - while (d < s->page.num_output_nesting_levels) { - if (d + t < s->page.num_output_nesting_levels) { - s->page.nesting_decode[d + t].null_count = s->nesting_decode_cache[d + t].null_count; + int depth = 0; + while (depth < s->page.num_output_nesting_levels) { + int const thread_depth = depth + t; + if (thread_depth < s->page.num_output_nesting_levels) { + s->page.nesting_decode[thread_depth].null_count = + s->nesting_decode_cache[thread_depth].null_count; } - d += blockDim.x; + depth += blockDim.x; } } } diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 1017716c69b..21752196430 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -3639,49 +3639,6 @@ TEST_F(ParquetWriterTest, DeepEmptyList) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->view().column(0), *L0); } -TEST_F(ParquetWriterTest, NestingOptimizationTest) -{ - // test nesting levels > cudf::io::parquet::gpu::max_cacheable_nesting_decode_info deep. - constexpr cudf::size_type num_nesting_levels = 16; - static_assert(num_nesting_levels > cudf::io::parquet::gpu::max_cacheable_nesting_decode_info); - constexpr cudf::size_type rows_per_level = 2; - - constexpr cudf::size_type num_values = (1 << num_nesting_levels) * rows_per_level; - auto value_iter = thrust::make_counting_iterator(0); - auto validity = - cudf::detail::make_counting_transform_iterator(0, [](cudf::size_type i) { return i % 2; }); - cudf::test::fixed_width_column_wrapper values(value_iter, value_iter + num_values, validity); - - // ~256k values with num_nesting_levels = 16 - int total_values_produced = num_values; - auto prev_col = values.release(); - for (int idx = 0; idx < num_nesting_levels; idx++) { - auto const depth = num_nesting_levels - idx; - auto const num_rows = (1 << (num_nesting_levels - idx)); - - auto offsets_iter = cudf::detail::make_counting_transform_iterator( - 0, [depth, rows_per_level](cudf::size_type i) { return i * rows_per_level; }); - total_values_produced += (num_rows + 1); - - cudf::test::fixed_width_column_wrapper offsets(offsets_iter, - offsets_iter + num_rows + 1); - auto c = cudf::make_lists_column(num_rows, offsets.release(), std::move(prev_col), 0, {}); - prev_col = std::move(c); - } - auto const& expect = prev_col; - - auto filepath = temp_env->get_temp_filepath("NestingDecodeCache.parquet"); - cudf::io::parquet_writer_options opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table_view{{*expect}}); - cudf::io::write_parquet(opts); - - cudf::io::parquet_reader_options in_opts = - cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); - auto result = cudf::io::read_parquet(in_opts); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expect, result.tbl->get_column(0)); -} - TEST_F(ParquetWriterTest, EmptyListWithStruct) { auto L2 = cudf::make_lists_column(0, @@ -4870,6 +4827,49 @@ TEST_F(ParquetReaderTest, StructByteArray) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_F(ParquetReaderTest, NestingOptimizationTest) +{ + // test nesting levels > cudf::io::parquet::gpu::max_cacheable_nesting_decode_info deep. + constexpr cudf::size_type num_nesting_levels = 16; + static_assert(num_nesting_levels > cudf::io::parquet::gpu::max_cacheable_nesting_decode_info); + constexpr cudf::size_type rows_per_level = 2; + + constexpr cudf::size_type num_values = (1 << num_nesting_levels) * rows_per_level; + auto value_iter = thrust::make_counting_iterator(0); + auto validity = + cudf::detail::make_counting_transform_iterator(0, [](cudf::size_type i) { return i % 2; }); + cudf::test::fixed_width_column_wrapper values(value_iter, value_iter + num_values, validity); + + // ~256k values with num_nesting_levels = 16 + int total_values_produced = num_values; + auto prev_col = values.release(); + for (int idx = 0; idx < num_nesting_levels; idx++) { + auto const depth = num_nesting_levels - idx; + auto const num_rows = (1 << (num_nesting_levels - idx)); + + auto offsets_iter = cudf::detail::make_counting_transform_iterator( + 0, [depth, rows_per_level](cudf::size_type i) { return i * rows_per_level; }); + total_values_produced += (num_rows + 1); + + cudf::test::fixed_width_column_wrapper offsets(offsets_iter, + offsets_iter + num_rows + 1); + auto c = cudf::make_lists_column(num_rows, offsets.release(), std::move(prev_col), 0, {}); + prev_col = std::move(c); + } + auto const& expect = prev_col; + + auto filepath = temp_env->get_temp_filepath("NestingDecodeCache.parquet"); + cudf::io::parquet_writer_options opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table_view{{*expect}}); + cudf::io::write_parquet(opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expect, result.tbl->get_column(0)); +} + TEST_F(ParquetWriterTest, SingleValueDictionaryTest) { constexpr unsigned int expected_bits = 1;