Skip to content

Commit

Permalink
Improve parquet dictionary encoding (#10635)
Browse files Browse the repository at this point in the history
This PR includes several changes to improve parquet dictionary encoding:

- API cleanups: get rid of unused arguments
- Remove min block limit in ` __launch_bounds__`
- Simplify the grid-stride loop logic by using `while`
- All threads calculate start/end indices instead of one doing the calculation and broadcasting the result (no more shared memory or block-wide sync).

Other ideas tested but not eventually included in this PR due to zero or negative performance impact:

- Tuning hash map occupancy
- `cg::shfl` instead of shared memory + sync
- CG based `insert`/`find`
- Relaxed atomic for `num_dict_entries` and `uniq_data_size`
- `cg::reduce` instead of `cub::BlockReduce`

Before:
```
-----------------------------------------------------------------------------------------------------------------------
Benchmark                                                             Time             CPU   Iterations UserCounters...
-----------------------------------------------------------------------------------------------------------------------
ParquetWrite/integral_void_output/29/0/1/1/2/manual_time            734 ms          734 ms            1 bytes_per_second=697.128M/s encoded_file_size=530.706M peak_memory_usage=1.7804G
ParquetWrite/integral_void_output/29/1000/1/1/2/manual_time         303 ms          303 ms            2 bytes_per_second=1.65131G/s encoded_file_size=397.998M peak_memory_usage=1.49675G
ParquetWrite/integral_void_output/29/0/32/1/2/manual_time           734 ms          734 ms            1 bytes_per_second=697.713M/s encoded_file_size=530.706M peak_memory_usage=1.7804G
ParquetWrite/integral_void_output/29/1000/32/1/2/manual_time       61.9 ms         61.9 ms           11 bytes_per_second=8.07721G/s encoded_file_size=159.574M peak_memory_usage=1.49675G
ParquetWrite/integral_void_output/29/0/1/0/2/manual_time            690 ms          690 ms            1 bytes_per_second=742.205M/s encoded_file_size=531.066M peak_memory_usage=1.3148G
ParquetWrite/integral_void_output/29/1000/1/0/2/manual_time         282 ms          282 ms            2 bytes_per_second=1.76991G/s encoded_file_size=398.712M peak_memory_usage=1.49675G
ParquetWrite/integral_void_output/29/0/32/0/2/manual_time           690 ms          690 ms            1 bytes_per_second=742.268M/s encoded_file_size=531.066M peak_memory_usage=1.3148G
ParquetWrite/integral_void_output/29/1000/32/0/2/manual_time       59.5 ms         59.5 ms           12 bytes_per_second=8.40878G/s encoded_file_size=199.926M peak_memory_usage=1.49675G
```

Now:
```
-----------------------------------------------------------------------------------------------------------------------
Benchmark                                                             Time             CPU   Iterations UserCounters...
-----------------------------------------------------------------------------------------------------------------------
ParquetWrite/integral_void_output/29/0/1/1/2/manual_time            733 ms          733 ms            1 bytes_per_second=698.24M/s encoded_file_size=530.706M peak_memory_usage=1.7804G
ParquetWrite/integral_void_output/29/1000/1/1/2/manual_time         302 ms          302 ms            2 bytes_per_second=1.65496G/s encoded_file_size=397.998M peak_memory_usage=1.49675G
ParquetWrite/integral_void_output/29/0/32/1/2/manual_time           733 ms          733 ms            1 bytes_per_second=698.701M/s encoded_file_size=530.706M peak_memory_usage=1.7804G
ParquetWrite/integral_void_output/29/1000/32/1/2/manual_time       61.3 ms         61.3 ms           11 bytes_per_second=8.1533G/s encoded_file_size=159.572M peak_memory_usage=1.49675G
ParquetWrite/integral_void_output/29/0/1/0/2/manual_time            688 ms          688 ms            1 bytes_per_second=743.71M/s encoded_file_size=531.066M peak_memory_usage=1.3148G
ParquetWrite/integral_void_output/29/1000/1/0/2/manual_time         282 ms          282 ms            2 bytes_per_second=1.7712G/s encoded_file_size=398.712M peak_memory_usage=1.49675G
ParquetWrite/integral_void_output/29/0/32/0/2/manual_time           688 ms          688 ms            1 bytes_per_second=743.658M/s encoded_file_size=531.066M peak_memory_usage=1.3148G
ParquetWrite/integral_void_output/29/1000/32/0/2/manual_time       58.9 ms         58.9 ms           12 bytes_per_second=8.49093G/s encoded_file_size=199.926M peak_memory_usage=1.49675G
```

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Jake Hemstad (https://github.com/jrhemstad)
  - Mike Wilson (https://github.com/hyperbolic2346)

URL: #10635
  • Loading branch information
PointKernel authored Apr 20, 2022
1 parent 65b1cbd commit 017d52a
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 115 deletions.
191 changes: 84 additions & 107 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@

#include <rmm/exec_policy.hpp>

#include <cuda/atomic>

namespace cudf {
namespace io {
namespace parquet {
namespace gpu {
namespace {
constexpr int DEFAULT_BLOCK_SIZE = 256;
}

template <int block_size>
__global__ void __launch_bounds__(block_size, 1)
__global__ void __launch_bounds__(block_size)
initialize_chunk_hash_maps_kernel(device_span<EncColumnChunk> chunks)
{
auto chunk = chunks[blockIdx.x];
auto t = threadIdx.x;
// fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk
for (size_t i = 0; i < chunk.dict_map_size; i += block_size) {
for (size_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
new (&chunk.dict_map_slots[t + i].first) map_type::atomic_key_type{KEY_SENTINEL};
new (&chunk.dict_map_slots[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL};
Expand Down Expand Up @@ -91,9 +96,8 @@ struct map_find_fn {
};

template <int block_size>
__global__ void __launch_bounds__(block_size, 1)
populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags)
__global__ void __launch_bounds__(block_size)
populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan<gpu::PageFragment const> frags)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
Expand All @@ -102,70 +106,57 @@ __global__ void __launch_bounds__(block_size, 1)
auto chunk = frag.chunk;
auto col = chunk->col_desc;

size_type start_row = frag.start_row;
size_type end_row = frag.start_row + frag.num_rows;
if (not chunk->use_dictionary) { return; }

__shared__ size_type s_start_value_idx;
__shared__ size_type s_num_values;
using block_reduce = cub::BlockReduce<size_type, block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;

if (not chunk->use_dictionary) { return; }
size_type start_row = frag.start_row;
size_type end_row = frag.start_row + frag.num_rows;

if (t == 0) {
// Find the bounds of values in leaf column to be inserted into the map for current chunk
auto cudf_col = *(col->parent_column);
s_start_value_idx = row_to_value_idx(start_row, cudf_col);
auto end_value_idx = row_to_value_idx(end_row, cudf_col);
s_num_values = end_value_idx - s_start_value_idx;
}
__syncthreads();
// Find the bounds of values in leaf column to be inserted into the map for current chunk
auto const cudf_col = *(col->parent_column);
size_type const s_start_value_idx = row_to_value_idx(start_row, cudf_col);
size_type const end_value_idx = row_to_value_idx(end_row, cudf_col);

column_device_view const& data_col = *col->leaf_column;
using block_reduce = cub::BlockReduce<size_type, block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;

// Make a view of the hash map
auto hash_map_mutable = map_type::device_mutable_view(
chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL);
auto hash_map = map_type::device_view(
chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL);

__shared__ int total_num_dict_entries;
for (size_type i = 0; i < s_num_values; i += block_size) {
// add the value to hash map
size_type val_idx = i + t + s_start_value_idx;
bool is_valid =
(i + t < s_num_values && val_idx < data_col.size()) and data_col.is_valid(val_idx);
__shared__ size_type total_num_dict_entries;
size_type val_idx = s_start_value_idx + t;
while (val_idx - block_size < end_value_idx) {
auto const is_valid =
val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx);

// insert element at val_idx to hash map and count successful insertions
size_type is_unique = 0;
size_type uniq_elem_size = 0;
if (is_valid) {
auto found_slot = type_dispatcher(data_col.type(), map_find_fn{hash_map}, data_col, val_idx);
if (found_slot == hash_map.end()) {
is_unique =
type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx);
uniq_elem_size = [&]() -> size_type {
if (not is_unique) { return 0; }
switch (col->physical_type) {
case Type::INT32: return 4;
case Type::INT64: return 8;
case Type::INT96: return 12;
case Type::FLOAT: return 4;
case Type::DOUBLE: return 8;
case Type::BYTE_ARRAY:
if (data_col.type().id() == type_id::STRING) {
// Strings are stored as 4 byte length + string bytes
return 4 + data_col.element<string_view>(val_idx).size_bytes();
}
case Type::FIXED_LEN_BYTE_ARRAY:
if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); }
default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding");
}
}();
}
is_unique =
type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx);
uniq_elem_size = [&]() -> size_type {
if (not is_unique) { return 0; }
switch (col->physical_type) {
case Type::INT32: return 4;
case Type::INT64: return 8;
case Type::INT96: return 12;
case Type::FLOAT: return 4;
case Type::DOUBLE: return 8;
case Type::BYTE_ARRAY:
if (data_col.type().id() == type_id::STRING) {
// Strings are stored as 4 byte length + string bytes
return 4 + data_col.element<string_view>(val_idx).size_bytes();
}
case Type::FIXED_LEN_BYTE_ARRAY:
if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); }
default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding");
}
}();
}

__syncthreads();
auto num_unique = block_reduce(reduce_storage).Sum(is_unique);
__syncthreads();
auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size);
Expand All @@ -178,11 +169,13 @@ __global__ void __launch_bounds__(block_size, 1)

// Check if the num unique values in chunk has already exceeded max dict size and early exit
if (total_num_dict_entries > MAX_DICT_SIZE) { return; }
}

val_idx += block_size;
} // while
}

