Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace the outdated cuco window concept with buckets #17602

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/groupby/hash/compute_groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ std::unique_ptr<table> compute_groupby(table_view const& keys,
d_row_equal,
probing_scheme_t{d_row_hash},
cuco::thread_scope_device,
cuco::storage<GROUPBY_WINDOW_SIZE>{},
cuco::storage<GROUPBY_BUCKET_SIZE>{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/groupby/hash/compute_mapping_indices.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ CUDF_KERNEL void mapping_indices_kernel(cudf::size_type num_input_rows,
__shared__ cudf::size_type shared_set_indices[GROUPBY_SHM_MAX_ELEMENTS];

// Shared set initialization
__shared__ cuco::window<cudf::size_type, GROUPBY_WINDOW_SIZE> windows[window_extent.value()];
__shared__ cuco::bucket<cudf::size_type, GROUPBY_BUCKET_SIZE> buckets[bucket_extent.value()];

auto raw_set = cuco::static_set_ref{
cuco::empty_key<cudf::size_type>{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
global_set.key_eq(),
probing_scheme_t{global_set.hash_function()},
cuco::thread_scope_block,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, decltype(window_extent)>{
window_extent, windows}};
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, decltype(bucket_extent)>{
bucket_extent, buckets}};
auto shared_set = raw_set.rebind_operators(cuco::insert_and_find);

auto const block = cooperative_groups::this_thread_block();
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/groupby/hash/helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace cudf::groupby::detail::hash {
CUDF_HOST_DEVICE auto constexpr GROUPBY_CG_SIZE = 1;

/// Number of slots per thread
CUDF_HOST_DEVICE auto constexpr GROUPBY_WINDOW_SIZE = 1;
CUDF_HOST_DEVICE auto constexpr GROUPBY_BUCKET_SIZE = 1;

/// Thread block size
CUDF_HOST_DEVICE auto constexpr GROUPBY_BLOCK_SIZE = 128;
Expand All @@ -48,9 +48,9 @@ using shmem_extent_t =
cuco::extent<cudf::size_type,
static_cast<cudf::size_type>(static_cast<double>(GROUPBY_SHM_MAX_ELEMENTS) * 1.43)>;

/// Number of windows needed by each shared memory hash set
CUDF_HOST_DEVICE auto constexpr window_extent =
cuco::make_window_extent<GROUPBY_CG_SIZE, GROUPBY_WINDOW_SIZE>(shmem_extent_t{});
/// Number of buckets needed by each shared memory hash set
CUDF_HOST_DEVICE auto constexpr bucket_extent =
cuco::make_bucket_extent<GROUPBY_CG_SIZE, GROUPBY_BUCKET_SIZE>(shmem_extent_t{});

using row_hash_t =
cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
Expand All @@ -75,23 +75,23 @@ using global_set_t = cuco::static_set<cudf::size_type,
row_comparator_t,
probing_scheme_t,
cudf::detail::cuco_allocator<char>,
cuco::storage<GROUPBY_WINDOW_SIZE>>;
cuco::storage<GROUPBY_BUCKET_SIZE>>;

using nullable_global_set_t = cuco::static_set<cudf::size_type,
cuco::extent<int64_t>,
cuda::thread_scope_device,
nullable_row_comparator_t,
probing_scheme_t,
cudf::detail::cuco_allocator<char>,
cuco::storage<GROUPBY_WINDOW_SIZE>>;
cuco::storage<GROUPBY_BUCKET_SIZE>>;

template <typename Op>
using hash_set_ref_t = cuco::static_set_ref<
cudf::size_type,
cuda::thread_scope_device,
row_comparator_t,
probing_scheme_t,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, cuco::window_extent<int64_t>>,
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, cuco::bucket_extent<int64_t>>,
Op>;

template <typename Op>
Expand All @@ -100,6 +100,6 @@ using nullable_hash_set_ref_t = cuco::static_set_ref<
cuda::thread_scope_device,
nullable_row_comparator_t,
probing_scheme_t,
cuco::aow_storage_ref<cudf::size_type, GROUPBY_WINDOW_SIZE, cuco::window_extent<int64_t>>,
cuco::bucket_storage_ref<cudf::size_type, GROUPBY_BUCKET_SIZE, cuco::bucket_extent<int64_t>>,
Op>;
} // namespace cudf::groupby::detail::hash
6 changes: 3 additions & 3 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ CUDF_KERNEL void __launch_bounds__(block_size)

for (size_type i = 0; i < dict.map_slots.size(); i += block_size) {
if (t + i < dict.map_slots.size()) {
auto window = dict.map_slots.begin() + t + i;
// Collect all slots from each window.
for (auto& slot : *window) {
auto bucket = dict.map_slots.begin() + t + i;
// Collect all slots from each bucket.
for (auto& slot : *bucket) {
auto const key = slot.first;
if (key != KEY_SENTINEL) {
auto loc = counter.fetch_add(1, memory_order_relaxed);
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ using slot_type = cuco::pair<key_type, mapped_type>;
auto constexpr map_cg_size =
1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset.
///< Note: Adjust insert and find loops to use `cg::tile<map_cg_size>` if increasing this.
auto constexpr window_size =
auto constexpr bucket_size =
1; ///< Number of concurrent slots (set for best performance) handled by each thread.
auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size
///< N * (1/0.7) = 1.43 to target a 70% occupancy factor.
using storage_type = cuco::aow_storage<slot_type,
window_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_type = cuco::bucket_storage<slot_type,
bucket_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_ref_type = typename storage_type::ref_type;
using window_type = typename storage_type::window_type;
using bucket_type = typename storage_type::bucket_type;
using slot_type = cuco::pair<key_type, mapped_type>;

auto constexpr KEY_SENTINEL = size_type{-1};
Expand Down Expand Up @@ -193,7 +193,7 @@ struct StripeStream {
*/
struct stripe_dictionary {
// input
device_span<window_type> map_slots; // hash map (windows) storage
device_span<bucket_type> map_slots; // hash map (buckets) storage
uint32_t column_idx = 0; // column index
size_type start_row = 0; // first row in the stripe
size_type start_rowgroup = 0; // first rowgroup in the stripe
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ struct map_find_fn {

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
populate_chunk_hash_maps_kernel(device_span<window_type> const map_storage,
populate_chunk_hash_maps_kernel(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto const col_idx = blockIdx.y;
Expand Down Expand Up @@ -239,7 +239,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
collect_map_entries_kernel(device_span<window_type> const map_storage,
collect_map_entries_kernel(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks)
{
auto& chunk = chunks[blockIdx.x];
Expand All @@ -251,11 +251,11 @@ CUDF_KERNEL void __launch_bounds__(block_size)
if (t == 0) { new (&counter) cuda::atomic<size_type, SCOPE>{0}; }
__syncthreads();

// Iterate over all windows in the map.
// Iterate over all buckets in the map.
for (; t < chunk.dict_map_size; t += block_size) {
auto window = map_storage.data() + chunk.dict_map_offset + t;
// Collect all slots from each window.
for (auto& slot : *window) {
auto bucket = map_storage.data() + chunk.dict_map_offset + t;
// Collect all slots from each bucket.
for (auto& slot : *bucket) {
auto const key = slot.first;
if (key != KEY_SENTINEL) {
auto const loc = counter.fetch_add(1, memory_order_relaxed);
Expand All @@ -272,7 +272,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)

template <int block_size>
CUDF_KERNEL void __launch_bounds__(block_size)
get_dictionary_indices_kernel(device_span<window_type> const map_storage,
get_dictionary_indices_kernel(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags)
{
auto const col_idx = blockIdx.y;
Expand Down Expand Up @@ -302,7 +302,7 @@ CUDF_KERNEL void __launch_bounds__(block_size)
s_ck_start_val_idx);
}

void populate_chunk_hash_maps(device_span<window_type> const map_storage,
void populate_chunk_hash_maps(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
Expand All @@ -311,7 +311,7 @@ void populate_chunk_hash_maps(device_span<window_type> const map_storage,
<<<dim_grid, DEFAULT_BLOCK_SIZE, 0, stream.value()>>>(map_storage, frags);
}

void collect_map_entries(device_span<window_type> const map_storage,
void collect_map_entries(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream)
{
Expand All @@ -320,7 +320,7 @@ void collect_map_entries(device_span<window_type> const map_storage,
<<<chunks.size(), block_size, 0, stream.value()>>>(map_storage, chunks);
}

void get_dictionary_indices(device_span<window_type> const map_storage,
void get_dictionary_indices(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream)
{
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/io/parquet/parquet_gpu.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using slot_type = cuco::pair<key_type, mapped_type>;
auto constexpr map_cg_size =
1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset.
///< Note: Adjust insert and find loops to use `cg::tile<map_cg_size>` if increasing this.
auto constexpr window_size =
auto constexpr bucket_size =
1; ///< Number of concurrent slots (set for best performance) handled by each thread.
auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size
///< N * (1/0.7) = 1.43 to target a 70% occupancy factor.
Expand All @@ -43,12 +43,12 @@ auto constexpr KEY_SENTINEL = key_type{-1};
auto constexpr VALUE_SENTINEL = mapped_type{-1};
auto constexpr SCOPE = cuda::thread_scope_block;

using storage_type = cuco::aow_storage<slot_type,
window_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_type = cuco::bucket_storage<slot_type,
bucket_size,
cuco::extent<std::size_t>,
cudf::detail::cuco_allocator<char>>;
using storage_ref_type = typename storage_type::ref_type;
using window_type = typename storage_type::window_type;
using bucket_type = typename storage_type::bucket_type;

/**
* @brief Return the byte length of parquet dtypes that are physically represented by INT32
Expand Down Expand Up @@ -100,7 +100,7 @@ inline size_type __device__ row_to_value_idx(size_type idx,
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void populate_chunk_hash_maps(device_span<window_type> const map_storage,
void populate_chunk_hash_maps(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream);

Expand All @@ -111,7 +111,7 @@ void populate_chunk_hash_maps(device_span<window_type> const map_storage,
* @param chunks Flat span of chunks to compact hash maps for
* @param stream CUDA stream to use
*/
void collect_map_entries(device_span<window_type> const map_storage,
void collect_map_entries(device_span<bucket_type> const map_storage,
device_span<EncColumnChunk> chunks,
rmm::cuda_stream_view stream);

Expand All @@ -128,7 +128,7 @@ void collect_map_entries(device_span<window_type> const map_storage,
* @param frags Column fragments
* @param stream CUDA stream to use
*/
void get_dictionary_indices(device_span<window_type> const map_storage,
void get_dictionary_indices(device_span<bucket_type> const map_storage,
cudf::detail::device_2dspan<PageFragment const> frags,
rmm::cuda_stream_view stream);

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
} else {
chunk.use_dictionary = true;
chunk.dict_map_size =
static_cast<cudf::size_type>(cuco::make_window_extent<map_cg_size, window_size>(
static_cast<cudf::size_type>(cuco::make_bucket_extent<map_cg_size, bucket_size>(
static_cast<cudf::size_type>(occupancy_factor * chunk.num_values)));
chunk.dict_map_offset = total_map_storage_size;
total_map_storage_size += chunk.dict_map_size;
Expand All @@ -1317,7 +1317,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
total_map_storage_size,
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream}};
// Create a span of non-const map_storage as map_storage_ref takes in a non-const pointer.
device_span<window_type> const map_storage_data{map_storage.data(), total_map_storage_size};
device_span<bucket_type> const map_storage_data{map_storage.data(), total_map_storage_size};

// Synchronize
chunks.host_to_device_async(stream);
Expand Down
Loading