diff --git a/cpp/include/cudf_test/column_wrapper.hpp b/cpp/include/cudf_test/column_wrapper.hpp index b2339cf6a65..caedbcc8f03 100644 --- a/cpp/include/cudf_test/column_wrapper.hpp +++ b/cpp/include/cudf_test/column_wrapper.hpp @@ -94,7 +94,7 @@ struct fixed_width_type_converter { template ::value, void>::type* = nullptr> - __host__ __device__ ToT operator()(FromT element) const + constexpr ToT operator()(FromT element) const { return element; } @@ -106,7 +106,7 @@ struct fixed_width_type_converter { (cudf::is_convertible::value || std::is_constructible::value), void>::type* = nullptr> - __host__ __device__ ToT operator()(FromT element) const + constexpr ToT operator()(FromT element) const { return static_cast(element); } @@ -117,7 +117,7 @@ struct fixed_width_type_converter { typename ToT = To, typename std::enable_if::value && cudf::is_timestamp_t::value, void>::type* = nullptr> - __host__ __device__ ToT operator()(FromT element) const + constexpr ToT operator()(FromT element) const { return ToT{typename ToT::duration{element}}; } diff --git a/cpp/src/io/csv/csv_gpu.cu b/cpp/src/io/csv/csv_gpu.cu index ef1c17aa817..041d1de3404 100644 --- a/cpp/src/io/csv/csv_gpu.cu +++ b/cpp/src/io/csv/csv_gpu.cu @@ -860,13 +860,11 @@ __global__ void __launch_bounds__(rowofs_block_dim) int escapechar, int commentchar) { - auto start = data.begin(); - __shared__ __align__(8) uint64_t ctxtree[rowofs_block_dim * 2]; - using warp_reduce = typename cub::WarpReduce; - using half_warp_reduce = typename cub::WarpReduce; + auto start = data.begin(); + using block_reduce = typename cub::BlockReduce; __shared__ union { - typename warp_reduce::TempStorage full; - typename half_warp_reduce::TempStorage half[rowofs_block_dim / 32]; + typename block_reduce::TempStorage bk_storage; + __align__(8) uint64_t ctxtree[rowofs_block_dim * 2]; } temp_storage; const char *end = start + (min(parse_pos + chunk_size, data_size) - start_offset); @@ -936,16 +934,16 @@ __global__ void __launch_bounds__(rowofs_block_dim) // Convert the long-form {rowmap,outctx}[inctx] version into packed version // {rowcount,ouctx}[inctx], then merge the row contexts of the 32-character blocks into // a single 16K-character block context - rowctx_merge_transform(ctxtree, pack_rowmaps(ctx_map), t); + rowctx_merge_transform(temp_storage.ctxtree, pack_rowmaps(ctx_map), t); // If this is the second phase, get the block's initial parser state and row counter if (offsets_out.data()) { - if (t == 0) { ctxtree[0] = row_ctx[blockIdx.x]; } + if (t == 0) { temp_storage.ctxtree[0] = row_ctx[blockIdx.x]; } __syncthreads(); // Walk back the transform tree with the known initial parser state - rowctx32_t ctx = rowctx_inverse_merge_transform(ctxtree, t); - uint64_t row = (ctxtree[0] >> 2) + (ctx >> 2); + rowctx32_t ctx = rowctx_inverse_merge_transform(temp_storage.ctxtree, t); + uint64_t row = (temp_storage.ctxtree[0] >> 2) + (ctx >> 2); uint32_t rows_out_of_range = 0; uint32_t rowmap = select_rowmap(ctx_map, ctx & 3); // Output row positions @@ -960,18 +958,13 @@ __global__ void __launch_bounds__(rowofs_block_dim) row++; rowmap >>= pos; } - // Return the number of rows out of range - rows_out_of_range = half_warp_reduce(temp_storage.half[t / 32]).Sum(rows_out_of_range); - __syncthreads(); - if (!(t & 0xf)) { ctxtree[t >> 4] = rows_out_of_range; } __syncthreads(); - if (t < 32) { - rows_out_of_range = warp_reduce(temp_storage.full).Sum(static_cast(ctxtree[t])); - if (t == 0) { row_ctx[blockIdx.x] = rows_out_of_range; } - } + // Return the number of rows out of range + rows_out_of_range = block_reduce(temp_storage.bk_storage).Sum(rows_out_of_range); + if (t == 0) { row_ctx[blockIdx.x] = rows_out_of_range; } } else { // Just store the row counts and output contexts - if (t == 0) { row_ctx[blockIdx.x] = ctxtree[1]; } + if (t == 0) { row_ctx[blockIdx.x] = temp_storage.ctxtree[1]; } } } diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index b814f5364ca..1ee57f4a2fd 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -62,13 +62,17 @@ static inline __device__ uint32_t nvstr_init_hash(char const *ptr, uint32_t len) * * @param[in,out] s dictionary builder state * @param[in] t thread id + * @param[in] temp_storage shared memory storage to scan non-null positions */ -static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, int t) +template +static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, + int t, + Storage &temp_storage) { if (t == 0) { s->nnz = 0; } for (uint32_t i = 0; i < s->chunk.num_rows; i += 512) { const uint32_t *valid_map = s->chunk.valid_map_base; - uint32_t is_valid, nz_map, nz_pos; + uint32_t is_valid, nz_pos; if (t < 16) { if (!valid_map) { s->scratch_red[t] = 0xffffffffu; @@ -88,18 +92,13 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, int t) } __syncthreads(); is_valid = (i + t < s->chunk.num_rows) ? (s->scratch_red[t >> 5] >> (t & 0x1f)) & 1 : 0; - nz_map = ballot(is_valid); - nz_pos = s->nnz + __popc(nz_map & (0x7fffffffu >> (0x1fu - ((uint32_t)t & 0x1f)))); - if (!(t & 0x1f)) { s->scratch_red[16 + (t >> 5)] = __popc(nz_map); } + uint32_t tmp_nnz; + cub::BlockScan(temp_storage) + .ExclusiveSum(is_valid, nz_pos, tmp_nnz); + nz_pos += s->nnz; __syncthreads(); - if (t < 32) { - uint32_t nnz = s->scratch_red[16 + (t & 0xf)]; - uint32_t nnz_pos = WarpReducePos16(nnz, t); - if (t == 0xf) { s->nnz += nnz_pos; } - if (t <= 0xf) { s->scratch_red[t] = nnz_pos - nnz; } - } - __syncthreads(); - if (is_valid) { s->dict[nz_pos + s->scratch_red[t >> 5]] = i + t; } + if (!t) { s->nnz += tmp_nnz; } + if (is_valid) { s->dict[nz_pos] = i + t; } __syncthreads(); } } @@ -116,11 +115,13 @@ __global__ void __launch_bounds__(block_size, 2) gpuInitDictionaryIndices(DictionaryChunk *chunks, uint32_t num_columns) { __shared__ __align__(16) dictinit_state_s state_g; - using warp_reduce = cub::WarpReduce; - using half_warp_reduce = cub::WarpReduce; + + using block_reduce = cub::BlockReduce; + using block_scan = cub::BlockScan; + __shared__ union { - typename warp_reduce::TempStorage full[block_size / 32]; - typename half_warp_reduce::TempStorage half[block_size / 32]; + typename block_reduce::TempStorage reduce_storage; + typename block_scan::TempStorage scan_storage; } temp_storage; dictinit_state_s *const s = &state_g; @@ -138,7 +139,7 @@ __global__ void __launch_bounds__(block_size, 2) __syncthreads(); // First, take care of NULLs, and count how many strings we have (TODO: bypass this step when // there are no nulls) - LoadNonNullIndices(s, t); + LoadNonNullIndices(s, t, temp_storage.scan_storage); // Sum the lengths of all the strings if (t == 0) { s->chunk.string_char_count = 0; @@ -157,13 +158,8 @@ __global__ void __launch_bounds__(block_size, 2) len = static_cast(ck_data[ck_row].count); hash = nvstr_init_hash(ck_data[ck_row].ptr, len); } - len = half_warp_reduce(temp_storage.half[t / 32]).Sum(len); - if (!(t & 0xf)) { s->scratch_red[t >> 4] = len; } - __syncthreads(); - if (t < 32) { - len = warp_reduce(temp_storage.full[t / 32]).Sum(s->scratch_red[t]); - if (t == 0) s->chunk.string_char_count += len; - } + len = block_reduce(temp_storage.reduce_storage).Sum(len); + if (t == 0) s->chunk.string_char_count += len; if (i + t < nnz) { atomicAdd(&s->map.u32[hash >> 1], 1 << ((hash & 1) ? 16 : 0)); dict_data[i + t] = start_row + ck_row; @@ -182,21 +178,13 @@ __global__ void __launch_bounds__(block_size, 2) uint32_t sum23 = count23 + (count23 << 16); uint32_t sum45 = count45 + (count45 << 16); uint32_t sum67 = count67 + (count67 << 16); - uint32_t sum_w, tmp; sum23 += (sum01 >> 16) * 0x10001; sum45 += (sum23 >> 16) * 0x10001; sum67 += (sum45 >> 16) * 0x10001; - sum_w = sum67 >> 16; - sum_w = WarpReducePos16(sum_w, t); - if ((t & 0xf) == 0xf) { s->scratch_red[t >> 4] = sum_w; } - __syncthreads(); - if (t < 32) { - uint32_t sum_b = WarpReducePos32(s->scratch_red[t], t); - s->scratch_red[t] = sum_b; - } + uint32_t sum_w = sum67 >> 16; + block_scan(temp_storage.scan_storage).InclusiveSum(sum_w, sum_w); __syncthreads(); - tmp = (t >= 16) ? s->scratch_red[(t >> 4) - 1] : 0; - sum_w = (sum_w - (sum67 >> 16) + tmp) * 0x10001; + sum_w = (sum_w - (sum67 >> 16)) * 0x10001; s->map.u32[t * 4 + 0] = sum_w + sum01 - count01; s->map.u32[t * 4 + 1] = sum_w + sum23 - count23; s->map.u32[t * 4 + 2] = sum_w + sum45 - count45; @@ -239,7 +227,7 @@ __global__ void __launch_bounds__(block_size, 2) // map, the position of the first string can be inferred from the hash map counts dict_char_count = 0; for (uint32_t i = 0; i < nnz; i += block_size) { - uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0, dupe_mask, dupes_before; + uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0; if (i + t < nnz) { const char *str1, *str2; uint32_t len1, len2, hash; @@ -255,33 +243,23 @@ __global__ void __launch_bounds__(block_size, 2) dict_char_count += (is_dupe) ? 0 : len1; } } - dupe_mask = ballot(is_dupe); - dupes_before = s->total_dupes + __popc(dupe_mask & ((2 << (t & 0x1f)) - 1)); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(dupe_mask); } - __syncthreads(); - if (t < 32) { - uint32_t warp_dupes = (t < 16) ? s->scratch_red[t] : 0; - uint32_t warp_pos = WarpReducePos16(warp_dupes, t); - if (t == 0xf) { s->total_dupes += warp_pos; } - if (t < 16) { s->scratch_red[t] = warp_pos - warp_dupes; } - } + uint32_t dupes_in_block; + uint32_t dupes_before; + block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block); + dupes_before += s->total_dupes; __syncthreads(); + if (!t) { s->total_dupes += dupes_in_block; } if (i + t < nnz) { if (!is_dupe) { - dupes_before += s->scratch_red[t >> 5]; dict_data[i + t - dupes_before] = ck_row + start_row; } else { s->chunk.dict_index[ck_row + start_row] = (ck_row_ref + start_row) | (1u << 31); } } } - dict_char_count = warp_reduce(temp_storage.full[t / 32]).Sum(dict_char_count); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = dict_char_count; } - __syncthreads(); - if (t < 32) { - dict_char_count = - half_warp_reduce(temp_storage.half[t / 32]).Sum((t < 16) ? s->scratch_red[t] : 0); - } + // temp_storage is being used twice, so make sure there is `__syncthreads()` between them + // while making any future changes. + dict_char_count = block_reduce(temp_storage.reduce_storage).Sum(dict_char_count); if (!t) { chunks[group_id * num_columns + col_id].num_strings = nnz; chunks[group_id * num_columns + col_id].string_char_count = s->chunk.string_char_count; @@ -362,8 +340,12 @@ __global__ void __launch_bounds__(block_size) gpuBuildStripeDictionaries(StripeDictionary *stripes, uint32_t num_columns) { __shared__ __align__(16) build_state_s state_g; - using warp_reduce = cub::WarpReduce; - __shared__ typename warp_reduce::TempStorage temp_storage[block_size / 32]; + using block_reduce = cub::BlockReduce; + using block_scan = cub::BlockScan; + __shared__ union { + typename block_reduce::TempStorage reduce_storage; + typename block_scan::TempStorage scan_storage; + } temp_storage; build_state_s *const s = &state_g; uint32_t col_id = blockIdx.x; @@ -384,8 +366,8 @@ __global__ void __launch_bounds__(block_size) str_data = static_cast(s->stripe.column_data_base); dict_char_count = 0; for (uint32_t i = 0; i < num_strings; i += block_size) { - uint32_t cur = (i + t < num_strings) ? dict_data[i + t] : 0; - uint32_t dupe_mask, dupes_before, cur_len = 0; + uint32_t cur = (i + t < num_strings) ? dict_data[i + t] : 0; + uint32_t cur_len = 0; const char *cur_ptr; bool is_dupe = false; if (i + t < num_strings) { @@ -397,28 +379,19 @@ __global__ void __launch_bounds__(block_size) is_dupe = nvstr_is_equal(cur_ptr, cur_len, str_data[prev].ptr, str_data[prev].count); } dict_char_count += (is_dupe) ? 0 : cur_len; - dupe_mask = ballot(is_dupe); - dupes_before = s->total_dupes + __popc(dupe_mask & ((2 << (t & 0x1f)) - 1)); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(dupe_mask); } - __syncthreads(); - if (t < 32) { - uint32_t warp_dupes = s->scratch_red[t]; - uint32_t warp_pos = WarpReducePos32(warp_dupes, t); - if (t == 0x1f) { s->total_dupes += warp_pos; } - s->scratch_red[t] = warp_pos - warp_dupes; - } + uint32_t dupes_in_block; + uint32_t dupes_before; + block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block); + dupes_before += s->total_dupes; __syncthreads(); + if (!t) { s->total_dupes += dupes_in_block; } if (i + t < num_strings) { - dupes_before += s->scratch_red[t >> 5]; dict_index[cur] = i + t - dupes_before; if (!is_dupe && dupes_before != 0) { dict_data[i + t - dupes_before] = cur; } } __syncthreads(); } - dict_char_count = warp_reduce(temp_storage[t / 32]).Sum(dict_char_count); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = dict_char_count; } - __syncthreads(); - if (t < 32) { dict_char_count = warp_reduce(temp_storage[t / 32]).Sum(s->scratch_red[t]); } + dict_char_count = block_reduce(temp_storage.reduce_storage).Sum(dict_char_count); if (t == 0) { stripes[stripe_id * num_columns + col_id].num_strings = num_strings - s->total_dupes; stripes[stripe_id * num_columns + col_id].dict_char_count = dict_char_count; diff --git a/cpp/src/io/orc/stats_enc.cu b/cpp/src/io/orc/stats_enc.cu index 50f8457d05b..a30c61d6ef7 100644 --- a/cpp/src/io/orc/stats_enc.cu +++ b/cpp/src/io/orc/stats_enc.cu @@ -66,8 +66,7 @@ __global__ void __launch_bounds__(init_threads_per_block) * @param[in] statistics_count Number of statistics buffers */ constexpr unsigned int buffersize_reduction_dim = 32; -constexpr unsigned int buffersize_threads_per_block = - buffersize_reduction_dim * buffersize_reduction_dim; +constexpr unsigned int block_size = buffersize_reduction_dim * buffersize_reduction_dim; constexpr unsigned int pb_fld_hdrlen = 1; constexpr unsigned int pb_fld_hdrlen16 = 2; // > 127-byte length constexpr unsigned int pb_fld_hdrlen32 = 5; // > 16KB length @@ -77,19 +76,18 @@ constexpr unsigned int pb_fldlen_decimal = 40; // Assume decimal2string fits in constexpr unsigned int pb_fldlen_bucket1 = 1 + pb_fldlen_int64; constexpr unsigned int pb_fldlen_common = 2 * pb_fld_hdrlen + pb_fldlen_int64; -__global__ void __launch_bounds__(buffersize_threads_per_block, 1) +template +__global__ void __launch_bounds__(block_size, 1) gpu_init_statistics_buffersize(statistics_merge_group *groups, const statistics_chunk *chunks, uint32_t statistics_count) { - __shared__ volatile uint32_t scratch_red[buffersize_reduction_dim]; - __shared__ volatile uint32_t stats_size; - uint32_t tx = threadIdx.x; - uint32_t ty = threadIdx.y; - uint32_t t = ty * buffersize_reduction_dim + tx; - if (!t) { stats_size = 0; } + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage temp_storage; + volatile uint32_t stats_size = 0; + uint32_t t = threadIdx.x; __syncthreads(); - for (uint32_t start = 0; start < statistics_count; start += buffersize_threads_per_block) { + for (uint32_t start = 0; start < statistics_count; start += block_size) { uint32_t stats_len = 0, stats_pos; uint32_t idx = start + t; if (idx < statistics_count) { @@ -120,19 +118,15 @@ __global__ void __launch_bounds__(buffersize_threads_per_block, 1) default: break; } } - stats_pos = WarpReducePos32(stats_len, tx); - if (tx == buffersize_reduction_dim - 1) { scratch_red[ty] = stats_pos; } - __syncthreads(); - if (ty == 0) { scratch_red[tx] = WarpReducePos32(scratch_red[tx], tx); } - __syncthreads(); - if (ty != 0) { stats_pos += scratch_red[ty - 1]; } + uint32_t tmp_stats_size; + block_scan(temp_storage).ExclusiveSum(stats_len, stats_pos, tmp_stats_size); stats_pos += stats_size; + stats_size += tmp_stats_size; if (idx < statistics_count) { - groups[idx].start_chunk = stats_pos - stats_len; + groups[idx].start_chunk = stats_pos; groups[idx].num_chunks = stats_len; } __syncthreads(); - if (t == buffersize_threads_per_block - 1) { stats_size = stats_pos; } } } @@ -405,9 +399,8 @@ void orc_init_statistics_buffersize(statistics_merge_group *groups, uint32_t statistics_count, rmm::cuda_stream_view stream) { - dim3 dim_block(buffersize_reduction_dim, buffersize_reduction_dim); - gpu_init_statistics_buffersize<<<1, dim_block, 0, stream.value()>>>( - groups, chunks, statistics_count); + gpu_init_statistics_buffersize + <<<1, block_size, 0, stream.value()>>>(groups, chunks, statistics_count); } /** diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 856c23c0f55..6f326fc0576 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -87,7 +87,6 @@ struct orc_byterle_state_s { struct orc_rowdec_state_s { uint32_t nz_count; - uint32_t last_row[num_warps]; uint32_t row[row_decoder_buffer_size]; // 0=skip, >0: row position relative to cur_row }; @@ -97,11 +96,6 @@ struct orc_strdict_state_s { uint32_t dict_len; }; -struct orc_nulldec_state_s { - uint32_t row; - uint32_t null_count[num_warps]; -}; - struct orc_datadec_state_s { uint32_t cur_row; // starting row of current batch uint32_t end_row; // ending row of this chunk (start_row + num_rows) @@ -119,7 +113,7 @@ struct orcdec_state_s { int is_string; union { orc_strdict_state_s dict; - orc_nulldec_state_s nulls; + uint32_t nulls_desc_row; // number of rows processed for nulls. orc_datadec_state_s data; } top; union { @@ -1120,8 +1114,12 @@ __global__ void __launch_bounds__(block_size) size_t first_row) { __shared__ __align__(16) orcdec_state_s state_g; - using warp_reduce = cub::WarpReduce; - __shared__ typename warp_reduce::TempStorage temp_storage[block_size / 32]; + using warp_reduce = cub::WarpReduce; + using block_reduce = cub::BlockReduce; + __shared__ union { + typename warp_reduce::TempStorage wr_storage[block_size / 32]; + typename block_reduce::TempStorage bk_storage; + } temp_storage; orcdec_state_s *const s = &state_g; bool is_nulldec = (blockIdx.y >= num_stripes); @@ -1136,8 +1134,8 @@ __global__ void __launch_bounds__(block_size) uint32_t null_count = 0; // Decode NULLs if (t == 0) { - s->chunk.skip_count = 0; - s->top.nulls.row = 0; + s->chunk.skip_count = 0; + s->top.nulls_desc_row = 0; bytestream_init(&s->bs, s->chunk.streams[CI_PRESENT], s->chunk.strm_len[CI_PRESENT]); } __syncthreads(); @@ -1145,8 +1143,8 @@ __global__ void __launch_bounds__(block_size) // No present stream: all rows are valid s->vals.u32[t] = ~0; } - while (s->top.nulls.row < s->chunk.num_rows) { - uint32_t nrows_max = min(s->chunk.num_rows - s->top.nulls.row, blockDim.x * 32); + while (s->top.nulls_desc_row < s->chunk.num_rows) { + uint32_t nrows_max = min(s->chunk.num_rows - s->top.nulls_desc_row, blockDim.x * 32); uint32_t nrows; size_t row_in; @@ -1164,7 +1162,7 @@ __global__ void __launch_bounds__(block_size) nrows = nrows_max; } __syncthreads(); - row_in = s->chunk.start_row + s->top.nulls.row; + row_in = s->chunk.start_row + s->top.nulls_desc_row; if (row_in + nrows > first_row && row_in < first_row + max_num_rows && s->chunk.valid_map_base != NULL) { int64_t dst_row = row_in - first_row; @@ -1215,25 +1213,19 @@ __global__ void __launch_bounds__(block_size) if (i + 32 > skippedrows) { bits &= (1 << (skippedrows - i)) - 1; } skip_count += __popc(bits); } - skip_count = warp_reduce(temp_storage[t / 32]).Sum(skip_count); + skip_count = warp_reduce(temp_storage.wr_storage[t / 32]).Sum(skip_count); if (t == 0) { s->chunk.skip_count += skip_count; } } __syncthreads(); - if (t == 0) { s->top.nulls.row += nrows; } + if (t == 0) { s->top.nulls_desc_row += nrows; } __syncthreads(); } __syncthreads(); // Sum up the valid counts and infer null_count - null_count = warp_reduce(temp_storage[t / 32]).Sum(null_count); - if (!(t & 0x1f)) { s->top.nulls.null_count[t >> 5] = null_count; } - __syncthreads(); - if (t < 32) { - null_count = (t < num_warps) ? s->top.nulls.null_count[t] : 0; - null_count = warp_reduce(temp_storage[t / 32]).Sum(null_count); - if (t == 0) { - chunks[chunk_id].null_count = null_count; - chunks[chunk_id].skip_count = s->chunk.skip_count; - } + null_count = block_reduce(temp_storage.bk_storage).Sum(null_count); + if (t == 0) { + chunks[chunk_id].null_count = null_count; + chunks[chunk_id].skip_count = s->chunk.skip_count; } } else { // Decode string dictionary @@ -1289,7 +1281,7 @@ __global__ void __launch_bounds__(block_size) * @param[in,out] s Column chunk decoder state * @param[in] first_row crop all rows below first rows * @param[in] t thread id - * @param[in] temp_storage shared memory storage to performance warp reduce + * @param[in] temp_storage shared memory storage to perform block reduce */ template static __device__ void DecodeRowPositions(orcdec_state_s *s, @@ -1297,7 +1289,8 @@ static __device__ void DecodeRowPositions(orcdec_state_s *s, int t, Storage &temp_storage) { - using warp_reduce = cub::WarpReduce; + using block_reduce = cub::BlockReduce; + if (t == 0) { if (s->chunk.skip_count != 0) { s->u.rowdec.nz_count = min(min(s->chunk.skip_count, s->top.data.max_vals), blockDim.x); @@ -1338,15 +1331,9 @@ static __device__ void DecodeRowPositions(orcdec_state_s *s, // TBD: Brute-forcing this, there might be a more efficient way to find the thread with the // last row last_row = (nz_count == s->u.rowdec.nz_count) ? row_plus1 : 0; - last_row = warp_reduce(temp_storage[t / 32]).Reduce(last_row, cub::Max()); - if (!(t & 0x1f)) { *(volatile uint32_t *)&s->u.rowdec.last_row[t >> 5] = last_row; } - nz_pos = (valid) ? nz_count : 0; - __syncthreads(); - if (t < 32) { - last_row = (t < num_warps) ? *(volatile uint32_t *)&s->u.rowdec.last_row[t] : 0; - last_row = warp_reduce(temp_storage[t / 32]).Reduce(last_row, cub::Max()); - if (t == 0) { s->top.data.nrows = last_row; } - } + last_row = block_reduce(temp_storage).Reduce(last_row, cub::Max()); + nz_pos = (valid) ? nz_count : 0; + if (t == 0) { s->top.data.nrows = last_row; } if (valid && nz_pos - 1 < s->u.rowdec.nz_count) { s->u.rowdec.row[nz_pos - 1] = row_plus1; } __syncthreads(); } else { @@ -1396,7 +1383,7 @@ __global__ void __launch_bounds__(block_size) uint32_t rowidx_stride) { __shared__ __align__(16) orcdec_state_s state_g; - __shared__ typename cub::WarpReduce::TempStorage temp_storage[block_size / 32]; + __shared__ typename cub::BlockReduce::TempStorage temp_storage; orcdec_state_s *const s = &state_g; uint32_t chunk_id; diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index fcad42ba1be..6e2b7b2ab89 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -49,10 +49,6 @@ struct intrle_enc_state_s { uint32_t hdr_bytes; uint32_t pl_bytes; volatile uint32_t delta_map[(512 / 32) + 1]; - volatile union { - uint32_t u32[(512 / 32) * 2]; - uint64_t u64[(512 / 32) * 2]; - } scratch; }; struct strdata_enc_state_s { @@ -79,7 +75,7 @@ struct orcenc_state_s { StripeDictionary dict_stripe; } u; union { - uint8_t u8[scratch_buffer_size]; // general scratch buffer + uint8_t u8[scratch_buffer_size]; // gblock_vminscratch buffer uint32_t u32[scratch_buffer_size / 4]; } buf; union { @@ -349,8 +345,7 @@ static inline __device__ void StoreBitsBigEndian( * @param[in] numvals max number of values to encode * @param[in] flush encode all remaining values if nonzero * @param[in] t thread id - * @param[in] temp_storage_full shared memory storage to performance warp reduce - * @param[in] temp_storage_half shared memory storage to performance half warp reduce + * @param[in] temp_storage shared memory storage to perform block reduce * * @return number of input values encoded */ @@ -358,21 +353,20 @@ template + int block_size, + typename Storage> static __device__ uint32_t IntegerRLE(orcenc_state_s *s, const T *inbuf, uint32_t inpos, uint32_t numvals, uint32_t flush, int t, - FullStorage &temp_storage_full, - HalfStorage &temp_storage_half) + Storage &temp_storage) { - using warp_reduce = cub::WarpReduce; - using half_warp_reduce = cub::WarpReduce; - uint8_t *dst = s->chunk.streams[cid] + s->strm_pos[cid]; - uint32_t out_cnt = 0; + using block_reduce = cub::BlockReduce; + uint8_t *dst = s->chunk.streams[cid] + s->strm_pos[cid]; + uint32_t out_cnt = 0; + __shared__ volatile uint64_t block_vmin; while (numvals > 0) { T v0 = (t < numvals) ? inbuf[(inpos + t) & inmask] : 0; @@ -421,69 +415,55 @@ static __device__ uint32_t IntegerRLE(orcenc_state_s *s, } else { intrle_minmax(vmax, vmin); } - vmin = warp_reduce(temp_storage_full[t / 32]).Reduce(vmin, cub::Min()); - __syncwarp(); - vmax = warp_reduce(temp_storage_full[t / 32]).Reduce(vmax, cub::Max()); - __syncwarp(); - if (!(t & 0x1f)) { - s->u.intrle.scratch.u64[(t >> 5) * 2 + 0] = vmin; - s->u.intrle.scratch.u64[(t >> 5) * 2 + 1] = vmax; - } + vmin = block_reduce(temp_storage).Reduce(vmin, cub::Min()); __syncthreads(); - if (t < 32) { - vmin = (T)s->u.intrle.scratch.u64[(t & 0xf) * 2 + 0]; - vmax = (T)s->u.intrle.scratch.u64[(t & 0xf) * 2 + 1]; - vmin = half_warp_reduce(temp_storage_half[t / 32]).Reduce(vmin, cub::Min()); - __syncwarp(); - vmax = half_warp_reduce(temp_storage_half[t / 32]).Reduce(vmax, cub::Max()); - __syncwarp(); - if (t == 0) { - uint32_t mode1_w, mode2_w; - typename std::make_unsigned::type vrange_mode1, vrange_mode2; - s->u.intrle.scratch.u64[0] = (uint64_t)vmin; - if (sizeof(T) > 4) { - vrange_mode1 = (is_signed) ? max(zigzag(vmin), zigzag(vmax)) : vmax; - vrange_mode2 = vmax - vmin; - mode1_w = 8 - min(CountLeadingBytes64(vrange_mode1), 7); - mode2_w = 8 - min(CountLeadingBytes64(vrange_mode2), 7); - } else { - vrange_mode1 = (is_signed) ? max(zigzag(vmin), zigzag(vmax)) : vmax; - vrange_mode2 = vmax - vmin; - mode1_w = 4 - min(CountLeadingBytes32(vrange_mode1), 3); - mode2_w = 4 - min(CountLeadingBytes32(vrange_mode2), 3); - } - // Decide between mode1 & mode2 (also mode3 for length=2 repeat) - if (vrange_mode2 == 0 && mode1_w > 1) { - // Should only occur if literal_run==2 (otherwise would have resulted in repeat_run >= - // 3) - uint32_t bytecnt = 2; - dst[0] = 0xC0 + ((literal_run - 1) >> 8); - dst[1] = (literal_run - 1) & 0xff; - bytecnt += StoreVarint(dst + 2, vrange_mode1); - dst[bytecnt++] = 0; // Zero delta - s->u.intrle.literal_mode = 3; - s->u.intrle.literal_w = bytecnt; + vmax = block_reduce(temp_storage).Reduce(vmax, cub::Max()); + if (t == 0) { + uint32_t mode1_w, mode2_w; + typename std::make_unsigned::type vrange_mode1, vrange_mode2; + block_vmin = static_cast(vmin); + if (sizeof(T) > 4) { + vrange_mode1 = (is_signed) ? max(zigzag(vmin), zigzag(vmax)) : vmax; + vrange_mode2 = vmax - vmin; + mode1_w = 8 - min(CountLeadingBytes64(vrange_mode1), 7); + mode2_w = 8 - min(CountLeadingBytes64(vrange_mode2), 7); + } else { + vrange_mode1 = (is_signed) ? max(zigzag(vmin), zigzag(vmax)) : vmax; + vrange_mode2 = vmax - vmin; + mode1_w = 4 - min(CountLeadingBytes32(vrange_mode1), 3); + mode2_w = 4 - min(CountLeadingBytes32(vrange_mode2), 3); + } + // Decide between mode1 & mode2 (also mode3 for length=2 repeat) + if (vrange_mode2 == 0 && mode1_w > 1) { + // Should only occur if literal_run==2 (otherwise would have resulted in repeat_run >= + // 3) + uint32_t bytecnt = 2; + dst[0] = 0xC0 + ((literal_run - 1) >> 8); + dst[1] = (literal_run - 1) & 0xff; + bytecnt += StoreVarint(dst + 2, vrange_mode1); + dst[bytecnt++] = 0; // Zero delta + s->u.intrle.literal_mode = 3; + s->u.intrle.literal_w = bytecnt; + } else { + uint32_t range, w; + if (mode1_w > mode2_w && (literal_run - 1) * (mode1_w - mode2_w) > 4) { + s->u.intrle.literal_mode = 2; + w = mode2_w; + range = (uint32_t)vrange_mode2; } else { - uint32_t range, w; - if (mode1_w > mode2_w && (literal_run - 1) * (mode1_w - mode2_w) > 4) { - s->u.intrle.literal_mode = 2; - w = mode2_w; - range = (uint32_t)vrange_mode2; - } else { - s->u.intrle.literal_mode = 1; - w = mode1_w; - range = (uint32_t)vrange_mode1; - } - if (w == 1) - w = (range >= 16) ? w << 3 : (range >= 4) ? 4 : (range >= 2) ? 2 : 1; - else - w <<= 3; // bytes -> bits - s->u.intrle.literal_w = w; + s->u.intrle.literal_mode = 1; + w = mode1_w; + range = (uint32_t)vrange_mode1; } + if (w == 1) + w = (range >= 16) ? w << 3 : (range >= 4) ? 4 : (range >= 2) ? 2 : 1; + else + w <<= 3; // bytes -> bits + s->u.intrle.literal_w = w; } } __syncthreads(); - vmin = (T)s->u.intrle.scratch.u64[0]; + vmin = static_cast(block_vmin); literal_mode = s->u.intrle.literal_mode; literal_w = s->u.intrle.literal_w; if (literal_mode == 1) { @@ -665,12 +645,9 @@ __global__ void __launch_bounds__(block_size) { __shared__ __align__(16) orcenc_state_s state_g; __shared__ union { - typename cub::WarpReduce::TempStorage full_i32[block_size / 32]; - typename cub::WarpReduce::TempStorage full_i64[block_size / 32]; - typename cub::WarpReduce::TempStorage full_u32[block_size / 32]; - typename cub::WarpReduce::TempStorage half_i32[block_size / 32]; - typename cub::WarpReduce::TempStorage half_i64[block_size / 32]; - typename cub::WarpReduce::TempStorage half_u32[block_size / 32]; + typename cub::BlockReduce::TempStorage i32; + typename cub::BlockReduce::TempStorage i64; + typename cub::BlockReduce::TempStorage u32; } temp_storage; orcenc_state_s *const s = &state_g; @@ -867,25 +844,13 @@ __global__ void __launch_bounds__(block_size) case SHORT: case INT: case DATE: - n = IntegerRLE(s, - s->vals.i32, - s->nnz - s->numvals, - s->numvals, - flush, - t, - temp_storage.full_i32, - temp_storage.half_i32); + n = IntegerRLE( + s, s->vals.i32, s->nnz - s->numvals, s->numvals, flush, t, temp_storage.i32); break; case LONG: case TIMESTAMP: - n = IntegerRLE(s, - s->vals.i64, - s->nnz - s->numvals, - s->numvals, - flush, - t, - temp_storage.full_i64, - temp_storage.half_i64); + n = IntegerRLE( + s, s->vals.i64, s->nnz - s->numvals, s->numvals, flush, t, temp_storage.i64); break; case BYTE: n = ByteRLE(s, s->vals.u8, s->nnz - s->numvals, s->numvals, flush, t); @@ -910,14 +875,8 @@ __global__ void __launch_bounds__(block_size) break; case STRING: if (s->chunk.encoding_kind == DICTIONARY_V2) { - n = IntegerRLE(s, - s->vals.u32, - s->nnz - s->numvals, - s->numvals, - flush, - t, - temp_storage.full_u32, - temp_storage.half_u32); + n = IntegerRLE( + s, s->vals.u32, s->nnz - s->numvals, s->numvals, flush, t, temp_storage.u32); } else { n = s->numvals; } @@ -933,14 +892,8 @@ __global__ void __launch_bounds__(block_size) switch (s->chunk.type_kind) { case TIMESTAMP: case STRING: - n = IntegerRLE(s, - s->lengths.u32, - s->nnz - s->numlengths, - s->numlengths, - flush, - t, - temp_storage.full_u32, - temp_storage.half_u32); + n = IntegerRLE( + s, s->lengths.u32, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u32); break; default: n = s->numlengths; break; } @@ -975,10 +928,7 @@ __global__ void __launch_bounds__(block_size) gpuEncodeStringDictionaries(StripeDictionary *stripes, EncChunk *chunks, uint32_t num_columns) { __shared__ __align__(16) orcenc_state_s state_g; - __shared__ union { - typename cub::WarpReduce::TempStorage full_u32[block_size / 32]; - typename cub::WarpReduce::TempStorage half_u32[block_size / 32]; - } temp_storage; + __shared__ typename cub::BlockReduce::TempStorage temp_storage; orcenc_state_s *const s = &state_g; uint32_t stripe_id = blockIdx.x; @@ -1027,14 +977,8 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); if (s->numlengths + numvals > 0) { uint32_t flush = (s->cur_row + numvals == s->nrows) ? 1 : 0; - uint32_t n = IntegerRLE(s, - s->lengths.u32, - s->cur_row, - s->numlengths + numvals, - flush, - t, - temp_storage.full_u32, - temp_storage.half_u32); + uint32_t n = IntegerRLE( + s, s->lengths.u32, s->cur_row, s->numlengths + numvals, flush, t, temp_storage); __syncthreads(); if (!t) { s->numlengths += numvals; diff --git a/cpp/src/io/parquet/page_dict.cu b/cpp/src/io/parquet/page_dict.cu index c7babef1a20..d984cc1e44f 100644 --- a/cpp/src/io/parquet/page_dict.cu +++ b/cpp/src/io/parquet/page_dict.cu @@ -105,8 +105,11 @@ __device__ void FetchDictionaryFragment(dict_state_s *s, } /// Generate dictionary indices in ascending row order +template __device__ void GenerateDictionaryIndices(dict_state_s *s, uint32_t t) { + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage temp_storage; uint32_t *dict_index = s->col.dict_index; uint32_t *dict_data = s->col.dict_data + s->ck.start_row; uint32_t num_dict_entries = 0; @@ -120,13 +123,11 @@ __device__ void GenerateDictionaryIndices(dict_state_s *s, uint32_t t) (is_valid && dict_idx == row); // Any value that doesn't have bit31 set should have dict_idx=row at this point - uint32_t umask = ballot(is_unique); - uint32_t pos = num_dict_entries + __popc(umask & ((1 << (t & 0x1f)) - 1)); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(umask); } - num_dict_entries += __syncthreads_count(is_unique); - if (t < 32) { s->scratch_red[t] = WarpReducePos32(s->scratch_red[t], t); } - __syncthreads(); - if (t >= 32) { pos += s->scratch_red[(t - 32) >> 5]; } + uint32_t block_num_dict_entries; + uint32_t pos; + block_scan(temp_storage).ExclusiveSum(is_unique, pos, block_num_dict_entries); + pos += num_dict_entries; + num_dict_entries += block_num_dict_entries; if (is_valid && is_unique) { dict_data[pos] = row; dict_index[row] = pos; @@ -150,8 +151,8 @@ __global__ void __launch_bounds__(block_size, 1) gpuBuildChunkDictionaries(EncColumnChunk *chunks, uint32_t *dev_scratch) { __shared__ __align__(8) dict_state_s state_g; - using warp_reduce = cub::WarpReduce; - __shared__ typename warp_reduce::TempStorage temp_storage[block_size / 32]; + using block_reduce = cub::BlockReduce; + __shared__ typename block_reduce::TempStorage temp_storage; dict_state_s *const s = &state_g; uint32_t t = threadIdx.x; @@ -250,15 +251,11 @@ __global__ void __launch_bounds__(block_size, 1) } } // Count the non-duplicate entries - frag_dict_size = warp_reduce(temp_storage[t / 32]).Sum((is_valid && !is_dupe) ? len : 0); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = frag_dict_size; } + frag_dict_size = block_reduce(temp_storage).Sum((is_valid && !is_dupe) ? len : 0); new_dict_entries = __syncthreads_count(is_valid && !is_dupe); - if (t < 32) { - frag_dict_size = warp_reduce(temp_storage[t / 32]).Sum(s->scratch_red[t]); - if (t == 0) { - s->frag_dict_size += frag_dict_size; - s->num_dict_entries += new_dict_entries; - } + if (t == 0) { + s->frag_dict_size += frag_dict_size; + s->num_dict_entries += new_dict_entries; } if (is_valid) { if (!is_dupe) { @@ -309,7 +306,7 @@ __global__ void __launch_bounds__(block_size, 1) __syncthreads(); } __syncthreads(); - GenerateDictionaryIndices(s, t); + GenerateDictionaryIndices(s, t); if (!t) { chunks[blockIdx.x].num_dict_fragments = s->ck.num_dict_fragments; chunks[blockIdx.x].dictionary_size = s->dictionary_size; diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 07d460e4bd5..d7d47c07354 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -120,11 +120,11 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment { __shared__ __align__(16) frag_init_state_s state_g; - using warp_reduce = cub::WarpReduce; - using half_warp_reduce = cub::WarpReduce; + using block_reduce = cub::BlockReduce; + using block_scan = cub::BlockScan; __shared__ union { - typename warp_reduce::TempStorage full[block_size / 32]; - typename half_warp_reduce::TempStorage half; + typename block_reduce::TempStorage reduce_storage; + typename block_scan::TempStorage scan_storage; } temp_storage; frag_init_state_s *const s = &state_g; @@ -204,7 +204,6 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment uint32_t is_valid = (i + t < nvals && val_idx < s->col.leaf_column->size()) ? s->col.leaf_column->is_valid(val_idx) : 0; - uint32_t valid_warp = ballot(is_valid); uint32_t len, nz_pos, hash; if (is_valid) { len = dtype_len; @@ -227,28 +226,18 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment len = 0; } - nz_pos = - s->frag.non_nulls + __popc(valid_warp & (0x7fffffffu >> (0x1fu - ((uint32_t)t & 0x1f)))); - len = warp_reduce(temp_storage.full[t / 32]).Sum(len); - if (!(t & 0x1f)) { - s->scratch_red[(t >> 5) + 0] = __popc(valid_warp); - s->scratch_red[(t >> 5) + 16] = len; - } + uint32_t non_nulls; + block_scan(temp_storage.scan_storage).ExclusiveSum(is_valid, nz_pos, non_nulls); + nz_pos += s->frag.non_nulls; __syncthreads(); - if (t < 32) { - uint32_t warp_pos = WarpReducePos16((t < 16) ? s->scratch_red[t] : 0, t); - uint32_t non_nulls = shuffle(warp_pos, 0xf); - len = half_warp_reduce(temp_storage.half).Sum((t < 16) ? s->scratch_red[t + 16] : 0); - if (t < 16) { s->scratch_red[t] = warp_pos; } - if (!t) { - s->frag.non_nulls = s->frag.non_nulls + non_nulls; - s->frag.fragment_data_size += len; - } + len = block_reduce(temp_storage.reduce_storage).Sum(len); + if (!t) { + s->frag.non_nulls += non_nulls; + s->frag.fragment_data_size += len; } __syncthreads(); if (is_valid && dtype != BOOLEAN) { uint32_t *dict_index = s->col.dict_index; - if (t >= 32) { nz_pos += s->scratch_red[(t - 32) >> 5]; } if (dict_index) { atomicAdd(&s->map.u32[hash >> 1], (hash & 1) ? 1 << 16 : 1); dict_index[start_value_idx + nz_pos] = @@ -271,27 +260,18 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment uint32_t sum23 = count23 + (count23 << 16); uint32_t sum45 = count45 + (count45 << 16); uint32_t sum67 = count67 + (count67 << 16); - uint32_t sum_w, tmp; sum23 += (sum01 >> 16) * 0x10001; sum45 += (sum23 >> 16) * 0x10001; sum67 += (sum45 >> 16) * 0x10001; - sum_w = sum67 >> 16; - sum_w = WarpReducePos16(sum_w, t); - if ((t & 0xf) == 0xf) { s->scratch_red[t >> 4] = sum_w; } - __syncthreads(); - if (t < 32) { - uint32_t sum_b = WarpReducePos32(s->scratch_red[t], t); - s->scratch_red[t] = sum_b; - } - __syncthreads(); - tmp = (t >= 16) ? s->scratch_red[(t >> 4) - 1] : 0; - sum_w = (sum_w - (sum67 >> 16) + tmp) * 0x10001; + uint32_t sum_w = sum67 >> 16; + block_scan(temp_storage.scan_storage).InclusiveSum(sum_w, sum_w); + sum_w = (sum_w - (sum67 >> 16)) * 0x10001; s->map.u32[t * 4 + 0] = sum_w + sum01 - count01; s->map.u32[t * 4 + 1] = sum_w + sum23 - count23; s->map.u32[t * 4 + 2] = sum_w + sum45 - count45; s->map.u32[t * 4 + 3] = sum_w + sum67 - count67; - __syncthreads(); } + __syncthreads(); // Put the indices back in hash order if (s->col.dict_index) { uint32_t *dict_index = s->col.dict_index + start_row; @@ -330,7 +310,7 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment // map, the position of the first entry can be inferred from the hash map counts uint32_t dupe_data_size = 0; for (uint32_t i = 0; i < nnz; i += block_size) { - uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0, dupe_mask, dupes_before; + uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0; if (i + t < nnz) { uint32_t dict_val = s->dict[i + t]; uint32_t hash = dict_val & ((1 << init_hash_bits) - 1); @@ -366,20 +346,14 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment } } } - dupe_mask = ballot(is_dupe); - dupes_before = s->total_dupes + __popc(dupe_mask & ((2 << (t & 0x1f)) - 1)); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(dupe_mask); } - __syncthreads(); - if (t < 32) { - uint32_t warp_dupes = (t < 16) ? s->scratch_red[t] : 0; - uint32_t warp_pos = WarpReducePos16(warp_dupes, t); - if (t == 0xf) { s->total_dupes += warp_pos; } - if (t < 16) { s->scratch_red[t] = warp_pos - warp_dupes; } - } + uint32_t dupes_in_block; + uint32_t dupes_before; + block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block); + dupes_before += s->total_dupes; __syncthreads(); + if (t == 0) { s->total_dupes += dupes_in_block; } if (i + t < nnz) { if (!is_dupe) { - dupes_before += s->scratch_red[t >> 5]; s->col.dict_data[start_row + i + t - dupes_before] = ck_row; } else { s->col.dict_index[ck_row] = ck_row_ref | (1u << 31); @@ -387,15 +361,10 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment } } __syncthreads(); - dupe_data_size = warp_reduce(temp_storage.full[t / 32]).Sum(dupe_data_size); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = dupe_data_size; } - __syncthreads(); - if (t < 32) { - dupe_data_size = half_warp_reduce(temp_storage.half).Sum((t < 16) ? s->scratch_red[t] : 0); - if (!t) { - s->frag.dict_data_size = s->frag.fragment_data_size - dupe_data_size; - s->frag.num_dict_vals = s->frag.non_nulls - s->total_dupes; - } + dupe_data_size = block_reduce(temp_storage.reduce_storage).Sum(dupe_data_size); + if (!t) { + s->frag.dict_data_size = s->frag.fragment_data_size - dupe_data_size; + s->frag.num_dict_vals = s->frag.non_nulls - s->total_dupes; } } __syncthreads(); @@ -942,6 +911,7 @@ convert_nanoseconds(cuda::std::chrono::sys_time } // blockDim(128, 1, 1) +template __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, const EncColumnChunk *chunks, gpu_inflate_input_s *comp_in, @@ -949,6 +919,8 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, uint32_t start_page) { __shared__ __align__(8) page_enc_state_s state_g; + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage temp_storage; page_enc_state_s *const s = &state_g; uint32_t t = threadIdx.x; @@ -1081,7 +1053,7 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, 128); uint32_t val_idx = s->page_start_val + cur_val_idx + t; - uint32_t is_valid, warp_valids, len, pos; + uint32_t is_valid, len, pos; if (s->page.page_type == PageType::DICTIONARY_PAGE) { is_valid = (cur_val_idx + t < s->page.num_leaf_values); @@ -1091,19 +1063,13 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, ? s->col.leaf_column->is_valid(val_idx) : 0; } - warp_valids = ballot(is_valid); cur_val_idx += nvals; if (dict_bits >= 0) { // Dictionary encoding if (dict_bits > 0) { uint32_t rle_numvals; - - pos = __popc(warp_valids & ((1 << (t & 0x1f)) - 1)); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(warp_valids); } - __syncthreads(); - if (t < 32) { s->scratch_red[t] = WarpReducePos4((t < 4) ? s->scratch_red[t] : 0, t); } - __syncthreads(); - pos = pos + ((t >= 32) ? s->scratch_red[(t - 32) >> 5] : 0); + uint32_t rle_numvals_in_block; + block_scan(temp_storage).ExclusiveSum(is_valid, pos, rle_numvals_in_block); rle_numvals = s->rle_numvals; if (is_valid) { uint32_t v; @@ -1114,7 +1080,7 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, } s->vals[(rle_numvals + pos) & (rle_buffer_size - 1)] = v; } - rle_numvals += s->scratch_red[3]; + rle_numvals += rle_numvals_in_block; __syncthreads(); if ((!enable_bool_rle) && (dtype == BOOLEAN)) { PlainBoolEncode(s, rle_numvals, (cur_val_idx == s->page.num_leaf_values), t); @@ -1137,13 +1103,10 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, } else { len = 0; } - pos = WarpReducePos32(len, t); - if ((t & 0x1f) == 0x1f) { s->scratch_red[t >> 5] = pos; } - __syncthreads(); - if (t < 32) { s->scratch_red[t] = WarpReducePos4((t < 4) ? s->scratch_red[t] : 0, t); } + uint32_t total_len = 0; + block_scan(temp_storage).ExclusiveSum(len, pos, total_len); __syncthreads(); - if (t == 0) { s->cur = dst + s->scratch_red[3]; } - pos = pos + ((t >= 32) ? s->scratch_red[(t - 32) >> 5] : 0) - len; + if (t == 0) { s->cur = dst + total_len; } if (is_valid) { switch (dtype) { case INT32: @@ -2104,8 +2067,8 @@ void EncodePages(EncPage *pages, { // A page is part of one column. This is launching 1 block per page. 1 block will exclusively // deal with one datatype. - gpuEncodePages<<>>( - pages, chunks, comp_in, comp_out, start_page); + gpuEncodePages<128> + <<>>(pages, chunks, comp_in, comp_out, start_page); } /** diff --git a/cpp/src/io/statistics/column_stats.cu b/cpp/src/io/statistics/column_stats.cu index f89151ca31b..5d9d41412a4 100644 --- a/cpp/src/io/statistics/column_stats.cu +++ b/cpp/src/io/statistics/column_stats.cu @@ -24,6 +24,8 @@ #include +constexpr int block_size = 1024; + namespace cudf { namespace io { /** @@ -35,7 +37,6 @@ struct stats_state_s { statistics_chunk ck; ///< Output statistics chunk volatile statistics_val warp_min[32]; ///< Min reduction scratch volatile statistics_val warp_max[32]; ///< Max reduction scratch - volatile statistics_val warp_sum[32]; ///< Sum reduction scratch }; /** @@ -47,9 +48,6 @@ struct merge_state_s { statistics_chunk ck; ///< Resulting statistics chunk volatile statistics_val warp_min[32]; ///< Min reduction scratch volatile statistics_val warp_max[32]; ///< Max reduction scratch - volatile statistics_val warp_sum[32]; ///< Sum reduction scratch - volatile uint32_t warp_non_nulls[32]; ///< Non-nulls reduction scratch - volatile uint32_t warp_nulls[32]; ///< Nulls reduction scratch }; /** @@ -148,20 +146,20 @@ inline __device__ string_stats WarpReduceMaxString(const char *smax, uint32_t lm * @param s shared block state * @param dtype data type * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Storage &storage) { - using warp_reduce = cub::WarpReduce; - int64_t vmin = INT64_MAX; - int64_t vmax = INT64_MIN; - int64_t vsum = 0; + using block_reduce = cub::BlockReduce; + int64_t vmin = INT64_MAX; + int64_t vmax = INT64_MIN; + int64_t vsum = 0; int64_t v; uint32_t nn_cnt = 0; - bool has_minmax; - for (uint32_t i = 0; i < s->group.num_rows; i += 1024) { + __shared__ volatile bool has_minmax; + for (uint32_t i = 0; i < s->group.num_rows; i += block_size) { uint32_t r = i + t; uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; @@ -198,35 +196,22 @@ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Stora s->ck.non_nulls = nn_cnt; s->ck.null_count = s->group.num_rows - nn_cnt; } - vmin = warp_reduce(storage.integer_stats[t / 32]).Reduce(vmin, cub::Min()); - vmin = shuffle(vmin); - vmax = warp_reduce(storage.integer_stats[t / 32]).Reduce(vmax, cub::Max()); - vmax = shuffle(vmax); - vsum = warp_reduce(storage.integer_stats[t / 32]).Sum(vsum); - if (!(t & 0x1f)) { - s->warp_min[t >> 5].i_val = vmin; - s->warp_max[t >> 5].i_val = vmax; - s->warp_sum[t >> 5].i_val = vsum; - } - has_minmax = __syncthreads_or(vmin <= vmax); - if (t < 32 * 1) { - vmin = warp_reduce(storage.integer_stats[t / 32]).Reduce(s->warp_min[t].i_val, cub::Min()); - if (!(t & 0x1f)) { + vmin = block_reduce(storage.integer_stats).Reduce(vmin, cub::Min()); + __syncthreads(); + vmax = block_reduce(storage.integer_stats).Reduce(vmax, cub::Max()); + if (!t) { has_minmax = (vmin <= vmax); } + __syncthreads(); + if (has_minmax) { vsum = block_reduce(storage.integer_stats).Sum(vsum); } + if (!t) { + if (has_minmax) { s->ck.min_value.i_val = vmin; - s->ck.has_minmax = (has_minmax); - } - } else if (t < 32 * 2) { - vmax = - warp_reduce(storage.integer_stats[t / 32]).Reduce(s->warp_max[t & 0x1f].i_val, cub::Max()); - if (!(t & 0x1f)) { s->ck.max_value.i_val = vmax; } - } else if (t < 32 * 3) { - vsum = warp_reduce(storage.integer_stats[t / 32]).Sum(s->warp_sum[t & 0x1f].i_val); - if (!(t & 0x1f)) { - s->ck.sum.i_val = vsum; - // TODO: For now, don't set the sum flag with 64-bit values so we don't have to check for - // 64-bit sum overflow - s->ck.has_sum = (dtype <= dtype_int32 && has_minmax); + s->ck.max_value.i_val = vmax; + s->ck.sum.i_val = vsum; } + s->ck.has_minmax = has_minmax; + // TODO: For now, don't set the sum flag with 64-bit values so we don't have to check for + // 64-bit sum overflow + s->ck.has_sum = (dtype <= dtype_int32 && has_minmax); } } @@ -236,20 +221,20 @@ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Stora * @param s shared block state * @param dtype data type * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ gatherFloatColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Storage &storage) { - using warp_reduce = cub::WarpReduce; - double vmin = CUDART_INF; - double vmax = -CUDART_INF; - double vsum = 0; + using block_reduce = cub::BlockReduce; + double vmin = CUDART_INF; + double vmax = -CUDART_INF; + double vsum = 0; double v; uint32_t nn_cnt = 0; - bool has_minmax; - for (uint32_t i = 0; i < s->group.num_rows; i += 1024) { + __shared__ volatile bool has_minmax; + for (uint32_t i = 0; i < s->group.num_rows; i += block_size) { uint32_t r = i + t; uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; @@ -275,34 +260,20 @@ gatherFloatColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Sto s->ck.non_nulls = nn_cnt; s->ck.null_count = s->group.num_rows - nn_cnt; } - vmin = warp_reduce(storage.float_stats[t / 32]).Reduce(vmin, cub::Min()); - vmin = shuffle(vmin); - vmax = warp_reduce(storage.float_stats[t / 32]).Reduce(vmax, cub::Max()); - vmax = shuffle(vmax); - vsum = warp_reduce(storage.float_stats[t / 32]).Reduce(vsum, IgnoreNaNSum()); - if (!(t & 0x1f)) { - s->warp_min[t >> 5].fp_val = vmin; - s->warp_max[t >> 5].fp_val = vmax; - s->warp_sum[t >> 5].fp_val = vsum; - } - has_minmax = __syncthreads_or(vmin <= vmax); - if (t < 32 * 1) { - vmin = warp_reduce(storage.float_stats[t / 32]).Reduce(s->warp_min[t].fp_val, cub::Min()); - if (!(t & 0x1f)) { + vmin = block_reduce(storage.float_stats).Reduce(vmin, cub::Min()); + __syncthreads(); + vmax = block_reduce(storage.float_stats).Reduce(vmax, cub::Max()); + if (!t) { has_minmax = (vmin <= vmax); } + __syncthreads(); + if (has_minmax) { vsum = block_reduce(storage.float_stats).Reduce(vsum, IgnoreNaNSum()); } + if (!t) { + if (has_minmax) { s->ck.min_value.fp_val = (vmin != 0.0) ? vmin : CUDART_NEG_ZERO; - s->ck.has_minmax = (has_minmax); - } - } else if (t < 32 * 2) { - vmax = - warp_reduce(storage.float_stats[t / 32]).Reduce(s->warp_max[t & 0x1f].fp_val, cub::Max()); - if (!(t & 0x1f)) { s->ck.max_value.fp_val = (vmax != 0.0) ? vmax : CUDART_ZERO; } - } else if (t < 32 * 3) { - vsum = - warp_reduce(storage.float_stats[t / 32]).Reduce(s->warp_sum[t & 0x1f].fp_val, IgnoreNaNSum()); - if (!(t & 0x1f)) { - s->ck.sum.fp_val = vsum; - s->ck.has_sum = (has_minmax); // Implies sum is valid as well + s->ck.max_value.fp_val = (vmax != 0.0) ? vmax : CUDART_ZERO; + s->ck.sum.fp_val = vsum; } + s->ck.has_minmax = has_minmax; + s->ck.has_sum = has_minmax; // Implies sum is valid as well } } @@ -317,22 +288,22 @@ struct nvstrdesc_s { * * @param s shared block state * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ gatherStringColumnStats(stats_state_s *s, uint32_t t, Storage &storage) { - using warp_reduce = cub::WarpReduce; - uint32_t len_sum = 0; - const char *smin = nullptr; - const char *smax = nullptr; - uint32_t lmin = 0; - uint32_t lmax = 0; - uint32_t nn_cnt = 0; + using block_reduce = cub::BlockReduce; + uint32_t len_sum = 0; + const char *smin = nullptr; + const char *smax = nullptr; + uint32_t lmin = 0; + uint32_t lmax = 0; + uint32_t nn_cnt = 0; bool has_minmax; string_stats minval, maxval; - for (uint32_t i = 0; i < s->group.num_rows; i += 1024) { + for (uint32_t i = 0; i < s->group.num_rows; i += block_size) { uint32_t r = i + t; uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; @@ -362,38 +333,35 @@ void __device__ gatherStringColumnStats(stats_state_s *s, uint32_t t, Storage &s s->ck.non_nulls = nn_cnt; s->ck.null_count = s->group.num_rows - nn_cnt; } - minval = WarpReduceMinString(smin, lmin); - maxval = WarpReduceMaxString(smax, lmax); - len_sum = warp_reduce(storage.string_stats[t / 32]).Sum(len_sum); + minval = WarpReduceMinString(smin, lmin); + maxval = WarpReduceMaxString(smax, lmax); __syncwarp(); if (!(t & 0x1f)) { s->warp_min[t >> 5].str_val.ptr = minval.ptr; s->warp_min[t >> 5].str_val.length = minval.length; s->warp_max[t >> 5].str_val.ptr = maxval.ptr; s->warp_max[t >> 5].str_val.length = maxval.length; - s->warp_sum[t >> 5].str_val.length = len_sum; } has_minmax = __syncthreads_or(smin != nullptr); + if (has_minmax) { len_sum = block_reduce(storage.string_stats).Sum(len_sum); } if (t < 32 * 1) { minval = WarpReduceMinString(s->warp_min[t].str_val.ptr, s->warp_min[t].str_val.length); if (!(t & 0x1f)) { - s->ck.min_value.str_val.ptr = minval.ptr; - s->ck.min_value.str_val.length = minval.length; - s->ck.has_minmax = has_minmax; + if (has_minmax) { + s->ck.min_value.str_val.ptr = minval.ptr; + s->ck.min_value.str_val.length = minval.length; + s->ck.sum.i_val = len_sum; + } + s->ck.has_minmax = has_minmax; + s->ck.has_sum = has_minmax; } - } else if (t < 32 * 2) { + } else if (t < 32 * 2 and has_minmax) { maxval = WarpReduceMaxString(s->warp_max[t & 0x1f].str_val.ptr, s->warp_max[t & 0x1f].str_val.length); if (!(t & 0x1f)) { s->ck.max_value.str_val.ptr = maxval.ptr; s->ck.max_value.str_val.length = maxval.length; } - } else if (t < 32 * 3) { - len_sum = warp_reduce(storage.string_stats[t / 32]).Sum(s->warp_sum[t & 0x1f].str_val.length); - if (!(t & 0x1f)) { - s->ck.sum.i_val = len_sum; - s->ck.has_sum = has_minmax; - } } } @@ -412,9 +380,9 @@ __global__ void __launch_bounds__(block_size, 1) { __shared__ __align__(8) stats_state_s state_g; __shared__ union { - typename cub::WarpReduce::TempStorage integer_stats[block_size / 32]; - typename cub::WarpReduce::TempStorage float_stats[block_size / 32]; - typename cub::WarpReduce::TempStorage string_stats[block_size / 32]; + typename cub::BlockReduce::TempStorage integer_stats; + typename cub::BlockReduce::TempStorage float_stats; + typename cub::BlockReduce::TempStorage string_stats; } temp_storage; stats_state_s *const s = &state_g; @@ -455,7 +423,7 @@ __global__ void __launch_bounds__(block_size, 1) * @param ck_in pointer to first statistic chunk * @param num_chunks number of statistic chunks to merge * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ mergeIntColumnStats(merge_state_s *s, @@ -470,8 +438,8 @@ void __device__ mergeIntColumnStats(merge_state_s *s, int64_t vsum = 0; uint32_t non_nulls = 0; uint32_t null_count = 0; - bool has_minmax; - for (uint32_t i = t; i < num_chunks; i += 1024) { + __shared__ volatile bool has_minmax; + for (uint32_t i = t; i < num_chunks; i += block_size) { const statistics_chunk *ck = &ck_in[i]; if (ck->has_minmax) { vmin = min(vmin, ck->min_value.i_val); @@ -481,52 +449,29 @@ void __device__ mergeIntColumnStats(merge_state_s *s, non_nulls += ck->non_nulls; null_count += ck->null_count; } - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(non_nulls); - __syncwarp(); - vmin = cub::WarpReduce(storage.i64[t / 32]).Reduce(vmin, cub::Min()); - __syncwarp(); - vmin = shuffle(vmin); - - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(null_count); - __syncwarp(); - vmax = cub::WarpReduce(storage.i64[t / 32]).Reduce(vmax, cub::Max()); - __syncwarp(); - vmax = shuffle(vmax); - - vsum = cub::WarpReduce(storage.i64[t / 32]).Sum(vsum); + vmin = cub::BlockReduce(storage.i64).Reduce(vmin, cub::Min()); + __syncthreads(); + vmax = cub::BlockReduce(storage.i64).Reduce(vmax, cub::Max()); + if (!t) { has_minmax = (vmin <= vmax); } + __syncthreads(); + non_nulls = cub::BlockReduce(storage.u32).Sum(non_nulls); + __syncthreads(); + null_count = cub::BlockReduce(storage.u32).Sum(null_count); + __syncthreads(); + if (has_minmax) { vsum = cub::BlockReduce(storage.i64).Sum(vsum); } - if (!(t & 0x1f)) { - s->warp_non_nulls[t >> 5] = non_nulls; - s->warp_nulls[t >> 5] = null_count; - s->warp_min[t >> 5].i_val = vmin; - s->warp_max[t >> 5].i_val = vmax; - s->warp_sum[t >> 5].i_val = vsum; - } - has_minmax = __syncthreads_or(vmin <= vmax); - if (t < 32 * 1) { - vmin = cub::WarpReduce(storage.i64[t / 32]).Reduce(s->warp_min[t].i_val, cub::Min()); - if (!(t & 0x1f)) { + if (!t) { + if (has_minmax) { s->ck.min_value.i_val = vmin; - s->ck.has_minmax = (has_minmax); + s->ck.max_value.i_val = vmax; + s->ck.sum.i_val = vsum; } - } else if (t < 32 * 2) { - vmax = - cub::WarpReduce(storage.i64[t / 32]).Reduce(s->warp_max[t & 0x1f].i_val, cub::Max()); - if (!(t & 0x1f)) { s->ck.max_value.i_val = vmax; } - } else if (t < 32 * 3) { - vsum = cub::WarpReduce(storage.i64[t / 32]).Sum(s->warp_sum[t & 0x1f].i_val); - if (!(t & 0x1f)) { - s->ck.sum.i_val = vsum; - // TODO: For now, don't set the sum flag with 64-bit values so we don't have to check for - // 64-bit sum overflow - s->ck.has_sum = (dtype <= dtype_int32 && has_minmax); - } - } else if (t < 32 * 4) { - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_non_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.non_nulls = non_nulls; } - } else if (t < 32 * 5) { - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.null_count = null_count; } + s->ck.has_minmax = has_minmax; + // TODO: For now, don't set the sum flag with 64-bit values so we don't have to check for + // 64-bit sum overflow + s->ck.has_sum = (dtype <= dtype_int32 && has_minmax); + s->ck.non_nulls = non_nulls; + s->ck.null_count = null_count; } } @@ -538,7 +483,7 @@ void __device__ mergeIntColumnStats(merge_state_s *s, * @param ck_in pointer to first statistic chunk * @param num_chunks number of statistic chunks to merge * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ mergeFloatColumnStats(merge_state_s *s, @@ -552,8 +497,8 @@ void __device__ mergeFloatColumnStats(merge_state_s *s, double vsum = 0; uint32_t non_nulls = 0; uint32_t null_count = 0; - bool has_minmax; - for (uint32_t i = t; i < num_chunks; i += 1024) { + __shared__ volatile bool has_minmax; + for (uint32_t i = t; i < num_chunks; i += block_size) { const statistics_chunk *ck = &ck_in[i]; if (ck->has_minmax) { double v0 = ck->min_value.fp_val; @@ -566,51 +511,29 @@ void __device__ mergeFloatColumnStats(merge_state_s *s, null_count += ck->null_count; } - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(non_nulls); - __syncwarp(); - vmin = cub::WarpReduce(storage.f64[t / 32]).Reduce(vmin, cub::Min()); - __syncwarp(); - vmin = shuffle(vmin); - - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(null_count); - __syncwarp(); - vmax = cub::WarpReduce(storage.f64[t / 32]).Reduce(vmax, cub::Max()); - __syncwarp(); - vmax = shuffle(vmax); - - vsum = cub::WarpReduce(storage.f64[t / 32]).Reduce(vsum, IgnoreNaNSum()); - - if (!(t & 0x1f)) { - s->warp_non_nulls[t >> 5] = non_nulls; - s->warp_nulls[t >> 5] = null_count; - s->warp_min[t >> 5].fp_val = vmin; - s->warp_max[t >> 5].fp_val = vmax; - s->warp_sum[t >> 5].fp_val = vsum; + vmin = cub::BlockReduce(storage.f64).Reduce(vmin, cub::Min()); + __syncthreads(); + vmax = cub::BlockReduce(storage.f64).Reduce(vmax, cub::Max()); + if (!t) { has_minmax = (vmin <= vmax); } + __syncthreads(); + non_nulls = cub::BlockReduce(storage.u32).Sum(non_nulls); + __syncthreads(); + null_count = cub::BlockReduce(storage.u32).Sum(null_count); + __syncthreads(); + if (has_minmax) { + vsum = cub::BlockReduce(storage.f64).Reduce(vsum, IgnoreNaNSum()); } - has_minmax = __syncthreads_or(vmin <= vmax); - if (t < 32 * 1) { - vmin = cub::WarpReduce(storage.f64[t / 32]).Reduce(s->warp_min[t].fp_val, cub::Min()); - if (!(t & 0x1f)) { + + if (!t) { + if (has_minmax) { s->ck.min_value.fp_val = (vmin != 0.0) ? vmin : CUDART_NEG_ZERO; - s->ck.has_minmax = (has_minmax); - } - } else if (t < 32 * 2) { - vmax = - cub::WarpReduce(storage.f64[t / 32]).Reduce(s->warp_max[t & 0x1f].fp_val, cub::Max()); - if (!(t & 0x1f)) { s->ck.max_value.fp_val = (vmax != 0.0) ? vmax : CUDART_ZERO; } - } else if (t < 32 * 3) { - vsum = cub::WarpReduce(storage.f64[t / 32]) - .Reduce(s->warp_sum[t & 0x1f].fp_val, IgnoreNaNSum()); - if (!(t & 0x1f)) { - s->ck.sum.fp_val = vsum; - s->ck.has_sum = (has_minmax); // Implies sum is valid as well + s->ck.max_value.fp_val = (vmax != 0.0) ? vmax : CUDART_ZERO; + s->ck.sum.fp_val = vsum; } - } else if (t < 32 * 4) { - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_non_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.non_nulls = non_nulls; } - } else if (t < 32 * 5) { - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.null_count = null_count; } + s->ck.has_minmax = has_minmax; + s->ck.has_sum = has_minmax; // Implies sum is valid as well + s->ck.non_nulls = non_nulls; + s->ck.null_count = null_count; } } @@ -621,7 +544,7 @@ void __device__ mergeFloatColumnStats(merge_state_s *s, * @param ck_in pointer to first statistic chunk * @param num_chunks number of statistic chunks to merge * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ mergeStringColumnStats(merge_state_s *s, @@ -640,7 +563,7 @@ void __device__ mergeStringColumnStats(merge_state_s *s, bool has_minmax; string_stats minval, maxval; - for (uint32_t i = t; i < num_chunks; i += 1024) { + for (uint32_t i = t; i < num_chunks; i += block_size) { const statistics_chunk *ck = &ck_in[i]; if (ck->has_minmax) { uint32_t len0 = ck->min_value.str_val.length; @@ -660,50 +583,41 @@ void __device__ mergeStringColumnStats(merge_state_s *s, non_nulls += ck->non_nulls; null_count += ck->null_count; } - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(non_nulls); - __syncwarp(); - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(null_count); - __syncwarp(); - minval = WarpReduceMinString(smin, lmin); - maxval = WarpReduceMaxString(smax, lmax); - len_sum = cub::WarpReduce(storage.u32[t / 32]).Sum(len_sum); + minval = WarpReduceMinString(smin, lmin); + maxval = WarpReduceMaxString(smax, lmax); if (!(t & 0x1f)) { - s->warp_non_nulls[t >> 5] = non_nulls; - s->warp_nulls[t >> 5] = null_count; s->warp_min[t >> 5].str_val.ptr = minval.ptr; s->warp_min[t >> 5].str_val.length = minval.length; s->warp_max[t >> 5].str_val.ptr = maxval.ptr; s->warp_max[t >> 5].str_val.length = maxval.length; - s->warp_sum[t >> 5].str_val.length = len_sum; } has_minmax = __syncthreads_or(smin != nullptr); + + non_nulls = cub::BlockReduce(storage.u32).Sum(non_nulls); + __syncthreads(); + null_count = cub::BlockReduce(storage.u32).Sum(null_count); + __syncthreads(); + if (has_minmax) { len_sum = cub::BlockReduce(storage.u32).Sum(len_sum); } if (t < 32 * 1) { minval = WarpReduceMinString(s->warp_min[t].str_val.ptr, s->warp_min[t].str_val.length); if (!(t & 0x1f)) { - s->ck.min_value.str_val.ptr = minval.ptr; - s->ck.min_value.str_val.length = minval.length; - s->ck.has_minmax = has_minmax; + if (has_minmax) { + s->ck.min_value.str_val.ptr = minval.ptr; + s->ck.min_value.str_val.length = minval.length; + s->ck.sum.i_val = len_sum; + } + s->ck.has_minmax = has_minmax; + s->ck.has_sum = has_minmax; + s->ck.non_nulls = non_nulls; + s->ck.null_count = null_count; } } else if (t < 32 * 2) { maxval = WarpReduceMaxString(s->warp_max[t & 0x1f].str_val.ptr, s->warp_max[t & 0x1f].str_val.length); - if (!(t & 0x1f)) { + if (!((t & 0x1f) and has_minmax)) { s->ck.max_value.str_val.ptr = maxval.ptr; s->ck.max_value.str_val.length = maxval.length; } - } else if (t < 32 * 3) { - len_sum = - cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_sum[t & 0x1f].str_val.length); - if (!(t & 0x1f)) { - s->ck.sum.i_val = len_sum; - s->ck.has_sum = has_minmax; - } - } else if (t < 32 * 4) { - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_non_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.non_nulls = non_nulls; } - } else if (t < 32 * 5) { - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.null_count = null_count; } } } @@ -724,9 +638,9 @@ __global__ void __launch_bounds__(block_size, 1) { __shared__ __align__(8) merge_state_s state_g; __shared__ struct { - typename cub::WarpReduce::TempStorage u32[block_size / 32]; - typename cub::WarpReduce::TempStorage i64[block_size / 32]; - typename cub::WarpReduce::TempStorage f64[block_size / 32]; + typename cub::BlockReduce::TempStorage u32; + typename cub::BlockReduce::TempStorage i64; + typename cub::BlockReduce::TempStorage f64; } storage; merge_state_s *const s = &state_g; @@ -773,7 +687,8 @@ void GatherColumnStatistics(statistics_chunk *chunks, uint32_t num_chunks, rmm::cuda_stream_view stream) { - gpuGatherColumnStatistics<1024><<>>(chunks, groups); + gpuGatherColumnStatistics + <<>>(chunks, groups); } /** @@ -791,8 +706,8 @@ void MergeColumnStatistics(statistics_chunk *chunks_out, uint32_t num_chunks, rmm::cuda_stream_view stream) { - gpuMergeColumnStatistics<1024> - <<>>(chunks_out, chunks_in, groups); + gpuMergeColumnStatistics + <<>>(chunks_out, chunks_in, groups); } } // namespace io diff --git a/cpp/src/io/utilities/block_utils.cuh b/cpp/src/io/utilities/block_utils.cuh index 9046eebcb02..4c03f9a9ca0 100644 --- a/cpp/src/io/utilities/block_utils.cuh +++ b/cpp/src/io/utilities/block_utils.cuh @@ -47,36 +47,6 @@ inline __device__ void nanosleep(T d) } // Warp reduction helpers -template -inline __device__ T WarpReduceSum2(T acc) -{ - return acc + shuffle_xor(acc, 1); -} -template -inline __device__ T WarpReduceSum4(T acc) -{ - acc = WarpReduceSum2(acc); - return acc + shuffle_xor(acc, 2); -} -template -inline __device__ T WarpReduceSum8(T acc) -{ - acc = WarpReduceSum4(acc); - return acc + shuffle_xor(acc, 4); -} -template -inline __device__ T WarpReduceSum16(T acc) -{ - acc = WarpReduceSum8(acc); - return acc + shuffle_xor(acc, 8); -} -template -inline __device__ T WarpReduceSum32(T acc) -{ - acc = WarpReduceSum16(acc); - return acc + shuffle_xor(acc, 16); -} - template inline __device__ T WarpReduceOr2(T acc) {