Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize gpuInitStringDescriptors for fixed length byte array data #16109

Merged
Merged
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

namespace cudf::io::parquet::detail {

namespace cg = cooperative_groups;

namespace {

// # of threads we're decoding with
Expand Down Expand Up @@ -163,7 +165,8 @@ __device__ size_type gpuDecodeTotalPageStringSize(page_state_s* s, int t)
// For V1, the choice is an overestimate (s->dict_size), or an exact number that's
// expensive to compute. For now we're going with the latter.
else {
str_len = gpuInitStringDescriptors<true, unused_state_buf>(s, nullptr, target_pos, t);
str_len = gpuInitStringDescriptors<true, unused_state_buf>(
s, nullptr, target_pos, cg::this_thread_block());
}
break;

Expand Down
7 changes: 5 additions & 2 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

namespace cudf::io::parquet::detail {

namespace cg = cooperative_groups;

namespace {

constexpr int decode_block_size = 128;
Expand Down Expand Up @@ -277,6 +279,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
}
// this needs to be here to prevent warp 3 modifying src_pos before all threads have read it
__syncthreads();
auto const tile_warp = cg::tiled_partition<cudf::detail::warp_size>(cg::this_thread_block());
if (t < 32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to use tile32 here since we already have it, but I'm not convinced it can be done in a simple way.

Copy link
Member Author

@mhaseeb123 mhaseeb123 Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right that we could replace this t < 32 with tile_warp.meta_group_rank() == 0 and it should be good but the logic at L289 is messier to replace since it may be tile_warp.meta_group_rank() == 0 or 1 depending on out_threads0 == 32 or 64 so I left this as is for simplicity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could/should probably port the whole logic to thread_groups and avoid the magic 32 multiples. I'd expect that it would not be more complex than the current logic.
Not something for this PR.

// decode repetition and definition levels.
// - update validity vectors
Expand All @@ -298,9 +301,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f);
} else if (s->col.physical_type == BYTE_ARRAY or
s->col.physical_type == FIXED_LEN_BYTE_ARRAY) {
gpuInitStringDescriptors<false>(s, sb, src_target_pos, t & 0x1f);
gpuInitStringDescriptors<false>(s, sb, src_target_pos, tile_warp);
}
if (t == 32) { s->dict_pos = src_target_pos; }
if (tile_warp.thread_rank() == 0) { s->dict_pos = src_target_pos; }
} else {
// WARP1..WARP3: Decode values
int const dtype = s->col.physical_type;
Expand Down
69 changes: 43 additions & 26 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "parquet_gpu.hpp"
#include "rle_stream.cuh"

#include <cooperative_groups.h>
#include <cuda/atomic>
#include <cuda/std/tuple>

Expand Down Expand Up @@ -420,46 +421,62 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s* s, state_buf* sb, int t
* @param[in,out] s Page state input/output
* @param[out] sb Page state buffer output
* @param[in] target_pos Target output position
* @param[in] t Thread ID
* @param[in] g Cooperative group (thread block or tile)
* @tparam sizes_only True if only sizes are to be calculated
* @tparam state_buf Typename of the `state_buf` (usually inferred)
* @tparam thread_group Typename of the cooperative group (inferred)
*
* @return Total length of strings processed
*/
template <bool sizes_only, typename state_buf>
__device__ size_type
gpuInitStringDescriptors(page_state_s* s, [[maybe_unused]] state_buf* sb, int target_pos, int t)
template <bool sizes_only, typename state_buf, typename thread_group>
__device__ size_type gpuInitStringDescriptors(page_state_s* s,
[[maybe_unused]] state_buf* sb,
int target_pos,
thread_group const g)
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
{
int pos = s->dict_pos;
int total_len = 0;
int const t = g.thread_rank();
int const dict_size = s->dict_size;
int k = s->dict_val;
int pos = s->dict_pos;
int total_len = 0;

// All group threads can participate for fixed len byte arrays.
if (s->col.physical_type == FIXED_LEN_BYTE_ARRAY) {
int const dtype_len_in = s->dtype_len_in;
total_len = min((target_pos - pos) * dtype_len_in, dict_size - s->dict_val);
if constexpr (!sizes_only) {
for (pos += t, k += t * dtype_len_in; pos < target_pos; pos += g.size()) {
sb->str_len[rolling_index<state_buf::str_buf_size>(pos)] =
(k < dict_size) ? dtype_len_in : 0;
// dict_idx is upperbounded by dict_size.
sb->dict_idx[rolling_index<state_buf::dict_buf_size>(pos)] = k;
// Increment k if needed.
if (k < dict_size) { k = min(k + (g.size() * dtype_len_in), dict_size); }
}
}
// Only thread_rank = 0 updates the s->dict_val
if (!t) { s->dict_val += total_len; }
shrshi marked this conversation as resolved.
Show resolved Hide resolved
}
// This step is purely serial for byte arrays
else {
if (!t) {
uint8_t const* cur = s->data_start;

// This step is purely serial
if (!t) {
uint8_t const* cur = s->data_start;
int dict_size = s->dict_size;
int k = s->dict_val;

while (pos < target_pos) {
int len = 0;
if (s->col.physical_type == FIXED_LEN_BYTE_ARRAY) {
if (k < dict_size) { len = s->dtype_len_in; }
} else {
for (int len = 0; pos < target_pos; pos++, len = 0) {
if (k + 4 <= dict_size) {
len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24);
k += 4;
if (k + len > dict_size) { len = 0; }
}
if constexpr (!sizes_only) {
sb->dict_idx[rolling_index<state_buf::dict_buf_size>(pos)] = k;
sb->str_len[rolling_index<state_buf::str_buf_size>(pos)] = len;
}
k += len;
total_len += len;
}
if constexpr (!sizes_only) {
sb->dict_idx[rolling_index<state_buf::dict_buf_size>(pos)] = k;
sb->str_len[rolling_index<state_buf::str_buf_size>(pos)] = len;
}
k += len;
total_len += len;
pos++;
s->dict_val = k;
}
s->dict_val = k;
__threadfence_block();
}

return total_len;
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

namespace cudf::io::parquet::detail {

namespace cg = cooperative_groups;

namespace {

constexpr int preprocess_block_size = 512;
Expand Down Expand Up @@ -1006,6 +1008,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
}
// this needs to be here to prevent warp 1/2 modifying src_pos before all threads have read it
__syncthreads();

// Create a warp sized thread block tile
auto const tile_warp = cg::tiled_partition<cudf::detail::warp_size>(cg::this_thread_block());

if (t < 32) {
// decode repetition and definition levels.
// - update validity vectors
Expand All @@ -1020,9 +1026,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
if (s->dict_base) {
src_target_pos = gpuDecodeDictionaryIndices<false>(s, sb, src_target_pos, lane_id).first;
} else {
gpuInitStringDescriptors<false>(s, sb, src_target_pos, lane_id);
gpuInitStringDescriptors<false>(s, sb, src_target_pos, tile_warp);
}
if (t == 32) { s->dict_pos = src_target_pos; }
if (tile_warp.thread_rank() == 0) { s->dict_pos = src_target_pos; }
} else {
int const me = t - out_thread0;

Expand Down
Loading