Skip to content

Commit

Permalink
Fix contiguous_split not properly handling output partitions > 2 GB. (#…
Browse files Browse the repository at this point in the history
…7515)

Fixes:
#7514

Related:
NVIDIA/spark-rapids#1861

There were a couple of places where 32 bit values were being used for buffer sizes that needed to be 64 bit.

Authors:
  - @nvdbaranec

Approvers:
  - Vukasin Milovanovic (@vuule)
  - Jake Hemstad (@jrhemstad)

URL: #7515
  • Loading branch information
nvdbaranec authored Mar 10, 2021
1 parent 0155bb1 commit 42c6d15
Showing 1 changed file with 61 additions and 59 deletions.
120 changes: 61 additions & 59 deletions cpp/src/copying/contiguous_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ namespace {

// align all column size allocations to this boundary so that all output column buffers
// start at that alignment.
static constexpr size_t split_align = 64;
inline __device__ size_t _round_up_safe(size_t number_to_round, size_t modulus)
static constexpr std::size_t split_align = 64;
inline __device__ std::size_t _round_up_safe(std::size_t number_to_round, std::size_t modulus)
{
auto remainder = number_to_round % modulus;
if (remainder == 0) { return number_to_round; }
Expand Down Expand Up @@ -88,15 +88,15 @@ struct src_buf_info {
* M partitions, then we have N*M destination buffers.
*/
struct dst_buf_info {
size_t buf_size; // total size of buffer, including padding
int num_elements; // # of elements to be copied
int element_size; // size of each element in bytes
std::size_t buf_size; // total size of buffer, including padding
int num_elements; // # of elements to be copied
int element_size; // size of each element in bytes
int num_rows; // # of rows (which may be different from num_elements in the case of validity or
// offset buffers)
int src_row_index; // row index to start reading from from my associated source buffer
int dst_offset; // my offset into the per-partition allocation
int value_shift; // amount to shift values down by (for offset buffers)
int bit_shift; // # of bits to shift right by (for validity buffers)
int src_row_index; // row index to start reading from from my associated source buffer
std::size_t dst_offset; // my offset into the per-partition allocation
int value_shift; // amount to shift values down by (for offset buffers)
int bit_shift; // # of bits to shift right by (for validity buffers)
size_type valid_count;
};

Expand Down Expand Up @@ -133,24 +133,24 @@ template <int block_size>
__device__ void copy_buffer(uint8_t* __restrict__ dst,
uint8_t* __restrict__ src,
int t,
int num_elements,
int element_size,
int src_row_index,
std::size_t num_elements,
std::size_t element_size,
std::size_t src_row_index,
uint32_t stride,
int value_shift,
int bit_shift,
int num_rows,
std::size_t num_rows,
size_type* valid_count)
{
src += (src_row_index * element_size);

size_type thread_valid_count = 0;

// handle misalignment. read 16 bytes in 4 byte reads. write in a single 16 byte store.
const size_t num_bytes = num_elements * element_size;
std::size_t const num_bytes = num_elements * element_size;
// how many bytes we're misaligned from 4-byte alignment
const uint32_t ofs = reinterpret_cast<uintptr_t>(src) % 4;
size_t pos = t * 16;
uint32_t const ofs = reinterpret_cast<uintptr_t>(src) % 4;
std::size_t pos = t * 16;
stride *= 16;
while (pos + 20 <= num_bytes) {
// read from the nearest aligned address.
Expand All @@ -175,12 +175,12 @@ __device__ void copy_buffer(uint8_t* __restrict__ dst,

// copy trailing bytes
if (t == 0) {
size_t remainder;
std::size_t remainder;
if (num_bytes < 16) {
remainder = num_bytes;
} else {
size_t last_bracket = (num_bytes / 16) * 16;
remainder = num_bytes - last_bracket;
std::size_t const last_bracket = (num_bytes / 16) * 16;
remainder = num_bytes - last_bracket;
if (remainder < 4) {
// we had less than 20 bytes for the last possible 16 byte copy, so copy 16 + the extra
remainder += 16;
Expand All @@ -191,12 +191,12 @@ __device__ void copy_buffer(uint8_t* __restrict__ dst,
// alignment must be a multiple of 4. value shifting and bit shifting are mututally exclusive
// and will never both be true at the same time.
if (value_shift || bit_shift) {
int idx = (num_bytes - remainder) / 4;
uint32_t v = remainder > 0 ? (reinterpret_cast<uint32_t*>(src)[idx] - value_shift) : 0;
std::size_t idx = (num_bytes - remainder) / 4;
uint32_t v = remainder > 0 ? (reinterpret_cast<uint32_t*>(src)[idx] - value_shift) : 0;
while (remainder) {
uint32_t next =
uint32_t const next =
remainder > 0 ? (reinterpret_cast<uint32_t*>(src)[idx + 1] - value_shift) : 0;
uint32_t val = (v >> bit_shift) | (next << (32 - bit_shift));
uint32_t const val = (v >> bit_shift) | (next << (32 - bit_shift));
if (valid_count) { thread_valid_count += __popc(val); }
reinterpret_cast<uint32_t*>(dst)[idx] = val;
v = next;
Expand All @@ -205,8 +205,8 @@ __device__ void copy_buffer(uint8_t* __restrict__ dst,
}
} else {
while (remainder) {
int idx = num_bytes - remainder--;
uint32_t val = reinterpret_cast<uint8_t*>(src)[idx];
std::size_t const idx = num_bytes - remainder--;
uint32_t const val = reinterpret_cast<uint8_t*>(src)[idx];
if (valid_count) { thread_valid_count += __popc(val); }
reinterpret_cast<uint8_t*>(dst)[idx] = val;
}
Expand All @@ -224,11 +224,11 @@ __device__ void copy_buffer(uint8_t* __restrict__ dst,
// we may have copied more bits than there are actual rows in the output.
// so we need to subtract off the count of any bits that shouldn't have been
// considered during the copy step.
int max_row = (num_bytes * 8);
int slack_bits = max_row > num_rows ? max_row - num_rows : 0;
auto slack_mask = set_most_significant_bits(slack_bits);
std::size_t const max_row = (num_bytes * 8);
std::size_t const slack_bits = max_row > num_rows ? max_row - num_rows : 0;
auto const slack_mask = set_most_significant_bits(slack_bits);
if (slack_mask > 0) {
uint32_t last_word = reinterpret_cast<uint32_t*>(dst + (num_bytes - 4))[0];
uint32_t const last_word = reinterpret_cast<uint32_t*>(dst + (num_bytes - 4))[0];
block_valid_count -= __popc(last_word & slack_mask);
}
*valid_count = block_valid_count;
Expand Down Expand Up @@ -260,9 +260,9 @@ __global__ void copy_partition(int num_src_bufs,
uint8_t** dst_bufs,
dst_buf_info* buf_info)
{
int const partition_index = blockIdx.x / num_src_bufs;
int const src_buf_index = blockIdx.x % num_src_bufs;
size_t const buf_index = (partition_index * num_src_bufs) + src_buf_index;
int const partition_index = blockIdx.x / num_src_bufs;
int const src_buf_index = blockIdx.x % num_src_bufs;
std::size_t const buf_index = (partition_index * num_src_bufs) + src_buf_index;

// copy, shifting offsets and validity bits as needed
copy_buffer<block_size>(
Expand Down Expand Up @@ -322,7 +322,7 @@ bool is_offset_type(type_id id) { return (id == type_id::STRING or id == type_id
* @returns Total offset stack size needed for this range of columns.
*/
template <typename InputIter>
size_t compute_offset_stack_size(InputIter begin, InputIter end, int offset_depth = 0)
std::size_t compute_offset_stack_size(InputIter begin, InputIter end, int offset_depth = 0)
{
return std::accumulate(begin, end, 0, [offset_depth](auto stack_size, column_view const& col) {
auto const num_buffers = 1 + (col.nullable() ? 1 : 0);
Expand Down Expand Up @@ -702,7 +702,7 @@ BufInfo build_output_columns(InputIter begin,
*/
struct buf_size_functor {
dst_buf_info const* ci;
size_t operator() __device__(int index) { return static_cast<size_t>(ci[index].buf_size); }
std::size_t operator() __device__(int index) { return ci[index].buf_size; }
};

/**
Expand All @@ -722,10 +722,10 @@ struct split_key_functor {
*/
struct dst_offset_output_iterator {
dst_buf_info* c;
using value_type = int;
using difference_type = int;
using pointer = int*;
using reference = int&;
using value_type = std::size_t;
using difference_type = std::size_t;
using pointer = std::size_t*;
using reference = std::size_t&;
using iterator_category = thrust::output_device_iterator_tag;

dst_offset_output_iterator operator+ __host__ __device__(int i)
Expand Down Expand Up @@ -778,7 +778,7 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
}
{
size_type begin = 0;
for (size_t i = 0; i < splits.size(); i++) {
for (std::size_t i = 0; i < splits.size(); i++) {
size_type end = splits[i];
CUDF_EXPECTS(begin >= 0, "Starting index cannot be negative.");
CUDF_EXPECTS(end >= begin, "End index cannot be smaller than the starting index.");
Expand All @@ -787,8 +787,8 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
}
}

size_t const num_partitions = splits.size() + 1;
size_t const num_root_columns = input.num_columns();
std::size_t const num_partitions = splits.size() + 1;
std::size_t const num_root_columns = input.num_columns();

// if inputs are empty, just return num_partitions empty tables
if (input.column(0).size() == 0) {
Expand All @@ -810,12 +810,12 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
// compute # of source buffers (column data, validity, children), # of partitions
// and total # of buffers
size_type const num_src_bufs = count_src_bufs(input.begin(), input.end());
size_t const num_bufs = num_src_bufs * num_partitions;
std::size_t const num_bufs = num_src_bufs * num_partitions;

// packed block of memory 1. split indices and src_buf_info structs
size_t const indices_size =
std::size_t const indices_size =
cudf::util::round_up_safe((num_partitions + 1) * sizeof(size_type), split_align);
size_t const src_buf_info_size =
std::size_t const src_buf_info_size =
cudf::util::round_up_safe(num_src_bufs * sizeof(src_buf_info), split_align);
// host-side
std::vector<uint8_t> h_indices_and_source_info(indices_size + src_buf_info_size);
Expand All @@ -825,7 +825,8 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
// device-side
// gpu-only : stack space needed for nested list offset calculation
int const offset_stack_partition_size = compute_offset_stack_size(input.begin(), input.end());
size_t const offset_stack_size = offset_stack_partition_size * num_partitions * sizeof(size_type);
std::size_t const offset_stack_size =
offset_stack_partition_size * num_partitions * sizeof(size_type);
rmm::device_buffer d_indices_and_source_info(indices_size + src_buf_info_size + offset_stack_size,
stream,
rmm::mr::get_current_device_resource());
Expand All @@ -852,33 +853,33 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
stream.value()));

// packed block of memory 2. partition buffer sizes and dst_buf_info structs
size_t const buf_sizes_size =
cudf::util::round_up_safe(num_partitions * sizeof(size_t), split_align);
size_t const dst_buf_info_size =
std::size_t const buf_sizes_size =
cudf::util::round_up_safe(num_partitions * sizeof(std::size_t), split_align);
std::size_t const dst_buf_info_size =
cudf::util::round_up_safe(num_bufs * sizeof(dst_buf_info), split_align);
// host-side
std::vector<uint8_t> h_buf_sizes_and_dst_info(buf_sizes_size + dst_buf_info_size);
size_t* h_buf_sizes = reinterpret_cast<size_t*>(h_buf_sizes_and_dst_info.data());
std::size_t* h_buf_sizes = reinterpret_cast<std::size_t*>(h_buf_sizes_and_dst_info.data());
dst_buf_info* h_dst_buf_info =
reinterpret_cast<dst_buf_info*>(h_buf_sizes_and_dst_info.data() + buf_sizes_size);
// device-side
rmm::device_buffer d_buf_sizes_and_dst_info(
buf_sizes_size + dst_buf_info_size, stream, rmm::mr::get_current_device_resource());
size_t* d_buf_sizes = reinterpret_cast<size_t*>(d_buf_sizes_and_dst_info.data());
std::size_t* d_buf_sizes = reinterpret_cast<std::size_t*>(d_buf_sizes_and_dst_info.data());
dst_buf_info* d_dst_buf_info = reinterpret_cast<dst_buf_info*>(
static_cast<uint8_t*>(d_buf_sizes_and_dst_info.data()) + buf_sizes_size);

// compute sizes of each column in each partition, including alignment.
thrust::transform(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_t>(0),
thrust::make_counting_iterator<size_t>(num_bufs),
thrust::make_counting_iterator<std::size_t>(0),
thrust::make_counting_iterator<std::size_t>(num_bufs),
d_dst_buf_info,
[num_src_bufs,
d_indices,
d_src_buf_info,
d_offset_stack,
offset_stack_partition_size] __device__(size_t t) {
offset_stack_partition_size] __device__(std::size_t t) {
int const split_index = t / num_src_bufs;
int const src_buf_index = t % num_src_bufs;
auto const& src_info = d_src_buf_info[src_buf_index];
Expand Down Expand Up @@ -929,7 +930,8 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
return num_rows;
}();
int const element_size = cudf::type_dispatcher(data_type{src_info.type}, size_of_helper{});
size_t const bytes = num_elements * element_size;
std::size_t const bytes =
static_cast<std::size_t>(num_elements) * static_cast<std::size_t>(element_size);
return dst_buf_info{_round_up_safe(bytes, 64),
num_elements,
element_size,
Expand Down Expand Up @@ -969,7 +971,7 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
keys + num_bufs,
values,
dst_offset_output_iterator{d_dst_buf_info},
0);
std::size_t{0});
}

// DtoH buf sizes and col info back to the host
Expand All @@ -986,15 +988,15 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
std::transform(h_buf_sizes,
h_buf_sizes + num_partitions,
std::back_inserter(out_buffers),
[stream, mr](size_t bytes) {
[stream, mr](std::size_t bytes) {
return rmm::device_buffer{bytes, stream, mr};
});

// packed block of memory 3. pointers to source and destination buffers (and stack space on the
// gpu for offset computation)
size_t const src_bufs_size =
std::size_t const src_bufs_size =
cudf::util::round_up_safe(num_src_bufs * sizeof(uint8_t*), split_align);
size_t const dst_bufs_size =
std::size_t const dst_bufs_size =
cudf::util::round_up_safe(num_partitions * sizeof(uint8_t*), split_align);
// host-side
std::vector<uint8_t> h_src_and_dst_buffers(src_bufs_size + dst_bufs_size);
Expand Down Expand Up @@ -1039,7 +1041,7 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
std::vector<column_view> cols;
cols.reserve(num_root_columns);
auto cur_dst_buf_info = h_dst_buf_info;
for (size_t idx = 0; idx < num_partitions; idx++) {
for (std::size_t idx = 0; idx < num_partitions; idx++) {
// traverse the buffers and build the columns.
cur_dst_buf_info = build_output_columns(
input.begin(), input.end(), cur_dst_buf_info, std::back_inserter(cols), h_dst_bufs[idx]);
Expand Down

0 comments on commit 42c6d15

Please sign in to comment.