template <int block_size>
__global__ void __launch_bounds__(block_size, 1)
__global__ void __launch_bounds__(block_size)
collect_map_entries_kernel(device_span<EncColumnChunk> chunks)
{
auto& chunk = chunks[blockIdx.x];
Expand All @@ -192,31 +185,30 @@ __global__ void __launch_bounds__(block_size, 1)
auto map =
map_type::device_view(chunk.dict_map_slots, chunk.dict_map_size, KEY_SENTINEL, VALUE_SENTINEL);

__shared__ size_type counter;
if (t == 0) counter = 0;
__shared__ cuda::atomic<size_type, cuda::thread_scope_block> counter;
using cuda::std::memory_order_relaxed;
if (t == 0) { new (&counter) cuda::atomic<size_type, cuda::thread_scope_block>{0}; }
__syncthreads();
for (size_t i = 0; i < chunk.dict_map_size; i += block_size) {
for (size_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
auto slot = map.begin_slot() + t + i;
auto key = static_cast<map_type::key_type>(slot->first);
auto* slot = reinterpret_cast<map_type::value_type*>(map.begin_slot() + t + i);
auto key = slot->first;
if (key != KEY_SENTINEL) {
auto loc = atomicAdd(&counter, 1);
auto loc = counter.fetch_add(1, memory_order_relaxed);
cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size");
chunk.dict_data[loc] = key;
// If sorting dict page ever becomes a hard requirement, enable the following statement and
// add a dict sorting step before storing into the slot's second field.
// chunk.dict_data_idx[loc] = t + i;
slot->second.store(loc);
// TODO: ^ This doesn't need to be atomic. Try casting to value_type ptr and just writing.
slot->second = loc;
}
}
}
}

