diff --git a/cpp/src/io/parquet/decode_preprocess.cu b/cpp/src/io/parquet/decode_preprocess.cu index e49801e6172..62f1ee88036 100644 --- a/cpp/src/io/parquet/decode_preprocess.cu +++ b/cpp/src/io/parquet/decode_preprocess.cu @@ -26,6 +26,8 @@ namespace cudf::io::parquet::detail { +namespace cg = cooperative_groups; + namespace { // # of threads we're decoding with @@ -163,7 +165,8 @@ __device__ size_type gpuDecodeTotalPageStringSize(page_state_s* s, int t) // For V1, the choice is an overestimate (s->dict_size), or an exact number that's // expensive to compute. For now we're going with the latter. else { - str_len = gpuInitStringDescriptors(s, nullptr, target_pos, t); + str_len = gpuInitStringDescriptors( + s, nullptr, target_pos, cg::this_thread_block()); } break; diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 7207173b82f..e0d50d7ccf9 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -23,6 +23,8 @@ namespace cudf::io::parquet::detail { +namespace cg = cooperative_groups; + namespace { constexpr int decode_block_size = 128; @@ -277,6 +279,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } // this needs to be here to prevent warp 3 modifying src_pos before all threads have read it __syncthreads(); + auto const tile_warp = cg::tiled_partition(cg::this_thread_block()); if (t < 32) { // decode repetition and definition levels. // - update validity vectors @@ -298,9 +301,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f); } else if (s->col.physical_type == BYTE_ARRAY or s->col.physical_type == FIXED_LEN_BYTE_ARRAY) { - gpuInitStringDescriptors(s, sb, src_target_pos, t & 0x1f); + gpuInitStringDescriptors(s, sb, src_target_pos, tile_warp); } - if (t == 32) { s->dict_pos = src_target_pos; } + if (tile_warp.thread_rank() == 0) { s->dict_pos = src_target_pos; } } else { // WARP1..WARP3: Decode values int const dtype = s->col.physical_type; diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index b1f8e6dd5fe..a3f91f6859b 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -21,6 +21,7 @@ #include "parquet_gpu.hpp" #include "rle_stream.cuh" +#include #include #include @@ -420,46 +421,62 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s* s, state_buf* sb, int t * @param[in,out] s Page state input/output * @param[out] sb Page state buffer output * @param[in] target_pos Target output position - * @param[in] t Thread ID + * @param[in] g Cooperative group (thread block or tile) * @tparam sizes_only True if only sizes are to be calculated * @tparam state_buf Typename of the `state_buf` (usually inferred) + * @tparam thread_group Typename of the cooperative group (inferred) * * @return Total length of strings processed */ -template -__device__ size_type -gpuInitStringDescriptors(page_state_s* s, [[maybe_unused]] state_buf* sb, int target_pos, int t) +template +__device__ size_type gpuInitStringDescriptors(page_state_s* s, + [[maybe_unused]] state_buf* sb, + int target_pos, + thread_group const& g) { - int pos = s->dict_pos; - int total_len = 0; + int const t = g.thread_rank(); + int const dict_size = s->dict_size; + int k = s->dict_val; + int pos = s->dict_pos; + int total_len = 0; + + // All group threads can participate for fixed len byte arrays. + if (s->col.physical_type == FIXED_LEN_BYTE_ARRAY) { + int const dtype_len_in = s->dtype_len_in; + total_len = min((target_pos - pos) * dtype_len_in, dict_size - s->dict_val); + if constexpr (!sizes_only) { + for (pos += t, k += t * dtype_len_in; pos < target_pos; pos += g.size()) { + sb->str_len[rolling_index(pos)] = + (k < dict_size) ? dtype_len_in : 0; + // dict_idx is upperbounded by dict_size. + sb->dict_idx[rolling_index(pos)] = k; + // Increment k if needed. + if (k < dict_size) { k = min(k + (g.size() * dtype_len_in), dict_size); } + } + } + // Only thread_rank = 0 updates the s->dict_val + if (!t) { s->dict_val += total_len; } + } + // This step is purely serial for byte arrays + else { + if (!t) { + uint8_t const* cur = s->data_start; - // This step is purely serial - if (!t) { - uint8_t const* cur = s->data_start; - int dict_size = s->dict_size; - int k = s->dict_val; - - while (pos < target_pos) { - int len = 0; - if (s->col.physical_type == FIXED_LEN_BYTE_ARRAY) { - if (k < dict_size) { len = s->dtype_len_in; } - } else { + for (int len = 0; pos < target_pos; pos++, len = 0) { if (k + 4 <= dict_size) { len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24); k += 4; if (k + len > dict_size) { len = 0; } } + if constexpr (!sizes_only) { + sb->dict_idx[rolling_index(pos)] = k; + sb->str_len[rolling_index(pos)] = len; + } + k += len; + total_len += len; } - if constexpr (!sizes_only) { - sb->dict_idx[rolling_index(pos)] = k; - sb->str_len[rolling_index(pos)] = len; - } - k += len; - total_len += len; - pos++; + s->dict_val = k; } - s->dict_val = k; - __threadfence_block(); } return total_len; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 58e8a09d5b6..ca74a1c2ba0 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -31,6 +31,8 @@ namespace cudf::io::parquet::detail { +namespace cg = cooperative_groups; + namespace { constexpr int preprocess_block_size = 512; @@ -1006,6 +1008,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } // this needs to be here to prevent warp 1/2 modifying src_pos before all threads have read it __syncthreads(); + + // Create a warp sized thread block tile + auto const tile_warp = cg::tiled_partition(cg::this_thread_block()); + if (t < 32) { // decode repetition and definition levels. // - update validity vectors @@ -1020,9 +1026,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) if (s->dict_base) { src_target_pos = gpuDecodeDictionaryIndices(s, sb, src_target_pos, lane_id).first; } else { - gpuInitStringDescriptors(s, sb, src_target_pos, lane_id); + gpuInitStringDescriptors(s, sb, src_target_pos, tile_warp); } - if (t == 32) { s->dict_pos = src_target_pos; } + if (tile_warp.thread_rank() == 0) { s->dict_pos = src_target_pos; } } else { int const me = t - out_thread0;