From 017d52a3c7aae758dbaca5495997db8482933b46 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 20 Apr 2022 11:07:29 -0400 Subject: [PATCH] Improve parquet dictionary encoding (#10635) This PR includes several changes to improve parquet dictionary encoding: - API cleanups: get rid of unused arguments - Remove min block limit in ` __launch_bounds__` - Simplify the grid-stride loop logic by using `while` - All threads calculate start/end indices instead of one doing the calculation and broadcasting the result (no more shared memory or block-wide sync). Other ideas tested but not eventually included in this PR due to zero or negative performance impact: - Tuning hash map occupancy - `cg::shfl` instead of shared memory + sync - CG based `insert`/`find` - Relaxed atomic for `num_dict_entries` and `uniq_data_size` - `cg::reduce` instead of `cub::BlockReduce` Before: ``` ----------------------------------------------------------------------------------------------------------------------- Benchmark Time CPU Iterations UserCounters... ----------------------------------------------------------------------------------------------------------------------- ParquetWrite/integral_void_output/29/0/1/1/2/manual_time 734 ms 734 ms 1 bytes_per_second=697.128M/s encoded_file_size=530.706M peak_memory_usage=1.7804G ParquetWrite/integral_void_output/29/1000/1/1/2/manual_time 303 ms 303 ms 2 bytes_per_second=1.65131G/s encoded_file_size=397.998M peak_memory_usage=1.49675G ParquetWrite/integral_void_output/29/0/32/1/2/manual_time 734 ms 734 ms 1 bytes_per_second=697.713M/s encoded_file_size=530.706M peak_memory_usage=1.7804G ParquetWrite/integral_void_output/29/1000/32/1/2/manual_time 61.9 ms 61.9 ms 11 bytes_per_second=8.07721G/s encoded_file_size=159.574M peak_memory_usage=1.49675G ParquetWrite/integral_void_output/29/0/1/0/2/manual_time 690 ms 690 ms 1 bytes_per_second=742.205M/s encoded_file_size=531.066M peak_memory_usage=1.3148G ParquetWrite/integral_void_output/29/1000/1/0/2/manual_time 282 ms 282 ms 2 bytes_per_second=1.76991G/s encoded_file_size=398.712M peak_memory_usage=1.49675G ParquetWrite/integral_void_output/29/0/32/0/2/manual_time 690 ms 690 ms 1 bytes_per_second=742.268M/s encoded_file_size=531.066M peak_memory_usage=1.3148G ParquetWrite/integral_void_output/29/1000/32/0/2/manual_time 59.5 ms 59.5 ms 12 bytes_per_second=8.40878G/s encoded_file_size=199.926M peak_memory_usage=1.49675G ``` Now: ``` ----------------------------------------------------------------------------------------------------------------------- Benchmark Time CPU Iterations UserCounters... ----------------------------------------------------------------------------------------------------------------------- ParquetWrite/integral_void_output/29/0/1/1/2/manual_time 733 ms 733 ms 1 bytes_per_second=698.24M/s encoded_file_size=530.706M peak_memory_usage=1.7804G ParquetWrite/integral_void_output/29/1000/1/1/2/manual_time 302 ms 302 ms 2 bytes_per_second=1.65496G/s encoded_file_size=397.998M peak_memory_usage=1.49675G ParquetWrite/integral_void_output/29/0/32/1/2/manual_time 733 ms 733 ms 1 bytes_per_second=698.701M/s encoded_file_size=530.706M peak_memory_usage=1.7804G ParquetWrite/integral_void_output/29/1000/32/1/2/manual_time 61.3 ms 61.3 ms 11 bytes_per_second=8.1533G/s encoded_file_size=159.572M peak_memory_usage=1.49675G ParquetWrite/integral_void_output/29/0/1/0/2/manual_time 688 ms 688 ms 1 bytes_per_second=743.71M/s encoded_file_size=531.066M peak_memory_usage=1.3148G ParquetWrite/integral_void_output/29/1000/1/0/2/manual_time 282 ms 282 ms 2 bytes_per_second=1.7712G/s encoded_file_size=398.712M peak_memory_usage=1.49675G ParquetWrite/integral_void_output/29/0/32/0/2/manual_time 688 ms 688 ms 1 bytes_per_second=743.658M/s encoded_file_size=531.066M peak_memory_usage=1.3148G ParquetWrite/integral_void_output/29/1000/32/0/2/manual_time 58.9 ms 58.9 ms 12 bytes_per_second=8.49093G/s encoded_file_size=199.926M peak_memory_usage=1.49675G ``` Authors: - Yunsong Wang (https://github.com/PointKernel) Approvers: - Jake Hemstad (https://github.com/jrhemstad) - Mike Wilson (https://github.com/hyperbolic2346) URL: https://github.com/rapidsai/cudf/pull/10635 --- cpp/src/io/parquet/chunk_dict.cu | 191 +++++++++++++---------------- cpp/src/io/parquet/parquet_gpu.hpp | 8 +- cpp/src/io/parquet/writer_impl.cu | 4 +- 3 files changed, 88 insertions(+), 115 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index f61cfa83579..d3ac491f416 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -22,19 +22,24 @@ #include +#include + namespace cudf { namespace io { namespace parquet { namespace gpu { +namespace { +constexpr int DEFAULT_BLOCK_SIZE = 256; +} template -__global__ void __launch_bounds__(block_size, 1) +__global__ void __launch_bounds__(block_size) initialize_chunk_hash_maps_kernel(device_span chunks) { auto chunk = chunks[blockIdx.x]; auto t = threadIdx.x; // fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk - for (size_t i = 0; i < chunk.dict_map_size; i += block_size) { + for (size_type i = 0; i < chunk.dict_map_size; i += block_size) { if (t + i < chunk.dict_map_size) { new (&chunk.dict_map_slots[t + i].first) map_type::atomic_key_type{KEY_SENTINEL}; new (&chunk.dict_map_slots[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL}; @@ -91,9 +96,8 @@ struct map_find_fn { }; template -__global__ void __launch_bounds__(block_size, 1) - populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags) +__global__ void __launch_bounds__(block_size) + populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan frags) { auto col_idx = blockIdx.y; auto block_x = blockIdx.x; @@ -102,70 +106,57 @@ __global__ void __launch_bounds__(block_size, 1) auto chunk = frag.chunk; auto col = chunk->col_desc; - size_type start_row = frag.start_row; - size_type end_row = frag.start_row + frag.num_rows; + if (not chunk->use_dictionary) { return; } - __shared__ size_type s_start_value_idx; - __shared__ size_type s_num_values; + using block_reduce = cub::BlockReduce; + __shared__ typename block_reduce::TempStorage reduce_storage; - if (not chunk->use_dictionary) { return; } + size_type start_row = frag.start_row; + size_type end_row = frag.start_row + frag.num_rows; - if (t == 0) { - // Find the bounds of values in leaf column to be inserted into the map for current chunk - auto cudf_col = *(col->parent_column); - s_start_value_idx = row_to_value_idx(start_row, cudf_col); - auto end_value_idx = row_to_value_idx(end_row, cudf_col); - s_num_values = end_value_idx - s_start_value_idx; - } - __syncthreads(); + // Find the bounds of values in leaf column to be inserted into the map for current chunk + auto const cudf_col = *(col->parent_column); + size_type const s_start_value_idx = row_to_value_idx(start_row, cudf_col); + size_type const end_value_idx = row_to_value_idx(end_row, cudf_col); column_device_view const& data_col = *col->leaf_column; - using block_reduce = cub::BlockReduce; - __shared__ typename block_reduce::TempStorage reduce_storage; // Make a view of the hash map auto hash_map_mutable = map_type::device_mutable_view( chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - auto hash_map = map_type::device_view( - chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - __shared__ int total_num_dict_entries; - for (size_type i = 0; i < s_num_values; i += block_size) { - // add the value to hash map - size_type val_idx = i + t + s_start_value_idx; - bool is_valid = - (i + t < s_num_values && val_idx < data_col.size()) and data_col.is_valid(val_idx); + __shared__ size_type total_num_dict_entries; + size_type val_idx = s_start_value_idx + t; + while (val_idx - block_size < end_value_idx) { + auto const is_valid = + val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx); // insert element at val_idx to hash map and count successful insertions size_type is_unique = 0; size_type uniq_elem_size = 0; if (is_valid) { - auto found_slot = type_dispatcher(data_col.type(), map_find_fn{hash_map}, data_col, val_idx); - if (found_slot == hash_map.end()) { - is_unique = - type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx); - uniq_elem_size = [&]() -> size_type { - if (not is_unique) { return 0; } - switch (col->physical_type) { - case Type::INT32: return 4; - case Type::INT64: return 8; - case Type::INT96: return 12; - case Type::FLOAT: return 4; - case Type::DOUBLE: return 8; - case Type::BYTE_ARRAY: - if (data_col.type().id() == type_id::STRING) { - // Strings are stored as 4 byte length + string bytes - return 4 + data_col.element(val_idx).size_bytes(); - } - case Type::FIXED_LEN_BYTE_ARRAY: - if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); } - default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding"); - } - }(); - } + is_unique = + type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx); + uniq_elem_size = [&]() -> size_type { + if (not is_unique) { return 0; } + switch (col->physical_type) { + case Type::INT32: return 4; + case Type::INT64: return 8; + case Type::INT96: return 12; + case Type::FLOAT: return 4; + case Type::DOUBLE: return 8; + case Type::BYTE_ARRAY: + if (data_col.type().id() == type_id::STRING) { + // Strings are stored as 4 byte length + string bytes + return 4 + data_col.element(val_idx).size_bytes(); + } + case Type::FIXED_LEN_BYTE_ARRAY: + if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); } + default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding"); + } + }(); } - __syncthreads(); auto num_unique = block_reduce(reduce_storage).Sum(is_unique); __syncthreads(); auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); @@ -178,11 +169,13 @@ __global__ void __launch_bounds__(block_size, 1) // Check if the num unique values in chunk has already exceeded max dict size and early exit if (total_num_dict_entries > MAX_DICT_SIZE) { return; } - } + + val_idx += block_size; + } // while } template -__global__ void __launch_bounds__(block_size, 1) +__global__ void __launch_bounds__(block_size) collect_map_entries_kernel(device_span chunks) { auto& chunk = chunks[blockIdx.x]; @@ -192,31 +185,30 @@ __global__ void __launch_bounds__(block_size, 1) auto map = map_type::device_view(chunk.dict_map_slots, chunk.dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - __shared__ size_type counter; - if (t == 0) counter = 0; + __shared__ cuda::atomic counter; + using cuda::std::memory_order_relaxed; + if (t == 0) { new (&counter) cuda::atomic{0}; } __syncthreads(); - for (size_t i = 0; i < chunk.dict_map_size; i += block_size) { + for (size_type i = 0; i < chunk.dict_map_size; i += block_size) { if (t + i < chunk.dict_map_size) { - auto slot = map.begin_slot() + t + i; - auto key = static_cast(slot->first); + auto* slot = reinterpret_cast(map.begin_slot() + t + i); + auto key = slot->first; if (key != KEY_SENTINEL) { - auto loc = atomicAdd(&counter, 1); + auto loc = counter.fetch_add(1, memory_order_relaxed); cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size"); chunk.dict_data[loc] = key; // If sorting dict page ever becomes a hard requirement, enable the following statement and // add a dict sorting step before storing into the slot's second field. // chunk.dict_data_idx[loc] = t + i; - slot->second.store(loc); - // TODO: ^ This doesn't need to be atomic. Try casting to value_type ptr and just writing. + slot->second = loc; } } } } template -__global__ void __launch_bounds__(block_size, 1) - get_dictionary_indices_kernel(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags) +__global__ void __launch_bounds__(block_size) + get_dictionary_indices_kernel(cudf::detail::device_2dspan frags) { auto col_idx = blockIdx.y; auto block_x = blockIdx.x; @@ -225,47 +217,38 @@ __global__ void __launch_bounds__(block_size, 1) auto chunk = frag.chunk; auto col = chunk->col_desc; + if (not chunk->use_dictionary) { return; } + size_type start_row = frag.start_row; size_type end_row = frag.start_row + frag.num_rows; - __shared__ size_type s_start_value_idx; - __shared__ size_type s_ck_start_val_idx; - __shared__ size_type s_num_values; - - if (t == 0) { - // Find the bounds of values in leaf column to be searched in the map for current chunk - auto cudf_col = *(col->parent_column); - s_start_value_idx = row_to_value_idx(start_row, cudf_col); - s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col); - auto end_value_idx = row_to_value_idx(end_row, cudf_col); - s_num_values = end_value_idx - s_start_value_idx; - } - __syncthreads(); - - if (not chunk->use_dictionary) { return; } + // Find the bounds of values in leaf column to be searched in the map for current chunk + auto const cudf_col = *(col->parent_column); + auto const s_start_value_idx = row_to_value_idx(start_row, cudf_col); + auto const s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col); + auto const end_value_idx = row_to_value_idx(end_row, cudf_col); column_device_view const& data_col = *col->leaf_column; auto map = map_type::device_view( chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); - for (size_t i = 0; i < s_num_values; i += block_size) { - if (t + i < s_num_values) { - auto val_idx = s_start_value_idx + t + i; - bool is_valid = - (i + t < s_num_values && val_idx < data_col.size()) ? data_col.is_valid(val_idx) : false; - - if (is_valid) { - auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx); - cudf_assert(found_slot != map.end() && - "Unable to find value in map in dictionary index construction"); - if (found_slot != map.end()) { - // No need for atomic as this is not going to be modified by any other thread - auto* val_ptr = reinterpret_cast(&found_slot->second); - chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr; - } + auto val_idx = s_start_value_idx + t; + while (val_idx < end_value_idx) { + auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx); + + if (is_valid) { + auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx); + cudf_assert(found_slot != map.end() && + "Unable to find value in map in dictionary index construction"); + if (found_slot != map.end()) { + // No need for atomic as this is not going to be modified by any other thread + auto* val_ptr = reinterpret_cast(&found_slot->second); + chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr; } } + + val_idx += block_size; } } @@ -276,15 +259,12 @@ void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_st <<>>(chunks); } -void populate_chunk_hash_maps(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags, +void populate_chunk_hash_maps(cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { - constexpr int block_size = 256; dim3 const dim_grid(frags.size().second, frags.size().first); - - populate_chunk_hash_maps_kernel - <<>>(chunks, frags); + populate_chunk_hash_maps_kernel + <<>>(frags); } void collect_map_entries(device_span chunks, rmm::cuda_stream_view stream) @@ -293,15 +273,12 @@ void collect_map_entries(device_span chunks, rmm::cuda_stream_vi collect_map_entries_kernel<<>>(chunks); } -void get_dictionary_indices(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags, +void get_dictionary_indices(cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { - constexpr int block_size = 256; dim3 const dim_grid(frags.size().second, frags.size().first); - - get_dictionary_indices_kernel - <<>>(chunks, frags); + get_dictionary_indices_kernel + <<>>(frags); } } // namespace gpu } // namespace parquet diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 8d0aa8881c3..53b82c73a35 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -529,12 +529,10 @@ void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_st /** * @brief Insert chunk values into their respective hash maps * - * @param chunks Column chunks [rowgroup][column] * @param frags Column fragments * @param stream CUDA stream to use */ -void populate_chunk_hash_maps(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags, +void populate_chunk_hash_maps(cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); /** @@ -554,12 +552,10 @@ void collect_map_entries(device_span chunks, rmm::cuda_stream_vi * Since dict_data itself contains indices into the original cudf column, this means that * col[row] == col[dict_data[dict_index[row - chunk.start_row]]] * - * @param chunks Column chunks [rowgroup][column] * @param frags Column fragments * @param stream CUDA stream to use */ -void get_dictionary_indices(cudf::detail::device_2dspan chunks, - cudf::detail::device_2dspan frags, +void get_dictionary_indices(cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 4bc084c61d0..92d436e4566 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -895,7 +895,7 @@ auto build_chunk_dictionaries(hostdevice_2dvector& chunks, chunks.host_to_device(stream); gpu::initialize_chunk_hash_maps(chunks.device_view().flat_view(), stream); - gpu::populate_chunk_hash_maps(chunks, frags, stream); + gpu::populate_chunk_hash_maps(frags, stream); chunks.device_to_host(stream, true); @@ -944,7 +944,7 @@ auto build_chunk_dictionaries(hostdevice_2dvector& chunks, } chunks.host_to_device(stream); gpu::collect_map_entries(chunks.device_view().flat_view(), stream); - gpu::get_dictionary_indices(chunks.device_view(), frags, stream); + gpu::get_dictionary_indices(frags, stream); return std::make_pair(std::move(dict_data), std::move(dict_index)); }