Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle multibyte_split byte_range out-of-bounds offsets on host #11885

Merged
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 97 additions & 98 deletions cpp/src/io/text/multibyte_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,24 @@
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/span.hpp>

#include <limits>
#include <rmm/cuda_stream_pool.hpp>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <thrust/copy.h>
#include <thrust/find.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/transform.h>

#include <cub/block/block_load.cuh>
#include <cub/block/block_scan.cuh>

#pragma GCC diagnostic pop

#include <cstdint>
#include <limits>
#include <memory>
#include <numeric>
#include <optional>
Expand Down Expand Up @@ -160,6 +163,10 @@ struct PatternScan {
}
};

// type aliases to distinguish between row offsets and character offsets
using output_offset = int64_t;
using byte_offset = int64_t;
Comment on lines +167 to +168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this? If you want to completely differentiate between these types, simple type aliasing like this is not enough. Instead, strong types should be prefered:

enum class output_offset : int64_t {};
enum class byte_offset : int64_t {};

Using strong types may be more difficult to do arithmetic operations but you can always cast the values to int64_t when doing so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this approach has its shortcomings, I mainly wanted to make clear what different offsets point to, more avoiding raw integer types for readability than true type safety. I need to to enough arithmetic on the types that using an enum class would be really inconvenient. Something like BOOST_STRONG_TYPEDEF would probably fit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both approaches have pros and cons. For this case, I think a simple using is sufficient, with a weakly aliased type. It does pose some benefit to readability, and the need for arithmetic operators makes it difficult to use a strong type like an enum class. I'm happy with the tradeoffs that @upsj took in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: @ttnghia we need strong types for the left/right comparator indices (the last time I remember discussing this topic) to ensure that function signatures match the given types. That isn't as much of a concern here because the weakly aliased types here aren't being passed to functions with multiple overloads for plain integers and strong types.


