From 979365fe7912b5996f3143a9e6a46d85486f43f9 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 8 Apr 2022 14:38:00 -0400 Subject: [PATCH 01/14] Remove unused arguments --- cpp/src/io/parquet/chunk_dict.cu | 18 ++++++------------ cpp/src/io/parquet/parquet_gpu.hpp | 8 ++------ cpp/src/io/parquet/writer_impl.cu | 4 ++-- 3 files changed, 10 insertions(+), 20 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index f61cfa83579..6a30b627269 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -92,8 +92,7 @@ struct map_find_fn { template __global__ void __launch_bounds__(block_size, 1) - populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags) + populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan frags) { auto col_idx = blockIdx.y; auto block_x = blockIdx.x; @@ -215,8 +214,7 @@ __global__ void __launch_bounds__(block_size, 1) template __global__ void __launch_bounds__(block_size, 1) - get_dictionary_indices_kernel(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags) + get_dictionary_indices_kernel(cudf::detail::device_2dspan frags) { auto col_idx = blockIdx.y; auto block_x = blockIdx.x; @@ -276,15 +274,13 @@ void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_st <<>>(chunks); } -void populate_chunk_hash_maps(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags, +void populate_chunk_hash_maps(cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { constexpr int block_size = 256; dim3 const dim_grid(frags.size().second, frags.size().first); - populate_chunk_hash_maps_kernel - <<>>(chunks, frags); + populate_chunk_hash_maps_kernel<<>>(frags); } void collect_map_entries(device_span chunks, rmm::cuda_stream_view stream) @@ -293,15 +289,13 @@ void collect_map_entries(device_span chunks, rmm::cuda_stream_vi collect_map_entries_kernel<<>>(chunks); } -void get_dictionary_indices(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags, +void get_dictionary_indices(cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { constexpr int block_size = 256; dim3 const dim_grid(frags.size().second, frags.size().first); - get_dictionary_indices_kernel - <<>>(chunks, frags); + get_dictionary_indices_kernel<<>>(frags); } } // namespace gpu } // namespace parquet diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 8d0aa8881c3..53b82c73a35 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -529,12 +529,10 @@ void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_st /** * @brief Insert chunk values into their respective hash maps * - * @param chunks Column chunks [rowgroup][column] * @param frags Column fragments * @param stream CUDA stream to use */ -void populate_chunk_hash_maps(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags, +void populate_chunk_hash_maps(cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); /** @@ -554,12 +552,10 @@ void collect_map_entries(device_span chunks, rmm::cuda_stream_vi * Since dict_data itself contains indices into the original cudf column, this means that * col[row] == col[dict_data[dict_index[row - chunk.start_row]]] * - * @param chunks Column chunks [rowgroup][column] * @param frags Column fragments * @param stream CUDA stream to use */ -void get_dictionary_indices(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags, +void get_dictionary_indices(cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 70a594423c9..1c7eeddb58d 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -929,7 +929,7 @@ auto build_chunk_dictionaries(hostdevice_2dvector& chunks, chunks.host_to_device(stream); gpu::initialize_chunk_hash_maps(chunks.device_view().flat_view(), stream); - gpu::populate_chunk_hash_maps(chunks, frags, stream); + gpu::populate_chunk_hash_maps(frags, stream); chunks.device_to_host(stream, true); @@ -978,7 +978,7 @@ auto build_chunk_dictionaries(hostdevice_2dvector& chunks, } chunks.host_to_device(stream); gpu::collect_map_entries(chunks.device_view().flat_view(), stream); - gpu::get_dictionary_indices(chunks.device_view(), frags, stream); + gpu::get_dictionary_indices(frags, stream); return std::make_pair(std::move(dict_data), std::move(dict_index)); } From 53ee30754733bc134d9d199ebe2d5aacc4e95243 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 8 Apr 2022 14:48:41 -0400 Subject: [PATCH 02/14] Remove redundant find --- cpp/src/io/parquet/chunk_dict.cu | 45 ++++++++++++++------------------ 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 6a30b627269..d26182faf43 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -125,8 +125,6 @@ __global__ void __launch_bounds__(block_size, 1) // Make a view of the hash map auto hash_map_mutable = map_type::device_mutable_view( chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - auto hash_map = map_type::device_view( - chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); __shared__ int total_num_dict_entries; for (size_type i = 0; i < s_num_values; i += block_size) { @@ -139,29 +137,26 @@ __global__ void __launch_bounds__(block_size, 1) size_type is_unique = 0; size_type uniq_elem_size = 0; if (is_valid) { - auto found_slot = type_dispatcher(data_col.type(), map_find_fn{hash_map}, data_col, val_idx); - if (found_slot == hash_map.end()) { - is_unique = - type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx); - uniq_elem_size = [&]() -> size_type { - if (not is_unique) { return 0; } - switch (col->physical_type) { - case Type::INT32: return 4; - case Type::INT64: return 8; - case Type::INT96: return 12; - case Type::FLOAT: return 4; - case Type::DOUBLE: return 8; - case Type::BYTE_ARRAY: - if (data_col.type().id() == type_id::STRING) { - // Strings are stored as 4 byte length + string bytes - return 4 + data_col.element(val_idx).size_bytes(); - } - case Type::FIXED_LEN_BYTE_ARRAY: - if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); } - default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding"); - } - }(); - } + is_unique = + type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx); + uniq_elem_size = [&]() -> size_type { + if (not is_unique) { return 0; } + switch (col->physical_type) { + case Type::INT32: return 4; + case Type::INT64: return 8; + case Type::INT96: return 12; + case Type::FLOAT: return 4; + case Type::DOUBLE: return 8; + case Type::BYTE_ARRAY: + if (data_col.type().id() == type_id::STRING) { + // Strings are stored as 4 byte length + string bytes + return 4 + data_col.element(val_idx).size_bytes(); + } + case Type::FIXED_LEN_BYTE_ARRAY: + if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); } + default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding"); + } + }(); } __syncthreads(); From e45bd6b98c989b2741a0228e7342e92a823c570e Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 11 Apr 2022 11:00:07 -0400 Subject: [PATCH 03/14] size_type instead of size_t --- cpp/src/io/parquet/chunk_dict.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index d26182faf43..d98ec1aa619 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -34,7 +34,7 @@ __global__ void __launch_bounds__(block_size, 1) auto chunk = chunks[blockIdx.x]; auto t = threadIdx.x; // fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk - for (size_t i = 0; i < chunk.dict_map_size; i += block_size) { + for (size_type i = 0; i < chunk.dict_map_size; i += block_size) { if (t + i < chunk.dict_map_size) { new (&chunk.dict_map_slots[t + i].first) map_type::atomic_key_type{KEY_SENTINEL}; new (&chunk.dict_map_slots[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL}; From c824f4b2b8d5e4cb583a3d735022ad2e0ce2aae8 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 11 Apr 2022 12:02:45 -0400 Subject: [PATCH 04/14] Use relaxed memory order --- cpp/src/io/parquet/chunk_dict.cu | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index d98ec1aa619..518ff6d53f9 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -22,6 +22,8 @@ #include +#include + namespace cudf { namespace io { namespace parquet { @@ -159,7 +161,6 @@ __global__ void __launch_bounds__(block_size, 1) }(); } - __syncthreads(); auto num_unique = block_reduce(reduce_storage).Sum(is_unique); __syncthreads(); auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); @@ -186,22 +187,22 @@ __global__ void __launch_bounds__(block_size, 1) auto map = map_type::device_view(chunk.dict_map_slots, chunk.dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - __shared__ size_type counter; - if (t == 0) counter = 0; + __shared__ cuda::atomic counter; + using cuda::std::memory_order_relaxed; + if (t == 0) { counter.store(0, memory_order_relaxed); } __syncthreads(); - for (size_t i = 0; i < chunk.dict_map_size; i += block_size) { + for (size_type i = 0; i < chunk.dict_map_size; i += block_size) { if (t + i < chunk.dict_map_size) { - auto slot = map.begin_slot() + t + i; - auto key = static_cast(slot->first); + auto* slot = reinterpret_cast(map.begin_slot() + t + i); + auto key = slot->first; if (key != KEY_SENTINEL) { - auto loc = atomicAdd(&counter, 1); + auto loc = counter.fetch_add(1, memory_order_relaxed); cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size"); chunk.dict_data[loc] = key; // If sorting dict page ever becomes a hard requirement, enable the following statement and // add a dict sorting step before storing into the slot's second field. // chunk.dict_data_idx[loc] = t + i; - slot->second.store(loc); - // TODO: ^ This doesn't need to be atomic. Try casting to value_type ptr and just writing. + slot->second = loc; } } } From da2718d375c59560508bda46c71fb952e46d20ec Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 11 Apr 2022 17:03:10 -0400 Subject: [PATCH 05/14] Minor cleanups --- cpp/src/io/parquet/chunk_dict.cu | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 518ff6d53f9..2f219c674ed 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -103,14 +103,14 @@ __global__ void __launch_bounds__(block_size, 1) auto chunk = frag.chunk; auto col = chunk->col_desc; + if (not chunk->use_dictionary) { return; } + size_type start_row = frag.start_row; size_type end_row = frag.start_row + frag.num_rows; __shared__ size_type s_start_value_idx; __shared__ size_type s_num_values; - if (not chunk->use_dictionary) { return; } - if (t == 0) { // Find the bounds of values in leaf column to be inserted into the map for current chunk auto cudf_col = *(col->parent_column); @@ -219,6 +219,8 @@ __global__ void __launch_bounds__(block_size, 1) auto chunk = frag.chunk; auto col = chunk->col_desc; + if (not chunk->use_dictionary) { return; } + size_type start_row = frag.start_row; size_type end_row = frag.start_row + frag.num_rows; @@ -236,14 +238,12 @@ __global__ void __launch_bounds__(block_size, 1) } __syncthreads(); - if (not chunk->use_dictionary) { return; } - column_device_view const& data_col = *col->leaf_column; auto map = map_type::device_view( chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - for (size_t i = 0; i < s_num_values; i += block_size) { + for (size_type i = 0; i < s_num_values; i += block_size) { if (t + i < s_num_values) { auto val_idx = s_start_value_idx + t + i; bool is_valid = From e43eccc546dd74c66e5ebeec8b3c826bd85013e7 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Tue, 12 Apr 2022 10:58:13 -0400 Subject: [PATCH 06/14] Use CG logical to get rid of block-wide sync --- cpp/src/io/parquet/chunk_dict.cu | 68 +++++++++++++++----------------- 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 2f219c674ed..9e203530c00 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -14,6 +14,8 @@ * limitations under the License. */ +#define _CG_ABI_EXPERIMENTAL // enable experimental API + #include #include @@ -24,10 +26,18 @@ #include +#include +#include + namespace cudf { namespace io { namespace parquet { namespace gpu { +namespace { +constexpr int DEFAULT_BLOCK_SIZE = 256; +} + +namespace cg = cooperative_groups; template __global__ void __launch_bounds__(block_size, 1) @@ -108,27 +118,23 @@ __global__ void __launch_bounds__(block_size, 1) size_type start_row = frag.start_row; size_type end_row = frag.start_row + frag.num_rows; - __shared__ size_type s_start_value_idx; - __shared__ size_type s_num_values; + __shared__ cg::experimental::block_tile_memory<4, block_size> shared; + auto thb = cg::experimental::this_thread_block(shared); + auto block = cg::experimental::tiled_partition(thb); - if (t == 0) { - // Find the bounds of values in leaf column to be inserted into the map for current chunk - auto cudf_col = *(col->parent_column); - s_start_value_idx = row_to_value_idx(start_row, cudf_col); - auto end_value_idx = row_to_value_idx(end_row, cudf_col); - s_num_values = end_value_idx - s_start_value_idx; - } - __syncthreads(); + // Find the bounds of values in leaf column to be inserted into the map for current chunk + auto cudf_col = *(col->parent_column); + auto s_start_value_idx = row_to_value_idx(start_row, cudf_col); + auto end_value_idx = row_to_value_idx(end_row, cudf_col); + auto s_num_values = end_value_idx - s_start_value_idx; column_device_view const& data_col = *col->leaf_column; - using block_reduce = cub::BlockReduce; - __shared__ typename block_reduce::TempStorage reduce_storage; // Make a view of the hash map auto hash_map_mutable = map_type::device_mutable_view( chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - __shared__ int total_num_dict_entries; + int total_num_dict_entries; for (size_type i = 0; i < s_num_values; i += block_size) { // add the value to hash map size_type val_idx = i + t + s_start_value_idx; @@ -161,15 +167,14 @@ __global__ void __launch_bounds__(block_size, 1) }(); } - auto num_unique = block_reduce(reduce_storage).Sum(is_unique); - __syncthreads(); - auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); + auto num_unique = cg::reduce(block, is_unique, cg::plus()); + auto uniq_data_size = cg::reduce(block, uniq_elem_size, cg::plus()); if (t == 0) { total_num_dict_entries = atomicAdd(&chunk->num_dict_entries, num_unique); total_num_dict_entries += num_unique; atomicAdd(&chunk->uniq_data_size, uniq_data_size); } - __syncthreads(); + total_num_dict_entries = block.shfl(total_num_dict_entries, 0); // Check if the num unique values in chunk has already exceeded max dict size and early exit if (total_num_dict_entries > MAX_DICT_SIZE) { return; } @@ -224,19 +229,12 @@ __global__ void __launch_bounds__(block_size, 1) size_type start_row = frag.start_row; size_type end_row = frag.start_row + frag.num_rows; - __shared__ size_type s_start_value_idx; - __shared__ size_type s_ck_start_val_idx; - __shared__ size_type s_num_values; - - if (t == 0) { - // Find the bounds of values in leaf column to be searched in the map for current chunk - auto cudf_col = *(col->parent_column); - s_start_value_idx = row_to_value_idx(start_row, cudf_col); - s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col); - auto end_value_idx = row_to_value_idx(end_row, cudf_col); - s_num_values = end_value_idx - s_start_value_idx; - } - __syncthreads(); + // Find the bounds of values in leaf column to be searched in the map for current chunk + auto cudf_col = *(col->parent_column); + auto s_start_value_idx = row_to_value_idx(start_row, cudf_col); + auto s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col); + auto end_value_idx = row_to_value_idx(end_row, cudf_col); + auto s_num_values = end_value_idx - s_start_value_idx; column_device_view const& data_col = *col->leaf_column; @@ -273,10 +271,9 @@ void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_st void populate_chunk_hash_maps(cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { - constexpr int block_size = 256; dim3 const dim_grid(frags.size().second, frags.size().first); - - populate_chunk_hash_maps_kernel<<>>(frags); + populate_chunk_hash_maps_kernel + <<>>(frags); } void collect_map_entries(device_span chunks, rmm::cuda_stream_view stream) @@ -288,10 +285,9 @@ void collect_map_entries(device_span chunks, rmm::cuda_stream_vi void get_dictionary_indices(cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { - constexpr int block_size = 256; dim3 const dim_grid(frags.size().second, frags.size().first); - - get_dictionary_indices_kernel<<>>(frags); + get_dictionary_indices_kernel + <<>>(frags); } } // namespace gpu } // namespace parquet From 68c45206531e55904789cdedc33eddf9441a05b8 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Tue, 12 Apr 2022 11:19:58 -0400 Subject: [PATCH 07/14] Use while logical --- cpp/src/io/parquet/chunk_dict.cu | 47 +++++++++++++++----------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 9e203530c00..d4b67a573ff 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -115,18 +115,17 @@ __global__ void __launch_bounds__(block_size, 1) if (not chunk->use_dictionary) { return; } - size_type start_row = frag.start_row; - size_type end_row = frag.start_row + frag.num_rows; - __shared__ cg::experimental::block_tile_memory<4, block_size> shared; auto thb = cg::experimental::this_thread_block(shared); auto block = cg::experimental::tiled_partition(thb); + size_type start_row = frag.start_row; + size_type end_row = frag.start_row + frag.num_rows; + // Find the bounds of values in leaf column to be inserted into the map for current chunk auto cudf_col = *(col->parent_column); auto s_start_value_idx = row_to_value_idx(start_row, cudf_col); auto end_value_idx = row_to_value_idx(end_row, cudf_col); - auto s_num_values = end_value_idx - s_start_value_idx; column_device_view const& data_col = *col->leaf_column; @@ -135,11 +134,9 @@ __global__ void __launch_bounds__(block_size, 1) chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); int total_num_dict_entries; - for (size_type i = 0; i < s_num_values; i += block_size) { - // add the value to hash map - size_type val_idx = i + t + s_start_value_idx; - bool is_valid = - (i + t < s_num_values && val_idx < data_col.size()) and data_col.is_valid(val_idx); + auto val_idx = s_start_value_idx + t; + while (block.any(val_idx < end_value_idx)) { + auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx); // insert element at val_idx to hash map and count successful insertions size_type is_unique = 0; @@ -178,6 +175,8 @@ __global__ void __launch_bounds__(block_size, 1) // Check if the num unique values in chunk has already exceeded max dict size and early exit if (total_num_dict_entries > MAX_DICT_SIZE) { return; } + + val_idx += block_size; } } @@ -234,30 +233,28 @@ __global__ void __launch_bounds__(block_size, 1) auto s_start_value_idx = row_to_value_idx(start_row, cudf_col); auto s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col); auto end_value_idx = row_to_value_idx(end_row, cudf_col); - auto s_num_values = end_value_idx - s_start_value_idx; column_device_view const& data_col = *col->leaf_column; auto map = map_type::device_view( chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - for (size_type i = 0; i < s_num_values; i += block_size) { - if (t + i < s_num_values) { - auto val_idx = s_start_value_idx + t + i; - bool is_valid = - (i + t < s_num_values && val_idx < data_col.size()) ? data_col.is_valid(val_idx) : false; - - if (is_valid) { - auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx); - cudf_assert(found_slot != map.end() && - "Unable to find value in map in dictionary index construction"); - if (found_slot != map.end()) { - // No need for atomic as this is not going to be modified by any other thread - auto* val_ptr = reinterpret_cast(&found_slot->second); - chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr; - } + auto val_idx = s_start_value_idx + t; + while (val_idx < end_value_idx) { + auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx); + + if (is_valid) { + auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx); + cudf_assert(found_slot != map.end() && + "Unable to find value in map in dictionary index construction"); + if (found_slot != map.end()) { + // No need for atomic as this is not going to be modified by any other thread + auto* val_ptr = reinterpret_cast(&found_slot->second); + chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr; } } + + val_idx += block_size; } } From def1c5182f4d225173ac47a61ca27140966aea23 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Tue, 12 Apr 2022 13:36:22 -0400 Subject: [PATCH 08/14] Revert back to cub reduce --- cpp/src/io/parquet/chunk_dict.cu | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index d4b67a573ff..8308e027324 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -27,7 +27,6 @@ #include #include -#include namespace cudf { namespace io { @@ -118,14 +117,17 @@ __global__ void __launch_bounds__(block_size, 1) __shared__ cg::experimental::block_tile_memory<4, block_size> shared; auto thb = cg::experimental::this_thread_block(shared); auto block = cg::experimental::tiled_partition(thb); + // cub::BlockReduce instead of cg::reduce for performance purpose + using block_reduce = cub::BlockReduce; + __shared__ typename block_reduce::TempStorage reduce_storage; size_type start_row = frag.start_row; size_type end_row = frag.start_row + frag.num_rows; // Find the bounds of values in leaf column to be inserted into the map for current chunk - auto cudf_col = *(col->parent_column); - auto s_start_value_idx = row_to_value_idx(start_row, cudf_col); - auto end_value_idx = row_to_value_idx(end_row, cudf_col); + auto const cudf_col = *(col->parent_column); + auto const s_start_value_idx = row_to_value_idx(start_row, cudf_col); + auto const end_value_idx = row_to_value_idx(end_row, cudf_col); column_device_view const& data_col = *col->leaf_column; @@ -133,7 +135,7 @@ __global__ void __launch_bounds__(block_size, 1) auto hash_map_mutable = map_type::device_mutable_view( chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - int total_num_dict_entries; + __shared__ int total_num_dict_entries; auto val_idx = s_start_value_idx + t; while (block.any(val_idx < end_value_idx)) { auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx); @@ -164,14 +166,15 @@ __global__ void __launch_bounds__(block_size, 1) }(); } - auto num_unique = cg::reduce(block, is_unique, cg::plus()); - auto uniq_data_size = cg::reduce(block, uniq_elem_size, cg::plus()); + auto num_unique = block_reduce(reduce_storage).Sum(is_unique); + block.sync(); + auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); if (t == 0) { total_num_dict_entries = atomicAdd(&chunk->num_dict_entries, num_unique); total_num_dict_entries += num_unique; atomicAdd(&chunk->uniq_data_size, uniq_data_size); } - total_num_dict_entries = block.shfl(total_num_dict_entries, 0); + block.sync(); // Check if the num unique values in chunk has already exceeded max dict size and early exit if (total_num_dict_entries > MAX_DICT_SIZE) { return; } @@ -229,10 +232,10 @@ __global__ void __launch_bounds__(block_size, 1) size_type end_row = frag.start_row + frag.num_rows; // Find the bounds of values in leaf column to be searched in the map for current chunk - auto cudf_col = *(col->parent_column); - auto s_start_value_idx = row_to_value_idx(start_row, cudf_col); - auto s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col); - auto end_value_idx = row_to_value_idx(end_row, cudf_col); + auto const cudf_col = *(col->parent_column); + auto const s_start_value_idx = row_to_value_idx(start_row, cudf_col); + auto const s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col); + auto const end_value_idx = row_to_value_idx(end_row, cudf_col); column_device_view const& data_col = *col->leaf_column; From 19a23283a173fdae7b72814ff4162586f9b477cb Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 13 Apr 2022 10:25:43 -0400 Subject: [PATCH 09/14] Clean up atomic counter --- cpp/src/io/parquet/chunk_dict.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 8308e027324..6ae7a8c8089 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -194,9 +194,9 @@ __global__ void __launch_bounds__(block_size, 1) auto map = map_type::device_view(chunk.dict_map_slots, chunk.dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - __shared__ cuda::atomic counter; + __shared__ cuda::atomic counter; using cuda::std::memory_order_relaxed; - if (t == 0) { counter.store(0, memory_order_relaxed); } + if (t == 0) { new (&counter) cuda::atomic{0}; } __syncthreads(); for (size_type i = 0; i < chunk.dict_map_size; i += block_size) { if (t + i < chunk.dict_map_size) { From 592821ceb101ad3f1c4ccbf28da0db0c1b0c8872 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 13 Apr 2022 10:33:28 -0400 Subject: [PATCH 10/14] Get rid of CG-based loop --- cpp/src/io/parquet/chunk_dict.cu | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 6ae7a8c8089..b67c1f0d55a 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -14,8 +14,6 @@ * limitations under the License. */ -#define _CG_ABI_EXPERIMENTAL // enable experimental API - #include #include @@ -26,8 +24,6 @@ #include -#include - namespace cudf { namespace io { namespace parquet { @@ -36,8 +32,6 @@ namespace { constexpr int DEFAULT_BLOCK_SIZE = 256; } -namespace cg = cooperative_groups; - template __global__ void __launch_bounds__(block_size, 1) initialize_chunk_hash_maps_kernel(device_span chunks) @@ -114,10 +108,6 @@ __global__ void __launch_bounds__(block_size, 1) if (not chunk->use_dictionary) { return; } - __shared__ cg::experimental::block_tile_memory<4, block_size> shared; - auto thb = cg::experimental::this_thread_block(shared); - auto block = cg::experimental::tiled_partition(thb); - // cub::BlockReduce instead of cg::reduce for performance purpose using block_reduce = cub::BlockReduce; __shared__ typename block_reduce::TempStorage reduce_storage; @@ -137,7 +127,7 @@ __global__ void __launch_bounds__(block_size, 1) __shared__ int total_num_dict_entries; auto val_idx = s_start_value_idx + t; - while (block.any(val_idx < end_value_idx)) { + while (val_idx - block_size < end_value_idx) { auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx); // insert element at val_idx to hash map and count successful insertions @@ -167,14 +157,14 @@ __global__ void __launch_bounds__(block_size, 1) } auto num_unique = block_reduce(reduce_storage).Sum(is_unique); - block.sync(); + __syncthreads(); auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); if (t == 0) { total_num_dict_entries = atomicAdd(&chunk->num_dict_entries, num_unique); total_num_dict_entries += num_unique; atomicAdd(&chunk->uniq_data_size, uniq_data_size); } - block.sync(); + __syncthreads(); // Check if the num unique values in chunk has already exceeded max dict size and early exit if (total_num_dict_entries > MAX_DICT_SIZE) { return; } From 8d51f9514049b14c9a459d466593766748a2895c Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 13 Apr 2022 10:38:48 -0400 Subject: [PATCH 11/14] Remove min number of CTAs in __launch_bounds__ --- cpp/src/io/parquet/chunk_dict.cu | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index b67c1f0d55a..dfd9b56a005 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -33,7 +33,7 @@ constexpr int DEFAULT_BLOCK_SIZE = 256; } template -__global__ void __launch_bounds__(block_size, 1) +__global__ void __launch_bounds__(block_size) initialize_chunk_hash_maps_kernel(device_span chunks) { auto chunk = chunks[blockIdx.x]; @@ -96,7 +96,7 @@ struct map_find_fn { }; template -__global__ void __launch_bounds__(block_size, 1) +__global__ void __launch_bounds__(block_size) populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan frags) { auto col_idx = blockIdx.y; @@ -174,7 +174,7 @@ __global__ void __launch_bounds__(block_size, 1) } template -__global__ void __launch_bounds__(block_size, 1) +__global__ void __launch_bounds__(block_size) collect_map_entries_kernel(device_span chunks) { auto& chunk = chunks[blockIdx.x]; @@ -206,7 +206,7 @@ __global__ void __launch_bounds__(block_size, 1) } template -__global__ void __launch_bounds__(block_size, 1) +__global__ void __launch_bounds__(block_size) get_dictionary_indices_kernel(cudf::detail::device_2dspan frags) { auto col_idx = blockIdx.y; From d89c37c89db7da919c0a9b7f0496db7f322ed2ed Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 13 Apr 2022 14:26:33 -0400 Subject: [PATCH 12/14] Revert back to CG-based loop + fix a minor bug --- cpp/src/io/parquet/chunk_dict.cu | 40 ++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index dfd9b56a005..3a1519010f9 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -14,6 +14,8 @@ * limitations under the License. */ +#define _CG_ABI_EXPERIMENTAL // enable experimental API + #include #include @@ -24,6 +26,8 @@ #include +#include + namespace cudf { namespace io { namespace parquet { @@ -32,6 +36,8 @@ namespace { constexpr int DEFAULT_BLOCK_SIZE = 256; } +namespace cg = cooperative_groups; + template __global__ void __launch_bounds__(block_size) initialize_chunk_hash_maps_kernel(device_span chunks) @@ -101,13 +107,16 @@ __global__ void __launch_bounds__(block_size) { auto col_idx = blockIdx.y; auto block_x = blockIdx.x; - auto t = threadIdx.x; auto frag = frags[col_idx][block_x]; auto chunk = frag.chunk; auto col = chunk->col_desc; if (not chunk->use_dictionary) { return; } + __shared__ cg::experimental::block_tile_memory<4, block_size> shared; + auto thb = cg::experimental::this_thread_block(shared); + auto block = cg::experimental::tiled_partition(thb); + // cub::BlockReduce instead of cg::reduce for performance purpose using block_reduce = cub::BlockReduce; __shared__ typename block_reduce::TempStorage reduce_storage; @@ -115,9 +124,9 @@ __global__ void __launch_bounds__(block_size) size_type end_row = frag.start_row + frag.num_rows; // Find the bounds of values in leaf column to be inserted into the map for current chunk - auto const cudf_col = *(col->parent_column); - auto const s_start_value_idx = row_to_value_idx(start_row, cudf_col); - auto const end_value_idx = row_to_value_idx(end_row, cudf_col); + auto const cudf_col = *(col->parent_column); + size_type const s_start_value_idx = row_to_value_idx(start_row, cudf_col); + size_type const end_value_idx = row_to_value_idx(end_row, cudf_col); column_device_view const& data_col = *col->leaf_column; @@ -125,10 +134,15 @@ __global__ void __launch_bounds__(block_size) auto hash_map_mutable = map_type::device_mutable_view( chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - __shared__ int total_num_dict_entries; - auto val_idx = s_start_value_idx + t; - while (val_idx - block_size < end_value_idx) { - auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx); + __shared__ size_type total_num_dict_entries; + if (block.thread_rank() == 0) { total_num_dict_entries = chunk->num_dict_entries; } + size_type val_idx = s_start_value_idx + t; + while (block.any(val_idx < end_value_idx)) { + // Check if the num unique values in chunk has already exceeded max dict size and early exit + if (total_num_dict_entries > MAX_DICT_SIZE) { return; } + + auto const is_valid = + val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx); // insert element at val_idx to hash map and count successful insertions size_type is_unique = 0; @@ -157,20 +171,16 @@ __global__ void __launch_bounds__(block_size) } auto num_unique = block_reduce(reduce_storage).Sum(is_unique); - __syncthreads(); + block.sync(); auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); - if (t == 0) { + if (block.thread_rank() == 0) { total_num_dict_entries = atomicAdd(&chunk->num_dict_entries, num_unique); total_num_dict_entries += num_unique; atomicAdd(&chunk->uniq_data_size, uniq_data_size); } - __syncthreads(); - - // Check if the num unique values in chunk has already exceeded max dict size and early exit - if (total_num_dict_entries > MAX_DICT_SIZE) { return; } val_idx += block_size; - } + } // while } template From b513f2d752ae09e6c706f5c5eda04b7b62c58a23 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 13 Apr 2022 15:40:00 -0400 Subject: [PATCH 13/14] Fix a bug --- cpp/src/io/parquet/chunk_dict.cu | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 3a1519010f9..ebfd2c9b3bc 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -107,6 +107,7 @@ __global__ void __launch_bounds__(block_size) { auto col_idx = blockIdx.y; auto block_x = blockIdx.x; + auto t = threadIdx.x; auto frag = frags[col_idx][block_x]; auto chunk = frag.chunk; auto col = chunk->col_desc; @@ -135,7 +136,7 @@ __global__ void __launch_bounds__(block_size) chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); __shared__ size_type total_num_dict_entries; - if (block.thread_rank() == 0) { total_num_dict_entries = chunk->num_dict_entries; } + if (t == 0) { total_num_dict_entries = chunk->num_dict_entries; } size_type val_idx = s_start_value_idx + t; while (block.any(val_idx < end_value_idx)) { // Check if the num unique values in chunk has already exceeded max dict size and early exit From 7e5d0bb4de4d08e294ea80258337bc5d572d02bf Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 13 Apr 2022 15:53:26 -0400 Subject: [PATCH 14/14] Remove CG algos again for performance purpose --- cpp/src/io/parquet/chunk_dict.cu | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index ebfd2c9b3bc..d3ac491f416 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -14,8 +14,6 @@ * limitations under the License. */ -#define _CG_ABI_EXPERIMENTAL // enable experimental API - #include #include @@ -26,8 +24,6 @@ #include -#include - namespace cudf { namespace io { namespace parquet { @@ -36,8 +32,6 @@ namespace { constexpr int DEFAULT_BLOCK_SIZE = 256; } -namespace cg = cooperative_groups; - template __global__ void __launch_bounds__(block_size) initialize_chunk_hash_maps_kernel(device_span chunks) @@ -114,10 +108,6 @@ __global__ void __launch_bounds__(block_size) if (not chunk->use_dictionary) { return; } - __shared__ cg::experimental::block_tile_memory<4, block_size> shared; - auto thb = cg::experimental::this_thread_block(shared); - auto block = cg::experimental::tiled_partition(thb); - // cub::BlockReduce instead of cg::reduce for performance purpose using block_reduce = cub::BlockReduce; __shared__ typename block_reduce::TempStorage reduce_storage; @@ -136,12 +126,8 @@ __global__ void __launch_bounds__(block_size) chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); __shared__ size_type total_num_dict_entries; - if (t == 0) { total_num_dict_entries = chunk->num_dict_entries; } size_type val_idx = s_start_value_idx + t; - while (block.any(val_idx < end_value_idx)) { - // Check if the num unique values in chunk has already exceeded max dict size and early exit - if (total_num_dict_entries > MAX_DICT_SIZE) { return; } - + while (val_idx - block_size < end_value_idx) { auto const is_valid = val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx); @@ -172,13 +158,17 @@ __global__ void __launch_bounds__(block_size) } auto num_unique = block_reduce(reduce_storage).Sum(is_unique); - block.sync(); + __syncthreads(); auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); - if (block.thread_rank() == 0) { + if (t == 0) { total_num_dict_entries = atomicAdd(&chunk->num_dict_entries, num_unique); total_num_dict_entries += num_unique; atomicAdd(&chunk->uniq_data_size, uniq_data_size); } + __syncthreads(); + + // Check if the num unique values in chunk has already exceeded max dict size and early exit + if (total_num_dict_entries > MAX_DICT_SIZE) { return; } val_idx += block_size; } // while