template <int block_size>
__global__ void __launch_bounds__(block_size, 1)
get_dictionary_indices_kernel(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags)
__global__ void __launch_bounds__(block_size)
get_dictionary_indices_kernel(cudf::detail::device_2dspan<gpu::PageFragment const> frags)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
Expand All @@ -225,47 +217,38 @@ __global__ void __launch_bounds__(block_size, 1)
auto chunk = frag.chunk;
auto col = chunk->col_desc;

if (not chunk->use_dictionary) { return; }

size_type start_row = frag.start_row;
size_type end_row = frag.start_row + frag.num_rows;

__shared__ size_type s_start_value_idx;
__shared__ size_type s_ck_start_val_idx;
__shared__ size_type s_num_values;

if (t == 0) {
// Find the bounds of values in leaf column to be searched in the map for current chunk
auto cudf_col = *(col->parent_column);
s_start_value_idx = row_to_value_idx(start_row, cudf_col);
s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col);
auto end_value_idx = row_to_value_idx(end_row, cudf_col);
s_num_values = end_value_idx - s_start_value_idx;
}
__syncthreads();

if (not chunk->use_dictionary) { return; }
// Find the bounds of values in leaf column to be searched in the map for current chunk
auto const cudf_col = *(col->parent_column);
auto const s_start_value_idx = row_to_value_idx(start_row, cudf_col);
auto const s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col);
auto const end_value_idx = row_to_value_idx(end_row, cudf_col);

column_device_view const& data_col = *col->leaf_column;

auto map = map_type::device_view(
chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL);

