Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parquet dictionary encoding #10635

Merged
191 changes: 84 additions & 107 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@

#include <rmm/exec_policy.hpp>

#include <cuda/atomic>

namespace cudf {
namespace io {
namespace parquet {
namespace gpu {
namespace {
constexpr int DEFAULT_BLOCK_SIZE = 256;
}

template <int block_size>
__global__ void __launch_bounds__(block_size, 1)
__global__ void __launch_bounds__(block_size)
initialize_chunk_hash_maps_kernel(device_span<EncColumnChunk> chunks)
{
auto chunk = chunks[blockIdx.x];
auto t = threadIdx.x;
// fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk
for (size_t i = 0; i < chunk.dict_map_size; i += block_size) {
for (size_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
new (&chunk.dict_map_slots[t + i].first) map_type::atomic_key_type{KEY_SENTINEL};
new (&chunk.dict_map_slots[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL};
Expand Down Expand Up @@ -91,9 +96,8 @@ struct map_find_fn {
};

template <int block_size>
__global__ void __launch_bounds__(block_size, 1)
populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags)
__global__ void __launch_bounds__(block_size)
populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan<gpu::PageFragment const> frags)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
Expand All @@ -102,70 +106,57 @@ __global__ void __launch_bounds__(block_size, 1)
auto chunk = frag.chunk;
auto col = chunk->col_desc;

size_type start_row = frag.start_row;
size_type end_row = frag.start_row + frag.num_rows;
if (not chunk->use_dictionary) { return; }

__shared__ size_type s_start_value_idx;
__shared__ size_type s_num_values;
using block_reduce = cub::BlockReduce<size_type, block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;

if (not chunk->use_dictionary) { return; }
size_type start_row = frag.start_row;
size_type end_row = frag.start_row + frag.num_rows;

if (t == 0) {
// Find the bounds of values in leaf column to be inserted into the map for current chunk
auto cudf_col = *(col->parent_column);
s_start_value_idx = row_to_value_idx(start_row, cudf_col);
auto end_value_idx = row_to_value_idx(end_row, cudf_col);
s_num_values = end_value_idx - s_start_value_idx;
}
__syncthreads();
// Find the bounds of values in leaf column to be inserted into the map for current chunk
auto const cudf_col = *(col->parent_column);
size_type const s_start_value_idx = row_to_value_idx(start_row, cudf_col);
size_type const end_value_idx = row_to_value_idx(end_row, cudf_col);

column_device_view const& data_col = *col->leaf_column;
using block_reduce = cub::BlockReduce<size_type, block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;

// Make a view of the hash map
auto hash_map_mutable = map_type::device_mutable_view(
chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL);
auto hash_map = map_type::device_view(
chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL);

__shared__ int total_num_dict_entries;
for (size_type i = 0; i < s_num_values; i += block_size) {
// add the value to hash map
size_type val_idx = i + t + s_start_value_idx;
bool is_valid =
(i + t < s_num_values && val_idx < data_col.size()) and data_col.is_valid(val_idx);
__shared__ size_type total_num_dict_entries;
size_type val_idx = s_start_value_idx + t;
while (val_idx - block_size < end_value_idx) {
auto const is_valid =
val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx);

// insert element at val_idx to hash map and count successful insertions
size_type is_unique = 0;
size_type uniq_elem_size = 0;
if (is_valid) {
auto found_slot = type_dispatcher(data_col.type(), map_find_fn{hash_map}, data_col, val_idx);
if (found_slot == hash_map.end()) {
is_unique =
type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx);
uniq_elem_size = [&]() -> size_type {
if (not is_unique) { return 0; }
switch (col->physical_type) {
case Type::INT32: return 4;
case Type::INT64: return 8;
case Type::INT96: return 12;
case Type::FLOAT: return 4;
case Type::DOUBLE: return 8;
case Type::BYTE_ARRAY:
if (data_col.type().id() == type_id::STRING) {
// Strings are stored as 4 byte length + string bytes
return 4 + data_col.element<string_view>(val_idx).size_bytes();
}
case Type::FIXED_LEN_BYTE_ARRAY:
if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); }
default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding");
}
}();
}
is_unique =
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx);
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
uniq_elem_size = [&]() -> size_type {
if (not is_unique) { return 0; }
switch (col->physical_type) {
case Type::INT32: return 4;
case Type::INT64: return 8;
case Type::INT96: return 12;
case Type::FLOAT: return 4;
case Type::DOUBLE: return 8;
case Type::BYTE_ARRAY:
if (data_col.type().id() == type_id::STRING) {
// Strings are stored as 4 byte length + string bytes
return 4 + data_col.element<string_view>(val_idx).size_bytes();
}
case Type::FIXED_LEN_BYTE_ARRAY:
if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); }
default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding");
}
Comment on lines +140 to +156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This switch seems redundant with the type_dispatcher. Couldn't the map_insert_fn be made to return the same information and avoid the extra switch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, it seems like this could be simplified to:

auto const [is_unique, element_size] = is_valid ? type_dispatcher(...) : {0, 0};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are parquet types, not cudf types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the parquet type can be derived the cudf type?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even better would be to push the is_valid check inside map_insert_fn. Then I'd write this as:

while (val_idx - block_size < end_value_idx) {
   thrust::optional<size_type> unique_element_size = type_dispatcher(...);
   ...
   auto const num_unique = block_reduce(reduce_storage).Sum( unique_element_size.has_value() );
   __syncthreads();
   auto const unique_data_size = block_reduce(reduce_storage).Sum(unique_element_size.value_or(0));
   ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the parquet type can be derived the cudf type?

Not trivially. There can be multiple parquet types associated with a cuDF type and the decision to use which one is determined by a user passed metadata which gets to here.

If the user wants time stamp to be encoded with int96 instead of int64 then that might affect the decision to use dictionary.

}();
}

__syncthreads();
auto num_unique = block_reduce(reduce_storage).Sum(is_unique);
__syncthreads();
auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size);
Expand All @@ -178,11 +169,13 @@ __global__ void __launch_bounds__(block_size, 1)

// Check if the num unique values in chunk has already exceeded max dict size and early exit
if (total_num_dict_entries > MAX_DICT_SIZE) { return; }
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
}

