From 2700111e6b300cfff41b4e9137093bd22a00d1d4 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 23 Aug 2023 10:52:35 -0700 Subject: [PATCH] Use `cudf::thread_index_type` in cuIO to prevent overflow in row indexing (#13910) Use wider type for indexing when rows are indexed in pattern ``` for (auto row = start_row; row < num_rows; row += block_size) { if (is_within_bounds) ... } ``` or ``` auto t = threadIdx.x; auto row = block_start + t; if (is_within_bounds) ... ``` Overflow can happen when the number of rows is so close to `max` that adding block size pushes the index over the max. Also sprinkled auto where increased size is not needed. Also removed a few redundant conditions. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Mark Harris (https://github.com/harrism) - Yunsong Wang (https://github.com/PointKernel) URL: https://github.com/rapidsai/cudf/pull/13910 --- cpp/include/cudf/detail/utilities/cuda.cuh | 27 ++++++++++++ cpp/src/io/csv/csv_gpu.cu | 13 +++--- cpp/src/io/json/legacy/json_gpu.cu | 8 ++-- cpp/src/io/orc/dict_enc.cu | 8 ++-- cpp/src/io/orc/stats_enc.cu | 16 +++---- cpp/src/io/orc/stripe_data.cu | 4 +- cpp/src/io/orc/writer_impl.cu | 2 +- cpp/src/io/parquet/chunk_dict.cu | 14 +++---- cpp/src/io/parquet/page_decode.cuh | 9 ++-- cpp/src/io/parquet/page_enc.cu | 14 +++---- cpp/src/io/parquet/page_string_decode.cu | 22 +++++----- cpp/src/io/text/multibyte_split.cu | 49 ++++++++++------------ cpp/src/io/utilities/parsing_utils.cu | 5 ++- 13 files changed, 108 insertions(+), 83 deletions(-) diff --git a/cpp/include/cudf/detail/utilities/cuda.cuh b/cpp/include/cudf/detail/utilities/cuda.cuh index 73c8969e207..c95189f1f94 100644 --- a/cpp/include/cudf/detail/utilities/cuda.cuh +++ b/cpp/include/cudf/detail/utilities/cuda.cuh @@ -65,6 +65,33 @@ class grid_1d { CUDF_EXPECTS(num_threads_per_block > 0, "num_threads_per_block must be > 0"); CUDF_EXPECTS(num_blocks > 0, "num_blocks must be > 0"); } + + /** + * @brief Returns the global thread index in a 1D grid. + * + * The returned index is unique across the entire grid. + * + * @param thread_id The thread index within the block + * @param block_id The block index within the grid + * @param num_threads_per_block The number of threads per block + * @return thread_index_type The global thread index + */ + static constexpr thread_index_type global_thread_id(thread_index_type thread_id, + thread_index_type block_id, + thread_index_type num_threads_per_block) + { + return thread_id + block_id * num_threads_per_block; + } + + /** + * @brief Returns the global thread index of the current thread in a 1D grid. + * + * @return thread_index_type The global thread index + */ + static __device__ thread_index_type global_thread_id() + { + return global_thread_id(threadIdx.x, blockIdx.x, blockDim.x); + } }; /** diff --git a/cpp/src/io/csv/csv_gpu.cu b/cpp/src/io/csv/csv_gpu.cu index ad8858c75d6..248e17669bc 100644 --- a/cpp/src/io/csv/csv_gpu.cu +++ b/cpp/src/io/csv/csv_gpu.cu @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -45,6 +46,7 @@ using namespace ::cudf::io; using cudf::device_span; +using cudf::detail::grid_1d; namespace cudf { namespace io { @@ -177,11 +179,10 @@ __global__ void __launch_bounds__(csvparse_block_dim) // ThreadIds range per block, so also need the blockId // This is entry into the fields; threadId is an element within `num_records` - long const rec_id = threadIdx.x + (blockDim.x * blockIdx.x); - long const rec_id_next = rec_id + 1; + auto const rec_id = grid_1d::global_thread_id(); + auto const rec_id_next = rec_id + 1; - // we can have more threads than data, make sure we are not past the end of - // the data + // we can have more threads than data, make sure we are not past the end of the data if (rec_id_next >= row_offsets.size()) { return; } auto field_start = raw_csv + row_offsets[rec_id]; @@ -317,8 +318,8 @@ __global__ void __launch_bounds__(csvparse_block_dim) auto const raw_csv = data.data(); // thread IDs range per block, so also need the block id. // this is entry into the field array - tid is an elements within the num_entries array - long const rec_id = threadIdx.x + (blockDim.x * blockIdx.x); - long const rec_id_next = rec_id + 1; + auto const rec_id = grid_1d::global_thread_id(); + auto const rec_id_next = rec_id + 1; // we can have more threads than data, make sure we are not past the end of the data if (rec_id_next >= row_offsets.size()) return; diff --git a/cpp/src/io/json/legacy/json_gpu.cu b/cpp/src/io/json/legacy/json_gpu.cu index d28d5614591..b358cc2071b 100644 --- a/cpp/src/io/json/legacy/json_gpu.cu +++ b/cpp/src/io/json/legacy/json_gpu.cu @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -44,6 +45,7 @@ #include using cudf::device_span; +using cudf::detail::grid_1d; namespace cudf::io::json::detail::legacy { @@ -252,7 +254,7 @@ __global__ void convert_data_to_columns_kernel(parse_options_view opts, device_span const valid_fields, device_span const num_valid_fields) { - auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x); + auto const rec_id = grid_1d::global_thread_id(); if (rec_id >= row_offsets.size()) return; auto const row_data_range = get_row_data_range(data, row_offsets, rec_id); @@ -327,7 +329,7 @@ __global__ void detect_data_types_kernel( int num_columns, device_span const column_infos) { - auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x); + auto const rec_id = grid_1d::global_thread_id(); if (rec_id >= row_offsets.size()) return; auto const are_rows_objects = col_map.capacity() != 0; @@ -485,7 +487,7 @@ __global__ void collect_keys_info_kernel(parse_options_view const options, unsigned long long int* keys_cnt, thrust::optional keys_info) { - auto const rec_id = threadIdx.x + (blockDim.x * blockIdx.x); + auto const rec_id = grid_1d::global_thread_id(); if (rec_id >= row_offsets.size()) return; auto const row_data_range = get_row_data_range(data, row_offsets, rec_id); diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index c069cb67cec..0007530a5af 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -130,7 +130,7 @@ __global__ void __launch_bounds__(block_size) size_type entry_count{0}; size_type char_count{0}; // all threads should loop the same number of times - for (auto cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) { + for (thread_index_type cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) { auto const is_valid = cur_row < end_row and col.is_valid(cur_row); if (is_valid) { @@ -215,11 +215,9 @@ __global__ void __launch_bounds__(block_size) cuco::empty_key{KEY_SENTINEL}, cuco::empty_value{VALUE_SENTINEL}); - auto cur_row = start_row + t; + thread_index_type cur_row = start_row + t; while (cur_row < end_row) { - auto const is_valid = cur_row < col.size() and col.is_valid(cur_row); - - if (is_valid) { + if (col.is_valid(cur_row)) { auto const hash_fn = hash_functor{col}; auto const equality_fn = equality_functor{col}; auto const found_slot = map.find(cur_row, hash_fn, equality_fn); diff --git a/cpp/src/io/orc/stats_enc.cu b/cpp/src/io/orc/stats_enc.cu index 8fada7d5d72..069841980c1 100644 --- a/cpp/src/io/orc/stats_enc.cu +++ b/cpp/src/io/orc/stats_enc.cu @@ -36,9 +36,9 @@ __global__ void __launch_bounds__(init_threads_per_block) device_2dspan rowgroup_bounds) { __shared__ __align__(4) statistics_group group_g[init_groups_per_block]; - uint32_t const col_id = blockIdx.y; - uint32_t const chunk_id = (blockIdx.x * init_groups_per_block) + threadIdx.y; - uint32_t const t = threadIdx.x; + auto const col_id = blockIdx.y; + auto const chunk_id = (blockIdx.x * init_groups_per_block) + threadIdx.y; + auto const t = threadIdx.x; auto const num_rowgroups = rowgroup_bounds.size().first; statistics_group* group = &group_g[threadIdx.y]; if (chunk_id < num_rowgroups and t == 0) { @@ -75,11 +75,11 @@ __global__ void __launch_bounds__(block_size, 1) using block_scan = cub::BlockScan; __shared__ typename block_scan::TempStorage temp_storage; volatile uint32_t stats_size = 0; - uint32_t t = threadIdx.x; + auto t = threadIdx.x; __syncthreads(); - for (uint32_t start = 0; start < statistics_count; start += block_size) { + for (thread_index_type start = 0; start < statistics_count; start += block_size) { uint32_t stats_len = 0, stats_pos; - uint32_t idx = start + t; + auto idx = start + t; if (idx < statistics_count) { statistics_dtype const dtype = groups[idx].stats_dtype; switch (dtype) { @@ -222,8 +222,8 @@ __global__ void __launch_bounds__(encode_threads_per_block) uint32_t statistics_count) { __shared__ __align__(8) stats_state_s state_g[encode_chunks_per_block]; - uint32_t t = threadIdx.x; - uint32_t idx = blockIdx.x * encode_chunks_per_block + threadIdx.y; + auto t = threadIdx.x; + auto idx = blockIdx.x * encode_chunks_per_block + threadIdx.y; stats_state_s* const s = &state_g[threadIdx.y]; // Encode and update actual bfr size diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 592233c2418..b66ca827119 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1206,7 +1206,7 @@ __global__ void __launch_bounds__(block_size) if (row_in < first_row && t < 32) { uint32_t skippedrows = min(static_cast(first_row - row_in), nrows); uint32_t skip_count = 0; - for (uint32_t i = t * 32; i < skippedrows; i += 32 * 32) { + for (thread_index_type i = t * 32; i < skippedrows; i += 32 * 32) { // Need to arrange the bytes to apply mask properly. uint32_t bits = (i + 32 <= skippedrows) ? s->vals.u32[i >> 5] : (__byte_perm(s->vals.u32[i >> 5], 0, 0x0123) & @@ -1435,7 +1435,7 @@ __global__ void __launch_bounds__(block_size) s->top.data.end_row = s->chunk.start_row + s->chunk.num_rows; s->top.data.buffered_count = 0; if (s->top.data.end_row > first_row + max_num_rows) { - s->top.data.end_row = static_cast(first_row + max_num_rows); + s->top.data.end_row = first_row + max_num_rows; } if (num_rowgroups > 0) { s->top.data.end_row = diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 6a3c5f0134d..3d8bdb4ec97 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -372,7 +372,7 @@ __global__ void copy_string_data(char* string_pool, auto dst = &string_pool[offsets[blockIdx.x]]; auto src = str_val.ptr; - for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) { + for (thread_index_type i = threadIdx.x; i < str_val.length; i += blockDim.x) { dst[i] = src[i]; } if (threadIdx.x == 0) { str_val.ptr = dst; } diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 72e38fd2e1c..9ff1869edde 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -36,10 +36,10 @@ template __global__ void __launch_bounds__(block_size) initialize_chunk_hash_maps_kernel(device_span chunks) { - auto chunk = chunks[blockIdx.x]; - auto t = threadIdx.x; + auto const chunk = chunks[blockIdx.x]; + auto const t = threadIdx.x; // fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk - for (size_type i = 0; i < chunk.dict_map_size; i += block_size) { + for (thread_index_type i = 0; i < chunk.dict_map_size; i += block_size) { if (t + i < chunk.dict_map_size) { new (&chunk.dict_map_slots[t + i].first) map_type::atomic_key_type{KEY_SENTINEL}; new (&chunk.dict_map_slots[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL}; @@ -131,7 +131,7 @@ __global__ void __launch_bounds__(block_size) cuco::empty_value{VALUE_SENTINEL}); __shared__ size_type total_num_dict_entries; - size_type val_idx = s_start_value_idx + t; + thread_index_type val_idx = s_start_value_idx + t; while (val_idx - block_size < end_value_idx) { auto const is_valid = val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx); @@ -252,11 +252,9 @@ __global__ void __launch_bounds__(block_size) cuco::empty_key{KEY_SENTINEL}, cuco::empty_value{VALUE_SENTINEL}); - auto val_idx = s_start_value_idx + t; + thread_index_type val_idx = s_start_value_idx + t; while (val_idx < end_value_idx) { - auto const is_valid = val_idx < data_col.size() and data_col.is_valid(val_idx); - - if (is_valid) { + if (data_col.is_valid(val_idx)) { auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx); cudf_assert(found_slot != map.end() && "Unable to find value in map in dictionary index construction"); diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index b2c09980b6e..f649eb97680 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -29,9 +29,12 @@ constexpr int preprocess_block_size = num_rle_stream_decode_threads; // 512 constexpr int decode_block_size = 128; constexpr int non_zero_buffer_size = decode_block_size * 2; -constexpr int rolling_index(int index) { return index & (non_zero_buffer_size - 1); } +constexpr int rolling_index(cudf::thread_index_type index) +{ + return index & (non_zero_buffer_size - 1); +} template -constexpr int rolling_lvl_index(int index) +constexpr int rolling_lvl_index(cudf::thread_index_type index) { return index % lvl_buf_size; } @@ -339,7 +342,7 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s volatile* s, int t) { uint8_t const* end = s->data_end; - int pos = s->dict_pos; + int64_t pos = s->dict_pos; while (pos < target_pos) { int is_literal, batch_len; diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 20993d12af8..d066b454840 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -242,9 +242,9 @@ __global__ void __launch_bounds__(block_size) { __shared__ __align__(16) frag_init_state_s state_g; - frag_init_state_s* const s = &state_g; - uint32_t const t = threadIdx.x; - uint32_t const num_fragments_per_column = frag.size().second; + frag_init_state_s* const s = &state_g; + auto const t = threadIdx.x; + auto const num_fragments_per_column = frag.size().second; if (t == 0) { s->col = col_desc[blockIdx.x]; } __syncthreads(); @@ -1003,7 +1003,7 @@ __global__ void __launch_bounds__(128, 8) } temp_storage; page_enc_state_s* const s = &state_g; - uint32_t t = threadIdx.x; + auto const t = threadIdx.x; if (t == 0) { state_g = page_enc_state_s{}; @@ -1042,7 +1042,7 @@ __global__ void __launch_bounds__(128, 8) while (s->rle_numvals < s->page.num_rows) { uint32_t rle_numvals = s->rle_numvals; uint32_t nrows = min(s->page.num_rows - rle_numvals, 128); - uint32_t row = s->page.start_row + rle_numvals + t; + auto row = s->page.start_row + rle_numvals + t; // Definition level encodes validity. Checks the valid map and if it is valid, then sets the // def_lvl accordingly and sets it in s->vals which is then given to RleEncode to encode uint32_t def_lvl = [&]() { @@ -1884,7 +1884,7 @@ __global__ void __launch_bounds__(128) __shared__ __align__(8) EncPage page_g; __shared__ __align__(8) unsigned char scratch[MIN_STATS_SCRATCH_SIZE]; - uint32_t t = threadIdx.x; + auto const t = threadIdx.x; if (t == 0) { uint8_t *hdr_start, *hdr_end; @@ -1972,7 +1972,7 @@ __global__ void __launch_bounds__(1024) __shared__ __align__(8) EncColumnChunk ck_g; __shared__ __align__(8) EncPage page_g; - uint32_t t = threadIdx.x; + auto const t = threadIdx.x; uint8_t *dst, *dst_base; EncPage const* first_page; uint32_t num_pages, uncompressed_size; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 9173d408192..bcab14f76c5 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -100,12 +100,12 @@ __device__ void block_excl_sum(size_type* arr, size_type length, size_type initi { using block_scan = cub::BlockScan; __shared__ typename block_scan::TempStorage scan_storage; - int const t = threadIdx.x; + auto const t = threadIdx.x; // do a series of block sums, storing results in arr as we go - for (int pos = 0; pos < length; pos += block_size) { - int const tidx = pos + t; - size_type tval = tidx < length ? arr[tidx] : 0; + for (thread_index_type pos = 0; pos < length; pos += block_size) { + auto const tidx = pos + t; + size_type tval = tidx < length ? arr[tidx] : 0; size_type block_sum; block_scan(scan_storage).ExclusiveScan(tval, tval, initial_value, cub::Sum(), block_sum); if (tidx < length) { arr[tidx] = tval; } @@ -144,7 +144,7 @@ __device__ thrust::pair page_bounds(page_state_s* const s, typename block_scan::TempStorage scan_storage; } temp_storage; - int const t = threadIdx.x; + auto const t = threadIdx.x; // decode batches of level stream data using rle_stream objects and use the results to // calculate start and end value positions in the encoded string data. @@ -213,7 +213,7 @@ __device__ thrust::pair page_bounds(page_state_s* const s, bool end_value_set = false; while (processed < s->page.num_input_values) { - int start_val = processed; + thread_index_type start_val = processed; if (has_repetition) { decoders[level_type::REPETITION].decode_next(t); @@ -237,8 +237,8 @@ __device__ thrust::pair page_bounds(page_state_s* const s, // do something with the level data while (start_val < processed) { - int idx_t = start_val + t; - int idx = rolling_lvl_index(idx_t); + auto const idx_t = start_val + t; + auto const idx = rolling_lvl_index(idx_t); // get absolute thread row index int is_new_row = idx_t < processed && (!has_repetition || rep_decode[idx] == 0); @@ -329,14 +329,14 @@ __device__ thrust::pair page_bounds(page_state_s* const s, else { int num_nulls = 0; while (processed < s->page.num_input_values) { - int start_val = processed; + thread_index_type start_val = processed; processed += decoders[level_type::DEFINITION].decode_next(t); __syncthreads(); while (start_val < processed) { - int idx_t = start_val + t; + auto const idx_t = start_val + t; if (idx_t < processed) { - int idx = rolling_lvl_index(idx_t); + auto const idx = rolling_lvl_index(idx_t); if (def_decode[idx] < max_def) { num_nulls++; } } start_val += preprocess_block_size; diff --git a/cpp/src/io/text/multibyte_split.cu b/cpp/src/io/text/multibyte_split.cu index a04c7d84463..818bbc0a18a 100644 --- a/cpp/src/io/text/multibyte_split.cu +++ b/cpp/src/io/text/multibyte_split.cu @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -143,7 +144,7 @@ __global__ void multibyte_split_init_kernel( cudf::io::text::detail::scan_tile_status status = cudf::io::text::detail::scan_tile_status::invalid) { - auto const thread_idx = blockIdx.x * blockDim.x + threadIdx.x; + auto const thread_idx = cudf::detail::grid_1d::global_thread_id(); if (thread_idx < num_tiles) { auto const tile_idx = base_tile_idx + thread_idx; tile_multistates.set_status(tile_idx, status); @@ -151,19 +152,6 @@ __global__ void multibyte_split_init_kernel( } } -__global__ void multibyte_split_seed_kernel( - cudf::io::text::detail::scan_tile_state_view tile_multistates, - cudf::io::text::detail::scan_tile_state_view tile_output_offsets, - multistate tile_multistate_seed, - output_offset tile_output_offset) -{ - auto const thread_idx = blockIdx.x * blockDim.x + threadIdx.x; - if (thread_idx == 0) { - tile_multistates.set_inclusive_prefix(-1, tile_multistate_seed); - tile_output_offsets.set_inclusive_prefix(-1, tile_output_offset); - } -} - __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel( cudf::size_type base_tile_idx, byte_offset base_input_offset, @@ -185,10 +173,12 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel( typename OffsetScan::TempStorage offset_scan; } temp_storage; - int32_t const tile_idx = base_tile_idx + blockIdx.x; - int32_t const tile_input_offset = blockIdx.x * ITEMS_PER_TILE; - int32_t const thread_input_offset = tile_input_offset + threadIdx.x * ITEMS_PER_THREAD; - int32_t const thread_input_size = chunk_input_chars.size() - thread_input_offset; + auto const tile_idx = base_tile_idx + blockIdx.x; + auto const tile_input_offset = blockIdx.x * ITEMS_PER_TILE; + auto const thread_input_offset = + tile_input_offset + cudf::thread_index_type{threadIdx.x} * ITEMS_PER_THREAD; + auto const thread_input_size = + std::max(chunk_input_chars.size() - thread_input_offset, 0); // STEP 1: Load inputs @@ -258,10 +248,12 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel( typename OffsetScan::TempStorage offset_scan; } temp_storage; - int32_t const tile_idx = base_tile_idx + blockIdx.x; - int32_t const tile_input_offset = blockIdx.x * ITEMS_PER_TILE; - int32_t const thread_input_offset = tile_input_offset + threadIdx.x * ITEMS_PER_THREAD; - int32_t const thread_input_size = chunk_input_chars.size() - thread_input_offset; + auto const tile_idx = base_tile_idx + blockIdx.x; + auto const tile_input_offset = blockIdx.x * ITEMS_PER_TILE; + auto const thread_input_offset = + tile_input_offset + cudf::thread_index_type{threadIdx.x} * ITEMS_PER_THREAD; + auto const thread_input_size = + std::max(chunk_input_chars.size() - thread_input_offset, 0); // STEP 1: Load inputs @@ -401,11 +393,14 @@ std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source // Seeding the tile state with an identity value allows the 0th tile to follow the same logic as // the Nth tile, assuming it can look up an inclusive prefix. Without this seed, the 0th block // would have to follow separate logic. - multibyte_split_seed_kernel<<<1, 1, 0, stream.value()>>>( // - tile_multistates, - tile_offsets, - multistate_seed, - 0); + cudf::detail::device_single_thread( + [tm = scan_tile_state_view(tile_multistates), + to = scan_tile_state_view(tile_offsets), + multistate_seed] __device__() mutable { + tm.set_inclusive_prefix(-1, multistate_seed); + to.set_inclusive_prefix(-1, 0); + }, + stream); auto reader = source.create_reader(); auto chunk_offset = std::max(0, byte_range.offset() - delimiter.size()); diff --git a/cpp/src/io/utilities/parsing_utils.cu b/cpp/src/io/utilities/parsing_utils.cu index 3d478471833..06b86f33c85 100644 --- a/cpp/src/io/utilities/parsing_utils.cu +++ b/cpp/src/io/utilities/parsing_utils.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -94,8 +95,8 @@ __global__ void count_and_set_positions(char const* data, T* positions) { // thread IDs range per block, so also need the block id - uint64_t const tid = threadIdx.x + (blockDim.x * blockIdx.x); - uint64_t const did = tid * bytes_per_find_thread; + auto const tid = cudf::detail::grid_1d::global_thread_id(); + auto const did = tid * bytes_per_find_thread; char const* raw = (data + did);