Skip to content

Commit

Permalink
Propagate errors from Parquet reader kernels back to host (#14167)
Browse files Browse the repository at this point in the history
Pass the error code to the host when a kernel detects invalid input.
If multiple errors types are detected, they are combined using a bitwise OR so that caller gets the aggregate error code that includes all types of errors that occurred.

Does not change the kernel side checks.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - https://github.com/nvdbaranec
  - Divye Gala (https://github.com/divyegala)
  - Yunsong Wang (https://github.com/PointKernel)
  - Bradley Dice (https://github.com/bdice)

URL: #14167
  • Loading branch information
vuule authored Sep 28, 2023
1 parent b789d4c commit 2c19bf3
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 42 deletions.
25 changes: 18 additions & 7 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,15 @@ static __device__ void gpuOutputGeneric(
* @param chunks List of column chunks
* @param min_row Row index to start reading at
* @param num_rows Maximum number of rows to read
* @param error_code Error code to set if an error is encountered
*/
template <int lvl_buf_size, typename level_t>
__global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(decode_block_size)
gpuDecodePageData(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(16)
Expand Down Expand Up @@ -472,7 +477,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t skipped_leaf_values = s->page.skipped_leaf_values;
while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
while (s->error == 0 &&
(s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
int target_pos;
int src_pos = s->src_pos;

Expand Down Expand Up @@ -596,6 +602,10 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
}
__syncthreads();
}
if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
}

struct mask_tform {
Expand All @@ -621,6 +631,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -629,11 +640,11 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodePageData<rolling_buf_size, uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodePageData<rolling_buf_size, uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodePageData<rolling_buf_size, uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodePageData<rolling_buf_size, uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

Expand Down
57 changes: 40 additions & 17 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <io/utilities/block_utils.cuh>

#include <cuda/atomic>
#include <cuda/std/tuple>

namespace cudf::io::parquet::gpu {
Expand Down Expand Up @@ -69,6 +70,18 @@ struct page_state_s {
PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info]{};
// points to either nesting_decode_cache above when possible, or to the global source otherwise
PageNestingDecodeInfo* nesting_info{};

inline __device__ void set_error_code(decode_error err) volatile
{
cuda::atomic_ref<int32_t, cuda::thread_scope_block> ref{const_cast<int&>(error)};
ref.fetch_or(static_cast<int32_t>(err), cuda::std::memory_order_relaxed);
}

inline __device__ void reset_error_code() volatile
{
cuda::atomic_ref<int32_t, cuda::thread_scope_block> ref{const_cast<int&>(error)};
ref.store(0, cuda::std::memory_order_release);
}
};

// buffers only used in the decode kernel. separated from page_state_s to keep
Expand Down Expand Up @@ -471,7 +484,7 @@ __device__ void gpuDecodeStream(
int32_t value_count = s->lvl_count[lvl];
int32_t batch_coded_count = 0;

while (value_count < target_count && value_count < num_input_values) {
while (s->error == 0 && value_count < target_count && value_count < num_input_values) {
int batch_len;
if (level_run <= 1) {
// Get a new run symbol from the byte stream
Expand All @@ -487,7 +500,14 @@ __device__ void gpuDecodeStream(
cur++;
}
}
if (cur > end || level_run <= 1) { s->error = 0x10; }
if (cur > end) {
s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN);
break;
}
if (level_run <= 1) {
s->set_error_code(decode_error::INVALID_LEVEL_RUN);
break;
}
sym_len = (int32_t)(cur - cur_def);
__threadfence_block();
}
Expand All @@ -496,7 +516,7 @@ __device__ void gpuDecodeStream(
level_run = shuffle(level_run);
cur_def += sym_len;
}
if (s->error) { break; }
if (s->error != 0) { break; }

batch_len = min(num_input_values - value_count, 32);
if (level_run & 1) {
Expand Down Expand Up @@ -852,7 +872,7 @@ __device__ void gpuDecodeLevels(page_state_s* s,

constexpr int batch_size = 32;
int cur_leaf_count = target_leaf_count;
while (!s->error && s->nz_count < target_leaf_count &&
while (s->error == 0 && s->nz_count < target_leaf_count &&
s->input_value_count < s->num_input_values) {
if (has_repetition) {
gpuDecodeStream<level_t, rolling_buf_size>(rep, s, cur_leaf_count, t, level_type::REPETITION);
Expand Down Expand Up @@ -916,7 +936,7 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s,
}
s->lvl_start[lvl] = cur;

if (cur > end) { s->error = 2; }
if (cur > end) { s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN); }
};

// this is a little redundant. if level_bits == 0, then nothing should be encoded
Expand All @@ -941,8 +961,8 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s,
// add back the 4 bytes for the length
len += 4;
} else {
len = 0;
s->error = 2;
len = 0;
s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN);
}
} else if (encoding == Encoding::BIT_PACKED) {
len = (s->page.num_input_values * level_bits + 7) >> 3;
Expand All @@ -951,8 +971,8 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s,
s->lvl_start[lvl] = cur;
s->abs_lvl_start[lvl] = cur;
} else {
s->error = 3;
len = 0;
len = 0;
s->set_error_code(decode_error::UNSUPPORTED_ENCODING);
}

s->abs_lvl_end[lvl] = start + len;
Expand Down Expand Up @@ -1094,7 +1114,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
}

if (!t) {
s->error = 0;
s->reset_error_code();

// IMPORTANT : nested schemas can have 0 rows in a page but still have
// values. The case is:
Expand Down Expand Up @@ -1152,7 +1172,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
break;
default: // FIXED_LEN_BYTE_ARRAY:
s->dtype_len = dtype_len_out;
s->error |= (s->dtype_len <= 0);
if (s->dtype_len <= 0) { s->set_error_code(decode_error::INVALID_DATA_TYPE); }
break;
}
// Special check for downconversions
Expand Down Expand Up @@ -1268,7 +1288,9 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dict_run = 0;
s->dict_val = 0;
s->dict_bits = (cur < end) ? *cur++ : 0;
if (s->dict_bits > 32 || !s->dict_base) { s->error = (10 << 8) | s->dict_bits; }
if (s->dict_bits > 32 || !s->dict_base) {
s->set_error_code(decode_error::INVALID_DICT_WIDTH);
}
break;
case Encoding::PLAIN:
s->dict_size = static_cast<int32_t>(end - cur);
Expand All @@ -1279,22 +1301,23 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
// first 4 bytes are length of RLE data
int const len = (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24);
cur += 4;
if (cur + len > end) { s->error = 2; }
if (cur + len > end) { s->set_error_code(decode_error::DATA_STREAM_OVERRUN); }
s->dict_run = 0;
} break;
case Encoding::DELTA_BINARY_PACKED:
// nothing to do, just don't error
break;
default:
s->error = 1; // Unsupported encoding
default: {
s->set_error_code(decode_error::UNSUPPORTED_ENCODING);
break;
}
}
if (cur > end) { s->error = 1; }
if (cur > end) { s->set_error_code(decode_error::DATA_STREAM_OVERRUN); }
s->lvl_end = cur;
s->data_start = cur;
s->data_end = end;
} else {
s->error = 1;
s->set_error_code(decode_error::EMPTY_PAGE);
}

s->lvl_count[level_type::REPETITION] = 0;
Expand Down
25 changes: 18 additions & 7 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ namespace {
// with V2 page headers; see https://www.mail-archive.com/[email protected]/msg11826.html).
// this kernel only needs 96 threads (3 warps)(for now).
template <typename level_t>
__global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(96)
gpuDecodeDeltaBinary(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_binary_decoder db_state;
Expand Down Expand Up @@ -79,7 +83,8 @@ __global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
// that has a value we need.
if (skipped_leaf_values > 0) { db->skip_values(skipped_leaf_values); }

while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
while (s->error == 0 &&
(s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
uint32_t target_pos;
uint32_t const src_pos = s->src_pos;

Expand Down Expand Up @@ -145,6 +150,11 @@ __global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
}
__syncthreads();
}

if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
}

} // anonymous namespace
Expand All @@ -157,6 +167,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -165,11 +176,11 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodeDeltaBinary<uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeDeltaBinary<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeDeltaBinary<uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeDeltaBinary<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

Expand Down
25 changes: 18 additions & 7 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,12 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz
* @tparam level_t Type used to store decoded repetition and definition levels
*/
template <typename level_t>
__global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(decode_block_size)
gpuDecodeStringPageData(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(4) size_type last_offset;
Expand Down Expand Up @@ -617,7 +621,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t skipped_leaf_values = s->page.skipped_leaf_values;
while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
while (s->error == 0 &&
(s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
int target_pos;
int src_pos = s->src_pos;

Expand Down Expand Up @@ -742,6 +747,11 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
}

} // anonymous namespace
Expand Down Expand Up @@ -775,6 +785,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -783,11 +794,11 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodeStringPageData<uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeStringPageData<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeStringPageData<uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeStringPageData<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

Expand Down
Loading

0 comments on commit 2c19bf3

Please sign in to comment.