From 9f8b93680ea81209ce34db6957cc0ef2791fa806 Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Sat, 15 Oct 2022 09:06:29 +0200 Subject: [PATCH] Handle `multibyte_split` byte_range out-of-bounds offsets on host (#11885) In order to uniformize the interface for a future combined handling of byte ranges between read_csv and read_text, this PR replaces the `cutoff_offset` by a plain integer again, and handles finding the first out-of-bounds on the host side instead. Authors: - Tobias Ribizel (https://github.com/upsj) Approvers: - Mike Wilson (https://github.com/hyperbolic2346) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/11885 --- cpp/src/io/text/multibyte_split.cu | 195 ++++++++++++++--------------- 1 file changed, 97 insertions(+), 98 deletions(-) diff --git a/cpp/src/io/text/multibyte_split.cu b/cpp/src/io/text/multibyte_split.cu index 133c5fe9826..136eb8d24c6 100644 --- a/cpp/src/io/text/multibyte_split.cu +++ b/cpp/src/io/text/multibyte_split.cu @@ -31,7 +31,6 @@ #include #include -#include #include #include #include @@ -39,6 +38,8 @@ #include #include +#include +#include #include #include @@ -46,6 +47,8 @@ #pragma GCC diagnostic pop +#include +#include #include #include #include @@ -160,6 +163,10 @@ struct PatternScan { } }; +// type aliases to distinguish between row offsets and character offsets +using output_offset = int64_t; +using byte_offset = int64_t; + // multibyte_split works by splitting up inputs in to 32 inputs (bytes) per thread, and transforming // them in to data structures called "multistates". these multistates are created by searching a // trie, but instead of a tradition trie where the search begins at a single node at the beginning, @@ -170,35 +177,11 @@ struct PatternScan { // it begins in. From there, each thread can then take deterministic action. In this case, the // deterministic action is counting and outputting delimiter offsets when a delimiter is found. -// This struct provides output offsets that are only incremented until a cutoff point. -struct cutoff_offset { - // magnitude stores the offset, sign bit stores whether we are past the cutoff - int64_t value = 0; - - constexpr cutoff_offset() = default; - - constexpr cutoff_offset(int64_t offset, bool is_past_cutoff) - : value{is_past_cutoff ? -offset : offset} - { - } - - [[nodiscard]] constexpr int64_t offset() const { return value < 0 ? -value : value; } - - [[nodiscard]] constexpr bool is_past_end() { return value < 0; } - - friend constexpr cutoff_offset operator+(cutoff_offset lhs, cutoff_offset rhs) - { - auto const past_end = lhs.is_past_end() or rhs.is_past_end(); - auto const offset = lhs.offset() + (lhs.is_past_end() ? 0 : rhs.offset()); - return cutoff_offset{offset, past_end}; - } -}; - __global__ void multibyte_split_init_kernel( cudf::size_type base_tile_idx, cudf::size_type num_tiles, cudf::io::text::detail::scan_tile_state_view tile_multistates, - cudf::io::text::detail::scan_tile_state_view tile_output_offsets, + cudf::io::text::detail::scan_tile_state_view tile_output_offsets, cudf::io::text::detail::scan_tile_status status = cudf::io::text::detail::scan_tile_status::invalid) { @@ -212,9 +195,9 @@ __global__ void multibyte_split_init_kernel( __global__ void multibyte_split_seed_kernel( cudf::io::text::detail::scan_tile_state_view tile_multistates, - cudf::io::text::detail::scan_tile_state_view tile_output_offsets, + cudf::io::text::detail::scan_tile_state_view tile_output_offsets, multistate tile_multistate_seed, - cutoff_offset tile_output_offset) + output_offset tile_output_offset) { auto const thread_idx = blockIdx.x * blockDim.x + threadIdx.x; if (thread_idx == 0) { @@ -225,19 +208,18 @@ __global__ void multibyte_split_seed_kernel( __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel( cudf::size_type base_tile_idx, - int64_t base_input_offset, - int64_t base_offset_offset, + byte_offset base_input_offset, + output_offset base_output_offset, cudf::io::text::detail::scan_tile_state_view tile_multistates, - cudf::io::text::detail::scan_tile_state_view tile_output_offsets, + cudf::io::text::detail::scan_tile_state_view tile_output_offsets, cudf::device_span delim, cudf::device_span chunk_input_chars, - int64_t byte_range_end, - cudf::split_device_span output_offsets) + cudf::split_device_span row_offsets) { using InputLoad = cub::BlockLoad; - using OffsetScan = cub::BlockScan; - using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback; + using OffsetScan = cub::BlockScan; + using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback; __shared__ union { typename InputLoad::TempStorage input_load; @@ -269,17 +251,15 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel( // STEP 3: Flag matches - cutoff_offset thread_offset; + output_offset thread_offset{}; uint32_t thread_match_mask[(ITEMS_PER_THREAD + 31) / 32]{}; for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) { - thread_multistate = transition(thread_chars[i], thread_multistate, delim); - auto const thread_state = thread_multistate.max_tail(); - auto const is_match = i < thread_input_size and thread_state == delim.size(); - auto const match_end = base_input_offset + thread_input_offset + i + 1; - auto const is_past_range = match_end >= byte_range_end; + thread_multistate = transition(thread_chars[i], thread_multistate, delim); + auto const thread_state = thread_multistate.max_tail(); + auto const is_match = i < thread_input_size and thread_state == delim.size(); thread_match_mask[i / 32] |= uint32_t{is_match} << (i % 32); - thread_offset = thread_offset + cutoff_offset{is_match, is_past_range}; + thread_offset += output_offset{is_match}; } // STEP 4: Scan flags to determine absolute thread output offset @@ -293,29 +273,27 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel( for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) { auto const is_match = (thread_match_mask[i / 32] >> (i % 32)) & 1u; - if (is_match && !thread_offset.is_past_end()) { - auto const match_end = base_input_offset + thread_input_offset + i + 1; - auto const is_past_range = match_end >= byte_range_end; - output_offsets[thread_offset.offset() - base_offset_offset] = match_end; - thread_offset = thread_offset + cutoff_offset{true, is_past_range}; + if (is_match) { + auto const match_end = base_input_offset + thread_input_offset + i + 1; + row_offsets[thread_offset - base_output_offset] = match_end; + thread_offset++; } } } __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel( cudf::size_type base_tile_idx, - int64_t base_input_offset, - int64_t base_offset_offset, - cudf::io::text::detail::scan_tile_state_view tile_output_offsets, + byte_offset base_input_offset, + output_offset base_output_offset, + cudf::io::text::detail::scan_tile_state_view tile_output_offsets, char delim, cudf::device_span chunk_input_chars, - int64_t byte_range_end, - cudf::split_device_span output_offsets) + cudf::split_device_span row_offsets) { using InputLoad = cub::BlockLoad; - using OffsetScan = cub::BlockScan; - using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback; + using OffsetScan = cub::BlockScan; + using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback; __shared__ union { typename InputLoad::TempStorage input_load; @@ -338,15 +316,13 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel( // STEP 2: Flag matches - cutoff_offset thread_offset; + output_offset thread_offset{}; uint32_t thread_match_mask[(ITEMS_PER_THREAD + 31) / 32]{}; for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) { - auto const is_match = i < thread_input_size and thread_chars[i] == delim; - auto const match_end = base_input_offset + thread_input_offset + i + 1; - auto const is_past_range = match_end >= byte_range_end; + auto const is_match = i < thread_input_size and thread_chars[i] == delim; thread_match_mask[i / 32] |= uint32_t{is_match} << (i % 32); - thread_offset = thread_offset + cutoff_offset{is_match, is_past_range}; + thread_offset += output_offset{is_match}; } // STEP 3: Scan flags to determine absolute thread output offset @@ -360,11 +336,10 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel( for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) { auto const is_match = (thread_match_mask[i / 32] >> (i % 32)) & 1u; - if (is_match && !thread_offset.is_past_end()) { - auto const match_end = base_input_offset + thread_input_offset + i + 1; - auto const is_past_range = match_end >= byte_range_end; - output_offsets[thread_offset.offset() - base_offset_offset] = match_end; - thread_offset = thread_offset + cutoff_offset{true, is_past_range}; + if (is_match) { + auto const match_end = base_input_offset + thread_input_offset + i + 1; + row_offsets[thread_offset - base_output_offset] = match_end; + thread_offset++; } } } @@ -611,7 +586,7 @@ std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source // best when at least 32 more than max possible concurrent tiles, due to rolling `invalid`s auto num_tile_states = std::max(32, TILES_PER_CHUNK * concurrency + 32); auto tile_multistates = scan_tile_state(num_tile_states, stream); - auto tile_offsets = scan_tile_state(num_tile_states, stream); + auto tile_offsets = scan_tile_state(num_tile_states, stream); multibyte_split_init_kernel<< multibyte_split(cudf::io::text::data_chunk_source tile_multistates, tile_offsets, multistate_seed, - {}); + 0); auto reader = source.create_reader(); - auto chunk_offset = std::max(0, byte_range.offset() - delimiter.size()); + auto chunk_offset = std::max(0, byte_range.offset() - delimiter.size()); auto const byte_range_end = byte_range.offset() + byte_range.size(); reader->skip_bytes(chunk_offset); // amortize output chunk allocations over 8 worst-case outputs. This limits the overallocation constexpr auto max_growth = 8; - output_builder offset_storage(ITEMS_PER_CHUNK, max_growth, stream); + output_builder row_offset_storage(ITEMS_PER_CHUNK, max_growth, stream); output_builder char_storage(ITEMS_PER_CHUNK, max_growth, stream); fork_stream(streams, stream); @@ -653,22 +628,23 @@ std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source auto& scan_stream = streams[1]; auto chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream); int64_t base_tile_idx = 0; - std::optional first_offset; - std::optional last_offset; - if (byte_range.offset() == 0) { first_offset = 0; } + std::optional first_row_offset; + std::optional last_row_offset; + bool found_last_offset = false; + if (byte_range.offset() == 0) { first_row_offset = 0; } std::swap(read_stream, scan_stream); while (chunk->size() > 0) { // if we found the last delimiter, or didn't find delimiters inside the byte range at all: abort - if (last_offset.has_value() or - (not first_offset.has_value() and chunk_offset >= byte_range_end)) { + if (last_row_offset.has_value() or + (not first_row_offset.has_value() and chunk_offset >= byte_range_end)) { break; } auto tiles_in_launch = cudf::util::div_rounding_up_safe(chunk->size(), static_cast(ITEMS_PER_TILE)); - auto offset_output = offset_storage.next_output(scan_stream); + auto row_offsets = row_offset_storage.next_output(scan_stream); // reset the next chunk of tile state multibyte_split_init_kernel<< multibyte_split(cudf::io::text::data_chunk_source scan_stream.value()>>>( // base_tile_idx, chunk_offset, - offset_storage.size(), + row_offset_storage.size(), tile_offsets, delimiter[0], *chunk, - byte_range_end, - offset_output); + row_offsets); } else { multibyte_split_kernel<< multibyte_split(cudf::io::text::data_chunk_source scan_stream.value()>>>( // base_tile_idx, chunk_offset, - offset_storage.size(), + row_offset_storage.size(), tile_multistates, tile_offsets, {device_delim.data(), static_cast(device_delim.size())}, *chunk, - byte_range_end, - offset_output); + row_offsets); } // load the next chunk auto next_chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream); // while that is running, determine how many offsets we output (synchronizes) - auto next_tile_offset = - tile_offsets.get_inclusive_prefix(base_tile_idx + tiles_in_launch - 1, scan_stream); - offset_storage.advance_output(next_tile_offset.offset() - offset_storage.size()); + auto const new_offsets = [&] { + auto const new_offsets_unclamped = + tile_offsets.get_inclusive_prefix(base_tile_idx + tiles_in_launch - 1, scan_stream) - + static_cast(row_offset_storage.size()); + // if we are not in the last chunk, we can use all offsets + if (chunk_offset + static_cast(chunk->size()) < byte_range_end) { + return new_offsets_unclamped; + } + // if we are in the last chunk, we need to find the first out-of-bounds offset + auto const it = thrust::make_counting_iterator(output_offset{}); + auto const end_loc = + *thrust::find_if(rmm::exec_policy_nosync(scan_stream), + it, + it + new_offsets_unclamped, + [row_offsets, byte_range_end] __device__(output_offset i) { + return row_offsets[i] >= byte_range_end; + }); + // if we had no out-of-bounds offset, we copy all offsets + if (end_loc == new_offsets_unclamped) { return end_loc; } + // otherwise we copy only up to (including) the first out-of-bounds delimiter + found_last_offset = true; + return end_loc + 1; + }(); + row_offset_storage.advance_output(new_offsets); // determine if we found the first or last field offset for the byte range - if (next_tile_offset.offset() > 0 and not first_offset) { - first_offset = offset_storage.front_element(scan_stream); + if (new_offsets > 0 and not first_row_offset) { + first_row_offset = row_offset_storage.front_element(scan_stream); } - if (next_tile_offset.is_past_end()) { last_offset = offset_storage.back_element(scan_stream); } + if (found_last_offset) { last_row_offset = row_offset_storage.back_element(scan_stream); } // copy over the characters we need, if we already encountered the first field delimiter - if (first_offset.has_value()) { - auto const begin = chunk->data() + std::max(0, *first_offset - chunk_offset); - auto const sentinel = last_offset.value_or(std::numeric_limits::max()); - auto const end = chunk->data() + std::min(sentinel - chunk_offset, chunk->size()); + if (first_row_offset.has_value()) { + auto const begin = chunk->data() + std::max(0, *first_row_offset - chunk_offset); + auto const sentinel = last_row_offset.value_or(std::numeric_limits::max()); + auto const end = + chunk->data() + std::min(sentinel - chunk_offset, chunk->size()); auto const output_size = end - begin; auto char_output = char_storage.next_output(scan_stream); - auto const split = begin + std::min(output_size, char_output.head().size()); + auto const split = begin + std::min(output_size, char_output.head().size()); thrust::copy(rmm::exec_policy_nosync(scan_stream), begin, split, char_output.head().begin()); thrust::copy(rmm::exec_policy_nosync(scan_stream), split, end, char_output.tail().begin()); char_storage.advance_output(output_size); @@ -739,7 +735,7 @@ std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source cudaEventRecord(last_launch_event, scan_stream.value()); std::swap(read_stream, scan_stream); - base_tile_idx += TILES_PER_CHUNK; + base_tile_idx += tiles_in_launch; chunk_offset += chunk->size(); chunk = std::move(next_chunk); } @@ -750,24 +746,27 @@ std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source // if the input was empty, we didn't find a delimiter at all, // or the first delimiter was also the last: empty output - if (chunk_offset == 0 or not first_offset.has_value() or first_offset == last_offset) { + if (chunk_offset == 0 or not first_row_offset.has_value() or + first_row_offset == last_row_offset) { return make_empty_column(type_id::STRING); } auto chars = char_storage.gather(stream, mr); - auto global_offsets = offset_storage.gather(stream, mr); + auto global_offsets = row_offset_storage.gather(stream, mr); - bool const insert_begin = *first_offset == 0; - bool const insert_end = not last_offset.has_value() or last_offset == chunk_offset; + bool const insert_begin = *first_row_offset == 0; + bool const insert_end = not last_row_offset.has_value() or last_row_offset == chunk_offset; rmm::device_uvector offsets{ global_offsets.size() + insert_begin + insert_end, stream, mr}; if (insert_begin) { offsets.set_element_to_zero_async(0, stream); } - if (insert_end) { offsets.set_element(offsets.size() - 1, chunk_offset - *first_offset, stream); } + if (insert_end) { + offsets.set_element(offsets.size() - 1, chunk_offset - *first_row_offset, stream); + } thrust::transform(rmm::exec_policy(stream), global_offsets.begin(), global_offsets.end(), offsets.begin() + insert_begin, - [baseline = *first_offset] __device__(int64_t global_offset) { + [baseline = *first_row_offset] __device__(byte_offset global_offset) { return static_cast(global_offset - baseline); });