// multibyte_split works by splitting up inputs in to 32 inputs (bytes) per thread, and transforming
// them in to data structures called "multistates". these multistates are created by searching a
// trie, but instead of a tradition trie where the search begins at a single node at the beginning,
Expand All @@ -170,35 +177,11 @@ struct PatternScan {
// it begins in. From there, each thread can then take deterministic action. In this case, the
// deterministic action is counting and outputting delimiter offsets when a delimiter is found.

// This struct provides output offsets that are only incremented until a cutoff point.
struct cutoff_offset {
// magnitude stores the offset, sign bit stores whether we are past the cutoff
int64_t value = 0;

constexpr cutoff_offset() = default;

constexpr cutoff_offset(int64_t offset, bool is_past_cutoff)
: value{is_past_cutoff ? -offset : offset}
{
}

[[nodiscard]] constexpr int64_t offset() const { return value < 0 ? -value : value; }

[[nodiscard]] constexpr bool is_past_end() { return value < 0; }

friend constexpr cutoff_offset operator+(cutoff_offset lhs, cutoff_offset rhs)
{
auto const past_end = lhs.is_past_end() or rhs.is_past_end();
auto const offset = lhs.offset() + (lhs.is_past_end() ? 0 : rhs.offset());
return cutoff_offset{offset, past_end};
}
};

__global__ void multibyte_split_init_kernel(
cudf::size_type base_tile_idx,
cudf::size_type num_tiles,
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<output_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_status status =
cudf::io::text::detail::scan_tile_status::invalid)
{
Expand All @@ -212,9 +195,9 @@ __global__ void multibyte_split_init_kernel(

__global__ void multibyte_split_seed_kernel(
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<output_offset> tile_output_offsets,
multistate tile_multistate_seed,
cutoff_offset tile_output_offset)
output_offset tile_output_offset)
{
auto const thread_idx = blockIdx.x * blockDim.x + threadIdx.x;
if (thread_idx == 0) {
Expand All @@ -225,19 +208,18 @@ __global__ void multibyte_split_seed_kernel(

__global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel(
cudf::size_type base_tile_idx,
int64_t base_input_offset,
int64_t base_offset_offset,
byte_offset base_input_offset,
output_offset base_output_offset,
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<output_offset> tile_output_offsets,
cudf::device_span<char const> delim,
cudf::device_span<char const> chunk_input_chars,
int64_t byte_range_end,
cudf::split_device_span<int64_t> output_offsets)
cudf::split_device_span<byte_offset> row_offsets)
{
using InputLoad =
cub::BlockLoad<char, THREADS_PER_TILE, ITEMS_PER_THREAD, cub::BLOCK_LOAD_WARP_TRANSPOSE>;
using OffsetScan = cub::BlockScan<cutoff_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<cutoff_offset>;
using OffsetScan = cub::BlockScan<output_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<output_offset>;

__shared__ union {
typename InputLoad::TempStorage input_load;
Expand Down Expand Up @@ -269,17 +251,15 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel(

// STEP 3: Flag matches

cutoff_offset thread_offset;
output_offset thread_offset{};
uint32_t thread_match_mask[(ITEMS_PER_THREAD + 31) / 32]{};

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
thread_multistate = transition(thread_chars[i], thread_multistate, delim);
auto const thread_state = thread_multistate.max_tail();
auto const is_match = i < thread_input_size and thread_state == delim.size();
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
thread_multistate = transition(thread_chars[i], thread_multistate, delim);
auto const thread_state = thread_multistate.max_tail();
auto const is_match = i < thread_input_size and thread_state == delim.size();
thread_match_mask[i / 32] |= uint32_t{is_match} << (i % 32);
thread_offset = thread_offset + cutoff_offset{is_match, is_past_range};
thread_offset = thread_offset + int{is_match};
upsj marked this conversation as resolved.
Show resolved Hide resolved
}

// STEP 4: Scan flags to determine absolute thread output offset
Expand All @@ -293,29 +273,27 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel(

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
auto const is_match = (thread_match_mask[i / 32] >> (i % 32)) & 1u;
if (is_match && !thread_offset.is_past_end()) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
output_offsets[thread_offset.offset() - base_offset_offset] = match_end;
thread_offset = thread_offset + cutoff_offset{true, is_past_range};
if (is_match) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
row_offsets[thread_offset - base_output_offset] = match_end;
thread_offset++;
}
}
}

__global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel(
cudf::size_type base_tile_idx,
int64_t base_input_offset,
int64_t base_offset_offset,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
byte_offset base_input_offset,
output_offset base_output_offset,
cudf::io::text::detail::scan_tile_state_view<output_offset> tile_output_offsets,
char delim,
cudf::device_span<char const> chunk_input_chars,
int64_t byte_range_end,
cudf::split_device_span<int64_t> output_offsets)
cudf::split_device_span<byte_offset> row_offsets)
{
using InputLoad =
cub::BlockLoad<char, THREADS_PER_TILE, ITEMS_PER_THREAD, cub::BLOCK_LOAD_WARP_TRANSPOSE>;
using OffsetScan = cub::BlockScan<cutoff_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<cutoff_offset>;
using OffsetScan = cub::BlockScan<output_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<output_offset>;

__shared__ union {
typename InputLoad::TempStorage input_load;
Expand All @@ -338,15 +316,13 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel(

// STEP 2: Flag matches

cutoff_offset thread_offset;
output_offset thread_offset{};
uint32_t thread_match_mask[(ITEMS_PER_THREAD + 31) / 32]{};

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
auto const is_match = i < thread_input_size and thread_chars[i] == delim;
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
auto const is_match = i < thread_input_size and thread_chars[i] == delim;
thread_match_mask[i / 32] |= uint32_t{is_match} << (i % 32);
thread_offset = thread_offset + cutoff_offset{is_match, is_past_range};
thread_offset = thread_offset + int{is_match};
upsj marked this conversation as resolved.
Show resolved Hide resolved
}

// STEP 3: Scan flags to determine absolute thread output offset
Expand All @@ -360,11 +336,10 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel(

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
auto const is_match = (thread_match_mask[i / 32] >> (i % 32)) & 1u;
if (is_match && !thread_offset.is_past_end()) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
output_offsets[thread_offset.offset() - base_offset_offset] = match_end;
thread_offset = thread_offset + cutoff_offset{true, is_past_range};
if (is_match) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
row_offsets[thread_offset - base_output_offset] = match_end;
thread_offset++;
}
}
}
Expand Down Expand Up @@ -611,7 +586,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
// best when at least 32 more than max possible concurrent tiles, due to rolling `invalid`s
auto num_tile_states = std::max(32, TILES_PER_CHUNK * concurrency + 32);
auto tile_multistates = scan_tile_state<multistate>(num_tile_states, stream);
auto tile_offsets = scan_tile_state<cutoff_offset>(num_tile_states, stream);
auto tile_offsets = scan_tile_state<output_offset>(num_tile_states, stream);

multibyte_split_init_kernel<<<TILES_PER_CHUNK,
THREADS_PER_TILE,
Expand All @@ -633,15 +608,15 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
tile_multistates,
tile_offsets,
multistate_seed,
{});
0);

auto reader = source.create_reader();
auto chunk_offset = std::max<int64_t>(0, byte_range.offset() - delimiter.size());
auto chunk_offset = std::max<byte_offset>(0, byte_range.offset() - delimiter.size());
auto const byte_range_end = byte_range.offset() + byte_range.size();
reader->skip_bytes(chunk_offset);
// amortize output chunk allocations over 8 worst-case outputs. This limits the overallocation
constexpr auto max_growth = 8;
output_builder<int64_t> offset_storage(ITEMS_PER_CHUNK, max_growth, stream);
output_builder<byte_offset> row_offset_storage(ITEMS_PER_CHUNK, max_growth, stream);
output_builder<char> char_storage(ITEMS_PER_CHUNK, max_growth, stream);

fork_stream(streams, stream);
Expand All @@ -653,22 +628,23 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
auto& scan_stream = streams[1];
auto chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream);
int64_t base_tile_idx = 0;
std::optional<int64_t> first_offset;
std::optional<int64_t> last_offset;
if (byte_range.offset() == 0) { first_offset = 0; }
std::optional<byte_offset> first_row_offset;
std::optional<byte_offset> last_row_offset;
bool found_last_offset = false;
if (byte_range.offset() == 0) { first_row_offset = 0; }
std::swap(read_stream, scan_stream);

