Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor dictionary encoding in PQ writer to migrate to the new cuco::static_map #16541

Merged
merged 39 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b09cfa0
Works. Need to figure out equality_fn
mhaseeb123 Aug 12, 2024
089f909
Almost works. Need to do cg split work.
mhaseeb123 Aug 13, 2024
fb742fb
Some updates
mhaseeb123 Aug 13, 2024
b9067b0
Fixes and improvements. 330/346 gtests passing
mhaseeb123 Aug 13, 2024
44a97ce
Fix headers being included
mhaseeb123 Aug 13, 2024
15de062
Merge branch 'branch-24.10' into fea-refactor-dict-pq
mhaseeb123 Aug 13, 2024
ac826ff
Merge branch 'fea-refactor-dict-pq' of https://github.com/mhaseeb123/…
mhaseeb123 Aug 13, 2024
393eaa6
All tests passing
mhaseeb123 Aug 13, 2024
c27c4cf
Cosmetic improvements
mhaseeb123 Aug 13, 2024
76a2d14
Revert changes to tests
mhaseeb123 Aug 13, 2024
ec32c0a
Migrate find_fn to use a tile
mhaseeb123 Aug 14, 2024
0546fcc
Functionally correct working solution
mhaseeb123 Aug 14, 2024
a14007d
Updated insert with tiles
mhaseeb123 Aug 14, 2024
4935c66
Cosmetic updates
mhaseeb123 Aug 14, 2024
63dab9d
Minor improvements
mhaseeb123 Aug 14, 2024
40a2f39
Use span instead of raw pointers.
mhaseeb123 Aug 14, 2024
26a3b77
Merge branch 'branch-24.10' into fea-refactor-dict-pq
mhaseeb123 Aug 14, 2024
8f2e650
Minor improvements
mhaseeb123 Aug 14, 2024
7bcca45
Minor improvements
mhaseeb123 Aug 14, 2024
b0e482e
Perf improvements
mhaseeb123 Aug 15, 2024
4f51253
Cosmetic improvements
mhaseeb123 Aug 15, 2024
16fa57e
Change cg_size to 1 for best perf so far
mhaseeb123 Aug 15, 2024
e394a71
Incorporate for window_size
mhaseeb123 Aug 15, 2024
3e6a0b7
Use double hashing
mhaseeb123 Aug 15, 2024
27bee05
Avoid reconstructing hash_map_ref again and again.
mhaseeb123 Aug 16, 2024
fd2cb7f
Add allocator to map's aow_storage
mhaseeb123 Aug 16, 2024
39c2a35
Perf optimization
mhaseeb123 Aug 17, 2024
1b64540
Update comments
mhaseeb123 Aug 17, 2024
2478ce0
Merge branch 'branch-24.10' into fea-refactor-dict-pq
mhaseeb123 Aug 17, 2024
3f331e0
Merge branch 'branch-24.10' into fea-refactor-dict-pq
mhaseeb123 Aug 19, 2024
4dccdf5
Merge branch 'branch-24.10' into fea-refactor-dict-pq
mhaseeb123 Aug 19, 2024
a0f5aab
Minor improvement
mhaseeb123 Aug 19, 2024
c7b8ddd
Merge branch 'fea-refactor-dict-pq' of https://github.com/mhaseeb123/…
mhaseeb123 Aug 19, 2024
9a16e30
Minor updates
mhaseeb123 Aug 20, 2024
24d16eb
Apply clang-format
mhaseeb123 Aug 20, 2024
f18c1f7
Address reviewer comments
mhaseeb123 Aug 21, 2024
50019fb
Merge branch 'branch-24.10' into fea-refactor-dict-pq
mhaseeb123 Aug 21, 2024
8f94aca
Apply suggestion
mhaseeb123 Aug 29, 2024
9cefe1d
Merge branch 'branch-24.10' into fea-refactor-dict-pq
mhaseeb123 Aug 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 134 additions & 88 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,75 +22,128 @@

#include <rmm/exec_policy.hpp>

#include <cooperative_groups.h>
#include <cuda/atomic>

