Skip to content

Commit

Permalink
Use cudf::thread_index_type in cuIO to prevent overflow in row inde…
Browse files Browse the repository at this point in the history
…xing (#13910)

Use wider type for indexing when rows are indexed in pattern 
```
for (auto row = start_row; row < num_rows; row += block_size) {
if (is_within_bounds) ...
}
```
or
```
auto t = threadIdx.x;
auto row = block_start + t;
if (is_within_bounds) ...
```
Overflow can happen when the number of rows is so close to `max<size_type>` that adding block size pushes the index over the max.

Also sprinkled auto where increased size is not needed.
Also removed a few redundant conditions.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Mark Harris (https://github.com/harrism)
  - Yunsong Wang (https://github.com/PointKernel)

URL: #13910
  • Loading branch information
vuule authored Aug 23, 2023
1 parent e16ed81 commit 2700111
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 83 deletions.
27 changes: 27 additions & 0 deletions cpp/include/cudf/detail/utilities/cuda.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,33 @@ class grid_1d {
CUDF_EXPECTS(num_threads_per_block > 0, "num_threads_per_block must be > 0");
CUDF_EXPECTS(num_blocks > 0, "num_blocks must be > 0");
}

/**
* @brief Returns the global thread index in a 1D grid.
*
* The returned index is unique across the entire grid.
*
* @param thread_id The thread index within the block
* @param block_id The block index within the grid
* @param num_threads_per_block The number of threads per block
* @return thread_index_type The global thread index
*/
static constexpr thread_index_type global_thread_id(thread_index_type thread_id,
thread_index_type block_id,
thread_index_type num_threads_per_block)
{
return thread_id + block_id * num_threads_per_block;
}

/**
* @brief Returns the global thread index of the current thread in a 1D grid.
*
* @return thread_index_type The global thread index
*/
static __device__ thread_index_type global_thread_id()
{
return global_thread_id(threadIdx.x, blockIdx.x, blockDim.x);
}
};

/**
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/io/csv/csv_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <io/utilities/block_utils.cuh>
#include <io/utilities/parsing_utils.cuh>

#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/fixed_point/fixed_point.hpp>
#include <cudf/null_mask.hpp>
Expand All @@ -45,6 +46,7 @@
using namespace ::cudf::io;

using cudf::device_span;
using cudf::detail::grid_1d;

namespace cudf {
namespace io {
Expand Down Expand Up @@ -177,11 +179,10 @@ __global__ void __launch_bounds__(csvparse_block_dim)

// ThreadIds range per block, so also need the blockId
// This is entry into the fields; threadId is an element within `num_records`
long const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
long const rec_id_next = rec_id + 1;
auto const rec_id = grid_1d::global_thread_id();
auto const rec_id_next = rec_id + 1;

// we can have more threads than data, make sure we are not past the end of
// the data
// we can have more threads than data, make sure we are not past the end of the data
if (rec_id_next >= row_offsets.size()) { return; }

auto field_start = raw_csv + row_offsets[rec_id];
Expand Down Expand Up @@ -317,8 +318,8 @@ __global__ void __launch_bounds__(csvparse_block_dim)
auto const raw_csv = data.data();
// thread IDs range per block, so also need the block id.
// this is entry into the field array - tid is an elements within the num_entries array
long const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
long const rec_id_next = rec_id + 1;
auto const rec_id = grid_1d::global_thread_id();
auto const rec_id_next = rec_id + 1;

// we can have more threads than data, make sure we are not past the end of the data
if (rec_id_next >= row_offsets.size()) return;
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/io/json/legacy/json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <io/utilities/column_type_histogram.hpp>
#include <io/utilities/parsing_utils.cuh>

#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/hashing/detail/murmurhash3_x86_32.cuh>
#include <cudf/types.hpp>
Expand All @@ -44,6 +45,7 @@
#include <thrust/pair.h>

using cudf::device_span;
using cudf::detail::grid_1d;

namespace cudf::io::json::detail::legacy {

Expand Down Expand Up @@ -252,7 +254,7 @@ __global__ void convert_data_to_columns_kernel(parse_options_view opts,
device_span<bitmask_type* const> const valid_fields,
device_span<cudf::size_type> const num_valid_fields)
{
auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
auto const rec_id = grid_1d::global_thread_id();
if (rec_id >= row_offsets.size()) return;

auto const row_data_range = get_row_data_range(data, row_offsets, rec_id);
Expand Down Expand Up @@ -327,7 +329,7 @@ __global__ void detect_data_types_kernel(
int num_columns,
device_span<cudf::io::column_type_histogram> const column_infos)
{
auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
auto const rec_id = grid_1d::global_thread_id();
if (rec_id >= row_offsets.size()) return;

auto const are_rows_objects = col_map.capacity() != 0;
Expand Down Expand Up @@ -485,7 +487,7 @@ __global__ void collect_keys_info_kernel(parse_options_view const options,
unsigned long long int* keys_cnt,
thrust::optional<mutable_table_device_view> keys_info)
{
auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
auto const rec_id = grid_1d::global_thread_id();
if (rec_id >= row_offsets.size()) return;

auto const row_data_range = get_row_data_range(data, row_offsets, rec_id);
Expand Down
8 changes: 3 additions & 5 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ __global__ void __launch_bounds__(block_size)
size_type entry_count{0};
size_type char_count{0};
// all threads should loop the same number of times
for (auto cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) {
for (thread_index_type cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) {
auto const is_valid = cur_row < end_row and col.is_valid(cur_row);

if (is_valid) {
Expand Down Expand Up @@ -215,11 +215,9 @@ __global__ void __launch_bounds__(block_size)
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});

auto cur_row = start_row + t;
thread_index_type cur_row = start_row + t;
while (cur_row < end_row) {
auto const is_valid = cur_row < col.size() and col.is_valid(cur_row);

if (is_valid) {
if (col.is_valid(cur_row)) {
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
auto const found_slot = map.find(cur_row, hash_fn, equality_fn);
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ __global__ void __launch_bounds__(init_threads_per_block)
device_2dspan<rowgroup_rows const> rowgroup_bounds)
{
__shared__ __align__(4) statistics_group group_g[init_groups_per_block];
uint32_t const col_id = blockIdx.y;
uint32_t const chunk_id = (blockIdx.x * init_groups_per_block) + threadIdx.y;
uint32_t const t = threadIdx.x;
auto const col_id = blockIdx.y;
auto const chunk_id = (blockIdx.x * init_groups_per_block) + threadIdx.y;
auto const t = threadIdx.x;
auto const num_rowgroups = rowgroup_bounds.size().first;
statistics_group* group = &group_g[threadIdx.y];
if (chunk_id < num_rowgroups and t == 0) {
Expand Down Expand Up @@ -75,11 +75,11 @@ __global__ void __launch_bounds__(block_size, 1)
using block_scan = cub::BlockScan<uint32_t, block_size, cub::BLOCK_SCAN_WARP_SCANS>;
__shared__ typename block_scan::TempStorage temp_storage;
volatile uint32_t stats_size = 0;
uint32_t t = threadIdx.x;
auto t = threadIdx.x;
__syncthreads();
for (uint32_t start = 0; start < statistics_count; start += block_size) {
for (thread_index_type start = 0; start < statistics_count; start += block_size) {
uint32_t stats_len = 0, stats_pos;
uint32_t idx = start + t;
auto idx = start + t;
if (idx < statistics_count) {
statistics_dtype const dtype = groups[idx].stats_dtype;
switch (dtype) {
Expand Down Expand Up @@ -222,8 +222,8 @@ __global__ void __launch_bounds__(encode_threads_per_block)
uint32_t statistics_count)
{
__shared__ __align__(8) stats_state_s state_g[encode_chunks_per_block];
uint32_t t = threadIdx.x;
uint32_t idx = blockIdx.x * encode_chunks_per_block + threadIdx.y;
auto t = threadIdx.x;
auto idx = blockIdx.x * encode_chunks_per_block + threadIdx.y;
stats_state_s* const s = &state_g[threadIdx.y];

// Encode and update actual bfr size
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ __global__ void __launch_bounds__(block_size)
if (row_in < first_row && t < 32) {
uint32_t skippedrows = min(static_cast<uint32_t>(first_row - row_in), nrows);
uint32_t skip_count = 0;
for (uint32_t i = t * 32; i < skippedrows; i += 32 * 32) {
for (thread_index_type i = t * 32; i < skippedrows; i += 32 * 32) {
// Need to arrange the bytes to apply mask properly.
uint32_t bits = (i + 32 <= skippedrows) ? s->vals.u32[i >> 5]
: (__byte_perm(s->vals.u32[i >> 5], 0, 0x0123) &
Expand Down Expand Up @@ -1435,7 +1435,7 @@ __global__ void __launch_bounds__(block_size)
s->top.data.end_row = s->chunk.start_row + s->chunk.num_rows;
s->top.data.buffered_count = 0;
if (s->top.data.end_row > first_row + max_num_rows) {
s->top.data.end_row = static_cast<uint32_t>(first_row + max_num_rows);
s->top.data.end_row = first_row + max_num_rows;
}
if (num_rowgroups > 0) {
s->top.data.end_row =
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ __global__ void copy_string_data(char* string_pool,
auto dst = &string_pool[offsets[blockIdx.x]];
auto src = str_val.ptr;

for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) {
for (thread_index_type i = threadIdx.x; i < str_val.length; i += blockDim.x) {
dst[i] = src[i];
}
if (threadIdx.x == 0) { str_val.ptr = dst; }
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ template <int block_size>
__global__ void __launch_bounds__(block_size)
initialize_chunk_hash_maps_kernel(device_span<EncColumnChunk> chunks)
{
auto chunk = chunks[blockIdx.x];
auto t = threadIdx.x;
auto const chunk = chunks[blockIdx.x];
auto const t = threadIdx.x;
// fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk
for (size_type i = 0; i < chunk.dict_map_size; i += block_size) {
for (thread_index_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
new (&chunk.dict_map_slots[t + i].first) map_type::atomic_key_type{KEY_SENTINEL};
new (&chunk.dict_map_slots[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL};
Expand Down Expand Up @@ -131,7 +131,7 @@ __global__ void __launch_bounds__(block_size)
cuco::empty_value{VALUE_SENTINEL});

__shared__ size_type total_num_dict_entries;
size_type val_idx = s_start_value_idx + t;
thread_index_type val_idx = s_start_value_idx + t;
while (val_idx - block_size < end_value_idx) {
auto const is_valid =
val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx);
Expand Down Expand Up @@ -252,11 +252,9 @@ __global__ void __launch_bounds__(block_size)
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});

auto val_idx = s_start_value_idx + t;
thread_index_type val_idx = s_start_value_idx + t;
while (val_idx < end_value_idx) {
auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx);

if (is_valid) {
if (data_col.is_valid(val_idx)) {
auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ constexpr int preprocess_block_size = num_rle_stream_decode_threads; // 512
constexpr int decode_block_size = 128;
constexpr int non_zero_buffer_size = decode_block_size * 2;

constexpr int rolling_index(int index) { return index & (non_zero_buffer_size - 1); }
constexpr int rolling_index(cudf::thread_index_type index)
{
return index & (non_zero_buffer_size - 1);
}
template <int lvl_buf_size>
constexpr int rolling_lvl_index(int index)
constexpr int rolling_lvl_index(cudf::thread_index_type index)
{
return index % lvl_buf_size;
}
Expand Down Expand Up @@ -339,7 +342,7 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s volatile* s,
int t)
{
uint8_t const* end = s->data_end;
int pos = s->dict_pos;
int64_t pos = s->dict_pos;

while (pos < target_pos) {
int is_literal, batch_len;
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ __global__ void __launch_bounds__(block_size)
{
__shared__ __align__(16) frag_init_state_s state_g;

frag_init_state_s* const s = &state_g;
uint32_t const t = threadIdx.x;
uint32_t const num_fragments_per_column = frag.size().second;
frag_init_state_s* const s = &state_g;
auto const t = threadIdx.x;
auto const num_fragments_per_column = frag.size().second;

if (t == 0) { s->col = col_desc[blockIdx.x]; }
__syncthreads();
Expand Down Expand Up @@ -1003,7 +1003,7 @@ __global__ void __launch_bounds__(128, 8)
} temp_storage;

page_enc_state_s* const s = &state_g;
uint32_t t = threadIdx.x;
auto const t = threadIdx.x;

if (t == 0) {
state_g = page_enc_state_s{};
Expand Down Expand Up @@ -1042,7 +1042,7 @@ __global__ void __launch_bounds__(128, 8)
while (s->rle_numvals < s->page.num_rows) {
uint32_t rle_numvals = s->rle_numvals;
uint32_t nrows = min(s->page.num_rows - rle_numvals, 128);
uint32_t row = s->page.start_row + rle_numvals + t;
auto row = s->page.start_row + rle_numvals + t;
// Definition level encodes validity. Checks the valid map and if it is valid, then sets the
// def_lvl accordingly and sets it in s->vals which is then given to RleEncode to encode
uint32_t def_lvl = [&]() {
Expand Down Expand Up @@ -1884,7 +1884,7 @@ __global__ void __launch_bounds__(128)
__shared__ __align__(8) EncPage page_g;
__shared__ __align__(8) unsigned char scratch[MIN_STATS_SCRATCH_SIZE];

uint32_t t = threadIdx.x;
auto const t = threadIdx.x;

if (t == 0) {
uint8_t *hdr_start, *hdr_end;
Expand Down Expand Up @@ -1972,7 +1972,7 @@ __global__ void __launch_bounds__(1024)
__shared__ __align__(8) EncColumnChunk ck_g;
__shared__ __align__(8) EncPage page_g;

uint32_t t = threadIdx.x;
auto const t = threadIdx.x;
uint8_t *dst, *dst_base;
EncPage const* first_page;
uint32_t num_pages, uncompressed_size;
Expand Down
22 changes: 11 additions & 11 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ __device__ void block_excl_sum(size_type* arr, size_type length, size_type initi
{
using block_scan = cub::BlockScan<size_type, block_size>;
__shared__ typename block_scan::TempStorage scan_storage;
int const t = threadIdx.x;
auto const t = threadIdx.x;

// do a series of block sums, storing results in arr as we go
for (int pos = 0; pos < length; pos += block_size) {
int const tidx = pos + t;
size_type tval = tidx < length ? arr[tidx] : 0;
for (thread_index_type pos = 0; pos < length; pos += block_size) {
auto const tidx = pos + t;
size_type tval = tidx < length ? arr[tidx] : 0;
size_type block_sum;
block_scan(scan_storage).ExclusiveScan(tval, tval, initial_value, cub::Sum(), block_sum);
if (tidx < length) { arr[tidx] = tval; }
Expand Down Expand Up @@ -144,7 +144,7 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
typename block_scan::TempStorage scan_storage;
} temp_storage;

int const t = threadIdx.x;
auto const t = threadIdx.x;

// decode batches of level stream data using rle_stream objects and use the results to
// calculate start and end value positions in the encoded string data.
Expand Down Expand Up @@ -213,7 +213,7 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
bool end_value_set = false;

while (processed < s->page.num_input_values) {
int start_val = processed;
thread_index_type start_val = processed;

if (has_repetition) {
decoders[level_type::REPETITION].decode_next(t);
Expand All @@ -237,8 +237,8 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,

// do something with the level data
while (start_val < processed) {
int idx_t = start_val + t;
int idx = rolling_lvl_index<lvl_buf_size>(idx_t);
auto const idx_t = start_val + t;
auto const idx = rolling_lvl_index<lvl_buf_size>(idx_t);

// get absolute thread row index
int is_new_row = idx_t < processed && (!has_repetition || rep_decode[idx] == 0);
Expand Down Expand Up @@ -329,14 +329,14 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
else {
int num_nulls = 0;
while (processed < s->page.num_input_values) {
int start_val = processed;
thread_index_type start_val = processed;
processed += decoders[level_type::DEFINITION].decode_next(t);
__syncthreads();

while (start_val < processed) {
int idx_t = start_val + t;
auto const idx_t = start_val + t;
if (idx_t < processed) {
int idx = rolling_lvl_index<lvl_buf_size>(idx_t);
auto const idx = rolling_lvl_index<lvl_buf_size>(idx_t);
if (def_decode[idx] < max_def) { num_nulls++; }
}
start_val += preprocess_block_size;
Expand Down
Loading

0 comments on commit 2700111

Please sign in to comment.