Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cudf::thread_index_type in cuIO to prevent overflow in row indexing #13910

Merged
merged 18 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions cpp/include/cudf/detail/utilities/cuda.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,33 @@ class grid_1d {
CUDF_EXPECTS(num_threads_per_block > 0, "num_threads_per_block must be > 0");
CUDF_EXPECTS(num_blocks > 0, "num_blocks must be > 0");
}

/**
* @brief Returns the global thread index in a 1D grid.
*
* The returned index is unique across the entire grid.
*
* @param thread_id The thread index within the block
* @param block_id The block index within the grid
* @param num_threads_per_block The number of threads per block
* @return thread_index_type The global thread index
*/
static constexpr thread_index_type global_thread_id(thread_index_type thread_id,
thread_index_type block_id,
thread_index_type num_threads_per_block)
{
return thread_id + block_id * num_threads_per_block;
}

/**
* @brief Returns the global thread index of the current thread in a 1D grid.
*
* @return thread_index_type The global thread index
*/
static __device__ thread_index_type global_thread_id()
{
return global_thread_id(threadIdx.x, blockIdx.x, blockDim.x);
}
};

/**
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/io/csv/csv_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <io/utilities/block_utils.cuh>
#include <io/utilities/parsing_utils.cuh>

#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/fixed_point/fixed_point.hpp>
#include <cudf/null_mask.hpp>
Expand All @@ -45,6 +46,7 @@
using namespace ::cudf::io;

using cudf::device_span;
using cudf::detail::grid_1d;

namespace cudf {
namespace io {
Expand Down Expand Up @@ -177,11 +179,10 @@ __global__ void __launch_bounds__(csvparse_block_dim)

// ThreadIds range per block, so also need the blockId
// This is entry into the fields; threadId is an element within `num_records`
long const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
long const rec_id_next = rec_id + 1;
auto const rec_id = grid_1d::global_thread_id();
auto const rec_id_next = rec_id + 1;

// we can have more threads than data, make sure we are not past the end of
// the data
// we can have more threads than data, make sure we are not past the end of the data
if (rec_id_next >= row_offsets.size()) { return; }

auto field_start = raw_csv + row_offsets[rec_id];
Expand Down Expand Up @@ -317,8 +318,8 @@ __global__ void __launch_bounds__(csvparse_block_dim)
auto const raw_csv = data.data();
// thread IDs range per block, so also need the block id.
// this is entry into the field array - tid is an elements within the num_entries array
long const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
long const rec_id_next = rec_id + 1;
auto const rec_id = grid_1d::global_thread_id();
auto const rec_id_next = rec_id + 1;

// we can have more threads than data, make sure we are not past the end of the data
if (rec_id_next >= row_offsets.size()) return;
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/io/json/legacy/json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <io/utilities/column_type_histogram.hpp>
#include <io/utilities/parsing_utils.cuh>

#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/hashing/detail/murmurhash3_x86_32.cuh>
#include <cudf/types.hpp>
Expand All @@ -44,6 +45,7 @@
#include <thrust/pair.h>

using cudf::device_span;
using cudf::detail::grid_1d;

namespace cudf::io::json::detail::legacy {

Expand Down Expand Up @@ -252,7 +254,7 @@ __global__ void convert_data_to_columns_kernel(parse_options_view opts,
device_span<bitmask_type* const> const valid_fields,
device_span<cudf::size_type> const num_valid_fields)
{
auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
auto const rec_id = grid_1d::global_thread_id();
if (rec_id >= row_offsets.size()) return;

auto const row_data_range = get_row_data_range(data, row_offsets, rec_id);
Expand Down Expand Up @@ -327,7 +329,7 @@ __global__ void detect_data_types_kernel(
int num_columns,
device_span<cudf::io::column_type_histogram> const column_infos)
{
auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
auto const rec_id = grid_1d::global_thread_id();
if (rec_id >= row_offsets.size()) return;

auto const are_rows_objects = col_map.capacity() != 0;
Expand Down Expand Up @@ -485,7 +487,7 @@ __global__ void collect_keys_info_kernel(parse_options_view const options,
unsigned long long int* keys_cnt,
thrust::optional<mutable_table_device_view> keys_info)
{
auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
auto const rec_id = grid_1d::global_thread_id();
if (rec_id >= row_offsets.size()) return;

auto const row_data_range = get_row_data_range(data, row_offsets, rec_id);
Expand Down
8 changes: 3 additions & 5 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ __global__ void __launch_bounds__(block_size)
size_type entry_count{0};
size_type char_count{0};
// all threads should loop the same number of times
for (auto cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) {
for (thread_index_type cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) {
auto const is_valid = cur_row < end_row and col.is_valid(cur_row);

if (is_valid) {
Expand Down Expand Up @@ -215,11 +215,9 @@ __global__ void __launch_bounds__(block_size)
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});

auto cur_row = start_row + t;
thread_index_type cur_row = start_row + t;
while (cur_row < end_row) {
auto const is_valid = cur_row < col.size() and col.is_valid(cur_row);
Comment on lines 219 to -220
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant check with cur_row < end_row


if (is_valid) {
if (col.is_valid(cur_row)) {
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
auto const found_slot = map.find(cur_row, hash_fn, equality_fn);
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ __global__ void __launch_bounds__(init_threads_per_block)
device_2dspan<rowgroup_rows const> rowgroup_bounds)
{
__shared__ __align__(4) statistics_group group_g[init_groups_per_block];
uint32_t const col_id = blockIdx.y;
uint32_t const chunk_id = (blockIdx.x * init_groups_per_block) + threadIdx.y;
uint32_t const t = threadIdx.x;
auto const col_id = blockIdx.y;
auto const chunk_id = (blockIdx.x * init_groups_per_block) + threadIdx.y;
auto const t = threadIdx.x;
auto const num_rowgroups = rowgroup_bounds.size().first;
statistics_group* group = &group_g[threadIdx.y];
if (chunk_id < num_rowgroups and t == 0) {
Expand Down Expand Up @@ -75,11 +75,11 @@ __global__ void __launch_bounds__(block_size, 1)
using block_scan = cub::BlockScan<uint32_t, block_size, cub::BLOCK_SCAN_WARP_SCANS>;
__shared__ typename block_scan::TempStorage temp_storage;
volatile uint32_t stats_size = 0;
uint32_t t = threadIdx.x;
auto t = threadIdx.x;
__syncthreads();
for (uint32_t start = 0; start < statistics_count; start += block_size) {
for (thread_index_type start = 0; start < statistics_count; start += block_size) {
uint32_t stats_len = 0, stats_pos;
uint32_t idx = start + t;
auto idx = start + t;
if (idx < statistics_count) {
statistics_dtype const dtype = groups[idx].stats_dtype;
switch (dtype) {
Expand Down Expand Up @@ -222,8 +222,8 @@ __global__ void __launch_bounds__(encode_threads_per_block)
uint32_t statistics_count)
{
__shared__ __align__(8) stats_state_s state_g[encode_chunks_per_block];
uint32_t t = threadIdx.x;
uint32_t idx = blockIdx.x * encode_chunks_per_block + threadIdx.y;
auto t = threadIdx.x;
auto idx = blockIdx.x * encode_chunks_per_block + threadIdx.y;
stats_state_s* const s = &state_g[threadIdx.y];

// Encode and update actual bfr size
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ __global__ void __launch_bounds__(block_size)
if (row_in < first_row && t < 32) {
uint32_t skippedrows = min(static_cast<uint32_t>(first_row - row_in), nrows);
uint32_t skip_count = 0;
for (uint32_t i = t * 32; i < skippedrows; i += 32 * 32) {
for (thread_index_type i = t * 32; i < skippedrows; i += 32 * 32) {
// Need to arrange the bytes to apply mask properly.
uint32_t bits = (i + 32 <= skippedrows) ? s->vals.u32[i >> 5]
: (__byte_perm(s->vals.u32[i >> 5], 0, 0x0123) &
Expand Down Expand Up @@ -1435,7 +1435,7 @@ __global__ void __launch_bounds__(block_size)
s->top.data.end_row = s->chunk.start_row + s->chunk.num_rows;
s->top.data.buffered_count = 0;
if (s->top.data.end_row > first_row + max_num_rows) {
s->top.data.end_row = static_cast<uint32_t>(first_row + max_num_rows);
s->top.data.end_row = first_row + max_num_rows;
}
if (num_rowgroups > 0) {
s->top.data.end_row =
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ __global__ void copy_string_data(char* string_pool,
auto dst = &string_pool[offsets[blockIdx.x]];
auto src = str_val.ptr;

for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) {
for (thread_index_type i = threadIdx.x; i < str_val.length; i += blockDim.x) {
dst[i] = src[i];
}
if (threadIdx.x == 0) { str_val.ptr = dst; }
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ template <int block_size>
__global__ void __launch_bounds__(block_size)
initialize_chunk_hash_maps_kernel(device_span<EncColumnChunk> chunks)
{
auto chunk = chunks[blockIdx.x];
auto t = threadIdx.x;
auto const chunk = chunks[blockIdx.x];
auto const t = threadIdx.x;
// fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk
for (size_type i = 0; i < chunk.dict_map_size; i += block_size) {
for (thread_index_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
new (&chunk.dict_map_slots[t + i].first) map_type::atomic_key_type{KEY_SENTINEL};
new (&chunk.dict_map_slots[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL};
Expand Down Expand Up @@ -131,7 +131,7 @@ __global__ void __launch_bounds__(block_size)
cuco::empty_value{VALUE_SENTINEL});

__shared__ size_type total_num_dict_entries;
size_type val_idx = s_start_value_idx + t;
thread_index_type val_idx = s_start_value_idx + t;
while (val_idx - block_size < end_value_idx) {
auto const is_valid =
val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx);
Expand Down Expand Up @@ -252,11 +252,9 @@ __global__ void __launch_bounds__(block_size)
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});

auto val_idx = s_start_value_idx + t;
thread_index_type val_idx = s_start_value_idx + t;
while (val_idx < end_value_idx) {
auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx);

if (is_valid) {
if (data_col.is_valid(val_idx)) {
auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ constexpr int preprocess_block_size = num_rle_stream_decode_threads; // 512
constexpr int decode_block_size = 128;
constexpr int non_zero_buffer_size = decode_block_size * 2;

constexpr int rolling_index(int index) { return index & (non_zero_buffer_size - 1); }
constexpr int rolling_index(cudf::thread_index_type index)
{
return index & (non_zero_buffer_size - 1);
}
template <int lvl_buf_size>
constexpr int rolling_lvl_index(int index)
constexpr int rolling_lvl_index(cudf::thread_index_type index)
{
return index % lvl_buf_size;
}
Expand Down Expand Up @@ -339,7 +342,7 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s volatile* s,
int t)
{
uint8_t const* end = s->data_end;
int pos = s->dict_pos;
int64_t pos = s->dict_pos;

while (pos < target_pos) {
int is_literal, batch_len;
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ __global__ void __launch_bounds__(block_size)
{
__shared__ __align__(16) frag_init_state_s state_g;

frag_init_state_s* const s = &state_g;
uint32_t const t = threadIdx.x;
uint32_t const num_fragments_per_column = frag.size().second;
frag_init_state_s* const s = &state_g;
auto const t = threadIdx.x;
auto const num_fragments_per_column = frag.size().second;

if (t == 0) { s->col = col_desc[blockIdx.x]; }
__syncthreads();
Expand Down Expand Up @@ -1003,7 +1003,7 @@ __global__ void __launch_bounds__(128, 8)
} temp_storage;

page_enc_state_s* const s = &state_g;
uint32_t t = threadIdx.x;
auto const t = threadIdx.x;

if (t == 0) {
state_g = page_enc_state_s{};
Expand Down Expand Up @@ -1042,7 +1042,7 @@ __global__ void __launch_bounds__(128, 8)
while (s->rle_numvals < s->page.num_rows) {
uint32_t rle_numvals = s->rle_numvals;
uint32_t nrows = min(s->page.num_rows - rle_numvals, 128);
uint32_t row = s->page.start_row + rle_numvals + t;
auto row = s->page.start_row + rle_numvals + t;
// Definition level encodes validity. Checks the valid map and if it is valid, then sets the
// def_lvl accordingly and sets it in s->vals which is then given to RleEncode to encode
uint32_t def_lvl = [&]() {
Expand Down Expand Up @@ -1884,7 +1884,7 @@ __global__ void __launch_bounds__(128)
__shared__ __align__(8) EncPage page_g;
__shared__ __align__(8) unsigned char scratch[MIN_STATS_SCRATCH_SIZE];

uint32_t t = threadIdx.x;
auto const t = threadIdx.x;

if (t == 0) {
uint8_t *hdr_start, *hdr_end;
Expand Down Expand Up @@ -1972,7 +1972,7 @@ __global__ void __launch_bounds__(1024)
__shared__ __align__(8) EncColumnChunk ck_g;
__shared__ __align__(8) EncPage page_g;

uint32_t t = threadIdx.x;
auto const t = threadIdx.x;
uint8_t *dst, *dst_base;
EncPage const* first_page;
uint32_t num_pages, uncompressed_size;
Expand Down
22 changes: 11 additions & 11 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ __device__ void block_excl_sum(size_type* arr, size_type length, size_type initi
{
using block_scan = cub::BlockScan<size_type, block_size>;
__shared__ typename block_scan::TempStorage scan_storage;
int const t = threadIdx.x;
auto const t = threadIdx.x;

// do a series of block sums, storing results in arr as we go
for (int pos = 0; pos < length; pos += block_size) {
int const tidx = pos + t;
size_type tval = tidx < length ? arr[tidx] : 0;
for (thread_index_type pos = 0; pos < length; pos += block_size) {
auto const tidx = pos + t;
size_type tval = tidx < length ? arr[tidx] : 0;
size_type block_sum;
block_scan(scan_storage).ExclusiveScan(tval, tval, initial_value, cub::Sum(), block_sum);
if (tidx < length) { arr[tidx] = tval; }
Expand Down Expand Up @@ -144,7 +144,7 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
typename block_scan::TempStorage scan_storage;
} temp_storage;

int const t = threadIdx.x;
auto const t = threadIdx.x;

// decode batches of level stream data using rle_stream objects and use the results to
// calculate start and end value positions in the encoded string data.
Expand Down Expand Up @@ -213,7 +213,7 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
bool end_value_set = false;

while (processed < s->page.num_input_values) {
int start_val = processed;
thread_index_type start_val = processed;

if (has_repetition) {
decoders[level_type::REPETITION].decode_next(t);
Expand All @@ -237,8 +237,8 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,

// do something with the level data
while (start_val < processed) {
int idx_t = start_val + t;
int idx = rolling_lvl_index<lvl_buf_size>(idx_t);
auto const idx_t = start_val + t;
auto const idx = rolling_lvl_index<lvl_buf_size>(idx_t);

// get absolute thread row index
int is_new_row = idx_t < processed && (!has_repetition || rep_decode[idx] == 0);
Expand Down Expand Up @@ -329,14 +329,14 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
else {
int num_nulls = 0;
while (processed < s->page.num_input_values) {
int start_val = processed;
thread_index_type start_val = processed;
processed += decoders[level_type::DEFINITION].decode_next(t);
__syncthreads();

while (start_val < processed) {
int idx_t = start_val + t;
auto const idx_t = start_val + t;
if (idx_t < processed) {
int idx = rolling_lvl_index<lvl_buf_size>(idx_t);
auto const idx = rolling_lvl_index<lvl_buf_size>(idx_t);
if (def_decode[idx] < max_def) { num_nulls++; }
}
start_val += preprocess_block_size;
Expand Down
Loading