namespace cudf::io::parquet::detail {

namespace cg = cooperative_groups;

namespace {
constexpr int DEFAULT_BLOCK_SIZE = 256;
}

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
initialize_chunk_hash_maps_kernel(device_span<EncColumnChunk> chunks)
{
auto const chunk = chunks[blockIdx.x];
auto const t = threadIdx.x;
// fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk
for (thread_index_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
new (&chunk.dict_map_slots[t + i].first) map_type::atomic_key_type{KEY_SENTINEL};
new (&chunk.dict_map_slots[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL};
}
}
}

template <typename T>
struct equality_functor {
column_device_view const& col;
__device__ bool operator()(size_type lhs_idx, size_type rhs_idx)
__device__ bool operator()(key_type const lhs_idx, key_type const rhs_idx) const
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
{
// We don't call this for nulls so this is fine
auto const equal = cudf::experimental::row::equality::nan_equal_physical_equality_comparator{};
return equal(col.element<T>(lhs_idx), col.element<T>(rhs_idx));
// We don't call this for nulls so this is fine
auto const equal = cudf::experimental::row::equality::nan_equal_physical_equality_comparator{};
auto const result = equal(col.element<T>(lhs_idx), col.element<T>(rhs_idx));
printf("col_type_id:%d, equality idx1:%d, idx2:%d, eq:%d\n",
col.type().id(),
lhs_idx,
rhs_idx,
result);
return result;
}
};

template <typename T>
struct hash_functor {
column_device_view const& col;
__device__ auto operator()(size_type idx) const
__device__ auto operator()(key_type idx) const
{
return cudf::hashing::detail::MurmurHash3_x86_32<T>{}(col.element<T>(idx));
auto const hashed = cudf::hashing::detail::MurmurHash3_x86_32<T>{}(col.element<T>(idx));
printf("hashing idx: %d = %d\n", idx, hashed);
return hashed; // cudf::hashing::detail::MurmurHash3_x86_32<T>{}(col.element<T>(idx));
}
};

struct map_insert_fn {
map_type::device_mutable_view& map;
storage_ref_type const& storage_ref;

template <typename T>
__device__ bool operator()(column_device_view const& col, size_type i)
__device__ bool operator()(column_device_view const& col, key_type i)
{
if constexpr (column_device_view::has_element_accessor<T>()) {
auto hash_fn = hash_functor<T>{col};
auto equality_fn = equality_functor<T>{col};
return map.insert(std::pair(i, i), hash_fn, equality_fn);
using equality_fn_type = equality_functor<T>;
using hash_fn_type = hash_functor<T>;
using probing_scheme_type = cuco::linear_probing<cg_size, hash_fn_type>;

// Instantiate hash and equality functors.
auto hash_fn = hash_fn_type{col};
auto equal_fn = equality_fn_type{col};

// Make a view of the hash map
cuco::static_map_ref<key_type,
mapped_type,
SCOPE,
equality_fn_type,
probing_scheme_type,
storage_ref_type>
hash_map_ref{cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL},
{equal_fn},
{hash_fn},
{},
storage_ref};

// Create another map with insert operator
auto map_insert_ref = hash_map_ref.with_operators(cuco::insert_and_find);
// Insert
auto [iter, found] = map_insert_ref.insert_and_find(cuco::pair{i, i});
printf("Inserted k=%d, v=%d, unique=%d\n", iter->first, iter->second, found);
return found;
} else {
CUDF_UNREACHABLE("Unsupported type to insert in map");
}
}
};
}; // namespace cudf::io::parquet::detail

