From f34b5e9e7e660ce658e152bb07e8825c9977f14b Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 27 Jun 2024 01:42:02 +0000 Subject: [PATCH 1/9] parallelize gpuInitStringDescriptors for FLBA data --- cpp/src/io/parquet/decode_preprocess.cu | 5 +- cpp/src/io/parquet/page_data.cu | 5 +- cpp/src/io/parquet/page_decode.cuh | 72 +++++++++++++++--------- cpp/src/io/parquet/page_string_decode.cu | 8 ++- 4 files changed, 61 insertions(+), 29 deletions(-) diff --git a/cpp/src/io/parquet/decode_preprocess.cu b/cpp/src/io/parquet/decode_preprocess.cu index e49801e6172..62f1ee88036 100644 --- a/cpp/src/io/parquet/decode_preprocess.cu +++ b/cpp/src/io/parquet/decode_preprocess.cu @@ -26,6 +26,8 @@ namespace cudf::io::parquet::detail { +namespace cg = cooperative_groups; + namespace { // # of threads we're decoding with @@ -163,7 +165,8 @@ __device__ size_type gpuDecodeTotalPageStringSize(page_state_s* s, int t) // For V1, the choice is an overestimate (s->dict_size), or an exact number that's // expensive to compute. For now we're going with the latter. else { - str_len = gpuInitStringDescriptors(s, nullptr, target_pos, t); + str_len = gpuInitStringDescriptors( + s, nullptr, target_pos, cg::this_thread_block()); } break; diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 7207173b82f..171422a5e38 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -23,6 +23,8 @@ namespace cudf::io::parquet::detail { +namespace cg = cooperative_groups; + namespace { constexpr int decode_block_size = 128; @@ -277,6 +279,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } // this needs to be here to prevent warp 3 modifying src_pos before all threads have read it __syncthreads(); + auto tile32 = cg::tiled_partition(cg::this_thread_block()); if (t < 32) { // decode repetition and definition levels. // - update validity vectors @@ -298,7 +301,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f); } else if (s->col.physical_type == BYTE_ARRAY or s->col.physical_type == FIXED_LEN_BYTE_ARRAY) { - gpuInitStringDescriptors(s, sb, src_target_pos, t & 0x1f); + gpuInitStringDescriptors(s, sb, src_target_pos, tile32); } if (t == 32) { s->dict_pos = src_target_pos; } } else { diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index b1f8e6dd5fe..02f5f16cb8b 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -21,6 +21,7 @@ #include "parquet_gpu.hpp" #include "rle_stream.cuh" +#include #include #include @@ -420,46 +421,65 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s* s, state_buf* sb, int t * @param[in,out] s Page state input/output * @param[out] sb Page state buffer output * @param[in] target_pos Target output position - * @param[in] t Thread ID + * @param[in] g Cooperative group (thread block or tile) * @tparam sizes_only True if only sizes are to be calculated * @tparam state_buf Typename of the `state_buf` (usually inferred) + * @tparam thread_group Typename of the cooperative group (inferred) * * @return Total length of strings processed */ -template -__device__ size_type -gpuInitStringDescriptors(page_state_s* s, [[maybe_unused]] state_buf* sb, int target_pos, int t) +template +__device__ size_type gpuInitStringDescriptors(page_state_s* s, + [[maybe_unused]] state_buf* sb, + int target_pos, + thread_group g) { - int pos = s->dict_pos; - int total_len = 0; + int const t = g.thread_rank(); + int const dict_size = s->dict_size; + int k = s->dict_val; + int pos = s->dict_pos; + int total_len = 0; + + // All group threads can participate for fixed len byte arrays. + if (s->col.physical_type == FIXED_LEN_BYTE_ARRAY) { + int const dtype_len_in = s->dtype_len_in; + total_len = min((target_pos - pos) * dtype_len_in, dict_size - s->dict_val); + if constexpr (!sizes_only) { + for (pos += t, k += t * dtype_len_in; pos < target_pos; pos += g.size()) { + int const len = (k < dict_size) ? dtype_len_in : 0; + k = min(k, dict_size); + sb->dict_idx[rolling_index(pos)] = k; + sb->str_len[rolling_index(pos)] = len; + if (k < dict_size) { k += g.size() * dtype_len_in; } + } + } + // Only thread_rank = 0 updates the s->dict_val + if (!t) { + s->dict_val += total_len; + __threadfence_block(); + } + } + // This step is purely serial for byte arrays + else { + if (!t) { + uint8_t const* cur = s->data_start; - // This step is purely serial - if (!t) { - uint8_t const* cur = s->data_start; - int dict_size = s->dict_size; - int k = s->dict_val; - - while (pos < target_pos) { - int len = 0; - if (s->col.physical_type == FIXED_LEN_BYTE_ARRAY) { - if (k < dict_size) { len = s->dtype_len_in; } - } else { + for (int len = 0; pos < target_pos; pos++, len = 0) { if (k + 4 <= dict_size) { len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24); k += 4; if (k + len > dict_size) { len = 0; } } + if constexpr (!sizes_only) { + sb->dict_idx[rolling_index(pos)] = k; + sb->str_len[rolling_index(pos)] = len; + } + k += len; + total_len += len; } - if constexpr (!sizes_only) { - sb->dict_idx[rolling_index(pos)] = k; - sb->str_len[rolling_index(pos)] = len; - } - k += len; - total_len += len; - pos++; + s->dict_val = k; + __threadfence_block(); } - s->dict_val = k; - __threadfence_block(); } return total_len; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 58e8a09d5b6..d3d4a629d7b 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -31,6 +31,8 @@ namespace cudf::io::parquet::detail { +namespace cg = cooperative_groups; + namespace { constexpr int preprocess_block_size = 512; @@ -1006,6 +1008,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } // this needs to be here to prevent warp 1/2 modifying src_pos before all threads have read it __syncthreads(); + + // Create a warp sized thread block tile + auto tile32 = cg::tiled_partition(cg::this_thread_block()); + if (t < 32) { // decode repetition and definition levels. // - update validity vectors @@ -1020,7 +1026,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) if (s->dict_base) { src_target_pos = gpuDecodeDictionaryIndices(s, sb, src_target_pos, lane_id).first; } else { - gpuInitStringDescriptors(s, sb, src_target_pos, lane_id); + gpuInitStringDescriptors(s, sb, src_target_pos, tile32); } if (t == 32) { s->dict_pos = src_target_pos; } } else { From b8a1ae37520d8e2afa4ec7ba766ef5316d62e6d8 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 27 Jun 2024 04:42:53 +0000 Subject: [PATCH 2/9] minor improvements --- cpp/src/io/parquet/page_data.cu | 2 +- cpp/src/io/parquet/page_decode.cuh | 6 +----- cpp/src/io/parquet/page_string_decode.cu | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 171422a5e38..4c510f88b39 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -303,7 +303,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) s->col.physical_type == FIXED_LEN_BYTE_ARRAY) { gpuInitStringDescriptors(s, sb, src_target_pos, tile32); } - if (t == 32) { s->dict_pos = src_target_pos; } + if (!tile32.thread_rank()) { s->dict_pos = src_target_pos; } } else { // WARP1..WARP3: Decode values int const dtype = s->col.physical_type; diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 02f5f16cb8b..ba6e39e50c5 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -454,10 +454,7 @@ __device__ size_type gpuInitStringDescriptors(page_state_s* s, } } // Only thread_rank = 0 updates the s->dict_val - if (!t) { - s->dict_val += total_len; - __threadfence_block(); - } + if (!t) { s->dict_val += total_len; } } // This step is purely serial for byte arrays else { @@ -478,7 +475,6 @@ __device__ size_type gpuInitStringDescriptors(page_state_s* s, total_len += len; } s->dict_val = k; - __threadfence_block(); } } diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index d3d4a629d7b..42060aaf2f1 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -1028,7 +1028,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } else { gpuInitStringDescriptors(s, sb, src_target_pos, tile32); } - if (t == 32) { s->dict_pos = src_target_pos; } + if (!tile32.thread_rank()) { s->dict_pos = src_target_pos; } } else { int const me = t - out_thread0; From 65e74c1aa8229ad1e38c202436b508debfba7ccd Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 27 Jun 2024 18:12:37 +0000 Subject: [PATCH 3/9] Add const where possible --- cpp/src/io/parquet/page_data.cu | 2 +- cpp/src/io/parquet/page_decode.cuh | 2 +- cpp/src/io/parquet/page_string_decode.cu | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 4c510f88b39..284a5355928 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -279,7 +279,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } // this needs to be here to prevent warp 3 modifying src_pos before all threads have read it __syncthreads(); - auto tile32 = cg::tiled_partition(cg::this_thread_block()); + auto const tile32 = cg::tiled_partition(cg::this_thread_block()); if (t < 32) { // decode repetition and definition levels. // - update validity vectors diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index ba6e39e50c5..1dc76c66f54 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -432,7 +432,7 @@ template __device__ size_type gpuInitStringDescriptors(page_state_s* s, [[maybe_unused]] state_buf* sb, int target_pos, - thread_group g) + thread_group const g) { int const t = g.thread_rank(); int const dict_size = s->dict_size; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 42060aaf2f1..131b528a91b 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -1010,7 +1010,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __syncthreads(); // Create a warp sized thread block tile - auto tile32 = cg::tiled_partition(cg::this_thread_block()); + auto const tile32 = cg::tiled_partition(cg::this_thread_block()); if (t < 32) { // decode repetition and definition levels. From 423985a769df4cd9f550085925c49340e88ab9d7 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 27 Jun 2024 20:09:34 +0000 Subject: [PATCH 4/9] remove temp variable `len` and directly assign to `str_len` --- cpp/src/io/parquet/page_decode.cuh | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 1dc76c66f54..1d612b7b5c6 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -446,10 +446,12 @@ __device__ size_type gpuInitStringDescriptors(page_state_s* s, total_len = min((target_pos - pos) * dtype_len_in, dict_size - s->dict_val); if constexpr (!sizes_only) { for (pos += t, k += t * dtype_len_in; pos < target_pos; pos += g.size()) { - int const len = (k < dict_size) ? dtype_len_in : 0; - k = min(k, dict_size); + sb->str_len[rolling_index(pos)] = + (k < dict_size) ? dtype_len_in : 0; + // k is upperbounded by dict_size. + k = min(k, dict_size); sb->dict_idx[rolling_index(pos)] = k; - sb->str_len[rolling_index(pos)] = len; + // Increment k if needed. if (k < dict_size) { k += g.size() * dtype_len_in; } } } From d4ee15f118d65d12249ca30f219f0649d0ce7a8a Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 27 Jun 2024 20:24:12 +0000 Subject: [PATCH 5/9] Simplify `dict_idx` and `k` computation. --- cpp/src/io/parquet/page_decode.cuh | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 1d612b7b5c6..cb73ae29a71 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -448,11 +448,10 @@ __device__ size_type gpuInitStringDescriptors(page_state_s* s, for (pos += t, k += t * dtype_len_in; pos < target_pos; pos += g.size()) { sb->str_len[rolling_index(pos)] = (k < dict_size) ? dtype_len_in : 0; - // k is upperbounded by dict_size. - k = min(k, dict_size); + // dict_idx is upperbounded by dict_size. sb->dict_idx[rolling_index(pos)] = k; // Increment k if needed. - if (k < dict_size) { k += g.size() * dtype_len_in; } + if (k < dict_size) { k = min(k + (g.size() * dtype_len_in), dict_size); } } } // Only thread_rank = 0 updates the s->dict_val From c674b4abfd32022971d55a930e8c4793bf5d8df0 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 9 Jul 2024 10:49:16 -0700 Subject: [PATCH 6/9] Apply suggestions from code review Co-authored-by: Vukasin Milovanovic Co-authored-by: Shruti Shivakumar --- cpp/src/io/parquet/page_data.cu | 2 +- cpp/src/io/parquet/page_string_decode.cu | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 284a5355928..729c5c9a8a8 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -303,7 +303,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) s->col.physical_type == FIXED_LEN_BYTE_ARRAY) { gpuInitStringDescriptors(s, sb, src_target_pos, tile32); } - if (!tile32.thread_rank()) { s->dict_pos = src_target_pos; } + if (tile32.thread_rank() == 0) { s->dict_pos = src_target_pos; } } else { // WARP1..WARP3: Decode values int const dtype = s->col.physical_type; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 131b528a91b..4f66ca7b630 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -1010,7 +1010,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __syncthreads(); // Create a warp sized thread block tile - auto const tile32 = cg::tiled_partition(cg::this_thread_block()); + auto const tile_warp = cg::tiled_partition(cg::this_thread_block()); if (t < 32) { // decode repetition and definition levels. @@ -1028,7 +1028,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } else { gpuInitStringDescriptors(s, sb, src_target_pos, tile32); } - if (!tile32.thread_rank()) { s->dict_pos = src_target_pos; } + if (tile32.thread_rank() == 0) { s->dict_pos = src_target_pos; } } else { int const me = t - out_thread0; From d207dc52579c6cc49b87c8cd217954d46c70dc9d Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 9 Jul 2024 10:50:40 -0700 Subject: [PATCH 7/9] Rename tile32 to tile_warp --- cpp/src/io/parquet/page_data.cu | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 729c5c9a8a8..e0d50d7ccf9 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -279,7 +279,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } // this needs to be here to prevent warp 3 modifying src_pos before all threads have read it __syncthreads(); - auto const tile32 = cg::tiled_partition(cg::this_thread_block()); + auto const tile_warp = cg::tiled_partition(cg::this_thread_block()); if (t < 32) { // decode repetition and definition levels. // - update validity vectors @@ -301,9 +301,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f); } else if (s->col.physical_type == BYTE_ARRAY or s->col.physical_type == FIXED_LEN_BYTE_ARRAY) { - gpuInitStringDescriptors(s, sb, src_target_pos, tile32); + gpuInitStringDescriptors(s, sb, src_target_pos, tile_warp); } - if (tile32.thread_rank() == 0) { s->dict_pos = src_target_pos; } + if (tile_warp.thread_rank() == 0) { s->dict_pos = src_target_pos; } } else { // WARP1..WARP3: Decode values int const dtype = s->col.physical_type; From 64f36ba2eafc490e96011745b2feb1862c0e4016 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 9 Jul 2024 10:51:36 -0700 Subject: [PATCH 8/9] Rename tile32 to tile_warp --- cpp/src/io/parquet/page_string_decode.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 4f66ca7b630..ca74a1c2ba0 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -1026,9 +1026,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) if (s->dict_base) { src_target_pos = gpuDecodeDictionaryIndices(s, sb, src_target_pos, lane_id).first; } else { - gpuInitStringDescriptors(s, sb, src_target_pos, tile32); + gpuInitStringDescriptors(s, sb, src_target_pos, tile_warp); } - if (tile32.thread_rank() == 0) { s->dict_pos = src_target_pos; } + if (tile_warp.thread_rank() == 0) { s->dict_pos = src_target_pos; } } else { int const me = t - out_thread0; From 4d20aeb1a23117f1668715126c56335cab0f9e82 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 9 Jul 2024 10:59:03 -0700 Subject: [PATCH 9/9] Pass cg via const reference instead of value Co-authored-by: Yunsong Wang --- cpp/src/io/parquet/page_decode.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index cb73ae29a71..a3f91f6859b 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -432,7 +432,7 @@ template __device__ size_type gpuInitStringDescriptors(page_state_s* s, [[maybe_unused]] state_buf* sb, int target_pos, - thread_group const g) + thread_group const& g) { int const t = g.thread_rank(); int const dict_size = s->dict_size;