val_idx += block_size;
} // while
}

template <int block_size>
__global__ void __launch_bounds__(block_size, 1)
__global__ void __launch_bounds__(block_size)
collect_map_entries_kernel(device_span<EncColumnChunk> chunks)
{
auto& chunk = chunks[blockIdx.x];
Expand All @@ -192,31 +185,30 @@ __global__ void __launch_bounds__(block_size, 1)
auto map =
map_type::device_view(chunk.dict_map_slots, chunk.dict_map_size, KEY_SENTINEL, VALUE_SENTINEL);

__shared__ size_type counter;
if (t == 0) counter = 0;
__shared__ cuda::atomic<size_type, cuda::thread_scope_block> counter;
using cuda::std::memory_order_relaxed;
if (t == 0) { new (&counter) cuda::atomic<size_type, cuda::thread_scope_block>{0}; }
__syncthreads();
for (size_t i = 0; i < chunk.dict_map_size; i += block_size) {
for (size_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
auto slot = map.begin_slot() + t + i;
auto key = static_cast<map_type::key_type>(slot->first);
auto* slot = reinterpret_cast<map_type::value_type*>(map.begin_slot() + t + i);
auto key = slot->first;
if (key != KEY_SENTINEL) {
auto loc = atomicAdd(&counter, 1);
auto loc = counter.fetch_add(1, memory_order_relaxed);
cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size");
chunk.dict_data[loc] = key;
// If sorting dict page ever becomes a hard requirement, enable the following statement and
// add a dict sorting step before storing into the slot's second field.
// chunk.dict_data_idx[loc] = t + i;
slot->second.store(loc);
// TODO: ^ This doesn't need to be atomic. Try casting to value_type ptr and just writing.
slot->second = loc;
}
}
}
}

template <int block_size>
__global__ void __launch_bounds__(block_size, 1)
get_dictionary_indices_kernel(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags)
__global__ void __launch_bounds__(block_size)
get_dictionary_indices_kernel(cudf::detail::device_2dspan<gpu::PageFragment const> frags)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
Expand All @@ -225,47 +217,38 @@ __global__ void __launch_bounds__(block_size, 1)
auto chunk = frag.chunk;
auto col = chunk->col_desc;

if (not chunk->use_dictionary) { return; }

size_type start_row = frag.start_row;
size_type end_row = frag.start_row + frag.num_rows;

__shared__ size_type s_start_value_idx;
__shared__ size_type s_ck_start_val_idx;
__shared__ size_type s_num_values;

if (t == 0) {
// Find the bounds of values in leaf column to be searched in the map for current chunk
auto cudf_col = *(col->parent_column);
s_start_value_idx = row_to_value_idx(start_row, cudf_col);
s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col);
auto end_value_idx = row_to_value_idx(end_row, cudf_col);
s_num_values = end_value_idx - s_start_value_idx;
}
__syncthreads();

if (not chunk->use_dictionary) { return; }
// Find the bounds of values in leaf column to be searched in the map for current chunk
auto const cudf_col = *(col->parent_column);
auto const s_start_value_idx = row_to_value_idx(start_row, cudf_col);
auto const s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col);
auto const end_value_idx = row_to_value_idx(end_row, cudf_col);

