Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cudf::thread_index_type in cuIO to prevent overflow in row indexing #13910

Merged
merged 18 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions cpp/src/io/csv/csv_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,10 @@ __global__ void __launch_bounds__(csvparse_block_dim)

// ThreadIds range per block, so also need the blockId
// This is entry into the fields; threadId is an element within `num_records`
long const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
long const rec_id_next = rec_id + 1;
auto const rec_id = cudf::thread_index_type{threadIdx.x} + (blockDim.x * blockIdx.x);
auto const rec_id_next = rec_id + 1;

// we can have more threads than data, make sure we are not past the end of
// the data
// we can have more threads than data, make sure we are not past the end of the data
if (rec_id_next >= row_offsets.size()) { return; }

auto field_start = raw_csv + row_offsets[rec_id];
Expand Down Expand Up @@ -317,8 +316,8 @@ __global__ void __launch_bounds__(csvparse_block_dim)
auto const raw_csv = data.data();
// thread IDs range per block, so also need the block id.
// this is entry into the field array - tid is an elements within the num_entries array
long const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
long const rec_id_next = rec_id + 1;
auto const rec_id = cudf::thread_index_type{threadIdx.x} + (blockDim.x * blockIdx.x);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Promoting just the threadId.x does not appear to be enough to promote the multiplication
https://godbolt.org/z/coqPKqfWe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the promotion to the multiplication

auto const rec_id_next = rec_id + 1;

