Skip to content

Commit

Permalink
Handle multibyte_split byte_range out-of-bounds offsets on host (#1…
Browse files Browse the repository at this point in the history
…1885)

In order to uniformize the interface for a future combined handling of byte ranges between read_csv and read_text, this PR replaces the `cutoff_offset` by a plain integer again, and handles finding the first out-of-bounds on the host side instead.

Authors:
  - Tobias Ribizel (https://github.com/upsj)

Approvers:
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Bradley Dice (https://github.com/bdice)

URL: #11885
  • Loading branch information
upsj authored Oct 15, 2022
1 parent c265c58 commit 9f8b936
Showing 1 changed file with 97 additions and 98 deletions.
195 changes: 97 additions & 98 deletions cpp/src/io/text/multibyte_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,24 @@
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/span.hpp>

#include <limits>
#include <rmm/cuda_stream_pool.hpp>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <thrust/copy.h>
#include <thrust/find.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/transform.h>

#include <cub/block/block_load.cuh>
#include <cub/block/block_scan.cuh>

#pragma GCC diagnostic pop

#include <cstdint>
#include <limits>
#include <memory>
#include <numeric>
#include <optional>
Expand Down Expand Up @@ -160,6 +163,10 @@ struct PatternScan {
}
};

// type aliases to distinguish between row offsets and character offsets
using output_offset = int64_t;
using byte_offset = int64_t;

// multibyte_split works by splitting up inputs in to 32 inputs (bytes) per thread, and transforming
// them in to data structures called "multistates". these multistates are created by searching a
// trie, but instead of a tradition trie where the search begins at a single node at the beginning,
Expand All @@ -170,35 +177,11 @@ struct PatternScan {
// it begins in. From there, each thread can then take deterministic action. In this case, the
// deterministic action is counting and outputting delimiter offsets when a delimiter is found.

// This struct provides output offsets that are only incremented until a cutoff point.
struct cutoff_offset {
// magnitude stores the offset, sign bit stores whether we are past the cutoff
int64_t value = 0;

constexpr cutoff_offset() = default;

constexpr cutoff_offset(int64_t offset, bool is_past_cutoff)
: value{is_past_cutoff ? -offset : offset}
{
}

[[nodiscard]] constexpr int64_t offset() const { return value < 0 ? -value : value; }

[[nodiscard]] constexpr bool is_past_end() { return value < 0; }

friend constexpr cutoff_offset operator+(cutoff_offset lhs, cutoff_offset rhs)
{
auto const past_end = lhs.is_past_end() or rhs.is_past_end();
auto const offset = lhs.offset() + (lhs.is_past_end() ? 0 : rhs.offset());
return cutoff_offset{offset, past_end};
}
};

__global__ void multibyte_split_init_kernel(
cudf::size_type base_tile_idx,
cudf::size_type num_tiles,
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<output_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_status status =
cudf::io::text::detail::scan_tile_status::invalid)
{
Expand All @@ -212,9 +195,9 @@ __global__ void multibyte_split_init_kernel(

__global__ void multibyte_split_seed_kernel(
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<output_offset> tile_output_offsets,
multistate tile_multistate_seed,
cutoff_offset tile_output_offset)
output_offset tile_output_offset)
{
auto const thread_idx = blockIdx.x * blockDim.x + threadIdx.x;
if (thread_idx == 0) {
Expand All @@ -225,19 +208,18 @@ __global__ void multibyte_split_seed_kernel(

__global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel(
cudf::size_type base_tile_idx,
int64_t base_input_offset,
int64_t base_offset_offset,
byte_offset base_input_offset,
output_offset base_output_offset,
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<output_offset> tile_output_offsets,
cudf::device_span<char const> delim,
cudf::device_span<char const> chunk_input_chars,
int64_t byte_range_end,
cudf::split_device_span<int64_t> output_offsets)
cudf::split_device_span<byte_offset> row_offsets)
{
using InputLoad =
cub::BlockLoad<char, THREADS_PER_TILE, ITEMS_PER_THREAD, cub::BLOCK_LOAD_WARP_TRANSPOSE>;
using OffsetScan = cub::BlockScan<cutoff_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<cutoff_offset>;
using OffsetScan = cub::BlockScan<output_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<output_offset>;

__shared__ union {
typename InputLoad::TempStorage input_load;
Expand Down Expand Up @@ -269,17 +251,15 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel(

// STEP 3: Flag matches

cutoff_offset thread_offset;
output_offset thread_offset{};
uint32_t thread_match_mask[(ITEMS_PER_THREAD + 31) / 32]{};

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
thread_multistate = transition(thread_chars[i], thread_multistate, delim);
auto const thread_state = thread_multistate.max_tail();
auto const is_match = i < thread_input_size and thread_state == delim.size();
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
thread_multistate = transition(thread_chars[i], thread_multistate, delim);
auto const thread_state = thread_multistate.max_tail();
auto const is_match = i < thread_input_size and thread_state == delim.size();
thread_match_mask[i / 32] |= uint32_t{is_match} << (i % 32);
thread_offset = thread_offset + cutoff_offset{is_match, is_past_range};
thread_offset += output_offset{is_match};
}

// STEP 4: Scan flags to determine absolute thread output offset
Expand All @@ -293,29 +273,27 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel(

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
auto const is_match = (thread_match_mask[i / 32] >> (i % 32)) & 1u;
if (is_match && !thread_offset.is_past_end()) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
output_offsets[thread_offset.offset() - base_offset_offset] = match_end;
thread_offset = thread_offset + cutoff_offset{true, is_past_range};
if (is_match) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
row_offsets[thread_offset - base_output_offset] = match_end;
thread_offset++;
}
}
}

__global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel(
cudf::size_type base_tile_idx,
int64_t base_input_offset,
int64_t base_offset_offset,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
byte_offset base_input_offset,
output_offset base_output_offset,
cudf::io::text::detail::scan_tile_state_view<output_offset> tile_output_offsets,
char delim,
cudf::device_span<char const> chunk_input_chars,
int64_t byte_range_end,
cudf::split_device_span<int64_t> output_offsets)
cudf::split_device_span<byte_offset> row_offsets)
{
using InputLoad =
cub::BlockLoad<char, THREADS_PER_TILE, ITEMS_PER_THREAD, cub::BLOCK_LOAD_WARP_TRANSPOSE>;
using OffsetScan = cub::BlockScan<cutoff_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<cutoff_offset>;
using OffsetScan = cub::BlockScan<output_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<output_offset>;

__shared__ union {
typename InputLoad::TempStorage input_load;
Expand All @@ -338,15 +316,13 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel(

// STEP 2: Flag matches

cutoff_offset thread_offset;
output_offset thread_offset{};
uint32_t thread_match_mask[(ITEMS_PER_THREAD + 31) / 32]{};

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
auto const is_match = i < thread_input_size and thread_chars[i] == delim;
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
auto const is_match = i < thread_input_size and thread_chars[i] == delim;
thread_match_mask[i / 32] |= uint32_t{is_match} << (i % 32);
thread_offset = thread_offset + cutoff_offset{is_match, is_past_range};
thread_offset += output_offset{is_match};
}

// STEP 3: Scan flags to determine absolute thread output offset
Expand All @@ -360,11 +336,10 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel(

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
auto const is_match = (thread_match_mask[i / 32] >> (i % 32)) & 1u;
if (is_match && !thread_offset.is_past_end()) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
output_offsets[thread_offset.offset() - base_offset_offset] = match_end;
thread_offset = thread_offset + cutoff_offset{true, is_past_range};
if (is_match) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
row_offsets[thread_offset - base_output_offset] = match_end;
thread_offset++;
}
}
}
Expand Down Expand Up @@ -611,7 +586,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
// best when at least 32 more than max possible concurrent tiles, due to rolling `invalid`s
auto num_tile_states = std::max(32, TILES_PER_CHUNK * concurrency + 32);
auto tile_multistates = scan_tile_state<multistate>(num_tile_states, stream);
auto tile_offsets = scan_tile_state<cutoff_offset>(num_tile_states, stream);
auto tile_offsets = scan_tile_state<output_offset>(num_tile_states, stream);

multibyte_split_init_kernel<<<TILES_PER_CHUNK,
THREADS_PER_TILE,
Expand All @@ -633,15 +608,15 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
tile_multistates,
tile_offsets,
multistate_seed,
{});
0);

auto reader = source.create_reader();
auto chunk_offset = std::max<int64_t>(0, byte_range.offset() - delimiter.size());
auto chunk_offset = std::max<byte_offset>(0, byte_range.offset() - delimiter.size());
auto const byte_range_end = byte_range.offset() + byte_range.size();
reader->skip_bytes(chunk_offset);
// amortize output chunk allocations over 8 worst-case outputs. This limits the overallocation
constexpr auto max_growth = 8;
output_builder<int64_t> offset_storage(ITEMS_PER_CHUNK, max_growth, stream);
output_builder<byte_offset> row_offset_storage(ITEMS_PER_CHUNK, max_growth, stream);
output_builder<char> char_storage(ITEMS_PER_CHUNK, max_growth, stream);

fork_stream(streams, stream);
Expand All @@ -653,22 +628,23 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
auto& scan_stream = streams[1];
auto chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream);
int64_t base_tile_idx = 0;
std::optional<int64_t> first_offset;
std::optional<int64_t> last_offset;
if (byte_range.offset() == 0) { first_offset = 0; }
std::optional<byte_offset> first_row_offset;
std::optional<byte_offset> last_row_offset;
bool found_last_offset = false;
if (byte_range.offset() == 0) { first_row_offset = 0; }
std::swap(read_stream, scan_stream);

while (chunk->size() > 0) {
// if we found the last delimiter, or didn't find delimiters inside the byte range at all: abort
if (last_offset.has_value() or
(not first_offset.has_value() and chunk_offset >= byte_range_end)) {
if (last_row_offset.has_value() or
(not first_row_offset.has_value() and chunk_offset >= byte_range_end)) {
break;
}

auto tiles_in_launch =
cudf::util::div_rounding_up_safe(chunk->size(), static_cast<std::size_t>(ITEMS_PER_TILE));

auto offset_output = offset_storage.next_output(scan_stream);
auto row_offsets = row_offset_storage.next_output(scan_stream);

// reset the next chunk of tile state
multibyte_split_init_kernel<<<tiles_in_launch,
Expand All @@ -690,47 +666,67 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
scan_stream.value()>>>( //
base_tile_idx,
chunk_offset,
offset_storage.size(),
row_offset_storage.size(),
tile_offsets,
delimiter[0],
*chunk,
byte_range_end,
offset_output);
row_offsets);
} else {
multibyte_split_kernel<<<tiles_in_launch,
THREADS_PER_TILE,
0,
scan_stream.value()>>>( //
base_tile_idx,
chunk_offset,
offset_storage.size(),
row_offset_storage.size(),
tile_multistates,
tile_offsets,
{device_delim.data(), static_cast<std::size_t>(device_delim.size())},
*chunk,
byte_range_end,
offset_output);
row_offsets);
}

