Skip to content

Commit

Permalink
Add compute_shared_memory_aggs used by shared memory groupby (#17162)
Browse files Browse the repository at this point in the history
This work is part of splitting the original bulk shared memory groupby PR #16619.

This PR introduces the `compute_shared_memory_aggs` API, which is utilized by the shared memory groupby. The shared memory groupby process consists of two main steps. The first step was introduced in #17147, and this PR implements the second step, where the actual aggregations are performed based on the offsets from the first step. Each thread block is designed to handle up to 128 unique keys. If this limit is exceeded, there won't be enough space to store temporary aggregation results in shared memory, so a flag is set to indicate that follow-up global memory aggregations are needed to complete the remaining aggregation requests.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Nghia Truong (https://github.com/ttnghia)
  - Bradley Dice (https://github.com/bdice)

URL: #17162
  • Loading branch information
PointKernel authored Oct 31, 2024
1 parent 3cf186c commit 0e294b1
Show file tree
Hide file tree
Showing 5 changed files with 446 additions and 9 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ add_library(
src/groupby/hash/compute_groupby.cu
src/groupby/hash/compute_mapping_indices.cu
src/groupby/hash/compute_mapping_indices_null.cu
src/groupby/hash/compute_shared_memory_aggs.cu
src/groupby/hash/compute_single_pass_aggs.cu
src/groupby/hash/create_sparse_results_table.cu
src/groupby/hash/flatten_single_pass_aggs.cpp
Expand Down
323 changes: 323 additions & 0 deletions cpp/src/groupby/hash/compute_shared_memory_aggs.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "compute_shared_memory_aggs.hpp"
#include "global_memory_aggregator.cuh"
#include "helpers.cuh"
#include "shared_memory_aggregator.cuh"
#include "single_pass_functors.cuh"

#include <cudf/aggregation.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/cuda.hpp>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/table/table_device_view.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <cooperative_groups.h>
#include <cuda/std/cstddef>

namespace cudf::groupby::detail::hash {
namespace {
/// Functor used by type dispatcher returning the size of the underlying C++ type
struct size_of_functor {
template <typename T>
__device__ constexpr cudf::size_type operator()()
{
return sizeof(T);
}
};

/// Shared memory data alignment
CUDF_HOST_DEVICE cudf::size_type constexpr ALIGNMENT = 8;

// Prepares shared memory data required by each output column, exits if
// no enough memory space to perform the shared memory aggregation for the
// current output column
__device__ void calculate_columns_to_aggregate(cudf::size_type& col_start,
cudf::size_type& col_end,
cudf::mutable_table_device_view output_values,
cudf::size_type output_size,
cudf::size_type* shmem_agg_res_offsets,
cudf::size_type* shmem_agg_mask_offsets,
cudf::size_type cardinality,
cudf::size_type total_agg_size)
{
col_start = col_end;
cudf::size_type bytes_allocated = 0;

auto const valid_col_size =
cudf::util::round_up_safe(static_cast<cudf::size_type>(sizeof(bool) * cardinality), ALIGNMENT);

while (bytes_allocated < total_agg_size && col_end < output_size) {
auto const col_idx = col_end;
auto const next_col_size =
cudf::util::round_up_safe(cudf::type_dispatcher<cudf::dispatch_storage_type>(
output_values.column(col_idx).type(), size_of_functor{}) *
cardinality,
ALIGNMENT);
auto const next_col_total_size = next_col_size + valid_col_size;

if (bytes_allocated + next_col_total_size > total_agg_size) {
CUDF_UNREACHABLE("Not enough memory for shared memory aggregations");
}

shmem_agg_res_offsets[col_end] = bytes_allocated;
shmem_agg_mask_offsets[col_end] = bytes_allocated + next_col_size;

bytes_allocated += next_col_total_size;
++col_end;
}
}

// Each block initialize its own shared memory aggregation results
__device__ void initialize_shmem_aggregations(cooperative_groups::thread_block const& block,
cudf::size_type col_start,
cudf::size_type col_end,
cudf::mutable_table_device_view output_values,
cuda::std::byte* shmem_agg_storage,
cudf::size_type* shmem_agg_res_offsets,
cudf::size_type* shmem_agg_mask_offsets,
cudf::size_type cardinality,
cudf::aggregation::Kind const* d_agg_kinds)
{
for (auto col_idx = col_start; col_idx < col_end; col_idx++) {
for (auto idx = block.thread_rank(); idx < cardinality; idx += block.num_threads()) {
auto target =
reinterpret_cast<cuda::std::byte*>(shmem_agg_storage + shmem_agg_res_offsets[col_idx]);
auto target_mask =
reinterpret_cast<bool*>(shmem_agg_storage + shmem_agg_mask_offsets[col_idx]);
cudf::detail::dispatch_type_and_aggregation(output_values.column(col_idx).type(),
d_agg_kinds[col_idx],
initialize_shmem{},
target,
target_mask,
idx);
}
}
block.sync();
}

__device__ void compute_pre_aggregrations(cudf::size_type col_start,
cudf::size_type col_end,
bitmask_type const* row_bitmask,
bool skip_rows_with_nulls,
cudf::table_device_view source,
cudf::size_type num_input_rows,
cudf::size_type* local_mapping_index,
cuda::std::byte* shmem_agg_storage,
cudf::size_type* shmem_agg_res_offsets,
cudf::size_type* shmem_agg_mask_offsets,
cudf::aggregation::Kind const* d_agg_kinds)
{
// Aggregates global memory sources to shared memory targets
for (auto source_idx = cudf::detail::grid_1d::global_thread_id(); source_idx < num_input_rows;
source_idx += cudf::detail::grid_1d::grid_stride()) {
if (not skip_rows_with_nulls or cudf::bit_is_set(row_bitmask, source_idx)) {
auto const target_idx = local_mapping_index[source_idx];
for (auto col_idx = col_start; col_idx < col_end; col_idx++) {
auto const source_col = source.column(col_idx);

cuda::std::byte* target =
reinterpret_cast<cuda::std::byte*>(shmem_agg_storage + shmem_agg_res_offsets[col_idx]);
bool* target_mask =
reinterpret_cast<bool*>(shmem_agg_storage + shmem_agg_mask_offsets[col_idx]);

cudf::detail::dispatch_type_and_aggregation(source_col.type(),
d_agg_kinds[col_idx],
shmem_element_aggregator{},
target,
target_mask,
target_idx,
source_col,
source_idx);
}
}
}
}

__device__ void compute_final_aggregations(cooperative_groups::thread_block const& block,
cudf::size_type col_start,
cudf::size_type col_end,
cudf::table_device_view input_values,
cudf::mutable_table_device_view target,
cudf::size_type cardinality,
cudf::size_type* global_mapping_index,
cuda::std::byte* shmem_agg_storage,
cudf::size_type* agg_res_offsets,
cudf::size_type* agg_mask_offsets,
cudf::aggregation::Kind const* d_agg_kinds)
{
// Aggregates shared memory sources to global memory targets
for (auto idx = block.thread_rank(); idx < cardinality; idx += block.num_threads()) {
auto const target_idx =
global_mapping_index[block.group_index().x * GROUPBY_SHM_MAX_ELEMENTS + idx];
for (auto col_idx = col_start; col_idx < col_end; col_idx++) {
auto target_col = target.column(col_idx);

cuda::std::byte* source =
reinterpret_cast<cuda::std::byte*>(shmem_agg_storage + agg_res_offsets[col_idx]);
bool* source_mask = reinterpret_cast<bool*>(shmem_agg_storage + agg_mask_offsets[col_idx]);

cudf::detail::dispatch_type_and_aggregation(input_values.column(col_idx).type(),
d_agg_kinds[col_idx],
gmem_element_aggregator{},
target_col,
target_idx,
input_values.column(col_idx),
source,
source_mask,
idx);
}
}
block.sync();
}

/* Takes the local_mapping_index and global_mapping_index to compute
* pre (shared) and final (global) aggregates*/
CUDF_KERNEL void single_pass_shmem_aggs_kernel(cudf::size_type num_rows,
bitmask_type const* row_bitmask,
bool skip_rows_with_nulls,
cudf::size_type* local_mapping_index,
cudf::size_type* global_mapping_index,
cudf::size_type* block_cardinality,
cudf::table_device_view input_values,
cudf::mutable_table_device_view output_values,
cudf::aggregation::Kind const* d_agg_kinds,
cudf::size_type total_agg_size,
cudf::size_type offsets_size)
{
auto const block = cooperative_groups::this_thread_block();
auto const cardinality = block_cardinality[block.group_index().x];
if (cardinality >= GROUPBY_CARDINALITY_THRESHOLD) { return; }

auto const num_cols = output_values.num_columns();

__shared__ cudf::size_type col_start;
__shared__ cudf::size_type col_end;
extern __shared__ cuda::std::byte shmem_agg_storage[];

cudf::size_type* shmem_agg_res_offsets =
reinterpret_cast<cudf::size_type*>(shmem_agg_storage + total_agg_size);
cudf::size_type* shmem_agg_mask_offsets =
reinterpret_cast<cudf::size_type*>(shmem_agg_storage + total_agg_size + offsets_size);

if (block.thread_rank() == 0) {
col_start = 0;
col_end = 0;
}
block.sync();

while (col_end < num_cols) {
if (block.thread_rank() == 0) {
calculate_columns_to_aggregate(col_start,
col_end,
output_values,
num_cols,
shmem_agg_res_offsets,
shmem_agg_mask_offsets,
cardinality,
total_agg_size);
}
block.sync();

initialize_shmem_aggregations(block,
col_start,
col_end,
output_values,
shmem_agg_storage,
shmem_agg_res_offsets,
shmem_agg_mask_offsets,
cardinality,
d_agg_kinds);

compute_pre_aggregrations(col_start,
col_end,
row_bitmask,
skip_rows_with_nulls,
input_values,
num_rows,
local_mapping_index,
shmem_agg_storage,
shmem_agg_res_offsets,
shmem_agg_mask_offsets,
d_agg_kinds);
block.sync();

compute_final_aggregations(block,
col_start,
col_end,
input_values,
output_values,
cardinality,
global_mapping_index,
shmem_agg_storage,
shmem_agg_res_offsets,
shmem_agg_mask_offsets,
d_agg_kinds);
}
}
} // namespace

std::size_t available_shared_memory_size(cudf::size_type grid_size)
{
auto const active_blocks_per_sm =
cudf::util::div_rounding_up_safe(grid_size, cudf::detail::num_multiprocessors());

size_t dynamic_shmem_size = 0;
CUDF_CUDA_TRY(cudaOccupancyAvailableDynamicSMemPerBlock(
&dynamic_shmem_size, single_pass_shmem_aggs_kernel, active_blocks_per_sm, GROUPBY_BLOCK_SIZE));
return cudf::util::round_down_safe(static_cast<cudf::size_type>(0.5 * dynamic_shmem_size),
ALIGNMENT);
}

void compute_shared_memory_aggs(cudf::size_type grid_size,
std::size_t available_shmem_size,
cudf::size_type num_input_rows,
bitmask_type const* row_bitmask,
bool skip_rows_with_nulls,
cudf::size_type* local_mapping_index,
cudf::size_type* global_mapping_index,
cudf::size_type* block_cardinality,
cudf::table_device_view input_values,
cudf::mutable_table_device_view output_values,
cudf::aggregation::Kind const* d_agg_kinds,
rmm::cuda_stream_view stream)
{
// For each aggregation, need one offset determining where the aggregation is
// performed, another indicating the validity of the aggregation
auto const shmem_offsets_size = output_values.num_columns() * sizeof(cudf::size_type);
// The rest of shmem is utilized for the actual arrays in shmem
CUDF_EXPECTS(available_shmem_size > shmem_offsets_size * 2,
"No enough space for shared memory aggregations");
auto const shmem_agg_size = available_shmem_size - shmem_offsets_size * 2;
single_pass_shmem_aggs_kernel<<<grid_size, GROUPBY_BLOCK_SIZE, available_shmem_size, stream>>>(
num_input_rows,
row_bitmask,
skip_rows_with_nulls,
local_mapping_index,
global_mapping_index,
block_cardinality,
input_values,
output_values,
d_agg_kinds,
shmem_agg_size,
shmem_offsets_size);
}
} // namespace cudf::groupby::detail::hash
41 changes: 41 additions & 0 deletions cpp/src/groupby/hash/compute_shared_memory_aggs.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cudf/aggregation.hpp>
#include <cudf/table/table_device_view.cuh>
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>

namespace cudf::groupby::detail::hash {

std::size_t available_shared_memory_size(cudf::size_type grid_size);

void compute_shared_memory_aggs(cudf::size_type grid_size,
std::size_t available_shmem_size,
cudf::size_type num_input_rows,
bitmask_type const* row_bitmask,
bool skip_rows_with_nulls,
cudf::size_type* local_mapping_index,
cudf::size_type* global_mapping_index,
cudf::size_type* block_cardinality,
cudf::table_device_view input_values,
cudf::mutable_table_device_view output_values,
cudf::aggregation::Kind const* d_agg_kinds,
rmm::cuda_stream_view stream);

} // namespace cudf::groupby::detail::hash
9 changes: 0 additions & 9 deletions cpp/src/groupby/hash/helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,6 @@ using shmem_extent_t =
CUDF_HOST_DEVICE auto constexpr window_extent =
cuco::make_window_extent<GROUPBY_CG_SIZE, GROUPBY_WINDOW_SIZE>(shmem_extent_t{});

/**
* @brief Returns the smallest multiple of 8 that is greater than or equal to the given integer.
*/
CUDF_HOST_DEVICE constexpr std::size_t round_to_multiple_of_8(std::size_t num)
{
std::size_t constexpr base = 8;
return cudf::util::div_rounding_up_safe(num, base) * base;
}

using row_hash_t =
cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>;
Expand Down
Loading

0 comments on commit 0e294b1

Please sign in to comment.