// we can have more threads than data, make sure we are not past the end of the data
if (rec_id_next >= row_offsets.size()) return;
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/json/legacy/json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ __global__ void convert_data_to_columns_kernel(parse_options_view opts,
device_span<bitmask_type* const> const valid_fields,
device_span<cudf::size_type> const num_valid_fields)
{
auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
auto const rec_id = cudf::thread_index_type{threadIdx.x} + (blockDim.x * blockIdx.x);
if (rec_id >= row_offsets.size()) return;

auto const row_data_range = get_row_data_range(data, row_offsets, rec_id);
Expand Down Expand Up @@ -327,7 +327,7 @@ __global__ void detect_data_types_kernel(
int num_columns,
device_span<cudf::io::column_type_histogram> const column_infos)
{
auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
auto const rec_id = cudf::thread_index_type{threadIdx.x} + (blockDim.x * blockIdx.x);
if (rec_id >= row_offsets.size()) return;

auto const are_rows_objects = col_map.capacity() != 0;
Expand Down Expand Up @@ -485,7 +485,7 @@ __global__ void collect_keys_info_kernel(parse_options_view const options,
unsigned long long int* keys_cnt,
thrust::optional<mutable_table_device_view> keys_info)
{
auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x);
auto const rec_id = cudf::thread_index_type{threadIdx.x} + (blockDim.x * blockIdx.x);
if (rec_id >= row_offsets.size()) return;

auto const row_data_range = get_row_data_range(data, row_offsets, rec_id);
Expand Down
20 changes: 9 additions & 11 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ __global__ void __launch_bounds__(block_size)
populate_dictionary_hash_maps_kernel(device_2dspan<stripe_dictionary> dictionaries,
device_span<orc_column_device_view const> columns)
{
auto const col_idx = blockIdx.x;
auto const stripe_idx = blockIdx.y;
auto const t = threadIdx.x;
auto& dict = dictionaries[col_idx][stripe_idx];
auto const& col = columns[dict.column_idx];
auto const col_idx = blockIdx.x;
auto const stripe_idx = blockIdx.y;
cudf::thread_index_type const t = threadIdx.x;
auto& dict = dictionaries[col_idx][stripe_idx];
auto const& col = columns[dict.column_idx];

// Make a view of the hash map
auto hash_map_mutable = map_type::device_mutable_view(dict.map_slots.data(),
Expand Down Expand Up @@ -206,9 +206,9 @@ __global__ void __launch_bounds__(block_size)

if (not dict.is_enabled) { return; }

auto const t = threadIdx.x;
auto const start_row = dict.start_row;
auto const end_row = dict.start_row + dict.num_rows;
cudf::thread_index_type const t = threadIdx.x;
auto const start_row = dict.start_row;
auto const end_row = dict.start_row + dict.num_rows;

auto const map = map_type::device_view(dict.map_slots.data(),
dict.map_slots.size(),
Expand All @@ -217,9 +217,7 @@ __global__ void __launch_bounds__(block_size)

auto cur_row = start_row + t;
while (cur_row < end_row) {
auto const is_valid = cur_row < col.size() and col.is_valid(cur_row);
Comment on lines 219 to -220
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant check with cur_row < end_row


if (is_valid) {
if (col.is_valid(cur_row)) {
auto const hash_fn = hash_functor{col};
auto const equality_fn = equality_functor{col};
auto const found_slot = map.find(cur_row, hash_fn, equality_fn);
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ __global__ void __launch_bounds__(init_threads_per_block)
device_2dspan<rowgroup_rows const> rowgroup_bounds)
{
__shared__ __align__(4) statistics_group group_g[init_groups_per_block];
uint32_t const col_id = blockIdx.y;
uint32_t const chunk_id = (blockIdx.x * init_groups_per_block) + threadIdx.y;
uint32_t const t = threadIdx.x;
auto const col_id = blockIdx.y;
auto const chunk_id = (blockIdx.x * init_groups_per_block) + threadIdx.y;
auto const t = threadIdx.x;
auto const num_rowgroups = rowgroup_bounds.size().first;
statistics_group* group = &group_g[threadIdx.y];
if (chunk_id < num_rowgroups and t == 0) {
Expand Down Expand Up @@ -75,11 +75,11 @@ __global__ void __launch_bounds__(block_size, 1)
using block_scan = cub::BlockScan<uint32_t, block_size, cub::BLOCK_SCAN_WARP_SCANS>;
__shared__ typename block_scan::TempStorage temp_storage;
volatile uint32_t stats_size = 0;
uint32_t t = threadIdx.x;
auto t = threadIdx.x;
__syncthreads();
for (uint32_t start = 0; start < statistics_count; start += block_size) {
uint32_t stats_len = 0, stats_pos;
uint32_t idx = start + t;
auto idx = start + t;
if (idx < statistics_count) {
statistics_dtype const dtype = groups[idx].stats_dtype;
switch (dtype) {
Expand Down Expand Up @@ -222,8 +222,8 @@ __global__ void __launch_bounds__(encode_threads_per_block)
uint32_t statistics_count)
{
__shared__ __align__(8) stats_state_s state_g[encode_chunks_per_block];
uint32_t t = threadIdx.x;
uint32_t idx = blockIdx.x * encode_chunks_per_block + threadIdx.y;
auto t = threadIdx.x;
auto idx = blockIdx.x * encode_chunks_per_block + threadIdx.y;
stats_state_s* const s = &state_g[threadIdx.y];

// Encode and update actual bfr size
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ __global__ void __launch_bounds__(block_size)
if (row_in < first_row && t < 32) {
uint32_t skippedrows = min(static_cast<uint32_t>(first_row - row_in), nrows);
uint32_t skip_count = 0;
for (uint32_t i = t * 32; i < skippedrows; i += 32 * 32) {
for (long i = t * 32; i < skippedrows; i += 32 * 32) {
// Need to arrange the bytes to apply mask properly.
uint32_t bits = (i + 32 <= skippedrows) ? s->vals.u32[i >> 5]
: (__byte_perm(s->vals.u32[i >> 5], 0, 0x0123) &
Expand Down Expand Up @@ -1435,7 +1435,7 @@ __global__ void __launch_bounds__(block_size)
s->top.data.end_row = s->chunk.start_row + s->chunk.num_rows;
s->top.data.buffered_count = 0;
if (s->top.data.end_row > first_row + max_num_rows) {
s->top.data.end_row = static_cast<uint32_t>(first_row + max_num_rows);
s->top.data.end_row = first_row + max_num_rows;
}
if (num_rowgroups > 0) {
s->top.data.end_row =
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ __global__ void copy_string_data(char* string_pool,
auto dst = &string_pool[offsets[blockIdx.x]];
auto src = str_val.ptr;

for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) {
for (int64_t i = threadIdx.x; i < str_val.length; i += blockDim.x) {
dst[i] = src[i];
}
if (threadIdx.x == 0) { str_val.ptr = dst; }
Expand Down
38 changes: 18 additions & 20 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ template <int block_size>
__global__ void __launch_bounds__(block_size)
initialize_chunk_hash_maps_kernel(device_span<EncColumnChunk> chunks)
{
auto chunk = chunks[blockIdx.x];
auto t = threadIdx.x;
auto chunk = chunks[blockIdx.x];
cudf::thread_index_type t = threadIdx.x;
// fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk
for (size_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
Expand Down Expand Up @@ -103,12 +103,12 @@ template <int block_size>
__global__ void __launch_bounds__(block_size)
populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan<gpu::PageFragment const> frags)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
auto t = threadIdx.x;
auto frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto col = chunk->col_desc;
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
cudf::thread_index_type t = threadIdx.x;
auto frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto col = chunk->col_desc;

if (not chunk->use_dictionary) { return; }

Expand All @@ -131,7 +131,7 @@ __global__ void __launch_bounds__(block_size)
cuco::empty_value{VALUE_SENTINEL});

__shared__ size_type total_num_dict_entries;
size_type val_idx = s_start_value_idx + t;
auto val_idx = s_start_value_idx + t;
while (val_idx - block_size < end_value_idx) {
auto const is_valid =
val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx);
Expand Down Expand Up @@ -197,8 +197,8 @@ __global__ void __launch_bounds__(block_size)
auto& chunk = chunks[blockIdx.x];
if (not chunk.use_dictionary) { return; }

auto t = threadIdx.x;
auto map = map_type::device_view(chunk.dict_map_slots,
auto const t = threadIdx.x;
auto map = map_type::device_view(chunk.dict_map_slots,
chunk.dict_map_size,
cuco::empty_key{KEY_SENTINEL},
cuco::empty_value{VALUE_SENTINEL});
Expand Down Expand Up @@ -228,12 +228,12 @@ template <int block_size>
__global__ void __launch_bounds__(block_size)
get_dictionary_indices_kernel(cudf::detail::device_2dspan<gpu::PageFragment const> frags)
{
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
auto t = threadIdx.x;
auto frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto col = chunk->col_desc;
auto col_idx = blockIdx.y;
auto block_x = blockIdx.x;
cudf::thread_index_type t = threadIdx.x;
auto frag = frags[col_idx][block_x];
auto chunk = frag.chunk;
auto col = chunk->col_desc;

if (not chunk->use_dictionary) { return; }

Expand All @@ -254,9 +254,7 @@ __global__ void __launch_bounds__(block_size)

auto val_idx = s_start_value_idx + t;
while (val_idx < end_value_idx) {
auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx);

if (is_valid) {
if (data_col.is_valid(val_idx)) {
auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx);
cudf_assert(found_slot != map.end() &&
"Unable to find value in map in dictionary index construction");
Expand Down
11 changes: 7 additions & 4 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ constexpr int preprocess_block_size = num_rle_stream_decode_threads; // 512
constexpr int decode_block_size = 128;
constexpr int non_zero_buffer_size = decode_block_size * 2;

constexpr int rolling_index(int index) { return index & (non_zero_buffer_size - 1); }
constexpr int rolling_index(cudf::thread_index_type index)
{
return index & (non_zero_buffer_size - 1);
}
template <int lvl_buf_size>
constexpr int rolling_lvl_index(int index)
constexpr int rolling_lvl_index(cudf::thread_index_type index)
{
return index % lvl_buf_size;
}
Expand Down Expand Up @@ -338,8 +341,8 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s volatile* s,
int target_pos,
int t)
{
uint8_t const* end = s->data_end;
int pos = s->dict_pos;
uint8_t const* end = s->data_end;
cudf::thread_index_type pos = s->dict_pos;

while (pos < target_pos) {
int is_literal, batch_len;
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ __global__ void __launch_bounds__(block_size)
{
__shared__ __align__(16) frag_init_state_s state_g;

frag_init_state_s* const s = &state_g;
uint32_t const t = threadIdx.x;
uint32_t const num_fragments_per_column = frag.size().second;
frag_init_state_s* const s = &state_g;
auto const t = threadIdx.x;
auto const num_fragments_per_column = frag.size().second;

if (t == 0) { s->col = col_desc[blockIdx.x]; }
__syncthreads();
Expand Down Expand Up @@ -1003,7 +1003,7 @@ __global__ void __launch_bounds__(128, 8)
} temp_storage;

page_enc_state_s* const s = &state_g;
uint32_t t = threadIdx.x;
auto const t = threadIdx.x;

if (t == 0) {
state_g = page_enc_state_s{};
Expand Down Expand Up @@ -1042,7 +1042,7 @@ __global__ void __launch_bounds__(128, 8)
while (s->rle_numvals < s->page.num_rows) {
uint32_t rle_numvals = s->rle_numvals;
uint32_t nrows = min(s->page.num_rows - rle_numvals, 128);
uint32_t row = s->page.start_row + rle_numvals + t;
auto row = s->page.start_row + rle_numvals + t;
// Definition level encodes validity. Checks the valid map and if it is valid, then sets the
// def_lvl accordingly and sets it in s->vals which is then given to RleEncode to encode
uint32_t def_lvl = [&]() {
Expand Down Expand Up @@ -1884,7 +1884,7 @@ __global__ void __launch_bounds__(128)
__shared__ __align__(8) EncPage page_g;
__shared__ __align__(8) unsigned char scratch[MIN_STATS_SCRATCH_SIZE];

uint32_t t = threadIdx.x;
auto const t = threadIdx.x;

if (t == 0) {
uint8_t *hdr_start, *hdr_end;
Expand Down Expand Up @@ -1972,7 +1972,7 @@ __global__ void __launch_bounds__(1024)
__shared__ __align__(8) EncColumnChunk ck_g;
__shared__ __align__(8) EncPage page_g;

uint32_t t = threadIdx.x;
auto const t = threadIdx.x;
uint8_t *dst, *dst_base;
EncPage const* first_page;
uint32_t num_pages, uncompressed_size;
Expand Down
22 changes: 11 additions & 11 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ __device__ void block_excl_sum(size_type* arr, size_type length, size_type initi
{
using block_scan = cub::BlockScan<size_type, block_size>;
__shared__ typename block_scan::TempStorage scan_storage;
int const t = threadIdx.x;
auto const t = threadIdx.x;

// do a series of block sums, storing results in arr as we go
for (int pos = 0; pos < length; pos += block_size) {
int const tidx = pos + t;
size_type tval = tidx < length ? arr[tidx] : 0;
for (long pos = 0; pos < length; pos += block_size) {
auto const tidx = pos + t;
size_type tval = tidx < length ? arr[tidx] : 0;
size_type block_sum;
block_scan(scan_storage).ExclusiveScan(tval, tval, initial_value, cub::Sum(), block_sum);
if (tidx < length) { arr[tidx] = tval; }
Expand Down Expand Up @@ -144,7 +144,7 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
typename block_scan::TempStorage scan_storage;
} temp_storage;

int const t = threadIdx.x;
auto const t = threadIdx.x;

// decode batches of level stream data using rle_stream objects and use the results to
// calculate start and end value positions in the encoded string data.
Expand Down Expand Up @@ -213,7 +213,7 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
bool end_value_set = false;

while (processed < s->page.num_input_values) {
int start_val = processed;
long start_val = processed;

if (has_repetition) {
decoders[level_type::REPETITION].decode_next(t);
Expand All @@ -237,8 +237,8 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,

// do something with the level data
while (start_val < processed) {
int idx_t = start_val + t;
int idx = rolling_lvl_index<lvl_buf_size>(idx_t);
cudf::thread_index_type idx_t = start_val + t;
auto idx = rolling_lvl_index<lvl_buf_size>(idx_t);

// get absolute thread row index
int is_new_row = idx_t < processed && (!has_repetition || rep_decode[idx] == 0);
Expand Down Expand Up @@ -329,14 +329,14 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
else {
int num_nulls = 0;
while (processed < s->page.num_input_values) {
int start_val = processed;
long start_val = processed;
processed += decoders[level_type::DEFINITION].decode_next(t);
__syncthreads();

while (start_val < processed) {
int idx_t = start_val + t;
cudf::thread_index_type idx_t = start_val + t;
if (idx_t < processed) {
int idx = rolling_lvl_index<lvl_buf_size>(idx_t);
auto idx = rolling_lvl_index<lvl_buf_size>(idx_t);
if (def_decode[idx] < max_def) { num_nulls++; }
}
start_val += preprocess_block_size;
Expand Down
Loading