Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle multibyte_split byte_range out-of-bounds offsets on host #11885

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 58 additions & 68 deletions cpp/src/io/text/multibyte_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,24 @@
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/span.hpp>

#include <limits>
#include <rmm/cuda_stream_pool.hpp>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <thrust/copy.h>
#include <thrust/find.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/transform.h>

#include <cub/block/block_load.cuh>
#include <cub/block/block_scan.cuh>

#pragma GCC diagnostic pop

#include <cstdint>
#include <limits>
#include <memory>
#include <numeric>
#include <optional>
Expand Down Expand Up @@ -170,35 +173,11 @@ struct PatternScan {
// it begins in. From there, each thread can then take deterministic action. In this case, the
// deterministic action is counting and outputting delimiter offsets when a delimiter is found.

// This struct provides output offsets that are only incremented until a cutoff point.
struct cutoff_offset {
// magnitude stores the offset, sign bit stores whether we are past the cutoff
int64_t value = 0;

constexpr cutoff_offset() = default;

constexpr cutoff_offset(int64_t offset, bool is_past_cutoff)
: value{is_past_cutoff ? -offset : offset}
{
}

[[nodiscard]] constexpr int64_t offset() const { return value < 0 ? -value : value; }

[[nodiscard]] constexpr bool is_past_end() { return value < 0; }

friend constexpr cutoff_offset operator+(cutoff_offset lhs, cutoff_offset rhs)
{
auto const past_end = lhs.is_past_end() or rhs.is_past_end();
auto const offset = lhs.offset() + (lhs.is_past_end() ? 0 : rhs.offset());
return cutoff_offset{offset, past_end};
}
};

__global__ void multibyte_split_init_kernel(
cudf::size_type base_tile_idx,
cudf::size_type num_tiles,
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<int64_t> tile_output_offsets,
cudf::io::text::detail::scan_tile_status status =
cudf::io::text::detail::scan_tile_status::invalid)
{
Expand All @@ -212,9 +191,9 @@ __global__ void multibyte_split_init_kernel(

__global__ void multibyte_split_seed_kernel(
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<int64_t> tile_output_offsets,
multistate tile_multistate_seed,
cutoff_offset tile_output_offset)
int64_t tile_output_offset)
{
auto const thread_idx = blockIdx.x * blockDim.x + threadIdx.x;
if (thread_idx == 0) {
Expand All @@ -228,16 +207,15 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel(
int64_t base_input_offset,
int64_t base_offset_offset,
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<int64_t> tile_output_offsets,
cudf::device_span<char const> delim,
cudf::device_span<char const> chunk_input_chars,
int64_t byte_range_end,
cudf::split_device_span<int64_t> output_offsets)
{
using InputLoad =
cub::BlockLoad<char, THREADS_PER_TILE, ITEMS_PER_THREAD, cub::BLOCK_LOAD_WARP_TRANSPOSE>;
using OffsetScan = cub::BlockScan<cutoff_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<cutoff_offset>;
using OffsetScan = cub::BlockScan<int64_t, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<int64_t>;

__shared__ union {
typename InputLoad::TempStorage input_load;
Expand Down Expand Up @@ -269,17 +247,15 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel(

// STEP 3: Flag matches

cutoff_offset thread_offset;
int64_t thread_offset{};
uint32_t thread_match_mask[(ITEMS_PER_THREAD + 31) / 32]{};

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
thread_multistate = transition(thread_chars[i], thread_multistate, delim);
auto const thread_state = thread_multistate.max_tail();
auto const is_match = i < thread_input_size and thread_state == delim.size();
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
thread_multistate = transition(thread_chars[i], thread_multistate, delim);
auto const thread_state = thread_multistate.max_tail();
auto const is_match = i < thread_input_size and thread_state == delim.size();
thread_match_mask[i / 32] |= uint32_t{is_match} << (i % 32);
thread_offset = thread_offset + cutoff_offset{is_match, is_past_range};
thread_offset = thread_offset + int{is_match};
upsj marked this conversation as resolved.
Show resolved Hide resolved
}

// STEP 4: Scan flags to determine absolute thread output offset
Expand All @@ -293,11 +269,10 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void multibyte_split_kernel(

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
auto const is_match = (thread_match_mask[i / 32] >> (i % 32)) & 1u;
if (is_match && !thread_offset.is_past_end()) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
output_offsets[thread_offset.offset() - base_offset_offset] = match_end;
thread_offset = thread_offset + cutoff_offset{true, is_past_range};
if (is_match) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
output_offsets[thread_offset - base_offset_offset] = match_end;
thread_offset++;
}
}
}
Expand All @@ -306,16 +281,15 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel(
cudf::size_type base_tile_idx,
int64_t base_input_offset,
int64_t base_offset_offset,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<int64_t> tile_output_offsets,
char delim,
cudf::device_span<char const> chunk_input_chars,
int64_t byte_range_end,
cudf::split_device_span<int64_t> output_offsets)
{
using InputLoad =
cub::BlockLoad<char, THREADS_PER_TILE, ITEMS_PER_THREAD, cub::BLOCK_LOAD_WARP_TRANSPOSE>;
using OffsetScan = cub::BlockScan<cutoff_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<cutoff_offset>;
using OffsetScan = cub::BlockScan<int64_t, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<int64_t>;

__shared__ union {
typename InputLoad::TempStorage input_load;
Expand All @@ -338,15 +312,13 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel(

// STEP 2: Flag matches

cutoff_offset thread_offset;
int64_t thread_offset{};
uint32_t thread_match_mask[(ITEMS_PER_THREAD + 31) / 32]{};

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
auto const is_match = i < thread_input_size and thread_chars[i] == delim;
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
auto const is_match = i < thread_input_size and thread_chars[i] == delim;
thread_match_mask[i / 32] |= uint32_t{is_match} << (i % 32);
thread_offset = thread_offset + cutoff_offset{is_match, is_past_range};
thread_offset = thread_offset + int{is_match};
upsj marked this conversation as resolved.
Show resolved Hide resolved
}

// STEP 3: Scan flags to determine absolute thread output offset
Expand All @@ -360,11 +332,10 @@ __global__ __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel(

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
auto const is_match = (thread_match_mask[i / 32] >> (i % 32)) & 1u;
if (is_match && !thread_offset.is_past_end()) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
auto const is_past_range = match_end >= byte_range_end;
output_offsets[thread_offset.offset() - base_offset_offset] = match_end;
thread_offset = thread_offset + cutoff_offset{true, is_past_range};
if (is_match) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
output_offsets[thread_offset - base_offset_offset] = match_end;
thread_offset++;
}
}
}
Expand Down Expand Up @@ -611,7 +582,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
// best when at least 32 more than max possible concurrent tiles, due to rolling `invalid`s
auto num_tile_states = std::max(32, TILES_PER_CHUNK * concurrency + 32);
auto tile_multistates = scan_tile_state<multistate>(num_tile_states, stream);
auto tile_offsets = scan_tile_state<cutoff_offset>(num_tile_states, stream);
auto tile_offsets = scan_tile_state<int64_t>(num_tile_states, stream);

multibyte_split_init_kernel<<<TILES_PER_CHUNK,
THREADS_PER_TILE,
Expand All @@ -633,7 +604,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
tile_multistates,
tile_offsets,
multistate_seed,
{});
0);

auto reader = source.create_reader();
auto chunk_offset = std::max<int64_t>(0, byte_range.offset() - delimiter.size());
Expand All @@ -655,6 +626,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
int64_t base_tile_idx = 0;
std::optional<int64_t> first_offset;
std::optional<int64_t> last_offset;
bool found_last_offset = false;
if (byte_range.offset() == 0) { first_offset = 0; }
std::swap(read_stream, scan_stream);

Expand Down Expand Up @@ -694,7 +666,6 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
tile_offsets,
delimiter[0],
*chunk,
byte_range_end,
offset_output);
} else {
multibyte_split_kernel<<<tiles_in_launch,
Expand All @@ -708,21 +679,40 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
tile_offsets,
{device_delim.data(), static_cast<std::size_t>(device_delim.size())},
*chunk,
byte_range_end,
offset_output);
}