struct map_find_fn {
map_type::device_view& map;
storage_ref_type const& storage_ref;

template <typename T>
__device__ map_type::device_view::iterator operator()(column_device_view const& col, size_type i)
__device__ cuco::pair<key_type, mapped_type> operator()(column_device_view const& col, key_type i)
{
if constexpr (column_device_view::has_element_accessor<T>()) {
auto hash_fn = hash_functor<T>{col};
auto equality_fn = equality_functor<T>{col};
return map.find(i, hash_fn, equality_fn);
using equality_fn_type = equality_functor<T>;
using hash_fn_type = hash_functor<T>;
using probing_scheme_type = cuco::linear_probing<cg_size, hash_fn_type>;

// Instantiate hash and equality functors.
auto hash_fn = hash_fn_type{col};
auto equal_fn = equality_fn_type{col};

// Make a view of the hash map
cuco::static_map_ref<key_type,
mapped_type,
SCOPE,
equality_fn_type,
probing_scheme_type,
storage_ref_type>
hash_map_ref{cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL},
{equal_fn},
{hash_fn},
{},
storage_ref};

// Create another map with find operator
auto map_find_ref = hash_map_ref.with_operators(cuco::find);

// Find the key = i
auto found_slot = map_find_ref.find(i);

// Check if we found the previously inserted key.
cudf_assert(found_slot != map_find_ref.end() &&
"Unable to find value in map in dictionary index construction");

// Return a pair of the found key and value.
printf("Find=%d, Found slot: k=%d, v=%d\n", i, found_slot->first, found_slot->second);
return {found_slot->first, found_slot->second};
} else {
CUDF_UNREACHABLE("Unsupported type to find in map");
}
Expand All @@ -99,20 +152,23 @@ struct map_find_fn {

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan<PageFragment const> frags)
populate_chunk_hash_maps_kernel(storage_type::window_type* map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
auto t = threadIdx.x;
auto frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto col = chunk->col_desc;
auto const col_idx = blockIdx.y;
auto const block_x = blockIdx.x;
auto const frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto col = chunk->col_desc;

if (not chunk->use_dictionary) { return; }

using block_reduce = cub::BlockReduce<size_type, block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;

[[maybe_unused]] auto const tile = cg::tiled_partition<cg_size>(cg::this_thread_block());
auto const t = cg::this_thread_block().thread_rank();

size_type start_row = frag.start_row;
size_type end_row = frag.start_row + frag.num_rows;

Expand All @@ -121,15 +177,11 @@ CUDF_KERNEL void __launch_bounds__(block_size)
size_type const end_value_idx = row_to_value_idx(end_row, *col);

column_device_view const& data_col = *col->leaf_column;

// Make a view of the hash map
auto hash_map_mutable = map_type::device_mutable_view(chunk->dict_map_slots,
chunk->dict_map_size,
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});
storage_ref_type const storage_ref{chunk->dict_map_size, map_storage + chunk->dict_map_offset};

__shared__ size_type total_num_dict_entries;
thread_index_type val_idx = s_start_value_idx + t;

while (val_idx - block_size < end_value_idx) {
auto const is_valid =
val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx);
Expand All @@ -138,8 +190,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)
size_type is_unique = 0;
size_type uniq_elem_size = 0;
if (is_valid) {
is_unique =
type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx);
auto const is_unique =
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
type_dispatcher(data_col.type(), map_insert_fn{storage_ref}, data_col, val_idx);
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
uniq_elem_size = [&]() -> size_type {
if (not is_unique) { return 0; }
switch (col->physical_type) {
Expand Down Expand Up @@ -170,7 +222,6 @@ CUDF_KERNEL void __launch_bounds__(block_size)
}
}();
}

auto num_unique = block_reduce(reduce_storage).Sum(is_unique);
__syncthreads();
auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size);
Expand All @@ -190,28 +241,30 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
collect_map_entries_kernel(device_span<EncColumnChunk> chunks)
collect_map_entries_kernel(storage_type::window_type* map_storage,
device_span<EncColumnChunk> chunks)
{
auto& chunk = chunks[blockIdx.x];
if (not chunk.use_dictionary) { return; }

auto t = threadIdx.x;
auto map = map_type::device_view(chunk.dict_map_slots,
chunk.dict_map_size,
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});
[[maybe_unused]] auto const tile = cg::tiled_partition<cg_size>(cg::this_thread_block());
auto const t = cg::this_thread_block().thread_rank();

storage_ref_type const storage_ref{chunk.dict_map_size, map_storage + chunk.dict_map_offset};

__shared__ cuda::atomic<size_type, cuda::thread_scope_block> counter;
using cuda::std::memory_order_relaxed;
if (t == 0) { new (&counter) cuda::atomic<size_type, cuda::thread_scope_block>{0}; }
__syncthreads();

