Skip to content

Commit

Permalink
Replace the outdated cuco window concept with buckets (#17602)
Browse files Browse the repository at this point in the history
Recently, cuco refined the term "window" as "bucket," as the latter more accurately represents a contiguous memory space containing one or more hash table slots. This PR implements the necessary changes to replace "window" with "bucket" in all relevant use cases.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Nghia Truong (https://github.com/ttnghia)

URL: #17602
  • Loading branch information
PointKernel authored Dec 19, 2024
1 parent a95fbc8 commit 88df0ad
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 43 deletions.
2 changes: 1 addition & 1 deletion cpp/src/groupby/hash/compute_groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ std::unique_ptr<table> compute_groupby(table_view const& keys,
d_row_equal,
probing_scheme_t{d_row_hash},
cuco::thread_scope_device,
cuco::storage<GROUPBY_WINDOW_SIZE>{},
cuco::storage<GROUPBY_BUCKET_SIZE>{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/groupby/hash/compute_mapping_indices.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ CUDF_KERNEL void mapping_indices_kernel(cudf::size_type num_input_rows,
__shared__ cudf::size_type shared_set_indices[GROUPBY_SHM_MAX_ELEMENTS];

// Shared set initialization
__shared__ cuco::window<cudf::size_type, GROUPBY_WINDOW_SIZE> windows[window_extent.value()];
__shared__ cuco::bucket<cudf::size_type, GROUPBY_BUCKET_SIZE> buckets[bucket_extent.value()];

auto raw_set = cuco::static_set_ref{
cuco::empty_key<cudf::size_type>{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
global_set.key_eq(),
probing_scheme_t{global_set.hash_function()},
cuco::thread_scope_block,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, decltype(window_extent)>{
window_extent, windows}};
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, decltype(bucket_extent)>{
bucket_extent, buckets}};
auto shared_set = raw_set.rebind_operators(cuco::insert_and_find);

auto const block = cooperative_groups::this_thread_block();
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/groupby/hash/helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace cudf::groupby::detail::hash {
CUDF_HOST_DEVICE auto constexpr GROUPBY_CG_SIZE = 1;

/// Number of slots per thread
CUDF_HOST_DEVICE auto constexpr GROUPBY_WINDOW_SIZE = 1;
CUDF_HOST_DEVICE auto constexpr GROUPBY_BUCKET_SIZE = 1;

/// Thread block size
CUDF_HOST_DEVICE auto constexpr GROUPBY_BLOCK_SIZE = 128;
Expand All @@ -48,9 +48,9 @@ using shmem_extent_t =
cuco::extent<cudf::size_type,
static_cast<cudf::size_type>(static_cast<double>(GROUPBY_SHM_MAX_ELEMENTS) * 1.43)>;

/// Number of windows needed by each shared memory hash set
CUDF_HOST_DEVICE auto constexpr window_extent =
cuco::make_window_extent<GROUPBY_CG_SIZE, GROUPBY_WINDOW_SIZE>(shmem_extent_t{});
/// Number of buckets needed by each shared memory hash set
CUDF_HOST_DEVICE auto constexpr bucket_extent =
cuco::make_bucket_extent<GROUPBY_CG_SIZE, GROUPBY_BUCKET_SIZE>(shmem_extent_t{});

using row_hash_t =
cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
Expand All @@ -75,23 +75,23 @@ using global_set_t = cuco::static_set<cudf::size_type,
row_comparator_t,
probing_scheme_t,
cudf::detail::cuco_allocator<char>,
cuco::storage<GROUPBY_WINDOW_SIZE>>;
cuco::storage<GROUPBY_BUCKET_SIZE>>;

using nullable_global_set_t = cuco::static_set<cudf::size_type,
cuco::extent<int64_t>,
cuda::thread_scope_device,
nullable_row_comparator_t,
probing_scheme_t,
cudf::detail::cuco_allocator<char>,
cuco::storage<GROUPBY_WINDOW_SIZE>>;
cuco::storage<GROUPBY_BUCKET_SIZE>>;

template <typename Op>
using hash_set_ref_t = cuco::static_set_ref<
cudf::size_type,
cuda::thread_scope_device,
row_comparator_t,
probing_scheme_t,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, cuco::window_extent<int64_t>>,
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, cuco::bucket_extent<int64_t>>,
Op>;

template <typename Op>
Expand All @@ -100,6 +100,6 @@ using nullable_hash_set_ref_t = cuco::static_set_ref<
cuda::thread_scope_device,
nullable_row_comparator_t,
probing_scheme_t,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, cuco::window_extent<int64_t>>,
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, cuco::bucket_extent<int64_t>>,
Op>;
} // namespace cudf::groupby::detail::hash
6 changes: 3 additions & 3 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ CUDF_KERNEL void __launch_bounds__(block_size)

for (size_type i = 0; i < dict.map_slots.size(); i += block_size) {
if (t + i < dict.map_slots.size()) {
auto window = dict.map_slots.begin() + t + i;
// Collect all slots from each window.
for (auto& slot : *window) {
auto bucket = dict.map_slots.begin() + t + i;
// Collect all slots from each bucket.
for (auto& slot : *bucket) {
auto const key = slot.first;
if (key != KEY_SENTINEL) {
auto loc = counter.fetch_add(1, memory_order_relaxed);
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ using slot_type = cuco::pair<key_type, mapped_type>;
auto constexpr map_cg_size =
1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset.
///< Note: Adjust insert and find loops to use `cg::tile<map_cg_size>` if increasing this.
auto constexpr window_size =
auto constexpr bucket_size =
1; ///< Number of concurrent slots (set for best performance) handled by each thread.
auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size
///< N * (1/0.7) = 1.43 to target a 70% occupancy factor.
using storage_type = cuco::aow_storage<slot_type,
window_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_type = cuco::bucket_storage<slot_type,
bucket_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_ref_type = typename storage_type::ref_type;
using window_type = typename storage_type::window_type;
using bucket_type = typename storage_type::bucket_type;
using slot_type = cuco::pair<key_type, mapped_type>;

auto constexpr KEY_SENTINEL = size_type{-1};
Expand Down Expand Up @@ -193,7 +193,7 @@ struct StripeStream {
*/
struct stripe_dictionary {
// input
device_span<window_type> map_slots; // hash map (windows) storage
device_span<bucket_type> map_slots; // hash map (buckets) storage
uint32_t column_idx = 0; // column index
size_type start_row = 0; // first row in the stripe
size_type start_rowgroup = 0; // first rowgroup in the stripe
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ struct map_find_fn {

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
populate_chunk_hash_maps_kernel(device_span<window_type> const map_storage,
populate_chunk_hash_maps_kernel(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto const col_idx = blockIdx.y;
Expand Down Expand Up @@ -239,7 +239,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
collect_map_entries_kernel(device_span<window_type> const map_storage,
collect_map_entries_kernel(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks)
{
auto& chunk = chunks[blockIdx.x];
Expand All @@ -251,11 +251,11 @@ CUDF_KERNEL void __launch_bounds__(block_size)
if (t == 0) { new (&counter) cuda::atomic<size_type, SCOPE>{0}; }
__syncthreads();

// Iterate over all windows in the map.
// Iterate over all buckets in the map.
for (; t < chunk.dict_map_size; t += block_size) {
auto window = map_storage.data() + chunk.dict_map_offset + t;
// Collect all slots from each window.
for (auto& slot : *window) {
auto bucket = map_storage.data() + chunk.dict_map_offset + t;
// Collect all slots from each bucket.
for (auto& slot : *bucket) {
auto const key = slot.first;
if (key != KEY_SENTINEL) {
auto const loc = counter.fetch_add(1, memory_order_relaxed);
Expand All @@ -272,7 +272,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
get_dictionary_indices_kernel(device_span<window_type> const map_storage,
get_dictionary_indices_kernel(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto const col_idx = blockIdx.y;
Expand Down Expand Up @@ -302,7 +302,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)
s_ck_start_val_idx);
}

void populate_chunk_hash_maps(device_span<window_type> const map_storage,
void populate_chunk_hash_maps(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
Expand All @@ -311,7 +311,7 @@ void populate_chunk_hash_maps(device_span<window_type> const map_storage,
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(map_storage, frags);
}

void collect_map_entries(device_span<window_type> const map_storage,
void collect_map_entries(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream)
{
Expand All @@ -320,7 +320,7 @@ void collect_map_entries(device_span<window_type> const map_storage,
<<<chunks.size(), block_size, 0, stream.value()>>>(map_storage, chunks);
}

void get_dictionary_indices(device_span<window_type> const map_storage,
void get_dictionary_indices(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/io/parquet/parquet_gpu.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using slot_type = cuco::pair<key_type, mapped_type>;
auto constexpr map_cg_size =
1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset.
///< Note: Adjust insert and find loops to use `cg::tile<map_cg_size>` if increasing this.
auto constexpr window_size =
auto constexpr bucket_size =
1; ///< Number of concurrent slots (set for best performance) handled by each thread.
auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size
///< N * (1/0.7) = 1.43 to target a 70% occupancy factor.
Expand All @@ -43,12 +43,12 @@ auto constexpr KEY_SENTINEL = key_type{-1};
auto constexpr VALUE_SENTINEL = mapped_type{-1};
auto constexpr SCOPE = cuda::thread_scope_block;

using storage_type = cuco::aow_storage<slot_type,
window_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_type = cuco::bucket_storage<slot_type,
bucket_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_ref_type = typename storage_type::ref_type;
using window_type = typename storage_type::window_type;
using bucket_type = typename storage_type::bucket_type;

/**
* @brief Return the byte length of parquet dtypes that are physically represented by INT32
Expand Down Expand Up @@ -100,7 +100,7 @@ inline size_type __device__ row_to_value_idx(size_type idx,
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void populate_chunk_hash_maps(device_span<window_type> const map_storage,
void populate_chunk_hash_maps(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream);

Expand All @@ -111,7 +111,7 @@ void populate_chunk_hash_maps(device_span<window_type> const map_storage,
* @param chunks Flat span of chunks to compact hash maps for
* @param stream CUDA stream to use
*/
void collect_map_entries(device_span<window_type> const map_storage,
void collect_map_entries(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream);

Expand All @@ -128,7 +128,7 @@ void collect_map_entries(device_span<window_type> const map_storage,
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void get_dictionary_indices(device_span<window_type> const map_storage,
void get_dictionary_indices(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream);

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
} else {
chunk.use_dictionary = true;
chunk.dict_map_size =
static_cast<cudf::size_type>(cuco::make_window_extent<map_cg_size, window_size>(
static_cast<cudf::size_type>(cuco::make_bucket_extent<map_cg_size, bucket_size>(
static_cast<cudf::size_type>(occupancy_factor * chunk.num_values)));
chunk.dict_map_offset = total_map_storage_size;
total_map_storage_size += chunk.dict_map_size;
Expand All @@ -1317,7 +1317,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
total_map_storage_size,
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream}};
// Create a span of non-const map_storage as map_storage_ref takes in a non-const pointer.
device_span<window_type> const map_storage_data{map_storage.data(), total_map_storage_size};
device_span<bucket_type> const map_storage_data{map_storage.data(), total_map_storage_size};

// Synchronize
chunks.host_to_device_async(stream);
Expand Down

0 comments on commit 88df0ad

Please sign in to comment.