From 7c609d2b02ba17588b6e61ed0260fc656db029a2 Mon Sep 17 00:00:00 2001 From: "Ram (Ramakrishna Prabhu)" <42624703+rgsl888prabhu@users.noreply.github.com> Date: Fri, 12 Feb 2021 11:39:43 -0600 Subject: [PATCH] Abstracting block reduce and block scan from cuIO kernels with `cub` apis (#7278) closes #6238 This PR replaces existing usages of `warp_reduce` or `warp_scans` which were used for block reduction/scan with `cub::BlockReduce/cub::BlockScan`. The changes has positive effect on mostly on numerical data processing, but seems to be little slower in case of string type. [all files.zip](https://github.com/rapidsai/cudf/files/5921314/all.files.zip) Update: Graphs have been updated after fixing a bug which also resolved several other performance issues.
Perf plots **Benchmark Performance** y-axis is in `ms` and there are three sets of plot, one which compares mean performance change next to each other, which also has error bars which is standard deviation calculated using five sets of benchmarks. Next one is difference of performance between cub::block_reduce/cub::block_scan with generic approach and the last one is percentage change in performance compared to branch-0.18. If the value is positive, then test is taking less time, else it is taking more time compared to main branch. **CSV READER** ![CSV_READER_comp_0_9](https://user-images.githubusercontent.com/42624703/106804864-90914b00-662b-11eb-8e5c-10510e4d4666.png) ![CSV_READER_comp_10_19](https://user-images.githubusercontent.com/42624703/106804866-9129e180-662b-11eb-8deb-a5090e4e2d9a.png) ![CSV_READER_diff_0_39](https://user-images.githubusercontent.com/42624703/106804867-9129e180-662b-11eb-95d3-5415cfe2a58d.png) ![CSV_READER_per_0_39](https://user-images.githubusercontent.com/42624703/106804869-91c27800-662b-11eb-8ecf-2bf16ccc8c00.png) ![ORC_WRITER_comp_40_49](https://user-images.githubusercontent.com/42624703/106804870-91c27800-662b-11eb-8cad-8d06fd299ce8.png) **ORC READER** ![ORC_READER_comp_0_9](https://user-images.githubusercontent.com/42624703/107102359-f0c8ee00-67df-11eb-9140-4f9f1147ee02.png) ![ORC_READER_comp_10_19](https://user-images.githubusercontent.com/42624703/107102360-f1618480-67df-11eb-80d5-eab360c80d85.png) ![ORC_READER_comp_20_29](https://user-images.githubusercontent.com/42624703/107102361-f1618480-67df-11eb-8729-d725a8beb91e.png) ![ORC_READER_comp_30_39](https://user-images.githubusercontent.com/42624703/107102362-f1fa1b00-67df-11eb-8541-59e6643791b5.png) ![ORC_READER_comp_40_49](https://user-images.githubusercontent.com/42624703/107102363-f1fa1b00-67df-11eb-864f-f99f2346404b.png) ![ORC_READER_comp_50_59](https://user-images.githubusercontent.com/42624703/107102364-f1fa1b00-67df-11eb-962d-9f71db2b6caf.png) ![ORC_READER_comp_60_69](https://user-images.githubusercontent.com/42624703/107102365-f1fa1b00-67df-11eb-8447-5cc17af20e51.png) ![ORC_READER_comp_70_79](https://user-images.githubusercontent.com/42624703/107102366-f292b180-67df-11eb-9d0c-730be0741eac.png) ![ORC_READER_diff_0_39](https://user-images.githubusercontent.com/42624703/107102368-f292b180-67df-11eb-8b0b-0ab94f06276b.png) ![ORC_READER_diff_40_79](https://user-images.githubusercontent.com/42624703/107102370-f292b180-67df-11eb-9d57-ce820dd586c6.png) ![ORC_READER_per_0_39](https://user-images.githubusercontent.com/42624703/107102371-f32b4800-67df-11eb-80ad-317ee8b29a51.png) ![ORC_READER_per_40_79](https://user-images.githubusercontent.com/42624703/107102372-f32b4800-67df-11eb-8180-fe59a4c2d5da.png) **ORC WRITER** ![ORC_WRITER_comp_0_9](https://user-images.githubusercontent.com/42624703/107102389-03432780-67e0-11eb-8782-a71ddadb72f3.png) ![ORC_WRITER_comp_10_19](https://user-images.githubusercontent.com/42624703/107102390-03dbbe00-67e0-11eb-8825-4896cffabb49.png) ![ORC_WRITER_comp_20_29](https://user-images.githubusercontent.com/42624703/107102392-03dbbe00-67e0-11eb-94dd-6380105f25e8.png) ![ORC_WRITER_comp_30_39](https://user-images.githubusercontent.com/42624703/107102393-03dbbe00-67e0-11eb-853f-6d2d518c038f.png) ![ORC_WRITER_comp_40_49](https://user-images.githubusercontent.com/42624703/107102394-04745480-67e0-11eb-8be4-48a3efdbf446.png) ![ORC_WRITER_comp_50_59](https://user-images.githubusercontent.com/42624703/107102395-04745480-67e0-11eb-8272-4563f28e9bf6.png) ![ORC_WRITER_comp_60_69](https://user-images.githubusercontent.com/42624703/107102396-04745480-67e0-11eb-9597-58142b1dfa99.png) ![ORC_WRITER_comp_70_79](https://user-images.githubusercontent.com/42624703/107102397-050ceb00-67e0-11eb-8e9f-56305d11cfbc.png) ![ORC_WRITER_comp_80_89](https://user-images.githubusercontent.com/42624703/107102398-050ceb00-67e0-11eb-912e-1cea0a96721b.png) ![ORC_WRITER_comp_90_99](https://user-images.githubusercontent.com/42624703/107102399-050ceb00-67e0-11eb-86f4-98de207ec87d.png) ![ORC_WRITER_diff_0_39](https://user-images.githubusercontent.com/42624703/107102401-05a58180-67e0-11eb-911c-66ea27d49633.png) ![ORC_WRITER_diff_40_79](https://user-images.githubusercontent.com/42624703/107102402-05a58180-67e0-11eb-8d3b-5a9f51620ae2.png) ![ORC_WRITER_diff_80_119](https://user-images.githubusercontent.com/42624703/107102403-05a58180-67e0-11eb-9e6c-9a0d3873502b.png) ![ORC_WRITER_per_0_39](https://user-images.githubusercontent.com/42624703/107102404-063e1800-67e0-11eb-9534-646db30944b6.png) ![ORC_WRITER_per_40_79](https://user-images.githubusercontent.com/42624703/107102406-063e1800-67e0-11eb-89bb-b77e5c627f68.png) ![ORC_WRITER_per_80_119](https://user-images.githubusercontent.com/42624703/107102407-063e1800-67e0-11eb-9be3-3d5bb106525c.png) **PARQUET CHUNKED WRITER** ![PQ_CHUNK_WRITER_comp_0_9](https://user-images.githubusercontent.com/42624703/107102412-0b02cc00-67e0-11eb-9038-162b4bcfe2a7.png) ![PQ_CHUNK_WRITER_diff_0_39](https://user-images.githubusercontent.com/42624703/107102413-0b9b6280-67e0-11eb-8af1-a06c1b15f987.png) ![PQ_CHUNK_WRITER_per_0_39](https://user-images.githubusercontent.com/42624703/107102415-0b9b6280-67e0-11eb-860b-f8ac579af9c8.png) **PARQUET READER** ![PQ_READER_comp_0_9](https://user-images.githubusercontent.com/42624703/107102421-13f39d80-67e0-11eb-9bd9-5d45094a4cc8.png) ![PQ_READER_comp_10_19](https://user-images.githubusercontent.com/42624703/107102422-148c3400-67e0-11eb-93d7-d37b6f8f0346.png) ![PQ_READER_comp_20_29](https://user-images.githubusercontent.com/42624703/107102423-148c3400-67e0-11eb-9ed0-3c613311164b.png) ![PQ_READER_comp_30_39](https://user-images.githubusercontent.com/42624703/107102424-1524ca80-67e0-11eb-8773-328871b75e6f.png) ![PQ_READER_comp_40_49](https://user-images.githubusercontent.com/42624703/107102425-1524ca80-67e0-11eb-9841-dd2fae2e00e3.png) ![PQ_READER_comp_50_59](https://user-images.githubusercontent.com/42624703/107102426-1524ca80-67e0-11eb-8cb3-36cde66909c0.png) ![PQ_READER_comp_60_69](https://user-images.githubusercontent.com/42624703/107102429-15bd6100-67e0-11eb-9825-5eb65d066343.png) ![PQ_READER_comp_70_79](https://user-images.githubusercontent.com/42624703/107102430-15bd6100-67e0-11eb-9570-e3f3f4df6e0a.png) ![PQ_READER_comp_80_89](https://user-images.githubusercontent.com/42624703/107102431-15bd6100-67e0-11eb-825e-633f33713844.png) ![PQ_READER_comp_90_99](https://user-images.githubusercontent.com/42624703/107102432-1655f780-67e0-11eb-9e51-4fbca0c640fe.png) ![PQ_READER_diff_0_39](https://user-images.githubusercontent.com/42624703/107102433-1655f780-67e0-11eb-9c61-457d7bdca759.png) ![PQ_READER_diff_40_79](https://user-images.githubusercontent.com/42624703/107102434-1655f780-67e0-11eb-9471-7b071bfc8e72.png) ![PQ_READER_diff_80_119](https://user-images.githubusercontent.com/42624703/107102435-16ee8e00-67e0-11eb-83a5-5ee304fdf8b7.png) ![PQ_READER_per_0_39](https://user-images.githubusercontent.com/42624703/107102436-16ee8e00-67e0-11eb-9df2-dbe74defd858.png) ![PQ_READER_per_40_79](https://user-images.githubusercontent.com/42624703/107102438-17872480-67e0-11eb-81d0-9611b2f554c4.png) ![PQ_READER_per_80_119](https://user-images.githubusercontent.com/42624703/107102441-17872480-67e0-11eb-81c7-a30ffc3d74a5.png) **PARQUET WRITER** ![PQ_WRITER_comp_0_9](https://user-images.githubusercontent.com/42624703/107102450-22da5000-67e0-11eb-8cfe-8198bdfef3b5.png) ![PQ_WRITER_comp_10_19](https://user-images.githubusercontent.com/42624703/107102451-2372e680-67e0-11eb-9036-f10cdf577e7e.png) ![PQ_WRITER_comp_20_29](https://user-images.githubusercontent.com/42624703/107102452-2372e680-67e0-11eb-966a-dd584ba24aa6.png) ![PQ_WRITER_comp_30_39](https://user-images.githubusercontent.com/42624703/107102453-240b7d00-67e0-11eb-9cd6-f2ee98eb679e.png) ![PQ_WRITER_comp_40_49](https://user-images.githubusercontent.com/42624703/107102454-240b7d00-67e0-11eb-87bd-0ed78398394b.png) ![PQ_WRITER_comp_50_59](https://user-images.githubusercontent.com/42624703/107102455-240b7d00-67e0-11eb-8769-ebf545d1f37d.png) ![PQ_WRITER_comp_60_69](https://user-images.githubusercontent.com/42624703/107102456-24a41380-67e0-11eb-82be-00729da0fad0.png) ![PQ_WRITER_comp_70_79](https://user-images.githubusercontent.com/42624703/107102457-24a41380-67e0-11eb-8117-7704b7c7b085.png) ![PQ_WRITER_comp_80_89](https://user-images.githubusercontent.com/42624703/107102458-24a41380-67e0-11eb-99de-df5b077b6b5e.png) ![PQ_WRITER_comp_90_99](https://user-images.githubusercontent.com/42624703/107102459-253caa00-67e0-11eb-9393-2f2c54021bab.png) ![PQ_WRITER_comp_100_109](https://user-images.githubusercontent.com/42624703/107102462-253caa00-67e0-11eb-9c22-d7c48b2bbb02.png) ![PQ_WRITER_comp_110_119](https://user-images.githubusercontent.com/42624703/107102464-253caa00-67e0-11eb-896d-52375e61c528.png) ![PQ_WRITER_comp_120_129](https://user-images.githubusercontent.com/42624703/107102465-25d54080-67e0-11eb-9ab7-eb37f1986c7c.png) ![PQ_WRITER_comp_130_139](https://user-images.githubusercontent.com/42624703/107102466-25d54080-67e0-11eb-8b68-9eb660a81a9b.png) ![PQ_WRITER_diff_0_39](https://user-images.githubusercontent.com/42624703/107102467-25d54080-67e0-11eb-989d-4c1994bb127b.png) ![PQ_WRITER_diff_40_79](https://user-images.githubusercontent.com/42624703/107102468-266dd700-67e0-11eb-98c1-7b6af9155cb4.png) ![PQ_WRITER_diff_80_119](https://user-images.githubusercontent.com/42624703/107102469-266dd700-67e0-11eb-921d-f7e83009c68f.png) ![PQ_WRITER_diff_120_159](https://user-images.githubusercontent.com/42624703/107102470-266dd700-67e0-11eb-8f4f-42f920d30bbc.png) ![PQ_WRITER_per_0_39](https://user-images.githubusercontent.com/42624703/107102472-27066d80-67e0-11eb-9e40-aa06098bbdae.png) ![PQ_WRITER_per_40_79](https://user-images.githubusercontent.com/42624703/107102474-27066d80-67e0-11eb-9227-aff94e67ecfc.png) ![PQ_WRITER_per_80_119](https://user-images.githubusercontent.com/42624703/107102476-279f0400-67e0-11eb-91c5-ad01f1aa09ea.png) ![PQ_WRITER_per_120_159](https://user-images.githubusercontent.com/42624703/107102477-279f0400-67e0-11eb-89ea-c9a5e113c628.png) Authors: - Ram (Ramakrishna Prabhu) (@rgsl888prabhu) Approvers: - Devavret Makkar (@devavret) - Vukasin Milovanovic (@vuule) URL: https://github.com/rapidsai/cudf/pull/7278 --- cpp/src/io/csv/csv_gpu.cu | 31 +-- cpp/src/io/orc/dict_enc.cu | 121 ++++----- cpp/src/io/orc/stats_enc.cu | 35 +-- cpp/src/io/orc/stripe_data.cu | 63 ++--- cpp/src/io/orc/stripe_enc.cu | 188 +++++--------- cpp/src/io/parquet/page_dict.cu | 33 ++- cpp/src/io/parquet/page_enc.cu | 111 +++----- cpp/src/io/statistics/column_stats.cu | 361 ++++++++++---------------- cpp/src/io/utilities/block_utils.cuh | 30 --- 9 files changed, 354 insertions(+), 619 deletions(-) diff --git a/cpp/src/io/csv/csv_gpu.cu b/cpp/src/io/csv/csv_gpu.cu index ef1c17aa817..041d1de3404 100644 --- a/cpp/src/io/csv/csv_gpu.cu +++ b/cpp/src/io/csv/csv_gpu.cu @@ -860,13 +860,11 @@ __global__ void __launch_bounds__(rowofs_block_dim) int escapechar, int commentchar) { - auto start = data.begin(); - __shared__ __align__(8) uint64_t ctxtree[rowofs_block_dim * 2]; - using warp_reduce = typename cub::WarpReduce; - using half_warp_reduce = typename cub::WarpReduce; + auto start = data.begin(); + using block_reduce = typename cub::BlockReduce; __shared__ union { - typename warp_reduce::TempStorage full; - typename half_warp_reduce::TempStorage half[rowofs_block_dim / 32]; + typename block_reduce::TempStorage bk_storage; + __align__(8) uint64_t ctxtree[rowofs_block_dim * 2]; } temp_storage; const char *end = start + (min(parse_pos + chunk_size, data_size) - start_offset); @@ -936,16 +934,16 @@ __global__ void __launch_bounds__(rowofs_block_dim) // Convert the long-form {rowmap,outctx}[inctx] version into packed version // {rowcount,ouctx}[inctx], then merge the row contexts of the 32-character blocks into // a single 16K-character block context - rowctx_merge_transform(ctxtree, pack_rowmaps(ctx_map), t); + rowctx_merge_transform(temp_storage.ctxtree, pack_rowmaps(ctx_map), t); // If this is the second phase, get the block's initial parser state and row counter if (offsets_out.data()) { - if (t == 0) { ctxtree[0] = row_ctx[blockIdx.x]; } + if (t == 0) { temp_storage.ctxtree[0] = row_ctx[blockIdx.x]; } __syncthreads(); // Walk back the transform tree with the known initial parser state - rowctx32_t ctx = rowctx_inverse_merge_transform(ctxtree, t); - uint64_t row = (ctxtree[0] >> 2) + (ctx >> 2); + rowctx32_t ctx = rowctx_inverse_merge_transform(temp_storage.ctxtree, t); + uint64_t row = (temp_storage.ctxtree[0] >> 2) + (ctx >> 2); uint32_t rows_out_of_range = 0; uint32_t rowmap = select_rowmap(ctx_map, ctx & 3); // Output row positions @@ -960,18 +958,13 @@ __global__ void __launch_bounds__(rowofs_block_dim) row++; rowmap >>= pos; } - // Return the number of rows out of range - rows_out_of_range = half_warp_reduce(temp_storage.half[t / 32]).Sum(rows_out_of_range); - __syncthreads(); - if (!(t & 0xf)) { ctxtree[t >> 4] = rows_out_of_range; } __syncthreads(); - if (t < 32) { - rows_out_of_range = warp_reduce(temp_storage.full).Sum(static_cast(ctxtree[t])); - if (t == 0) { row_ctx[blockIdx.x] = rows_out_of_range; } - } + // Return the number of rows out of range + rows_out_of_range = block_reduce(temp_storage.bk_storage).Sum(rows_out_of_range); + if (t == 0) { row_ctx[blockIdx.x] = rows_out_of_range; } } else { // Just store the row counts and output contexts - if (t == 0) { row_ctx[blockIdx.x] = ctxtree[1]; } + if (t == 0) { row_ctx[blockIdx.x] = temp_storage.ctxtree[1]; } } } diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index b814f5364ca..1ee57f4a2fd 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -62,13 +62,17 @@ static inline __device__ uint32_t nvstr_init_hash(char const *ptr, uint32_t len) * * @param[in,out] s dictionary builder state * @param[in] t thread id + * @param[in] temp_storage shared memory storage to scan non-null positions */ -static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, int t) +template +static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, + int t, + Storage &temp_storage) { if (t == 0) { s->nnz = 0; } for (uint32_t i = 0; i < s->chunk.num_rows; i += 512) { const uint32_t *valid_map = s->chunk.valid_map_base; - uint32_t is_valid, nz_map, nz_pos; + uint32_t is_valid, nz_pos; if (t < 16) { if (!valid_map) { s->scratch_red[t] = 0xffffffffu; @@ -88,18 +92,13 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, int t) } __syncthreads(); is_valid = (i + t < s->chunk.num_rows) ? (s->scratch_red[t >> 5] >> (t & 0x1f)) & 1 : 0; - nz_map = ballot(is_valid); - nz_pos = s->nnz + __popc(nz_map & (0x7fffffffu >> (0x1fu - ((uint32_t)t & 0x1f)))); - if (!(t & 0x1f)) { s->scratch_red[16 + (t >> 5)] = __popc(nz_map); } + uint32_t tmp_nnz; + cub::BlockScan(temp_storage) + .ExclusiveSum(is_valid, nz_pos, tmp_nnz); + nz_pos += s->nnz; __syncthreads(); - if (t < 32) { - uint32_t nnz = s->scratch_red[16 + (t & 0xf)]; - uint32_t nnz_pos = WarpReducePos16(nnz, t); - if (t == 0xf) { s->nnz += nnz_pos; } - if (t <= 0xf) { s->scratch_red[t] = nnz_pos - nnz; } - } - __syncthreads(); - if (is_valid) { s->dict[nz_pos + s->scratch_red[t >> 5]] = i + t; } + if (!t) { s->nnz += tmp_nnz; } + if (is_valid) { s->dict[nz_pos] = i + t; } __syncthreads(); } } @@ -116,11 +115,13 @@ __global__ void __launch_bounds__(block_size, 2) gpuInitDictionaryIndices(DictionaryChunk *chunks, uint32_t num_columns) { __shared__ __align__(16) dictinit_state_s state_g; - using warp_reduce = cub::WarpReduce; - using half_warp_reduce = cub::WarpReduce; + + using block_reduce = cub::BlockReduce; + using block_scan = cub::BlockScan; + __shared__ union { - typename warp_reduce::TempStorage full[block_size / 32]; - typename half_warp_reduce::TempStorage half[block_size / 32]; + typename block_reduce::TempStorage reduce_storage; + typename block_scan::TempStorage scan_storage; } temp_storage; dictinit_state_s *const s = &state_g; @@ -138,7 +139,7 @@ __global__ void __launch_bounds__(block_size, 2) __syncthreads(); // First, take care of NULLs, and count how many strings we have (TODO: bypass this step when // there are no nulls) - LoadNonNullIndices(s, t); + LoadNonNullIndices(s, t, temp_storage.scan_storage); // Sum the lengths of all the strings if (t == 0) { s->chunk.string_char_count = 0; @@ -157,13 +158,8 @@ __global__ void __launch_bounds__(block_size, 2) len = static_cast(ck_data[ck_row].count); hash = nvstr_init_hash(ck_data[ck_row].ptr, len); } - len = half_warp_reduce(temp_storage.half[t / 32]).Sum(len); - if (!(t & 0xf)) { s->scratch_red[t >> 4] = len; } - __syncthreads(); - if (t < 32) { - len = warp_reduce(temp_storage.full[t / 32]).Sum(s->scratch_red[t]); - if (t == 0) s->chunk.string_char_count += len; - } + len = block_reduce(temp_storage.reduce_storage).Sum(len); + if (t == 0) s->chunk.string_char_count += len; if (i + t < nnz) { atomicAdd(&s->map.u32[hash >> 1], 1 << ((hash & 1) ? 16 : 0)); dict_data[i + t] = start_row + ck_row; @@ -182,21 +178,13 @@ __global__ void __launch_bounds__(block_size, 2) uint32_t sum23 = count23 + (count23 << 16); uint32_t sum45 = count45 + (count45 << 16); uint32_t sum67 = count67 + (count67 << 16); - uint32_t sum_w, tmp; sum23 += (sum01 >> 16) * 0x10001; sum45 += (sum23 >> 16) * 0x10001; sum67 += (sum45 >> 16) * 0x10001; - sum_w = sum67 >> 16; - sum_w = WarpReducePos16(sum_w, t); - if ((t & 0xf) == 0xf) { s->scratch_red[t >> 4] = sum_w; } - __syncthreads(); - if (t < 32) { - uint32_t sum_b = WarpReducePos32(s->scratch_red[t], t); - s->scratch_red[t] = sum_b; - } + uint32_t sum_w = sum67 >> 16; + block_scan(temp_storage.scan_storage).InclusiveSum(sum_w, sum_w); __syncthreads(); - tmp = (t >= 16) ? s->scratch_red[(t >> 4) - 1] : 0; - sum_w = (sum_w - (sum67 >> 16) + tmp) * 0x10001; + sum_w = (sum_w - (sum67 >> 16)) * 0x10001; s->map.u32[t * 4 + 0] = sum_w + sum01 - count01; s->map.u32[t * 4 + 1] = sum_w + sum23 - count23; s->map.u32[t * 4 + 2] = sum_w + sum45 - count45; @@ -239,7 +227,7 @@ __global__ void __launch_bounds__(block_size, 2) // map, the position of the first string can be inferred from the hash map counts dict_char_count = 0; for (uint32_t i = 0; i < nnz; i += block_size) { - uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0, dupe_mask, dupes_before; + uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0; if (i + t < nnz) { const char *str1, *str2; uint32_t len1, len2, hash; @@ -255,33 +243,23 @@ __global__ void __launch_bounds__(block_size, 2) dict_char_count += (is_dupe) ? 0 : len1; } } - dupe_mask = ballot(is_dupe); - dupes_before = s->total_dupes + __popc(dupe_mask & ((2 << (t & 0x1f)) - 1)); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(dupe_mask); } - __syncthreads(); - if (t < 32) { - uint32_t warp_dupes = (t < 16) ? s->scratch_red[t] : 0; - uint32_t warp_pos = WarpReducePos16(warp_dupes, t); - if (t == 0xf) { s->total_dupes += warp_pos; } - if (t < 16) { s->scratch_red[t] = warp_pos - warp_dupes; } - } + uint32_t dupes_in_block; + uint32_t dupes_before; + block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block); + dupes_before += s->total_dupes; __syncthreads(); + if (!t) { s->total_dupes += dupes_in_block; } if (i + t < nnz) { if (!is_dupe) { - dupes_before += s->scratch_red[t >> 5]; dict_data[i + t - dupes_before] = ck_row + start_row; } else { s->chunk.dict_index[ck_row + start_row] = (ck_row_ref + start_row) | (1u << 31); } } } - dict_char_count = warp_reduce(temp_storage.full[t / 32]).Sum(dict_char_count); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = dict_char_count; } - __syncthreads(); - if (t < 32) { - dict_char_count = - half_warp_reduce(temp_storage.half[t / 32]).Sum((t < 16) ? s->scratch_red[t] : 0); - } + // temp_storage is being used twice, so make sure there is `__syncthreads()` between them + // while making any future changes. + dict_char_count = block_reduce(temp_storage.reduce_storage).Sum(dict_char_count); if (!t) { chunks[group_id * num_columns + col_id].num_strings = nnz; chunks[group_id * num_columns + col_id].string_char_count = s->chunk.string_char_count; @@ -362,8 +340,12 @@ __global__ void __launch_bounds__(block_size) gpuBuildStripeDictionaries(StripeDictionary *stripes, uint32_t num_columns) { __shared__ __align__(16) build_state_s state_g; - using warp_reduce = cub::WarpReduce; - __shared__ typename warp_reduce::TempStorage temp_storage[block_size / 32]; + using block_reduce = cub::BlockReduce; + using block_scan = cub::BlockScan; + __shared__ union { + typename block_reduce::TempStorage reduce_storage; + typename block_scan::TempStorage scan_storage; + } temp_storage; build_state_s *const s = &state_g; uint32_t col_id = blockIdx.x; @@ -384,8 +366,8 @@ __global__ void __launch_bounds__(block_size) str_data = static_cast(s->stripe.column_data_base); dict_char_count = 0; for (uint32_t i = 0; i < num_strings; i += block_size) { - uint32_t cur = (i + t < num_strings) ? dict_data[i + t] : 0; - uint32_t dupe_mask, dupes_before, cur_len = 0; + uint32_t cur = (i + t < num_strings) ? dict_data[i + t] : 0; + uint32_t cur_len = 0; const char *cur_ptr; bool is_dupe = false; if (i + t < num_strings) { @@ -397,28 +379,19 @@ __global__ void __launch_bounds__(block_size) is_dupe = nvstr_is_equal(cur_ptr, cur_len, str_data[prev].ptr, str_data[prev].count); } dict_char_count += (is_dupe) ? 0 : cur_len; - dupe_mask = ballot(is_dupe); - dupes_before = s->total_dupes + __popc(dupe_mask & ((2 << (t & 0x1f)) - 1)); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(dupe_mask); } - __syncthreads(); - if (t < 32) { - uint32_t warp_dupes = s->scratch_red[t]; - uint32_t warp_pos = WarpReducePos32(warp_dupes, t); - if (t == 0x1f) { s->total_dupes += warp_pos; } - s->scratch_red[t] = warp_pos - warp_dupes; - } + uint32_t dupes_in_block; + uint32_t dupes_before; + block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block); + dupes_before += s->total_dupes; __syncthreads(); + if (!t) { s->total_dupes += dupes_in_block; } if (i + t < num_strings) { - dupes_before += s->scratch_red[t >> 5]; dict_index[cur] = i + t - dupes_before; if (!is_dupe && dupes_before != 0) { dict_data[i + t - dupes_before] = cur; } } __syncthreads(); } - dict_char_count = warp_reduce(temp_storage[t / 32]).Sum(dict_char_count); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = dict_char_count; } - __syncthreads(); - if (t < 32) { dict_char_count = warp_reduce(temp_storage[t / 32]).Sum(s->scratch_red[t]); } + dict_char_count = block_reduce(temp_storage.reduce_storage).Sum(dict_char_count); if (t == 0) { stripes[stripe_id * num_columns + col_id].num_strings = num_strings - s->total_dupes; stripes[stripe_id * num_columns + col_id].dict_char_count = dict_char_count; diff --git a/cpp/src/io/orc/stats_enc.cu b/cpp/src/io/orc/stats_enc.cu index 50f8457d05b..a30c61d6ef7 100644 --- a/cpp/src/io/orc/stats_enc.cu +++ b/cpp/src/io/orc/stats_enc.cu @@ -66,8 +66,7 @@ __global__ void __launch_bounds__(init_threads_per_block) * @param[in] statistics_count Number of statistics buffers */ constexpr unsigned int buffersize_reduction_dim = 32; -constexpr unsigned int buffersize_threads_per_block = - buffersize_reduction_dim * buffersize_reduction_dim; +constexpr unsigned int block_size = buffersize_reduction_dim * buffersize_reduction_dim; constexpr unsigned int pb_fld_hdrlen = 1; constexpr unsigned int pb_fld_hdrlen16 = 2; // > 127-byte length constexpr unsigned int pb_fld_hdrlen32 = 5; // > 16KB length @@ -77,19 +76,18 @@ constexpr unsigned int pb_fldlen_decimal = 40; // Assume decimal2string fits in constexpr unsigned int pb_fldlen_bucket1 = 1 + pb_fldlen_int64; constexpr unsigned int pb_fldlen_common = 2 * pb_fld_hdrlen + pb_fldlen_int64; -__global__ void __launch_bounds__(buffersize_threads_per_block, 1) +template +__global__ void __launch_bounds__(block_size, 1) gpu_init_statistics_buffersize(statistics_merge_group *groups, const statistics_chunk *chunks, uint32_t statistics_count) { - __shared__ volatile uint32_t scratch_red[buffersize_reduction_dim]; - __shared__ volatile uint32_t stats_size; - uint32_t tx = threadIdx.x; - uint32_t ty = threadIdx.y; - uint32_t t = ty * buffersize_reduction_dim + tx; - if (!t) { stats_size = 0; } + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage temp_storage; + volatile uint32_t stats_size = 0; + uint32_t t = threadIdx.x; __syncthreads(); - for (uint32_t start = 0; start < statistics_count; start += buffersize_threads_per_block) { + for (uint32_t start = 0; start < statistics_count; start += block_size) { uint32_t stats_len = 0, stats_pos; uint32_t idx = start + t; if (idx < statistics_count) { @@ -120,19 +118,15 @@ __global__ void __launch_bounds__(buffersize_threads_per_block, 1) default: break; } } - stats_pos = WarpReducePos32(stats_len, tx); - if (tx == buffersize_reduction_dim - 1) { scratch_red[ty] = stats_pos; } - __syncthreads(); - if (ty == 0) { scratch_red[tx] = WarpReducePos32(scratch_red[tx], tx); } - __syncthreads(); - if (ty != 0) { stats_pos += scratch_red[ty - 1]; } + uint32_t tmp_stats_size; + block_scan(temp_storage).ExclusiveSum(stats_len, stats_pos, tmp_stats_size); stats_pos += stats_size; + stats_size += tmp_stats_size; if (idx < statistics_count) { - groups[idx].start_chunk = stats_pos - stats_len; + groups[idx].start_chunk = stats_pos; groups[idx].num_chunks = stats_len; } __syncthreads(); - if (t == buffersize_threads_per_block - 1) { stats_size = stats_pos; } } } @@ -405,9 +399,8 @@ void orc_init_statistics_buffersize(statistics_merge_group *groups, uint32_t statistics_count, rmm::cuda_stream_view stream) { - dim3 dim_block(buffersize_reduction_dim, buffersize_reduction_dim); - gpu_init_statistics_buffersize<<<1, dim_block, 0, stream.value()>>>( - groups, chunks, statistics_count); + gpu_init_statistics_buffersize + <<<1, block_size, 0, stream.value()>>>(groups, chunks, statistics_count); } /** diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 856c23c0f55..6f326fc0576 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -87,7 +87,6 @@ struct orc_byterle_state_s { struct orc_rowdec_state_s { uint32_t nz_count; - uint32_t last_row[num_warps]; uint32_t row[row_decoder_buffer_size]; // 0=skip, >0: row position relative to cur_row }; @@ -97,11 +96,6 @@ struct orc_strdict_state_s { uint32_t dict_len; }; -struct orc_nulldec_state_s { - uint32_t row; - uint32_t null_count[num_warps]; -}; - struct orc_datadec_state_s { uint32_t cur_row; // starting row of current batch uint32_t end_row; // ending row of this chunk (start_row + num_rows) @@ -119,7 +113,7 @@ struct orcdec_state_s { int is_string; union { orc_strdict_state_s dict; - orc_nulldec_state_s nulls; + uint32_t nulls_desc_row; // number of rows processed for nulls. orc_datadec_state_s data; } top; union { @@ -1120,8 +1114,12 @@ __global__ void __launch_bounds__(block_size) size_t first_row) { __shared__ __align__(16) orcdec_state_s state_g; - using warp_reduce = cub::WarpReduce; - __shared__ typename warp_reduce::TempStorage temp_storage[block_size / 32]; + using warp_reduce = cub::WarpReduce; + using block_reduce = cub::BlockReduce; + __shared__ union { + typename warp_reduce::TempStorage wr_storage[block_size / 32]; + typename block_reduce::TempStorage bk_storage; + } temp_storage; orcdec_state_s *const s = &state_g; bool is_nulldec = (blockIdx.y >= num_stripes); @@ -1136,8 +1134,8 @@ __global__ void __launch_bounds__(block_size) uint32_t null_count = 0; // Decode NULLs if (t == 0) { - s->chunk.skip_count = 0; - s->top.nulls.row = 0; + s->chunk.skip_count = 0; + s->top.nulls_desc_row = 0; bytestream_init(&s->bs, s->chunk.streams[CI_PRESENT], s->chunk.strm_len[CI_PRESENT]); } __syncthreads(); @@ -1145,8 +1143,8 @@ __global__ void __launch_bounds__(block_size) // No present stream: all rows are valid s->vals.u32[t] = ~0; } - while (s->top.nulls.row < s->chunk.num_rows) { - uint32_t nrows_max = min(s->chunk.num_rows - s->top.nulls.row, blockDim.x * 32); + while (s->top.nulls_desc_row < s->chunk.num_rows) { + uint32_t nrows_max = min(s->chunk.num_rows - s->top.nulls_desc_row, blockDim.x * 32); uint32_t nrows; size_t row_in; @@ -1164,7 +1162,7 @@ __global__ void __launch_bounds__(block_size) nrows = nrows_max; } __syncthreads(); - row_in = s->chunk.start_row + s->top.nulls.row; + row_in = s->chunk.start_row + s->top.nulls_desc_row; if (row_in + nrows > first_row && row_in < first_row + max_num_rows && s->chunk.valid_map_base != NULL) { int64_t dst_row = row_in - first_row; @@ -1215,25 +1213,19 @@ __global__ void __launch_bounds__(block_size) if (i + 32 > skippedrows) { bits &= (1 << (skippedrows - i)) - 1; } skip_count += __popc(bits); } - skip_count = warp_reduce(temp_storage[t / 32]).Sum(skip_count); + skip_count = warp_reduce(temp_storage.wr_storage[t / 32]).Sum(skip_count); if (t == 0) { s->chunk.skip_count += skip_count; } } __syncthreads(); - if (t == 0) { s->top.nulls.row += nrows; } + if (t == 0) { s->top.nulls_desc_row += nrows; } __syncthreads(); } __syncthreads(); // Sum up the valid counts and infer null_count - null_count = warp_reduce(temp_storage[t / 32]).Sum(null_count); - if (!(t & 0x1f)) { s->top.nulls.null_count[t >> 5] = null_count; } - __syncthreads(); - if (t < 32) { - null_count = (t < num_warps) ? s->top.nulls.null_count[t] : 0; - null_count = warp_reduce(temp_storage[t / 32]).Sum(null_count); - if (t == 0) { - chunks[chunk_id].null_count = null_count; - chunks[chunk_id].skip_count = s->chunk.skip_count; - } + null_count = block_reduce(temp_storage.bk_storage).Sum(null_count); + if (t == 0) { + chunks[chunk_id].null_count = null_count; + chunks[chunk_id].skip_count = s->chunk.skip_count; } } else { // Decode string dictionary @@ -1289,7 +1281,7 @@ __global__ void __launch_bounds__(block_size) * @param[in,out] s Column chunk decoder state * @param[in] first_row crop all rows below first rows * @param[in] t thread id - * @param[in] temp_storage shared memory storage to performance warp reduce + * @param[in] temp_storage shared memory storage to perform block reduce */ template static __device__ void DecodeRowPositions(orcdec_state_s *s, @@ -1297,7 +1289,8 @@ static __device__ void DecodeRowPositions(orcdec_state_s *s, int t, Storage &temp_storage) { - using warp_reduce = cub::WarpReduce; + using block_reduce = cub::BlockReduce; + if (t == 0) { if (s->chunk.skip_count != 0) { s->u.rowdec.nz_count = min(min(s->chunk.skip_count, s->top.data.max_vals), blockDim.x); @@ -1338,15 +1331,9 @@ static __device__ void DecodeRowPositions(orcdec_state_s *s, // TBD: Brute-forcing this, there might be a more efficient way to find the thread with the // last row last_row = (nz_count == s->u.rowdec.nz_count) ? row_plus1 : 0; - last_row = warp_reduce(temp_storage[t / 32]).Reduce(last_row, cub::Max()); - if (!(t & 0x1f)) { *(volatile uint32_t *)&s->u.rowdec.last_row[t >> 5] = last_row; } - nz_pos = (valid) ? nz_count : 0; - __syncthreads(); - if (t < 32) { - last_row = (t < num_warps) ? *(volatile uint32_t *)&s->u.rowdec.last_row[t] : 0; - last_row = warp_reduce(temp_storage[t / 32]).Reduce(last_row, cub::Max()); - if (t == 0) { s->top.data.nrows = last_row; } - } + last_row = block_reduce(temp_storage).Reduce(last_row, cub::Max()); + nz_pos = (valid) ? nz_count : 0; + if (t == 0) { s->top.data.nrows = last_row; } if (valid && nz_pos - 1 < s->u.rowdec.nz_count) { s->u.rowdec.row[nz_pos - 1] = row_plus1; } __syncthreads(); } else { @@ -1396,7 +1383,7 @@ __global__ void __launch_bounds__(block_size) uint32_t rowidx_stride) { __shared__ __align__(16) orcdec_state_s state_g; - __shared__ typename cub::WarpReduce::TempStorage temp_storage[block_size / 32]; + __shared__ typename cub::BlockReduce::TempStorage temp_storage; orcdec_state_s *const s = &state_g; uint32_t chunk_id; diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index fcad42ba1be..6e2b7b2ab89 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -49,10 +49,6 @@ struct intrle_enc_state_s { uint32_t hdr_bytes; uint32_t pl_bytes; volatile uint32_t delta_map[(512 / 32) + 1]; - volatile union { - uint32_t u32[(512 / 32) * 2]; - uint64_t u64[(512 / 32) * 2]; - } scratch; }; struct strdata_enc_state_s { @@ -79,7 +75,7 @@ struct orcenc_state_s { StripeDictionary dict_stripe; } u; union { - uint8_t u8[scratch_buffer_size]; // general scratch buffer + uint8_t u8[scratch_buffer_size]; // gblock_vminscratch buffer uint32_t u32[scratch_buffer_size / 4]; } buf; union { @@ -349,8 +345,7 @@ static inline __device__ void StoreBitsBigEndian( * @param[in] numvals max number of values to encode * @param[in] flush encode all remaining values if nonzero * @param[in] t thread id - * @param[in] temp_storage_full shared memory storage to performance warp reduce - * @param[in] temp_storage_half shared memory storage to performance half warp reduce + * @param[in] temp_storage shared memory storage to perform block reduce * * @return number of input values encoded */ @@ -358,21 +353,20 @@ template + int block_size, + typename Storage> static __device__ uint32_t IntegerRLE(orcenc_state_s *s, const T *inbuf, uint32_t inpos, uint32_t numvals, uint32_t flush, int t, - FullStorage &temp_storage_full, - HalfStorage &temp_storage_half) + Storage &temp_storage) { - using warp_reduce = cub::WarpReduce; - using half_warp_reduce = cub::WarpReduce; - uint8_t *dst = s->chunk.streams[cid] + s->strm_pos[cid]; - uint32_t out_cnt = 0; + using block_reduce = cub::BlockReduce; + uint8_t *dst = s->chunk.streams[cid] + s->strm_pos[cid]; + uint32_t out_cnt = 0; + __shared__ volatile uint64_t block_vmin; while (numvals > 0) { T v0 = (t < numvals) ? inbuf[(inpos + t) & inmask] : 0; @@ -421,69 +415,55 @@ static __device__ uint32_t IntegerRLE(orcenc_state_s *s, } else { intrle_minmax(vmax, vmin); } - vmin = warp_reduce(temp_storage_full[t / 32]).Reduce(vmin, cub::Min()); - __syncwarp(); - vmax = warp_reduce(temp_storage_full[t / 32]).Reduce(vmax, cub::Max()); - __syncwarp(); - if (!(t & 0x1f)) { - s->u.intrle.scratch.u64[(t >> 5) * 2 + 0] = vmin; - s->u.intrle.scratch.u64[(t >> 5) * 2 + 1] = vmax; - } + vmin = block_reduce(temp_storage).Reduce(vmin, cub::Min()); __syncthreads(); - if (t < 32) { - vmin = (T)s->u.intrle.scratch.u64[(t & 0xf) * 2 + 0]; - vmax = (T)s->u.intrle.scratch.u64[(t & 0xf) * 2 + 1]; - vmin = half_warp_reduce(temp_storage_half[t / 32]).Reduce(vmin, cub::Min()); - __syncwarp(); - vmax = half_warp_reduce(temp_storage_half[t / 32]).Reduce(vmax, cub::Max()); - __syncwarp(); - if (t == 0) { - uint32_t mode1_w, mode2_w; - typename std::make_unsigned::type vrange_mode1, vrange_mode2; - s->u.intrle.scratch.u64[0] = (uint64_t)vmin; - if (sizeof(T) > 4) { - vrange_mode1 = (is_signed) ? max(zigzag(vmin), zigzag(vmax)) : vmax; - vrange_mode2 = vmax - vmin; - mode1_w = 8 - min(CountLeadingBytes64(vrange_mode1), 7); - mode2_w = 8 - min(CountLeadingBytes64(vrange_mode2), 7); - } else { - vrange_mode1 = (is_signed) ? max(zigzag(vmin), zigzag(vmax)) : vmax; - vrange_mode2 = vmax - vmin; - mode1_w = 4 - min(CountLeadingBytes32(vrange_mode1), 3); - mode2_w = 4 - min(CountLeadingBytes32(vrange_mode2), 3); - } - // Decide between mode1 & mode2 (also mode3 for length=2 repeat) - if (vrange_mode2 == 0 && mode1_w > 1) { - // Should only occur if literal_run==2 (otherwise would have resulted in repeat_run >= - // 3) - uint32_t bytecnt = 2; - dst[0] = 0xC0 + ((literal_run - 1) >> 8); - dst[1] = (literal_run - 1) & 0xff; - bytecnt += StoreVarint(dst + 2, vrange_mode1); - dst[bytecnt++] = 0; // Zero delta - s->u.intrle.literal_mode = 3; - s->u.intrle.literal_w = bytecnt; + vmax = block_reduce(temp_storage).Reduce(vmax, cub::Max()); + if (t == 0) { + uint32_t mode1_w, mode2_w; + typename std::make_unsigned::type vrange_mode1, vrange_mode2; + block_vmin = static_cast(vmin); + if (sizeof(T) > 4) { + vrange_mode1 = (is_signed) ? max(zigzag(vmin), zigzag(vmax)) : vmax; + vrange_mode2 = vmax - vmin; + mode1_w = 8 - min(CountLeadingBytes64(vrange_mode1), 7); + mode2_w = 8 - min(CountLeadingBytes64(vrange_mode2), 7); + } else { + vrange_mode1 = (is_signed) ? max(zigzag(vmin), zigzag(vmax)) : vmax; + vrange_mode2 = vmax - vmin; + mode1_w = 4 - min(CountLeadingBytes32(vrange_mode1), 3); + mode2_w = 4 - min(CountLeadingBytes32(vrange_mode2), 3); + } + // Decide between mode1 & mode2 (also mode3 for length=2 repeat) + if (vrange_mode2 == 0 && mode1_w > 1) { + // Should only occur if literal_run==2 (otherwise would have resulted in repeat_run >= + // 3) + uint32_t bytecnt = 2; + dst[0] = 0xC0 + ((literal_run - 1) >> 8); + dst[1] = (literal_run - 1) & 0xff; + bytecnt += StoreVarint(dst + 2, vrange_mode1); + dst[bytecnt++] = 0; // Zero delta + s->u.intrle.literal_mode = 3; + s->u.intrle.literal_w = bytecnt; + } else { + uint32_t range, w; + if (mode1_w > mode2_w && (literal_run - 1) * (mode1_w - mode2_w) > 4) { + s->u.intrle.literal_mode = 2; + w = mode2_w; + range = (uint32_t)vrange_mode2; } else { - uint32_t range, w; - if (mode1_w > mode2_w && (literal_run - 1) * (mode1_w - mode2_w) > 4) { - s->u.intrle.literal_mode = 2; - w = mode2_w; - range = (uint32_t)vrange_mode2; - } else { - s->u.intrle.literal_mode = 1; - w = mode1_w; - range = (uint32_t)vrange_mode1; - } - if (w == 1) - w = (range >= 16) ? w << 3 : (range >= 4) ? 4 : (range >= 2) ? 2 : 1; - else - w <<= 3; // bytes -> bits - s->u.intrle.literal_w = w; + s->u.intrle.literal_mode = 1; + w = mode1_w; + range = (uint32_t)vrange_mode1; } + if (w == 1) + w = (range >= 16) ? w << 3 : (range >= 4) ? 4 : (range >= 2) ? 2 : 1; + else + w <<= 3; // bytes -> bits + s->u.intrle.literal_w = w; } } __syncthreads(); - vmin = (T)s->u.intrle.scratch.u64[0]; + vmin = static_cast(block_vmin); literal_mode = s->u.intrle.literal_mode; literal_w = s->u.intrle.literal_w; if (literal_mode == 1) { @@ -665,12 +645,9 @@ __global__ void __launch_bounds__(block_size) { __shared__ __align__(16) orcenc_state_s state_g; __shared__ union { - typename cub::WarpReduce::TempStorage full_i32[block_size / 32]; - typename cub::WarpReduce::TempStorage full_i64[block_size / 32]; - typename cub::WarpReduce::TempStorage full_u32[block_size / 32]; - typename cub::WarpReduce::TempStorage half_i32[block_size / 32]; - typename cub::WarpReduce::TempStorage half_i64[block_size / 32]; - typename cub::WarpReduce::TempStorage half_u32[block_size / 32]; + typename cub::BlockReduce::TempStorage i32; + typename cub::BlockReduce::TempStorage i64; + typename cub::BlockReduce::TempStorage u32; } temp_storage; orcenc_state_s *const s = &state_g; @@ -867,25 +844,13 @@ __global__ void __launch_bounds__(block_size) case SHORT: case INT: case DATE: - n = IntegerRLE(s, - s->vals.i32, - s->nnz - s->numvals, - s->numvals, - flush, - t, - temp_storage.full_i32, - temp_storage.half_i32); + n = IntegerRLE( + s, s->vals.i32, s->nnz - s->numvals, s->numvals, flush, t, temp_storage.i32); break; case LONG: case TIMESTAMP: - n = IntegerRLE(s, - s->vals.i64, - s->nnz - s->numvals, - s->numvals, - flush, - t, - temp_storage.full_i64, - temp_storage.half_i64); + n = IntegerRLE( + s, s->vals.i64, s->nnz - s->numvals, s->numvals, flush, t, temp_storage.i64); break; case BYTE: n = ByteRLE(s, s->vals.u8, s->nnz - s->numvals, s->numvals, flush, t); @@ -910,14 +875,8 @@ __global__ void __launch_bounds__(block_size) break; case STRING: if (s->chunk.encoding_kind == DICTIONARY_V2) { - n = IntegerRLE(s, - s->vals.u32, - s->nnz - s->numvals, - s->numvals, - flush, - t, - temp_storage.full_u32, - temp_storage.half_u32); + n = IntegerRLE( + s, s->vals.u32, s->nnz - s->numvals, s->numvals, flush, t, temp_storage.u32); } else { n = s->numvals; } @@ -933,14 +892,8 @@ __global__ void __launch_bounds__(block_size) switch (s->chunk.type_kind) { case TIMESTAMP: case STRING: - n = IntegerRLE(s, - s->lengths.u32, - s->nnz - s->numlengths, - s->numlengths, - flush, - t, - temp_storage.full_u32, - temp_storage.half_u32); + n = IntegerRLE( + s, s->lengths.u32, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u32); break; default: n = s->numlengths; break; } @@ -975,10 +928,7 @@ __global__ void __launch_bounds__(block_size) gpuEncodeStringDictionaries(StripeDictionary *stripes, EncChunk *chunks, uint32_t num_columns) { __shared__ __align__(16) orcenc_state_s state_g; - __shared__ union { - typename cub::WarpReduce::TempStorage full_u32[block_size / 32]; - typename cub::WarpReduce::TempStorage half_u32[block_size / 32]; - } temp_storage; + __shared__ typename cub::BlockReduce::TempStorage temp_storage; orcenc_state_s *const s = &state_g; uint32_t stripe_id = blockIdx.x; @@ -1027,14 +977,8 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); if (s->numlengths + numvals > 0) { uint32_t flush = (s->cur_row + numvals == s->nrows) ? 1 : 0; - uint32_t n = IntegerRLE(s, - s->lengths.u32, - s->cur_row, - s->numlengths + numvals, - flush, - t, - temp_storage.full_u32, - temp_storage.half_u32); + uint32_t n = IntegerRLE( + s, s->lengths.u32, s->cur_row, s->numlengths + numvals, flush, t, temp_storage); __syncthreads(); if (!t) { s->numlengths += numvals; diff --git a/cpp/src/io/parquet/page_dict.cu b/cpp/src/io/parquet/page_dict.cu index c7babef1a20..d984cc1e44f 100644 --- a/cpp/src/io/parquet/page_dict.cu +++ b/cpp/src/io/parquet/page_dict.cu @@ -105,8 +105,11 @@ __device__ void FetchDictionaryFragment(dict_state_s *s, } /// Generate dictionary indices in ascending row order +template __device__ void GenerateDictionaryIndices(dict_state_s *s, uint32_t t) { + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage temp_storage; uint32_t *dict_index = s->col.dict_index; uint32_t *dict_data = s->col.dict_data + s->ck.start_row; uint32_t num_dict_entries = 0; @@ -120,13 +123,11 @@ __device__ void GenerateDictionaryIndices(dict_state_s *s, uint32_t t) (is_valid && dict_idx == row); // Any value that doesn't have bit31 set should have dict_idx=row at this point - uint32_t umask = ballot(is_unique); - uint32_t pos = num_dict_entries + __popc(umask & ((1 << (t & 0x1f)) - 1)); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(umask); } - num_dict_entries += __syncthreads_count(is_unique); - if (t < 32) { s->scratch_red[t] = WarpReducePos32(s->scratch_red[t], t); } - __syncthreads(); - if (t >= 32) { pos += s->scratch_red[(t - 32) >> 5]; } + uint32_t block_num_dict_entries; + uint32_t pos; + block_scan(temp_storage).ExclusiveSum(is_unique, pos, block_num_dict_entries); + pos += num_dict_entries; + num_dict_entries += block_num_dict_entries; if (is_valid && is_unique) { dict_data[pos] = row; dict_index[row] = pos; @@ -150,8 +151,8 @@ __global__ void __launch_bounds__(block_size, 1) gpuBuildChunkDictionaries(EncColumnChunk *chunks, uint32_t *dev_scratch) { __shared__ __align__(8) dict_state_s state_g; - using warp_reduce = cub::WarpReduce; - __shared__ typename warp_reduce::TempStorage temp_storage[block_size / 32]; + using block_reduce = cub::BlockReduce; + __shared__ typename block_reduce::TempStorage temp_storage; dict_state_s *const s = &state_g; uint32_t t = threadIdx.x; @@ -250,15 +251,11 @@ __global__ void __launch_bounds__(block_size, 1) } } // Count the non-duplicate entries - frag_dict_size = warp_reduce(temp_storage[t / 32]).Sum((is_valid && !is_dupe) ? len : 0); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = frag_dict_size; } + frag_dict_size = block_reduce(temp_storage).Sum((is_valid && !is_dupe) ? len : 0); new_dict_entries = __syncthreads_count(is_valid && !is_dupe); - if (t < 32) { - frag_dict_size = warp_reduce(temp_storage[t / 32]).Sum(s->scratch_red[t]); - if (t == 0) { - s->frag_dict_size += frag_dict_size; - s->num_dict_entries += new_dict_entries; - } + if (t == 0) { + s->frag_dict_size += frag_dict_size; + s->num_dict_entries += new_dict_entries; } if (is_valid) { if (!is_dupe) { @@ -309,7 +306,7 @@ __global__ void __launch_bounds__(block_size, 1) __syncthreads(); } __syncthreads(); - GenerateDictionaryIndices(s, t); + GenerateDictionaryIndices(s, t); if (!t) { chunks[blockIdx.x].num_dict_fragments = s->ck.num_dict_fragments; chunks[blockIdx.x].dictionary_size = s->dictionary_size; diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 07d460e4bd5..d7d47c07354 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -120,11 +120,11 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment { __shared__ __align__(16) frag_init_state_s state_g; - using warp_reduce = cub::WarpReduce; - using half_warp_reduce = cub::WarpReduce; + using block_reduce = cub::BlockReduce; + using block_scan = cub::BlockScan; __shared__ union { - typename warp_reduce::TempStorage full[block_size / 32]; - typename half_warp_reduce::TempStorage half; + typename block_reduce::TempStorage reduce_storage; + typename block_scan::TempStorage scan_storage; } temp_storage; frag_init_state_s *const s = &state_g; @@ -204,7 +204,6 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment uint32_t is_valid = (i + t < nvals && val_idx < s->col.leaf_column->size()) ? s->col.leaf_column->is_valid(val_idx) : 0; - uint32_t valid_warp = ballot(is_valid); uint32_t len, nz_pos, hash; if (is_valid) { len = dtype_len; @@ -227,28 +226,18 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment len = 0; } - nz_pos = - s->frag.non_nulls + __popc(valid_warp & (0x7fffffffu >> (0x1fu - ((uint32_t)t & 0x1f)))); - len = warp_reduce(temp_storage.full[t / 32]).Sum(len); - if (!(t & 0x1f)) { - s->scratch_red[(t >> 5) + 0] = __popc(valid_warp); - s->scratch_red[(t >> 5) + 16] = len; - } + uint32_t non_nulls; + block_scan(temp_storage.scan_storage).ExclusiveSum(is_valid, nz_pos, non_nulls); + nz_pos += s->frag.non_nulls; __syncthreads(); - if (t < 32) { - uint32_t warp_pos = WarpReducePos16((t < 16) ? s->scratch_red[t] : 0, t); - uint32_t non_nulls = shuffle(warp_pos, 0xf); - len = half_warp_reduce(temp_storage.half).Sum((t < 16) ? s->scratch_red[t + 16] : 0); - if (t < 16) { s->scratch_red[t] = warp_pos; } - if (!t) { - s->frag.non_nulls = s->frag.non_nulls + non_nulls; - s->frag.fragment_data_size += len; - } + len = block_reduce(temp_storage.reduce_storage).Sum(len); + if (!t) { + s->frag.non_nulls += non_nulls; + s->frag.fragment_data_size += len; } __syncthreads(); if (is_valid && dtype != BOOLEAN) { uint32_t *dict_index = s->col.dict_index; - if (t >= 32) { nz_pos += s->scratch_red[(t - 32) >> 5]; } if (dict_index) { atomicAdd(&s->map.u32[hash >> 1], (hash & 1) ? 1 << 16 : 1); dict_index[start_value_idx + nz_pos] = @@ -271,27 +260,18 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment uint32_t sum23 = count23 + (count23 << 16); uint32_t sum45 = count45 + (count45 << 16); uint32_t sum67 = count67 + (count67 << 16); - uint32_t sum_w, tmp; sum23 += (sum01 >> 16) * 0x10001; sum45 += (sum23 >> 16) * 0x10001; sum67 += (sum45 >> 16) * 0x10001; - sum_w = sum67 >> 16; - sum_w = WarpReducePos16(sum_w, t); - if ((t & 0xf) == 0xf) { s->scratch_red[t >> 4] = sum_w; } - __syncthreads(); - if (t < 32) { - uint32_t sum_b = WarpReducePos32(s->scratch_red[t], t); - s->scratch_red[t] = sum_b; - } - __syncthreads(); - tmp = (t >= 16) ? s->scratch_red[(t >> 4) - 1] : 0; - sum_w = (sum_w - (sum67 >> 16) + tmp) * 0x10001; + uint32_t sum_w = sum67 >> 16; + block_scan(temp_storage.scan_storage).InclusiveSum(sum_w, sum_w); + sum_w = (sum_w - (sum67 >> 16)) * 0x10001; s->map.u32[t * 4 + 0] = sum_w + sum01 - count01; s->map.u32[t * 4 + 1] = sum_w + sum23 - count23; s->map.u32[t * 4 + 2] = sum_w + sum45 - count45; s->map.u32[t * 4 + 3] = sum_w + sum67 - count67; - __syncthreads(); } + __syncthreads(); // Put the indices back in hash order if (s->col.dict_index) { uint32_t *dict_index = s->col.dict_index + start_row; @@ -330,7 +310,7 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment // map, the position of the first entry can be inferred from the hash map counts uint32_t dupe_data_size = 0; for (uint32_t i = 0; i < nnz; i += block_size) { - uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0, dupe_mask, dupes_before; + uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0; if (i + t < nnz) { uint32_t dict_val = s->dict[i + t]; uint32_t hash = dict_val & ((1 << init_hash_bits) - 1); @@ -366,20 +346,14 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment } } } - dupe_mask = ballot(is_dupe); - dupes_before = s->total_dupes + __popc(dupe_mask & ((2 << (t & 0x1f)) - 1)); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(dupe_mask); } - __syncthreads(); - if (t < 32) { - uint32_t warp_dupes = (t < 16) ? s->scratch_red[t] : 0; - uint32_t warp_pos = WarpReducePos16(warp_dupes, t); - if (t == 0xf) { s->total_dupes += warp_pos; } - if (t < 16) { s->scratch_red[t] = warp_pos - warp_dupes; } - } + uint32_t dupes_in_block; + uint32_t dupes_before; + block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block); + dupes_before += s->total_dupes; __syncthreads(); + if (t == 0) { s->total_dupes += dupes_in_block; } if (i + t < nnz) { if (!is_dupe) { - dupes_before += s->scratch_red[t >> 5]; s->col.dict_data[start_row + i + t - dupes_before] = ck_row; } else { s->col.dict_index[ck_row] = ck_row_ref | (1u << 31); @@ -387,15 +361,10 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment } } __syncthreads(); - dupe_data_size = warp_reduce(temp_storage.full[t / 32]).Sum(dupe_data_size); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = dupe_data_size; } - __syncthreads(); - if (t < 32) { - dupe_data_size = half_warp_reduce(temp_storage.half).Sum((t < 16) ? s->scratch_red[t] : 0); - if (!t) { - s->frag.dict_data_size = s->frag.fragment_data_size - dupe_data_size; - s->frag.num_dict_vals = s->frag.non_nulls - s->total_dupes; - } + dupe_data_size = block_reduce(temp_storage.reduce_storage).Sum(dupe_data_size); + if (!t) { + s->frag.dict_data_size = s->frag.fragment_data_size - dupe_data_size; + s->frag.num_dict_vals = s->frag.non_nulls - s->total_dupes; } } __syncthreads(); @@ -942,6 +911,7 @@ convert_nanoseconds(cuda::std::chrono::sys_time } // blockDim(128, 1, 1) +template __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, const EncColumnChunk *chunks, gpu_inflate_input_s *comp_in, @@ -949,6 +919,8 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, uint32_t start_page) { __shared__ __align__(8) page_enc_state_s state_g; + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage temp_storage; page_enc_state_s *const s = &state_g; uint32_t t = threadIdx.x; @@ -1081,7 +1053,7 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, 128); uint32_t val_idx = s->page_start_val + cur_val_idx + t; - uint32_t is_valid, warp_valids, len, pos; + uint32_t is_valid, len, pos; if (s->page.page_type == PageType::DICTIONARY_PAGE) { is_valid = (cur_val_idx + t < s->page.num_leaf_values); @@ -1091,19 +1063,13 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, ? s->col.leaf_column->is_valid(val_idx) : 0; } - warp_valids = ballot(is_valid); cur_val_idx += nvals; if (dict_bits >= 0) { // Dictionary encoding if (dict_bits > 0) { uint32_t rle_numvals; - - pos = __popc(warp_valids & ((1 << (t & 0x1f)) - 1)); - if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(warp_valids); } - __syncthreads(); - if (t < 32) { s->scratch_red[t] = WarpReducePos4((t < 4) ? s->scratch_red[t] : 0, t); } - __syncthreads(); - pos = pos + ((t >= 32) ? s->scratch_red[(t - 32) >> 5] : 0); + uint32_t rle_numvals_in_block; + block_scan(temp_storage).ExclusiveSum(is_valid, pos, rle_numvals_in_block); rle_numvals = s->rle_numvals; if (is_valid) { uint32_t v; @@ -1114,7 +1080,7 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, } s->vals[(rle_numvals + pos) & (rle_buffer_size - 1)] = v; } - rle_numvals += s->scratch_red[3]; + rle_numvals += rle_numvals_in_block; __syncthreads(); if ((!enable_bool_rle) && (dtype == BOOLEAN)) { PlainBoolEncode(s, rle_numvals, (cur_val_idx == s->page.num_leaf_values), t); @@ -1137,13 +1103,10 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, } else { len = 0; } - pos = WarpReducePos32(len, t); - if ((t & 0x1f) == 0x1f) { s->scratch_red[t >> 5] = pos; } - __syncthreads(); - if (t < 32) { s->scratch_red[t] = WarpReducePos4((t < 4) ? s->scratch_red[t] : 0, t); } + uint32_t total_len = 0; + block_scan(temp_storage).ExclusiveSum(len, pos, total_len); __syncthreads(); - if (t == 0) { s->cur = dst + s->scratch_red[3]; } - pos = pos + ((t >= 32) ? s->scratch_red[(t - 32) >> 5] : 0) - len; + if (t == 0) { s->cur = dst + total_len; } if (is_valid) { switch (dtype) { case INT32: @@ -2104,8 +2067,8 @@ void EncodePages(EncPage *pages, { // A page is part of one column. This is launching 1 block per page. 1 block will exclusively // deal with one datatype. - gpuEncodePages<<>>( - pages, chunks, comp_in, comp_out, start_page); + gpuEncodePages<128> + <<>>(pages, chunks, comp_in, comp_out, start_page); } /** diff --git a/cpp/src/io/statistics/column_stats.cu b/cpp/src/io/statistics/column_stats.cu index f89151ca31b..5d9d41412a4 100644 --- a/cpp/src/io/statistics/column_stats.cu +++ b/cpp/src/io/statistics/column_stats.cu @@ -24,6 +24,8 @@ #include +constexpr int block_size = 1024; + namespace cudf { namespace io { /** @@ -35,7 +37,6 @@ struct stats_state_s { statistics_chunk ck; ///< Output statistics chunk volatile statistics_val warp_min[32]; ///< Min reduction scratch volatile statistics_val warp_max[32]; ///< Max reduction scratch - volatile statistics_val warp_sum[32]; ///< Sum reduction scratch }; /** @@ -47,9 +48,6 @@ struct merge_state_s { statistics_chunk ck; ///< Resulting statistics chunk volatile statistics_val warp_min[32]; ///< Min reduction scratch volatile statistics_val warp_max[32]; ///< Max reduction scratch - volatile statistics_val warp_sum[32]; ///< Sum reduction scratch - volatile uint32_t warp_non_nulls[32]; ///< Non-nulls reduction scratch - volatile uint32_t warp_nulls[32]; ///< Nulls reduction scratch }; /** @@ -148,20 +146,20 @@ inline __device__ string_stats WarpReduceMaxString(const char *smax, uint32_t lm * @param s shared block state * @param dtype data type * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Storage &storage) { - using warp_reduce = cub::WarpReduce; - int64_t vmin = INT64_MAX; - int64_t vmax = INT64_MIN; - int64_t vsum = 0; + using block_reduce = cub::BlockReduce; + int64_t vmin = INT64_MAX; + int64_t vmax = INT64_MIN; + int64_t vsum = 0; int64_t v; uint32_t nn_cnt = 0; - bool has_minmax; - for (uint32_t i = 0; i < s->group.num_rows; i += 1024) { + __shared__ volatile bool has_minmax; + for (uint32_t i = 0; i < s->group.num_rows; i += block_size) { uint32_t r = i + t; uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; @@ -198,35 +196,22 @@ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Stora s->ck.non_nulls = nn_cnt; s->ck.null_count = s->group.num_rows - nn_cnt; } - vmin = warp_reduce(storage.integer_stats[t / 32]).Reduce(vmin, cub::Min()); - vmin = shuffle(vmin); - vmax = warp_reduce(storage.integer_stats[t / 32]).Reduce(vmax, cub::Max()); - vmax = shuffle(vmax); - vsum = warp_reduce(storage.integer_stats[t / 32]).Sum(vsum); - if (!(t & 0x1f)) { - s->warp_min[t >> 5].i_val = vmin; - s->warp_max[t >> 5].i_val = vmax; - s->warp_sum[t >> 5].i_val = vsum; - } - has_minmax = __syncthreads_or(vmin <= vmax); - if (t < 32 * 1) { - vmin = warp_reduce(storage.integer_stats[t / 32]).Reduce(s->warp_min[t].i_val, cub::Min()); - if (!(t & 0x1f)) { + vmin = block_reduce(storage.integer_stats).Reduce(vmin, cub::Min()); + __syncthreads(); + vmax = block_reduce(storage.integer_stats).Reduce(vmax, cub::Max()); + if (!t) { has_minmax = (vmin <= vmax); } + __syncthreads(); + if (has_minmax) { vsum = block_reduce(storage.integer_stats).Sum(vsum); } + if (!t) { + if (has_minmax) { s->ck.min_value.i_val = vmin; - s->ck.has_minmax = (has_minmax); - } - } else if (t < 32 * 2) { - vmax = - warp_reduce(storage.integer_stats[t / 32]).Reduce(s->warp_max[t & 0x1f].i_val, cub::Max()); - if (!(t & 0x1f)) { s->ck.max_value.i_val = vmax; } - } else if (t < 32 * 3) { - vsum = warp_reduce(storage.integer_stats[t / 32]).Sum(s->warp_sum[t & 0x1f].i_val); - if (!(t & 0x1f)) { - s->ck.sum.i_val = vsum; - // TODO: For now, don't set the sum flag with 64-bit values so we don't have to check for - // 64-bit sum overflow - s->ck.has_sum = (dtype <= dtype_int32 && has_minmax); + s->ck.max_value.i_val = vmax; + s->ck.sum.i_val = vsum; } + s->ck.has_minmax = has_minmax; + // TODO: For now, don't set the sum flag with 64-bit values so we don't have to check for + // 64-bit sum overflow + s->ck.has_sum = (dtype <= dtype_int32 && has_minmax); } } @@ -236,20 +221,20 @@ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Stora * @param s shared block state * @param dtype data type * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ gatherFloatColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Storage &storage) { - using warp_reduce = cub::WarpReduce; - double vmin = CUDART_INF; - double vmax = -CUDART_INF; - double vsum = 0; + using block_reduce = cub::BlockReduce; + double vmin = CUDART_INF; + double vmax = -CUDART_INF; + double vsum = 0; double v; uint32_t nn_cnt = 0; - bool has_minmax; - for (uint32_t i = 0; i < s->group.num_rows; i += 1024) { + __shared__ volatile bool has_minmax; + for (uint32_t i = 0; i < s->group.num_rows; i += block_size) { uint32_t r = i + t; uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; @@ -275,34 +260,20 @@ gatherFloatColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Sto s->ck.non_nulls = nn_cnt; s->ck.null_count = s->group.num_rows - nn_cnt; } - vmin = warp_reduce(storage.float_stats[t / 32]).Reduce(vmin, cub::Min()); - vmin = shuffle(vmin); - vmax = warp_reduce(storage.float_stats[t / 32]).Reduce(vmax, cub::Max()); - vmax = shuffle(vmax); - vsum = warp_reduce(storage.float_stats[t / 32]).Reduce(vsum, IgnoreNaNSum()); - if (!(t & 0x1f)) { - s->warp_min[t >> 5].fp_val = vmin; - s->warp_max[t >> 5].fp_val = vmax; - s->warp_sum[t >> 5].fp_val = vsum; - } - has_minmax = __syncthreads_or(vmin <= vmax); - if (t < 32 * 1) { - vmin = warp_reduce(storage.float_stats[t / 32]).Reduce(s->warp_min[t].fp_val, cub::Min()); - if (!(t & 0x1f)) { + vmin = block_reduce(storage.float_stats).Reduce(vmin, cub::Min()); + __syncthreads(); + vmax = block_reduce(storage.float_stats).Reduce(vmax, cub::Max()); + if (!t) { has_minmax = (vmin <= vmax); } + __syncthreads(); + if (has_minmax) { vsum = block_reduce(storage.float_stats).Reduce(vsum, IgnoreNaNSum()); } + if (!t) { + if (has_minmax) { s->ck.min_value.fp_val = (vmin != 0.0) ? vmin : CUDART_NEG_ZERO; - s->ck.has_minmax = (has_minmax); - } - } else if (t < 32 * 2) { - vmax = - warp_reduce(storage.float_stats[t / 32]).Reduce(s->warp_max[t & 0x1f].fp_val, cub::Max()); - if (!(t & 0x1f)) { s->ck.max_value.fp_val = (vmax != 0.0) ? vmax : CUDART_ZERO; } - } else if (t < 32 * 3) { - vsum = - warp_reduce(storage.float_stats[t / 32]).Reduce(s->warp_sum[t & 0x1f].fp_val, IgnoreNaNSum()); - if (!(t & 0x1f)) { - s->ck.sum.fp_val = vsum; - s->ck.has_sum = (has_minmax); // Implies sum is valid as well + s->ck.max_value.fp_val = (vmax != 0.0) ? vmax : CUDART_ZERO; + s->ck.sum.fp_val = vsum; } + s->ck.has_minmax = has_minmax; + s->ck.has_sum = has_minmax; // Implies sum is valid as well } } @@ -317,22 +288,22 @@ struct nvstrdesc_s { * * @param s shared block state * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ gatherStringColumnStats(stats_state_s *s, uint32_t t, Storage &storage) { - using warp_reduce = cub::WarpReduce; - uint32_t len_sum = 0; - const char *smin = nullptr; - const char *smax = nullptr; - uint32_t lmin = 0; - uint32_t lmax = 0; - uint32_t nn_cnt = 0; + using block_reduce = cub::BlockReduce; + uint32_t len_sum = 0; + const char *smin = nullptr; + const char *smax = nullptr; + uint32_t lmin = 0; + uint32_t lmax = 0; + uint32_t nn_cnt = 0; bool has_minmax; string_stats minval, maxval; - for (uint32_t i = 0; i < s->group.num_rows; i += 1024) { + for (uint32_t i = 0; i < s->group.num_rows; i += block_size) { uint32_t r = i + t; uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; @@ -362,38 +333,35 @@ void __device__ gatherStringColumnStats(stats_state_s *s, uint32_t t, Storage &s s->ck.non_nulls = nn_cnt; s->ck.null_count = s->group.num_rows - nn_cnt; } - minval = WarpReduceMinString(smin, lmin); - maxval = WarpReduceMaxString(smax, lmax); - len_sum = warp_reduce(storage.string_stats[t / 32]).Sum(len_sum); + minval = WarpReduceMinString(smin, lmin); + maxval = WarpReduceMaxString(smax, lmax); __syncwarp(); if (!(t & 0x1f)) { s->warp_min[t >> 5].str_val.ptr = minval.ptr; s->warp_min[t >> 5].str_val.length = minval.length; s->warp_max[t >> 5].str_val.ptr = maxval.ptr; s->warp_max[t >> 5].str_val.length = maxval.length; - s->warp_sum[t >> 5].str_val.length = len_sum; } has_minmax = __syncthreads_or(smin != nullptr); + if (has_minmax) { len_sum = block_reduce(storage.string_stats).Sum(len_sum); } if (t < 32 * 1) { minval = WarpReduceMinString(s->warp_min[t].str_val.ptr, s->warp_min[t].str_val.length); if (!(t & 0x1f)) { - s->ck.min_value.str_val.ptr = minval.ptr; - s->ck.min_value.str_val.length = minval.length; - s->ck.has_minmax = has_minmax; + if (has_minmax) { + s->ck.min_value.str_val.ptr = minval.ptr; + s->ck.min_value.str_val.length = minval.length; + s->ck.sum.i_val = len_sum; + } + s->ck.has_minmax = has_minmax; + s->ck.has_sum = has_minmax; } - } else if (t < 32 * 2) { + } else if (t < 32 * 2 and has_minmax) { maxval = WarpReduceMaxString(s->warp_max[t & 0x1f].str_val.ptr, s->warp_max[t & 0x1f].str_val.length); if (!(t & 0x1f)) { s->ck.max_value.str_val.ptr = maxval.ptr; s->ck.max_value.str_val.length = maxval.length; } - } else if (t < 32 * 3) { - len_sum = warp_reduce(storage.string_stats[t / 32]).Sum(s->warp_sum[t & 0x1f].str_val.length); - if (!(t & 0x1f)) { - s->ck.sum.i_val = len_sum; - s->ck.has_sum = has_minmax; - } } } @@ -412,9 +380,9 @@ __global__ void __launch_bounds__(block_size, 1) { __shared__ __align__(8) stats_state_s state_g; __shared__ union { - typename cub::WarpReduce::TempStorage integer_stats[block_size / 32]; - typename cub::WarpReduce::TempStorage float_stats[block_size / 32]; - typename cub::WarpReduce::TempStorage string_stats[block_size / 32]; + typename cub::BlockReduce::TempStorage integer_stats; + typename cub::BlockReduce::TempStorage float_stats; + typename cub::BlockReduce::TempStorage string_stats; } temp_storage; stats_state_s *const s = &state_g; @@ -455,7 +423,7 @@ __global__ void __launch_bounds__(block_size, 1) * @param ck_in pointer to first statistic chunk * @param num_chunks number of statistic chunks to merge * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ mergeIntColumnStats(merge_state_s *s, @@ -470,8 +438,8 @@ void __device__ mergeIntColumnStats(merge_state_s *s, int64_t vsum = 0; uint32_t non_nulls = 0; uint32_t null_count = 0; - bool has_minmax; - for (uint32_t i = t; i < num_chunks; i += 1024) { + __shared__ volatile bool has_minmax; + for (uint32_t i = t; i < num_chunks; i += block_size) { const statistics_chunk *ck = &ck_in[i]; if (ck->has_minmax) { vmin = min(vmin, ck->min_value.i_val); @@ -481,52 +449,29 @@ void __device__ mergeIntColumnStats(merge_state_s *s, non_nulls += ck->non_nulls; null_count += ck->null_count; } - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(non_nulls); - __syncwarp(); - vmin = cub::WarpReduce(storage.i64[t / 32]).Reduce(vmin, cub::Min()); - __syncwarp(); - vmin = shuffle(vmin); - - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(null_count); - __syncwarp(); - vmax = cub::WarpReduce(storage.i64[t / 32]).Reduce(vmax, cub::Max()); - __syncwarp(); - vmax = shuffle(vmax); - - vsum = cub::WarpReduce(storage.i64[t / 32]).Sum(vsum); + vmin = cub::BlockReduce(storage.i64).Reduce(vmin, cub::Min()); + __syncthreads(); + vmax = cub::BlockReduce(storage.i64).Reduce(vmax, cub::Max()); + if (!t) { has_minmax = (vmin <= vmax); } + __syncthreads(); + non_nulls = cub::BlockReduce(storage.u32).Sum(non_nulls); + __syncthreads(); + null_count = cub::BlockReduce(storage.u32).Sum(null_count); + __syncthreads(); + if (has_minmax) { vsum = cub::BlockReduce(storage.i64).Sum(vsum); } - if (!(t & 0x1f)) { - s->warp_non_nulls[t >> 5] = non_nulls; - s->warp_nulls[t >> 5] = null_count; - s->warp_min[t >> 5].i_val = vmin; - s->warp_max[t >> 5].i_val = vmax; - s->warp_sum[t >> 5].i_val = vsum; - } - has_minmax = __syncthreads_or(vmin <= vmax); - if (t < 32 * 1) { - vmin = cub::WarpReduce(storage.i64[t / 32]).Reduce(s->warp_min[t].i_val, cub::Min()); - if (!(t & 0x1f)) { + if (!t) { + if (has_minmax) { s->ck.min_value.i_val = vmin; - s->ck.has_minmax = (has_minmax); + s->ck.max_value.i_val = vmax; + s->ck.sum.i_val = vsum; } - } else if (t < 32 * 2) { - vmax = - cub::WarpReduce(storage.i64[t / 32]).Reduce(s->warp_max[t & 0x1f].i_val, cub::Max()); - if (!(t & 0x1f)) { s->ck.max_value.i_val = vmax; } - } else if (t < 32 * 3) { - vsum = cub::WarpReduce(storage.i64[t / 32]).Sum(s->warp_sum[t & 0x1f].i_val); - if (!(t & 0x1f)) { - s->ck.sum.i_val = vsum; - // TODO: For now, don't set the sum flag with 64-bit values so we don't have to check for - // 64-bit sum overflow - s->ck.has_sum = (dtype <= dtype_int32 && has_minmax); - } - } else if (t < 32 * 4) { - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_non_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.non_nulls = non_nulls; } - } else if (t < 32 * 5) { - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.null_count = null_count; } + s->ck.has_minmax = has_minmax; + // TODO: For now, don't set the sum flag with 64-bit values so we don't have to check for + // 64-bit sum overflow + s->ck.has_sum = (dtype <= dtype_int32 && has_minmax); + s->ck.non_nulls = non_nulls; + s->ck.null_count = null_count; } } @@ -538,7 +483,7 @@ void __device__ mergeIntColumnStats(merge_state_s *s, * @param ck_in pointer to first statistic chunk * @param num_chunks number of statistic chunks to merge * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ mergeFloatColumnStats(merge_state_s *s, @@ -552,8 +497,8 @@ void __device__ mergeFloatColumnStats(merge_state_s *s, double vsum = 0; uint32_t non_nulls = 0; uint32_t null_count = 0; - bool has_minmax; - for (uint32_t i = t; i < num_chunks; i += 1024) { + __shared__ volatile bool has_minmax; + for (uint32_t i = t; i < num_chunks; i += block_size) { const statistics_chunk *ck = &ck_in[i]; if (ck->has_minmax) { double v0 = ck->min_value.fp_val; @@ -566,51 +511,29 @@ void __device__ mergeFloatColumnStats(merge_state_s *s, null_count += ck->null_count; } - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(non_nulls); - __syncwarp(); - vmin = cub::WarpReduce(storage.f64[t / 32]).Reduce(vmin, cub::Min()); - __syncwarp(); - vmin = shuffle(vmin); - - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(null_count); - __syncwarp(); - vmax = cub::WarpReduce(storage.f64[t / 32]).Reduce(vmax, cub::Max()); - __syncwarp(); - vmax = shuffle(vmax); - - vsum = cub::WarpReduce(storage.f64[t / 32]).Reduce(vsum, IgnoreNaNSum()); - - if (!(t & 0x1f)) { - s->warp_non_nulls[t >> 5] = non_nulls; - s->warp_nulls[t >> 5] = null_count; - s->warp_min[t >> 5].fp_val = vmin; - s->warp_max[t >> 5].fp_val = vmax; - s->warp_sum[t >> 5].fp_val = vsum; + vmin = cub::BlockReduce(storage.f64).Reduce(vmin, cub::Min()); + __syncthreads(); + vmax = cub::BlockReduce(storage.f64).Reduce(vmax, cub::Max()); + if (!t) { has_minmax = (vmin <= vmax); } + __syncthreads(); + non_nulls = cub::BlockReduce(storage.u32).Sum(non_nulls); + __syncthreads(); + null_count = cub::BlockReduce(storage.u32).Sum(null_count); + __syncthreads(); + if (has_minmax) { + vsum = cub::BlockReduce(storage.f64).Reduce(vsum, IgnoreNaNSum()); } - has_minmax = __syncthreads_or(vmin <= vmax); - if (t < 32 * 1) { - vmin = cub::WarpReduce(storage.f64[t / 32]).Reduce(s->warp_min[t].fp_val, cub::Min()); - if (!(t & 0x1f)) { + + if (!t) { + if (has_minmax) { s->ck.min_value.fp_val = (vmin != 0.0) ? vmin : CUDART_NEG_ZERO; - s->ck.has_minmax = (has_minmax); - } - } else if (t < 32 * 2) { - vmax = - cub::WarpReduce(storage.f64[t / 32]).Reduce(s->warp_max[t & 0x1f].fp_val, cub::Max()); - if (!(t & 0x1f)) { s->ck.max_value.fp_val = (vmax != 0.0) ? vmax : CUDART_ZERO; } - } else if (t < 32 * 3) { - vsum = cub::WarpReduce(storage.f64[t / 32]) - .Reduce(s->warp_sum[t & 0x1f].fp_val, IgnoreNaNSum()); - if (!(t & 0x1f)) { - s->ck.sum.fp_val = vsum; - s->ck.has_sum = (has_minmax); // Implies sum is valid as well + s->ck.max_value.fp_val = (vmax != 0.0) ? vmax : CUDART_ZERO; + s->ck.sum.fp_val = vsum; } - } else if (t < 32 * 4) { - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_non_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.non_nulls = non_nulls; } - } else if (t < 32 * 5) { - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.null_count = null_count; } + s->ck.has_minmax = has_minmax; + s->ck.has_sum = has_minmax; // Implies sum is valid as well + s->ck.non_nulls = non_nulls; + s->ck.null_count = null_count; } } @@ -621,7 +544,7 @@ void __device__ mergeFloatColumnStats(merge_state_s *s, * @param ck_in pointer to first statistic chunk * @param num_chunks number of statistic chunks to merge * @param t thread id - * @param storage temporary storage for warp reduction + * @param storage temporary storage for reduction */ template void __device__ mergeStringColumnStats(merge_state_s *s, @@ -640,7 +563,7 @@ void __device__ mergeStringColumnStats(merge_state_s *s, bool has_minmax; string_stats minval, maxval; - for (uint32_t i = t; i < num_chunks; i += 1024) { + for (uint32_t i = t; i < num_chunks; i += block_size) { const statistics_chunk *ck = &ck_in[i]; if (ck->has_minmax) { uint32_t len0 = ck->min_value.str_val.length; @@ -660,50 +583,41 @@ void __device__ mergeStringColumnStats(merge_state_s *s, non_nulls += ck->non_nulls; null_count += ck->null_count; } - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(non_nulls); - __syncwarp(); - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(null_count); - __syncwarp(); - minval = WarpReduceMinString(smin, lmin); - maxval = WarpReduceMaxString(smax, lmax); - len_sum = cub::WarpReduce(storage.u32[t / 32]).Sum(len_sum); + minval = WarpReduceMinString(smin, lmin); + maxval = WarpReduceMaxString(smax, lmax); if (!(t & 0x1f)) { - s->warp_non_nulls[t >> 5] = non_nulls; - s->warp_nulls[t >> 5] = null_count; s->warp_min[t >> 5].str_val.ptr = minval.ptr; s->warp_min[t >> 5].str_val.length = minval.length; s->warp_max[t >> 5].str_val.ptr = maxval.ptr; s->warp_max[t >> 5].str_val.length = maxval.length; - s->warp_sum[t >> 5].str_val.length = len_sum; } has_minmax = __syncthreads_or(smin != nullptr); + + non_nulls = cub::BlockReduce(storage.u32).Sum(non_nulls); + __syncthreads(); + null_count = cub::BlockReduce(storage.u32).Sum(null_count); + __syncthreads(); + if (has_minmax) { len_sum = cub::BlockReduce(storage.u32).Sum(len_sum); } if (t < 32 * 1) { minval = WarpReduceMinString(s->warp_min[t].str_val.ptr, s->warp_min[t].str_val.length); if (!(t & 0x1f)) { - s->ck.min_value.str_val.ptr = minval.ptr; - s->ck.min_value.str_val.length = minval.length; - s->ck.has_minmax = has_minmax; + if (has_minmax) { + s->ck.min_value.str_val.ptr = minval.ptr; + s->ck.min_value.str_val.length = minval.length; + s->ck.sum.i_val = len_sum; + } + s->ck.has_minmax = has_minmax; + s->ck.has_sum = has_minmax; + s->ck.non_nulls = non_nulls; + s->ck.null_count = null_count; } } else if (t < 32 * 2) { maxval = WarpReduceMaxString(s->warp_max[t & 0x1f].str_val.ptr, s->warp_max[t & 0x1f].str_val.length); - if (!(t & 0x1f)) { + if (!((t & 0x1f) and has_minmax)) { s->ck.max_value.str_val.ptr = maxval.ptr; s->ck.max_value.str_val.length = maxval.length; } - } else if (t < 32 * 3) { - len_sum = - cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_sum[t & 0x1f].str_val.length); - if (!(t & 0x1f)) { - s->ck.sum.i_val = len_sum; - s->ck.has_sum = has_minmax; - } - } else if (t < 32 * 4) { - non_nulls = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_non_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.non_nulls = non_nulls; } - } else if (t < 32 * 5) { - null_count = cub::WarpReduce(storage.u32[t / 32]).Sum(s->warp_nulls[t & 0x1f]); - if (!(t & 0x1f)) { s->ck.null_count = null_count; } } } @@ -724,9 +638,9 @@ __global__ void __launch_bounds__(block_size, 1) { __shared__ __align__(8) merge_state_s state_g; __shared__ struct { - typename cub::WarpReduce::TempStorage u32[block_size / 32]; - typename cub::WarpReduce::TempStorage i64[block_size / 32]; - typename cub::WarpReduce::TempStorage f64[block_size / 32]; + typename cub::BlockReduce::TempStorage u32; + typename cub::BlockReduce::TempStorage i64; + typename cub::BlockReduce::TempStorage f64; } storage; merge_state_s *const s = &state_g; @@ -773,7 +687,8 @@ void GatherColumnStatistics(statistics_chunk *chunks, uint32_t num_chunks, rmm::cuda_stream_view stream) { - gpuGatherColumnStatistics<1024><<>>(chunks, groups); + gpuGatherColumnStatistics + <<>>(chunks, groups); } /** @@ -791,8 +706,8 @@ void MergeColumnStatistics(statistics_chunk *chunks_out, uint32_t num_chunks, rmm::cuda_stream_view stream) { - gpuMergeColumnStatistics<1024> - <<>>(chunks_out, chunks_in, groups); + gpuMergeColumnStatistics + <<>>(chunks_out, chunks_in, groups); } } // namespace io diff --git a/cpp/src/io/utilities/block_utils.cuh b/cpp/src/io/utilities/block_utils.cuh index 9046eebcb02..4c03f9a9ca0 100644 --- a/cpp/src/io/utilities/block_utils.cuh +++ b/cpp/src/io/utilities/block_utils.cuh @@ -47,36 +47,6 @@ inline __device__ void nanosleep(T d) } // Warp reduction helpers -template -inline __device__ T WarpReduceSum2(T acc) -{ - return acc + shuffle_xor(acc, 1); -} -template -inline __device__ T WarpReduceSum4(T acc) -{ - acc = WarpReduceSum2(acc); - return acc + shuffle_xor(acc, 2); -} -template -inline __device__ T WarpReduceSum8(T acc) -{ - acc = WarpReduceSum4(acc); - return acc + shuffle_xor(acc, 4); -} -template -inline __device__ T WarpReduceSum16(T acc) -{ - acc = WarpReduceSum8(acc); - return acc + shuffle_xor(acc, 8); -} -template -inline __device__ T WarpReduceSum32(T acc) -{ - acc = WarpReduceSum16(acc); - return acc + shuffle_xor(acc, 16); -} - template inline __device__ T WarpReduceOr2(T acc) {