// load the next chunk
auto next_chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream);
// while that is running, determine how many offsets we output (synchronizes)
auto next_tile_offset =
tile_offsets.get_inclusive_prefix(base_tile_idx + tiles_in_launch - 1, scan_stream);
offset_storage.advance_output(next_tile_offset.offset() - offset_storage.size());
auto const new_offsets = [&] {
auto const new_offsets_unclamped =
tile_offsets.get_inclusive_prefix(base_tile_idx + tiles_in_launch - 1, scan_stream) -
static_cast<output_offset>(row_offset_storage.size());
// if we are not in the last chunk, we can use all offsets
if (chunk_offset + static_cast<output_offset>(chunk->size()) < byte_range_end) {
return new_offsets_unclamped;
}
// if we are in the last chunk, we need to find the first out-of-bounds offset
auto const it = thrust::make_counting_iterator(output_offset{});
auto const end_loc =
*thrust::find_if(rmm::exec_policy_nosync(scan_stream),
it,
it + new_offsets_unclamped,
[row_offsets, byte_range_end] __device__(output_offset i) {
return row_offsets[i] >= byte_range_end;
});
// if we had no out-of-bounds offset, we copy all offsets
if (end_loc == new_offsets_unclamped) { return end_loc; }
// otherwise we copy only up to (including) the first out-of-bounds delimiter
found_last_offset = true;
return end_loc + 1;
}();
row_offset_storage.advance_output(new_offsets);
// determine if we found the first or last field offset for the byte range
if (next_tile_offset.offset() > 0 and not first_offset) {
first_offset = offset_storage.front_element(scan_stream);
if (new_offsets > 0 and not first_row_offset) {
first_row_offset = row_offset_storage.front_element(scan_stream);
}
if (next_tile_offset.is_past_end()) { last_offset = offset_storage.back_element(scan_stream); }
if (found_last_offset) { last_row_offset = row_offset_storage.back_element(scan_stream); }
// copy over the characters we need, if we already encountered the first field delimiter
if (first_offset.has_value()) {
auto const begin = chunk->data() + std::max<int64_t>(0, *first_offset - chunk_offset);
auto const sentinel = last_offset.value_or(std::numeric_limits<int64_t>::max());
auto const end = chunk->data() + std::min<int64_t>(sentinel - chunk_offset, chunk->size());
if (first_row_offset.has_value()) {
auto const begin = chunk->data() + std::max<byte_offset>(0, *first_row_offset - chunk_offset);
auto const sentinel = last_row_offset.value_or(std::numeric_limits<byte_offset>::max());
auto const end =
chunk->data() + std::min<byte_offset>(sentinel - chunk_offset, chunk->size());
auto const output_size = end - begin;
auto char_output = char_storage.next_output(scan_stream);
auto const split = begin + std::min<int64_t>(output_size, char_output.head().size());
auto const split = begin + std::min<byte_offset>(output_size, char_output.head().size());
thrust::copy(rmm::exec_policy_nosync(scan_stream), begin, split, char_output.head().begin());
thrust::copy(rmm::exec_policy_nosync(scan_stream), split, end, char_output.tail().begin());
char_storage.advance_output(output_size);
Expand All @@ -739,7 +735,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
cudaEventRecord(last_launch_event, scan_stream.value());

std::swap(read_stream, scan_stream);
base_tile_idx += TILES_PER_CHUNK;
base_tile_idx += tiles_in_launch;
chunk_offset += chunk->size();
chunk = std::move(next_chunk);
}
Expand All @@ -750,24 +746,27 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source

// if the input was empty, we didn't find a delimiter at all,
// or the first delimiter was also the last: empty output
if (chunk_offset == 0 or not first_offset.has_value() or first_offset == last_offset) {
if (chunk_offset == 0 or not first_row_offset.has_value() or
first_row_offset == last_row_offset) {
return make_empty_column(type_id::STRING);
}

auto chars = char_storage.gather(stream, mr);
auto global_offsets = offset_storage.gather(stream, mr);
auto global_offsets = row_offset_storage.gather(stream, mr);

bool const insert_begin = *first_offset == 0;
bool const insert_end = not last_offset.has_value() or last_offset == chunk_offset;
bool const insert_begin = *first_row_offset == 0;
bool const insert_end = not last_row_offset.has_value() or last_row_offset == chunk_offset;
rmm::device_uvector<int32_t> offsets{
global_offsets.size() + insert_begin + insert_end, stream, mr};
if (insert_begin) { offsets.set_element_to_zero_async(0, stream); }
if (insert_end) { offsets.set_element(offsets.size() - 1, chunk_offset - *first_offset, stream); }
if (insert_end) {
offsets.set_element(offsets.size() - 1, chunk_offset - *first_row_offset, stream);
}
thrust::transform(rmm::exec_policy(stream),
global_offsets.begin(),
global_offsets.end(),
offsets.begin() + insert_begin,
[baseline = *first_offset] __device__(int64_t global_offset) {
[baseline = *first_row_offset] __device__(byte_offset global_offset) {
return static_cast<int32_t>(global_offset - baseline);
});

Expand Down

0 comments on commit 9f8b936

Please sign in to comment.