for (size_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
auto* slot = reinterpret_cast<map_type::value_type*>(map.begin_slot() + t + i);
auto* slot = reinterpret_cast<slot_type*>(storage_ref.data() + chunk.dict_map_offset + t + i);
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
auto key = slot->first;
if (key != KEY_SENTINEL) {
auto loc = counter.fetch_add(1, memory_order_relaxed);
cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size");
printf("Writing %d at loc: %d\n", key, loc);
chunk.dict_data[loc] = key;
// If sorting dict page ever becomes a hard requirement, enable the following statement and
// add a dict sorting step before storing into the slot's second field.
Expand All @@ -224,17 +277,20 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
get_dictionary_indices_kernel(cudf::detail::device_2dspan<PageFragment const> frags)
get_dictionary_indices_kernel(storage_type::window_type* map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
auto t = threadIdx.x;
auto frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto col = chunk->col_desc;
auto const col_idx = blockIdx.y;
auto const block_x = blockIdx.x;
auto const frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto const col = chunk->col_desc;

if (not chunk->use_dictionary) { return; }

[[maybe_unused]] auto const tile = cg::tiled_partition<cg_size>(cg::this_thread_block());
auto const t = cg::this_thread_block().thread_rank();

size_type start_row = frag.start_row;
size_type end_row = frag.start_row + frag.num_rows;

Expand All @@ -244,55 +300,45 @@ CUDF_KERNEL void __launch_bounds__(block_size)
auto const end_value_idx = row_to_value_idx(end_row, *col);

column_device_view const& data_col = *col->leaf_column;

auto map = map_type::device_view(chunk->dict_map_slots,
chunk->dict_map_size,
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});
storage_ref_type const storage_ref{chunk->dict_map_size, map_storage + chunk->dict_map_offset};

thread_index_type val_idx = s_start_value_idx + t;
while (val_idx < end_value_idx) {
if (data_col.is_valid(val_idx)) {
auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
if (found_slot != map.end()) {
// No need for atomic as this is not going to be modified by any other thread
auto* val_ptr = reinterpret_cast<map_type::mapped_type*>(&found_slot->second);
chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr;
}
auto [found_key, found_value] =
type_dispatcher(data_col.type(), map_find_fn{storage_ref}, data_col, val_idx);
// No need for atomic as this is not going to be modified by any other thread
chunk->dict_index[val_idx - s_ck_start_val_idx] = found_value;
}

val_idx += block_size;
}
}

void initialize_chunk_hash_maps(device_span<EncColumnChunk> chunks, rmm::cuda_stream_view stream)
{
constexpr int block_size = 1024;
initialize_chunk_hash_maps_kernel<block_size>
<<<chunks.size(), block_size, 0, stream.value()>>>(chunks);
}

void populate_chunk_hash_maps(cudf::detail::device_2dspan<PageFragment const> frags,
void populate_chunk_hash_maps(storage_type::window_type* map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
dim3 const dim_grid(frags.size().second, frags.size().first);
populate_chunk_hash_maps_kernel<DEFAULT_BLOCK_SIZE>
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(frags);
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(map_storage, frags);
}

void collect_map_entries(device_span<EncColumnChunk> chunks, rmm::cuda_stream_view stream)
void collect_map_entries(storage_type::window_type* map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream)
{
constexpr int block_size = 1024;
collect_map_entries_kernel<block_size><<<chunks.size(), block_size, 0, stream.value()>>>(chunks);
collect_map_entries_kernel<block_size>
<<<chunks.size(), block_size, 0, stream.value()>>>(map_storage, chunks);
}

void get_dictionary_indices(cudf::detail::device_2dspan<PageFragment const> frags,
void get_dictionary_indices(storage_type::window_type* map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
dim3 const dim_grid(frags.size().second, frags.size().first);
get_dictionary_indices_kernel<DEFAULT_BLOCK_SIZE>
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(frags);
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(map_storage, frags);
}
} // namespace cudf::io::parquet::detail
Loading
Loading