while (chunk->size() > 0) {
// if we found the last delimiter, or didn't find delimiters inside the byte range at all: abort
if (last_offset.has_value() or
(not first_offset.has_value() and chunk_offset >= byte_range_end)) {
if (last_row_offset.has_value() or
(not first_row_offset.has_value() and chunk_offset >= byte_range_end)) {
break;
}

auto tiles_in_launch =
cudf::util::div_rounding_up_safe(chunk->size(), static_cast<std::size_t>(ITEMS_PER_TILE));

auto offset_output = offset_storage.next_output(scan_stream);
auto row_offsets = row_offset_storage.next_output(scan_stream);

// reset the next chunk of tile state
multibyte_split_init_kernel<<<tiles_in_launch,
Expand All @@ -690,47 +666,67 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
scan_stream.value()>>>( //
base_tile_idx,
chunk_offset,
offset_storage.size(),
row_offset_storage.size(),
tile_offsets,
delimiter[0],
*chunk,
byte_range_end,
offset_output);
row_offsets);
} else {
multibyte_split_kernel<<<tiles_in_launch,
THREADS_PER_TILE,
0,
scan_stream.value()>>>( //
base_tile_idx,
chunk_offset,
offset_storage.size(),
row_offset_storage.size(),
tile_multistates,
tile_offsets,
{device_delim.data(), static_cast<std::size_t>(device_delim.size())},
*chunk,
byte_range_end,
offset_output);
row_offsets);
}