for (size_t i = 0; i < s_num_values; i += block_size) {
if (t + i < s_num_values) {
auto val_idx = s_start_value_idx + t + i;
bool is_valid =
(i + t < s_num_values && val_idx < data_col.size()) ? data_col.is_valid(val_idx) : false;

if (is_valid) {
auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
if (found_slot != map.end()) {
// No need for atomic as this is not going to be modified by any other thread
auto* val_ptr = reinterpret_cast<map_type::mapped_type*>(&found_slot->second);
chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr;
}
auto val_idx = s_start_value_idx + t;
while (val_idx < end_value_idx) {
auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx);

if (is_valid) {
auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
if (found_slot != map.end()) {
// No need for atomic as this is not going to be modified by any other thread
auto* val_ptr = reinterpret_cast<map_type::mapped_type*>(&found_slot->second);
chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr;
}
}

val_idx += block_size;
}
}

Expand All @@ -276,15 +259,12 @@ void initialize_chunk_hash_maps(device_span<EncColumnChunk> chunks, rmm::cuda_st
<<<chunks.size(), block_size, 0, stream.value()>>>(chunks);
}

void populate_chunk_hash_maps(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags,
void populate_chunk_hash_maps(cudf::detail::device_2dspan<gpu::PageFragment const> frags,
rmm::cuda_stream_view stream)
{
constexpr int block_size = 256;
dim3 const dim_grid(frags.size().second, frags.size().first);

populate_chunk_hash_maps_kernel<block_size>
<<<dim_grid, block_size, 0, stream.value()>>>(chunks, frags);
populate_chunk_hash_maps_kernel<DEFAULT_BLOCK_SIZE>
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(frags);
}

void collect_map_entries(device_span<EncColumnChunk> chunks, rmm::cuda_stream_view stream)
Expand All @@ -293,15 +273,12 @@ void collect_map_entries(device_span<EncColumnChunk> chunks, rmm::cuda_stream_vi
collect_map_entries_kernel<block_size><<<chunks.size(), block_size, 0, stream.value()>>>(chunks);
}

void get_dictionary_indices(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags,
void get_dictionary_indices(cudf::detail::device_2dspan<gpu::PageFragment const> frags,
rmm::cuda_stream_view stream)
{
constexpr int block_size = 256;
dim3 const dim_grid(frags.size().second, frags.size().first);

get_dictionary_indices_kernel<block_size>
<<<dim_grid, block_size, 0, stream.value()>>>(chunks, frags);
get_dictionary_indices_kernel<DEFAULT_BLOCK_SIZE>
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(frags);
}
} // namespace gpu
} // namespace parquet
Expand Down
8 changes: 2 additions & 6 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,12 +529,10 @@ void initialize_chunk_hash_maps(device_span<EncColumnChunk> chunks, rmm::cuda_st
/**
* @brief Insert chunk values into their respective hash maps
*
* @param chunks Column chunks [rowgroup][column]
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void populate_chunk_hash_maps(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags,
void populate_chunk_hash_maps(cudf::detail::device_2dspan<gpu::PageFragment const> frags,
rmm::cuda_stream_view stream);

/**
Expand All @@ -554,12 +552,10 @@ void collect_map_entries(device_span<EncColumnChunk> chunks, rmm::cuda_stream_vi
* Since dict_data itself contains indices into the original cudf column, this means that
* col[row] == col[dict_data[dict_index[row - chunk.start_row]]]
*
* @param chunks Column chunks [rowgroup][column]
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void get_dictionary_indices(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags,
void get_dictionary_indices(cudf::detail::device_2dspan<gpu::PageFragment const> frags,
rmm::cuda_stream_view stream);

/**
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ auto build_chunk_dictionaries(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
chunks.host_to_device(stream);

gpu::initialize_chunk_hash_maps(chunks.device_view().flat_view(), stream);
gpu::populate_chunk_hash_maps(chunks, frags, stream);
gpu::populate_chunk_hash_maps(frags, stream);

chunks.device_to_host(stream, true);

Expand Down Expand Up @@ -944,7 +944,7 @@ auto build_chunk_dictionaries(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
}
chunks.host_to_device(stream);
gpu::collect_map_entries(chunks.device_view().flat_view(), stream);
gpu::get_dictionary_indices(chunks.device_view(), frags, stream);
gpu::get_dictionary_indices(frags, stream);

return std::make_pair(std::move(dict_data), std::move(dict_index));
}
Expand Down

0 comments on commit 017d52a

Please sign in to comment.