Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate errors from Parquet reader kernels back to host #14167

Merged
merged 18 commits into from
Sep 28, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
pass error code back to host
vuule committed Sep 20, 2023

Verified

This commit was signed with the committer’s verified signature.
vuule Vukasin Milovanovic
commit 49efa5105810b5496725b7875dad5b6c3f758e89
23 changes: 16 additions & 7 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
@@ -430,10 +430,15 @@ static __device__ void gpuOutputGeneric(
* @param chunks List of column chunks
* @param min_row Row index to start reading at
* @param num_rows Maximum number of rows to read
* @param error_code Error code to set if an error is encountered
*/
template <int lvl_buf_size, typename level_t>
__global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(decode_block_size)
gpuDecodePageData(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int* error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(16)
@@ -472,7 +477,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t skipped_leaf_values = s->page.skipped_leaf_values;
while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
while (s->error != 0 &&
(s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
int target_pos;
int src_pos = s->src_pos;

@@ -596,6 +602,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
}
__syncthreads();
}

if (!t and s->error != 0) { atomicOr(error_code, s->error); }
}

struct mask_tform {
@@ -621,6 +629,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
@@ -629,11 +638,11 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodePageData<rolling_buf_size, uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodePageData<rolling_buf_size, uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodePageData<rolling_buf_size, uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodePageData<rolling_buf_size, uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

6 changes: 3 additions & 3 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ struct page_state_s {
int32_t dict_val;
uint32_t initial_rle_run[NUM_LEVEL_TYPES]; // [def,rep]
int32_t initial_rle_value[NUM_LEVEL_TYPES]; // [def,rep]
int32_t error;
int error;
PageInfo page;
ColumnChunkDesc col;

@@ -495,7 +495,7 @@ __device__ void gpuDecodeStream(
level_run = shuffle(level_run);
cur_def += sym_len;
}
if (s->error) { break; }
if (s->error != 0) { break; }

batch_len = min(num_input_values - value_count, 32);
if (level_run & 1) {
@@ -851,7 +851,7 @@ __device__ void gpuDecodeLevels(page_state_s* s,

constexpr int batch_size = 32;
int cur_leaf_count = target_leaf_count;
while (!s->error && s->nz_count < target_leaf_count &&
while (s->error != 0 && s->nz_count < target_leaf_count &&
s->input_value_count < s->num_input_values) {
if (has_repetition) {
gpuDecodeStream<level_t, rolling_buf_size>(rep, s, cur_leaf_count, t, level_type::REPETITION);
19 changes: 13 additions & 6 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
@@ -32,8 +32,12 @@ namespace {
// with V2 page headers; see https://www.mail-archive.com/[email protected]/msg11826.html).
// this kernel only needs 96 threads (3 warps)(for now).
template <typename level_t>
__global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(96)
gpuDecodeDeltaBinary(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int* error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_binary_decoder db_state;
@@ -145,6 +149,8 @@ __global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
}
__syncthreads();
}

if (!t and s->error != 0) { atomicOr(error_code, s->error); }
}

} // anonymous namespace
@@ -157,6 +163,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
@@ -165,11 +172,11 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodeDeltaBinary<uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeDeltaBinary<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeDeltaBinary<uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeDeltaBinary<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

19 changes: 13 additions & 6 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
@@ -582,8 +582,12 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz
* @tparam level_t Type used to store decoded repetition and definition levels
*/
template <typename level_t>
__global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(decode_block_size)
gpuDecodeStringPageData(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int* error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(4) size_type last_offset;
@@ -742,6 +746,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

if (!t and s->error != 0) { atomicOr(error_code, s->error); }
}

} // anonymous namespace
@@ -775,6 +781,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
@@ -783,11 +790,11 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodeStringPageData<uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeStringPageData<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeStringPageData<uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeStringPageData<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

6 changes: 6 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
@@ -566,13 +566,15 @@ void ComputePageStringSizes(cudf::detail::hostdevice_vector<PageInfo>& pages,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
void DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
rmm::cuda_stream_view stream);

/**
@@ -586,13 +588,15 @@ void DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
void DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
rmm::cuda_stream_view stream);

/**
@@ -606,13 +610,15 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use, default 0
*/
void DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
int* error_code,
rmm::cuda_stream_view stream);

/**
16 changes: 12 additions & 4 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
@@ -163,6 +163,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
chunk_nested_valids.host_to_device_async(_stream);
chunk_nested_data.host_to_device_async(_stream);

rmm::device_scalar<int> error_code(0, _stream);

// get the number of streams we need from the pool and tell them to wait on the H2D copies
int const nkernels = std::bitset<32>(kernel_mask).count();
auto streams = cudf::detail::fork_streams(_stream, nkernels);
@@ -174,17 +176,20 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
if (has_strings) {
auto& stream = streams[s_idx++];
chunk_nested_str_data.host_to_device_async(stream);
gpu::DecodeStringPageData(pages, chunks, num_rows, skip_rows, level_type_size, stream);
gpu::DecodeStringPageData(
pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), stream);
}

// launch delta binary decoder
if ((kernel_mask & gpu::KERNEL_MASK_DELTA_BINARY) != 0) {
gpu::DecodeDeltaBinary(pages, chunks, num_rows, skip_rows, level_type_size, streams[s_idx++]);
gpu::DecodeDeltaBinary(
pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), streams[s_idx++]);
}

// launch the catch-all page decoder
if ((kernel_mask & gpu::KERNEL_MASK_GENERAL) != 0) {
gpu::DecodePageData(pages, chunks, num_rows, skip_rows, level_type_size, streams[s_idx++]);
gpu::DecodePageData(
pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), streams[s_idx++]);
}

// synchronize the streams
@@ -193,7 +198,10 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
pages.device_to_host_async(_stream);
page_nesting.device_to_host_async(_stream);
page_nesting_decode.device_to_host_async(_stream);
_stream.synchronize();

auto const decode_error = error_code.value(_stream);
CUDF_EXPECTS(decode_error == 0,
"Parquet data decode failed with error code " + std::to_string(decode_error));

// for list columns, add the final offset to every offset buffer.
// TODO : make this happen in more efficiently. Maybe use thrust::for_each