column_device_view const& data_col = *col->leaf_column;

auto map = map_type::device_view(
chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL);

for (size_t i = 0; i < s_num_values; i += block_size) {
if (t + i < s_num_values) {
auto val_idx = s_start_value_idx + t + i;
bool is_valid =
(i + t < s_num_values && val_idx < data_col.size()) ? data_col.is_valid(val_idx) : false;

if (is_valid) {
auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
if (found_slot != map.end()) {
// No need for atomic as this is not going to be modified by any other thread
auto* val_ptr = reinterpret_cast<map_type::mapped_type*>(&found_slot->second);
chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr;
}
auto val_idx = s_start_value_idx + t;
while (val_idx < end_value_idx) {
auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx);

if (is_valid) {
auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
if (found_slot != map.end()) {
// No need for atomic as this is not going to be modified by any other thread
auto* val_ptr = reinterpret_cast<map_type::mapped_type*>(&found_slot->second);
chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr;
}
}

val_idx += block_size;
}
}

Expand All @@ -276,15 +259,12 @@ void initialize_chunk_hash_maps(device_span<EncColumnChunk> chunks, rmm::cuda_st
<<<chunks.size(), block_size, 0, stream.value()>>>(chunks);
}

void populate_chunk_hash_maps(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags,
void populate_chunk_hash_maps(cudf::detail::device_2dspan<gpu::PageFragment const> frags,
rmm::cuda_stream_view stream)
{
constexpr int block_size = 256;
dim3 const dim_grid(frags.size().second, frags.size().first);

populate_chunk_hash_maps_kernel<block_size>
<<<dim_grid, block_size, 0, stream.value()>>>(chunks, frags);
populate_chunk_hash_maps_kernel<DEFAULT_BLOCK_SIZE>
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(frags);
}

void collect_map_entries(device_span<EncColumnChunk> chunks, rmm::cuda_stream_view stream)
Expand All @@ -293,15 +273,12 @@ void collect_map_entries(device_span<EncColumnChunk> chunks, rmm::cuda_stream_vi
collect_map_entries_kernel<block_size><<<chunks.size(), block_size, 0, stream.value()>>>(chunks);
}

void get_dictionary_indices(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags,
void get_dictionary_indices(cudf::detail::device_2dspan<gpu::PageFragment const> frags,
rmm::cuda_stream_view stream)
{
constexpr int block_size = 256;
dim3 const dim_grid(frags.size().second, frags.size().first);

get_dictionary_indices_kernel<block_size>
<<<dim_grid, block_size, 0, stream.value()>>>(chunks, frags);
get_dictionary_indices_kernel<DEFAULT_BLOCK_SIZE>
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(frags);
}
} // namespace gpu
} // namespace parquet
Expand Down
8 changes: 2 additions & 6 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,12 +529,10 @@ void initialize_chunk_hash_maps(device_span<EncColumnChunk> chunks, rmm::cuda_st
/**
* @brief Insert chunk values into their respective hash maps
*
* @param chunks Column chunks [rowgroup][column]
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void populate_chunk_hash_maps(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags,
void populate_chunk_hash_maps(cudf::detail::device_2dspan<gpu::PageFragment const> frags,
rmm::cuda_stream_view stream);

/**
Expand All @@ -554,12 +552,10 @@ void collect_map_entries(device_span<EncColumnChunk> chunks, rmm::cuda_stream_vi
* Since dict_data itself contains indices into the original cudf column, this means that
* col[row] == col[dict_data[dict_index[row - chunk.start_row]]]
*
* @param chunks Column chunks [rowgroup][column]
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void get_dictionary_indices(cudf::detail::device_2dspan<EncColumnChunk> chunks,
cudf::detail::device_2dspan<gpu::PageFragment const> frags,
void get_dictionary_indices(cudf::detail::device_2dspan<gpu::PageFragment const> frags,
rmm::cuda_stream_view stream);

/**
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ auto build_chunk_dictionaries(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
chunks.host_to_device(stream);

gpu::initialize_chunk_hash_maps(chunks.device_view().flat_view(), stream);
gpu::populate_chunk_hash_maps(chunks, frags, stream);
gpu::populate_chunk_hash_maps(frags, stream);

chunks.device_to_host(stream, true);

Expand Down Expand Up @@ -978,7 +978,7 @@ auto build_chunk_dictionaries(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
}
chunks.host_to_device(stream);
gpu::collect_map_entries(chunks.device_view().flat_view(), stream);
gpu::get_dictionary_indices(chunks.device_view(), frags, stream);
gpu::get_dictionary_indices(frags, stream);

return std::make_pair(std::move(dict_data), std::move(dict_index));
}
Expand Down