Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix contiguous_split not properly handling output partitions > 2 GB. #7515

Merged
merged 2 commits into from
Mar 10, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 61 additions & 59 deletions cpp/src/copying/contiguous_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ namespace {

// align all column size allocations to this boundary so that all output column buffers
// start at that alignment.
static constexpr size_t split_align = 64;
inline __device__ size_t _round_up_safe(size_t number_to_round, size_t modulus)
static constexpr std::size_t split_align = 64;
inline __device__ std::size_t _round_up_safe(std::size_t number_to_round, std::size_t modulus)
{
auto remainder = number_to_round % modulus;
if (remainder == 0) { return number_to_round; }
Expand Down Expand Up @@ -88,15 +88,15 @@ struct src_buf_info {
* M partitions, then we have N*M destination buffers.
*/
struct dst_buf_info {
size_t buf_size; // total size of buffer, including padding
int num_elements; // # of elements to be copied
int element_size; // size of each element in bytes
std::size_t buf_size; // total size of buffer, including padding
int num_elements; // # of elements to be copied
int element_size; // size of each element in bytes
int num_rows; // # of rows (which may be different from num_elements in the case of validity or
// offset buffers)
int src_row_index; // row index to start reading from from my associated source buffer
int dst_offset; // my offset into the per-partition allocation
int value_shift; // amount to shift values down by (for offset buffers)
int bit_shift; // # of bits to shift right by (for validity buffers)
int src_row_index; // row index to start reading from from my associated source buffer
std::size_t dst_offset; // my offset into the per-partition allocation
int value_shift; // amount to shift values down by (for offset buffers)
int bit_shift; // # of bits to shift right by (for validity buffers)
size_type valid_count;
};

Expand Down Expand Up @@ -133,24 +133,24 @@ template <int block_size>
__device__ void copy_buffer(uint8_t* __restrict__ dst,
uint8_t* __restrict__ src,
int t,
int num_elements,
int element_size,
int src_row_index,
std::size_t num_elements,
std::size_t element_size,
std::size_t src_row_index,
uint32_t stride,
int value_shift,
int bit_shift,
int num_rows,
std::size_t num_rows,
size_type* valid_count)
{
src += (src_row_index * element_size);

size_type thread_valid_count = 0;

// handle misalignment. read 16 bytes in 4 byte reads. write in a single 16 byte store.
const size_t num_bytes = num_elements * element_size;
std::size_t const num_bytes = num_elements * element_size;
// how many bytes we're misaligned from 4-byte alignment
const uint32_t ofs = reinterpret_cast<uintptr_t>(src) % 4;
size_t pos = t * 16;
uint32_t const ofs = reinterpret_cast<uintptr_t>(src) % 4;
std::size_t pos = t * 16;
stride *= 16;
while (pos + 20 <= num_bytes) {
// read from the nearest aligned address.
Expand All @@ -175,12 +175,12 @@ __device__ void copy_buffer(uint8_t* __restrict__ dst,

// copy trailing bytes
if (t == 0) {
size_t remainder;
std::size_t remainder;
if (num_bytes < 16) {
remainder = num_bytes;
} else {
size_t last_bracket = (num_bytes / 16) * 16;
remainder = num_bytes - last_bracket;
std::size_t const last_bracket = (num_bytes / 16) * 16;
remainder = num_bytes - last_bracket;
if (remainder < 4) {
// we had less than 20 bytes for the last possible 16 byte copy, so copy 16 + the extra
remainder += 16;
Expand All @@ -191,12 +191,12 @@ __device__ void copy_buffer(uint8_t* __restrict__ dst,
// alignment must be a multiple of 4. value shifting and bit shifting are mututally exclusive
// and will never both be true at the same time.
if (value_shift || bit_shift) {
int idx = (num_bytes - remainder) / 4;
uint32_t v = remainder > 0 ? (reinterpret_cast<uint32_t*>(src)[idx] - value_shift) : 0;
std::size_t idx = (num_bytes - remainder) / 4;
uint32_t v = remainder > 0 ? (reinterpret_cast<uint32_t*>(src)[idx] - value_shift) : 0;
while (remainder) {
uint32_t next =
uint32_t const next =
remainder > 0 ? (reinterpret_cast<uint32_t*>(src)[idx + 1] - value_shift) : 0;
uint32_t val = (v >> bit_shift) | (next << (32 - bit_shift));
uint32_t const val = (v >> bit_shift) | (next << (32 - bit_shift));
if (valid_count) { thread_valid_count += __popc(val); }
reinterpret_cast<uint32_t*>(dst)[idx] = val;
v = next;
Expand All @@ -205,8 +205,8 @@ __device__ void copy_buffer(uint8_t* __restrict__ dst,
}
} else {
while (remainder) {
int idx = num_bytes - remainder--;
uint32_t val = reinterpret_cast<uint8_t*>(src)[idx];
std::size_t const idx = num_bytes - remainder--;
uint32_t const val = reinterpret_cast<uint8_t*>(src)[idx];
if (valid_count) { thread_valid_count += __popc(val); }
reinterpret_cast<uint8_t*>(dst)[idx] = val;
}
Expand All @@ -224,11 +224,11 @@ __device__ void copy_buffer(uint8_t* __restrict__ dst,
// we may have copied more bits than there are actual rows in the output.
// so we need to subtract off the count of any bits that shouldn't have been
// considered during the copy step.
int max_row = (num_bytes * 8);
int slack_bits = max_row > num_rows ? max_row - num_rows : 0;
auto slack_mask = set_most_significant_bits(slack_bits);
std::size_t const max_row = (num_bytes * 8);
std::size_t const slack_bits = max_row > num_rows ? max_row - num_rows : 0;
auto const slack_mask = set_most_significant_bits(slack_bits);
if (slack_mask > 0) {
uint32_t last_word = reinterpret_cast<uint32_t*>(dst + (num_bytes - 4))[0];
uint32_t const last_word = reinterpret_cast<uint32_t*>(dst + (num_bytes - 4))[0];
block_valid_count -= __popc(last_word & slack_mask);
}
*valid_count = block_valid_count;
Expand Down Expand Up @@ -260,9 +260,9 @@ __global__ void copy_partition(int num_src_bufs,
uint8_t** dst_bufs,
dst_buf_info* buf_info)
{
int const partition_index = blockIdx.x / num_src_bufs;
int const src_buf_index = blockIdx.x % num_src_bufs;
size_t const buf_index = (partition_index * num_src_bufs) + src_buf_index;
int const partition_index = blockIdx.x / num_src_bufs;
int const src_buf_index = blockIdx.x % num_src_bufs;
std::size_t const buf_index = (partition_index * num_src_bufs) + src_buf_index;

// copy, shifting offsets and validity bits as needed
copy_buffer<block_size>(
Expand Down Expand Up @@ -322,7 +322,7 @@ bool is_offset_type(type_id id) { return (id == type_id::STRING or id == type_id
* @returns Total offset stack size needed for this range of columns.
*/
template <typename InputIter>
size_t compute_offset_stack_size(InputIter begin, InputIter end, int offset_depth = 0)
std::size_t compute_offset_stack_size(InputIter begin, InputIter end, int offset_depth = 0)
{
return std::accumulate(begin, end, 0, [offset_depth](auto stack_size, column_view const& col) {
auto const num_buffers = 1 + (col.nullable() ? 1 : 0);
Expand Down Expand Up @@ -702,7 +702,7 @@ BufInfo build_output_columns(InputIter begin,
*/
struct buf_size_functor {
dst_buf_info const* ci;
size_t operator() __device__(int index) { return static_cast<size_t>(ci[index].buf_size); }
std::size_t operator() __device__(int index) { return ci[index].buf_size; }
};

/**
Expand All @@ -722,10 +722,10 @@ struct split_key_functor {
*/
struct dst_offset_output_iterator {
dst_buf_info* c;
using value_type = int;
using difference_type = int;
using pointer = int*;
using reference = int&;
using value_type = std::size_t;
using difference_type = std::size_t;
using pointer = std::size_t*;
using reference = std::size_t&;
using iterator_category = thrust::output_device_iterator_tag;

dst_offset_output_iterator operator+ __host__ __device__(int i)
Expand Down Expand Up @@ -778,7 +778,7 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
}
{
size_type begin = 0;
for (size_t i = 0; i < splits.size(); i++) {
for (std::size_t i = 0; i < splits.size(); i++) {
size_type end = splits[i];
CUDF_EXPECTS(begin >= 0, "Starting index cannot be negative.");
CUDF_EXPECTS(end >= begin, "End index cannot be smaller than the starting index.");
Expand All @@ -787,8 +787,8 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
}
}

size_t const num_partitions = splits.size() + 1;
size_t const num_root_columns = input.num_columns();
std::size_t const num_partitions = splits.size() + 1;
std::size_t const num_root_columns = input.num_columns();

// if inputs are empty, just return num_partitions empty tables
if (input.column(0).size() == 0) {
Expand All @@ -810,12 +810,12 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
// compute # of source buffers (column data, validity, children), # of partitions
// and total # of buffers
size_type const num_src_bufs = count_src_bufs(input.begin(), input.end());
size_t const num_bufs = num_src_bufs * num_partitions;
std::size_t const num_bufs = num_src_bufs * num_partitions;

// packed block of memory 1. split indices and src_buf_info structs
size_t const indices_size =
std::size_t const indices_size =
cudf::util::round_up_safe((num_partitions + 1) * sizeof(size_type), split_align);
size_t const src_buf_info_size =
std::size_t const src_buf_info_size =
cudf::util::round_up_safe(num_src_bufs * sizeof(src_buf_info), split_align);
// host-side
std::vector<uint8_t> h_indices_and_source_info(indices_size + src_buf_info_size);
Expand All @@ -825,7 +825,8 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
// device-side
// gpu-only : stack space needed for nested list offset calculation
int const offset_stack_partition_size = compute_offset_stack_size(input.begin(), input.end());
size_t const offset_stack_size = offset_stack_partition_size * num_partitions * sizeof(size_type);
std::size_t const offset_stack_size =
offset_stack_partition_size * num_partitions * sizeof(size_type);
rmm::device_buffer d_indices_and_source_info(indices_size + src_buf_info_size + offset_stack_size,
stream,
rmm::mr::get_current_device_resource());
Expand All @@ -852,33 +853,33 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
stream.value()));

// packed block of memory 2. partition buffer sizes and dst_buf_info structs
size_t const buf_sizes_size =
cudf::util::round_up_safe(num_partitions * sizeof(size_t), split_align);
size_t const dst_buf_info_size =
std::size_t const buf_sizes_size =
cudf::util::round_up_safe(num_partitions * sizeof(std::size_t), split_align);
std::size_t const dst_buf_info_size =
cudf::util::round_up_safe(num_bufs * sizeof(dst_buf_info), split_align);
// host-side
std::vector<uint8_t> h_buf_sizes_and_dst_info(buf_sizes_size + dst_buf_info_size);
size_t* h_buf_sizes = reinterpret_cast<size_t*>(h_buf_sizes_and_dst_info.data());
std::size_t* h_buf_sizes = reinterpret_cast<std::size_t*>(h_buf_sizes_and_dst_info.data());
dst_buf_info* h_dst_buf_info =
reinterpret_cast<dst_buf_info*>(h_buf_sizes_and_dst_info.data() + buf_sizes_size);
// device-side
rmm::device_buffer d_buf_sizes_and_dst_info(
buf_sizes_size + dst_buf_info_size, stream, rmm::mr::get_current_device_resource());
size_t* d_buf_sizes = reinterpret_cast<size_t*>(d_buf_sizes_and_dst_info.data());
std::size_t* d_buf_sizes = reinterpret_cast<std::size_t*>(d_buf_sizes_and_dst_info.data());
dst_buf_info* d_dst_buf_info = reinterpret_cast<dst_buf_info*>(
static_cast<uint8_t*>(d_buf_sizes_and_dst_info.data()) + buf_sizes_size);

// compute sizes of each column in each partition, including alignment.
thrust::transform(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_t>(0),
thrust::make_counting_iterator<size_t>(num_bufs),
thrust::make_counting_iterator<std::size_t>(0),
thrust::make_counting_iterator<std::size_t>(num_bufs),
d_dst_buf_info,
[num_src_bufs,
d_indices,
d_src_buf_info,
d_offset_stack,
offset_stack_partition_size] __device__(size_t t) {
offset_stack_partition_size] __device__(std::size_t t) {
int const split_index = t / num_src_bufs;
int const src_buf_index = t % num_src_bufs;
auto const& src_info = d_src_buf_info[src_buf_index];
Expand Down Expand Up @@ -929,7 +930,8 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
return num_rows;
}();
int const element_size = cudf::type_dispatcher(data_type{src_info.type}, size_of_helper{});
size_t const bytes = num_elements * element_size;
std::size_t const bytes =
static_cast<std::size_t>(num_elements) * static_cast<std::size_t>(element_size);
return dst_buf_info{_round_up_safe(bytes, 64),
num_elements,
element_size,
Expand Down Expand Up @@ -969,7 +971,7 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
keys + num_bufs,
values,
dst_offset_output_iterator{d_dst_buf_info},
0);
std::size_t{0});
}

// DtoH buf sizes and col info back to the host
Expand All @@ -986,15 +988,15 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
std::transform(h_buf_sizes,
h_buf_sizes + num_partitions,
std::back_inserter(out_buffers),
[stream, mr](size_t bytes) {
[stream, mr](std::size_t bytes) {
return rmm::device_buffer{bytes, stream, mr};
});

// packed block of memory 3. pointers to source and destination buffers (and stack space on the
// gpu for offset computation)
size_t const src_bufs_size =
std::size_t const src_bufs_size =
cudf::util::round_up_safe(num_src_bufs * sizeof(uint8_t*), split_align);
size_t const dst_bufs_size =
std::size_t const dst_bufs_size =
cudf::util::round_up_safe(num_partitions * sizeof(uint8_t*), split_align);
// host-side
std::vector<uint8_t> h_src_and_dst_buffers(src_bufs_size + dst_bufs_size);
Expand Down Expand Up @@ -1039,7 +1041,7 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
std::vector<column_view> cols;
cols.reserve(num_root_columns);
auto cur_dst_buf_info = h_dst_buf_info;
for (size_t idx = 0; idx < num_partitions; idx++) {
for (std::size_t idx = 0; idx < num_partitions; idx++) {
// traverse the buffers and build the columns.
cur_dst_buf_info = build_output_columns(
input.begin(), input.end(), cur_dst_buf_info, std::back_inserter(cols), h_dst_bufs[idx]);
Expand Down