// load the next chunk
auto next_chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream);
// while that is running, determine how many offsets we output (synchronizes)
auto next_tile_offset =
tile_offsets.get_inclusive_prefix(base_tile_idx + tiles_in_launch - 1, scan_stream);
offset_storage.advance_output(next_tile_offset.offset() - offset_storage.size());
auto new_offsets_unclamped =
tile_offsets.get_inclusive_prefix(base_tile_idx + tiles_in_launch - 1, scan_stream) -
static_cast<int64_t>(offset_storage.size());
auto new_offsets = [&] {
upsj marked this conversation as resolved.
Show resolved Hide resolved
// if we are not in the last chunk, we can use all offsets
if (chunk_offset + static_cast<int64_t>(chunk->size()) < byte_range_end) {
return new_offsets_unclamped;
upsj marked this conversation as resolved.
Show resolved Hide resolved
}
// if we are in the last chunk, we need to find the first out-of-bounds offset
auto const it = thrust::make_counting_iterator(int64_t{});
auto end_loc = *thrust::find_if(rmm::exec_policy_nosync(scan_stream),
upsj marked this conversation as resolved.
Show resolved Hide resolved
it,
it + new_offsets_unclamped,
[offset_output, byte_range_end] __device__(int64_t i) {
return offset_output[i] >= byte_range_end;
});
// if we had no out-of-bounds offset, we copy all offsets
if (end_loc == new_offsets_unclamped) { return end_loc; }
// otherwise we copy only up to (including) the first out-of-bounds delimiter
found_last_offset = true;
return end_loc + 1;
}();
offset_storage.advance_output(new_offsets);
// determine if we found the first or last field offset for the byte range
if (next_tile_offset.offset() > 0 and not first_offset) {
if (new_offsets > 0 and not first_offset) {
first_offset = offset_storage.front_element(scan_stream);
}
if (next_tile_offset.is_past_end()) { last_offset = offset_storage.back_element(scan_stream); }
if (found_last_offset) { last_offset = offset_storage.back_element(scan_stream); }
// copy over the characters we need, if we already encountered the first field delimiter
if (first_offset.has_value()) {
auto const begin = chunk->data() + std::max<int64_t>(0, *first_offset - chunk_offset);
Expand All @@ -739,7 +729,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
cudaEventRecord(last_launch_event, scan_stream.value());

std::swap(read_stream, scan_stream);
base_tile_idx += TILES_PER_CHUNK;
base_tile_idx += tiles_in_launch;
chunk_offset += chunk->size();
chunk = std::move(next_chunk);
}
Expand Down