// load the next chunk
auto next_chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream);
// while that is running, determine how many offsets we output (synchronizes)
auto next_tile_offset =
tile_offsets.get_inclusive_prefix(base_tile_idx + tiles_in_launch - 1, scan_stream);
offset_storage.advance_output(next_tile_offset.offset() - offset_storage.size());
auto const new_offsets = [&] {
auto const new_offsets_unclamped =
tile_offsets.get_inclusive_prefix(base_tile_idx + tiles_in_launch - 1, scan_stream) -
static_cast<output_offset>(row_offset_storage.size());
// if we are not in the last chunk, we can use all offsets
if (chunk_offset + static_cast<output_offset>(chunk->size()) < byte_range_end) {
return new_offsets_unclamped;
upsj marked this conversation as resolved.
Show resolved Hide resolved
}
// if we are in the last chunk, we need to find the first out-of-bounds offset
auto const it = thrust::make_counting_iterator(output_offset{});
auto const end_loc =
*thrust::find_if(rmm::exec_policy_nosync(scan_stream),
it,
it + new_offsets_unclamped,
[row_offsets, byte_range_end] __device__(output_offset i) {
return row_offsets[i] >= byte_range_end;
});
// if we had no out-of-bounds offset, we copy all offsets
if (end_loc == new_offsets_unclamped) { return end_loc; }
// otherwise we copy only up to (including) the first out-of-bounds delimiter
found_last_offset = true;
return end_loc + 1;
}();
row_offset_storage.advance_output(new_offsets);
// determine if we found the first or last field offset for the byte range
if (next_tile_offset.offset() > 0 and not first_offset) {
first_offset = offset_storage.front_element(scan_stream);
if (new_offsets > 0 and not first_row_offset) {
first_row_offset = row_offset_storage.front_element(scan_stream);
}
if (next_tile_offset.is_past_end()) { last_offset = offset_storage.back_element(scan_stream); }
if (found_last_offset) { last_row_offset = row_offset_storage.back_element(scan_stream); }
// copy over the characters we need, if we already encountered the first field delimiter
if (first_offset.has_value()) {
auto const begin = chunk->data() + std::max<int64_t>(0, *first_offset - chunk_offset);
auto const sentinel = last_offset.value_or(std::numeric_limits<int64_t>::max());
auto const end = chunk->data() + std::min<int64_t>(sentinel - chunk_offset, chunk->size());
if (first_row_offset.has_value()) {
auto const begin = chunk->data() + std::max<byte_offset>(0, *first_row_offset - chunk_offset);
auto const sentinel = last_row_offset.value_or(std::numeric_limits<byte_offset>::max());
auto const end =
chunk->data() + std::min<byte_offset>(sentinel - chunk_offset, chunk->size());
auto const output_size = end - begin;
auto char_output = char_storage.next_output(scan_stream);
auto const split = begin + std::min<int64_t>(output_size, char_output.head().size());
auto const split = begin + std::min<byte_offset>(output_size, char_output.head().size());
thrust::copy(rmm::exec_policy_nosync(scan_stream), begin, split, char_output.head().begin());
thrust::copy(rmm::exec_policy_nosync(scan_stream), split, end, char_output.tail().begin());
char_storage.advance_output(output_size);
Expand All @@ -739,7 +735,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
cudaEventRecord(last_launch_event, scan_stream.value());

std::swap(read_stream, scan_stream);
base_tile_idx += TILES_PER_CHUNK;
base_tile_idx += tiles_in_launch;
chunk_offset += chunk->size();
chunk = std::move(next_chunk);
}
Expand All @@ -750,24 +746,27 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source

// if the input was empty, we didn't find a delimiter at all,
// or the first delimiter was also the last: empty output
if (chunk_offset == 0 or not first_offset.has_value() or first_offset == last_offset) {
if (chunk_offset == 0 or not first_row_offset.has_value() or
first_row_offset == last_row_offset) {
return make_empty_column(type_id::STRING);
}

auto chars = char_storage.gather(stream, mr);
auto global_offsets = offset_storage.gather(stream, mr);
auto global_offsets = row_offset_storage.gather(stream, mr);

bool const insert_begin = *first_offset == 0;
bool const insert_end = not last_offset.has_value() or last_offset == chunk_offset;
bool const insert_begin = *first_row_offset == 0;
bool const insert_end = not last_row_offset.has_value() or last_row_offset == chunk_offset;
rmm::device_uvector<int32_t> offsets{
global_offsets.size() + insert_begin + insert_end, stream, mr};
if (insert_begin) { offsets.set_element_to_zero_async(0, stream); }
if (insert_end) { offsets.set_element(offsets.size() - 1, chunk_offset - *first_offset, stream); }
if (insert_end) {
offsets.set_element(offsets.size() - 1, chunk_offset - *first_row_offset, stream);
}
thrust::transform(rmm::exec_policy(stream),
global_offsets.begin(),
global_offsets.end(),
offsets.begin() + insert_begin,
[baseline = *first_offset] __device__(int64_t global_offset) {
[baseline = *first_row_offset] __device__(byte_offset global_offset) {
return static_cast<int32_t>(global_offset - baseline);
});

Expand Down