Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect and report errors in Parquet header parsing #14237

Merged
merged 33 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6457f03
remove unneeded cruft from thrift parser
etseidl Sep 29, 2023
854916b
report errors parsing page headers
etseidl Sep 29, 2023
34ca06a
Merge branch 'rapidsai:branch-23.12' into detect_header_overrun
etseidl Sep 29, 2023
e50515a
add test
etseidl Sep 29, 2023
7486d8b
Merge branch 'detect_header_overrun' of github.com:etseidl/cudf into …
etseidl Sep 29, 2023
74e0867
Merge branch 'branch-23.12' into detect_header_overrun
etseidl Oct 3, 2023
7a4d1cb
implement suggestion from review
etseidl Oct 3, 2023
2f299a0
add some braces
etseidl Oct 3, 2023
adc5f2c
check for errors before checking returned value
etseidl Oct 3, 2023
0f086a8
create a shared error scalar
etseidl Oct 4, 2023
5573c64
rework error stuff some to pass python tests
etseidl Oct 4, 2023
3ec7225
Merge branch 'branch-23.12' into detect_header_overrun
etseidl Oct 4, 2023
50e4efc
use set_error
etseidl Oct 4, 2023
62a1f85
switch to new error reporting
etseidl Oct 4, 2023
50314f3
detect unsupported page encodings in kernel now
etseidl Oct 4, 2023
f75884c
get rid of global error and instead wrap it
etseidl Oct 4, 2023
bfa71b0
test error is not 0 before setting it
etseidl Oct 4, 2023
2cb87d4
Merge branch 'branch-23.12' into detect_header_overrun
etseidl Oct 4, 2023
7b67a1d
error vector does not need to be shared
etseidl Oct 4, 2023
9b943be
add docstring
etseidl Oct 4, 2023
d7b387e
Apply suggestions from code review
etseidl Oct 4, 2023
69bf46b
Merge branch 'branch-23.12' into detect_header_overrun
vuule Oct 5, 2023
fb0e79b
Merge remote-tracking branch 'origin/branch-23.12' into detect_header…
etseidl Oct 9, 2023
3de8283
Merge branch 'branch-23.12' into detect_header_overrun
etseidl Oct 10, 2023
ae2d00c
Merge branch 'branch-23.12' into detect_header_overrun
etseidl Oct 11, 2023
92eb08d
implement change from review
etseidl Oct 13, 2023
e145ecd
a few more review suggestions
etseidl Oct 13, 2023
be49938
Merge branch 'detect_header_overrun' of github.com:etseidl/cudf into …
etseidl Oct 13, 2023
b141c6c
Merge remote-tracking branch 'origin/branch-23.12' into detect_header…
etseidl Oct 13, 2023
12c797c
Apply suggestions from code review
etseidl Oct 16, 2023
be05dbb
Merge remote-tracking branch 'origin/branch-23.12' into detect_header…
etseidl Oct 16, 2023
733efbd
Merge branch 'branch-23.12' into detect_header_overrun
vuule Oct 17, 2023
0a01cef
Merge branch 'branch-23.12' into detect_header_overrun
vuule Oct 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions cpp/src/io/parquet/error.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_scalar.hpp>

#include <cstdint>
#include <sstream>

namespace cudf::io::parquet {

/**
* @brief Wrapper around a `rmm::device_scalar` for use in reporting errors that occur in
* kernel calls.
*
* The `kernel_error` object is created with a `rmm::cuda_stream_view` which is used throughout
* the object's lifetime.
*/
class kernel_error {
private:
rmm::device_scalar<int32_t> _error_code;

public:
/**
* @brief Construct a new `kernel_error` with an initial value of 0.
*
* Note: setting the initial value is done asynchronously.
etseidl marked this conversation as resolved.
Show resolved Hide resolved
*
* @throws `rmm::bad_alloc` if allocating the device memory for `initial_value` fails.
* @throws `rmm::cuda_error` if copying `initial_value` to device memory fails.
*
* @param CUDA stream to use
*/
kernel_error(rmm::cuda_stream_view stream) : _error_code{0, stream} {}

/**
* @brief Return a pointer to the device memory for the error
*/
[[nodiscard]] auto data() { return _error_code.data(); }

/**
* @brief Return the current value of the error
*
* This uses the stream used to create this instance. This does a synchronize on the stream
* this object was instantiated with.
*/
[[nodiscard]] auto value() const { return _error_code.value(_error_code.stream()); }

/**
* @brief Return a hexadecimal string representation of the current error code
*
* Returned string will have "0x" prepended.
*/
[[nodiscard]] std::string str() const
{
std::stringstream sstream;
sstream << std::hex << value();
return "0x" + sstream.str();
}
};

} // namespace cudf::io::parquet
5 changes: 1 addition & 4 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -599,10 +599,7 @@ __global__ void __launch_bounds__(decode_block_size)
}
__syncthreads();
}
if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

struct mask_tform {
Expand Down
5 changes: 1 addition & 4 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,7 @@ __global__ void __launch_bounds__(96)
__syncthreads();
}

if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

} // anonymous namespace
Expand Down
52 changes: 24 additions & 28 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,6 @@ namespace cudf::io::parquet::detail {
// Minimal thrift implementation for parsing page headers
// https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md

static const __device__ __constant__ uint8_t g_list2struct[16] = {0,
1,
2,
ST_FLD_BYTE,
ST_FLD_DOUBLE,
5,
ST_FLD_I16,
7,
ST_FLD_I32,
9,
ST_FLD_I64,
ST_FLD_BINARY,
ST_FLD_STRUCT,
ST_FLD_MAP,
ST_FLD_SET,
ST_FLD_LIST};

struct byte_stream_s {
uint8_t const* cur{};
uint8_t const* end{};
Expand Down Expand Up @@ -140,12 +123,13 @@ __device__ void skip_struct_field(byte_stream_s* bs, int field_type)
case ST_FLD_SET: { // NOTE: skipping a list of lists is not handled
auto const c = getb(bs);
int n = c >> 4;
if (n == 0xf) n = get_u32(bs);
field_type = g_list2struct[c & 0xf];
if (field_type == ST_FLD_STRUCT)
if (n == 0xf) { n = get_u32(bs); }
field_type = c & 0xf;
if (field_type == ST_FLD_STRUCT) {
struct_depth += n;
else
} else {
rep_cnt = n;
}
} break;
case ST_FLD_STRUCT: struct_depth++; break;
}
Expand Down Expand Up @@ -356,16 +340,19 @@ struct gpuParsePageHeader {
*/
// blockDim {128,1,1}
__global__ void __launch_bounds__(128)
gpuDecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks)
gpuDecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, int32_t* error_code)
{
gpuParsePageHeader parse_page_header;
__shared__ byte_stream_s bs_g[4];

int error[4] = {0};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to be explicit about int vs. int32_t, for the purpose of aligning our conventions? I'd like to use the same type for the error value as the type we use in the parameters of set_error.

int lane_id = threadIdx.x % 32;
int chunk = (blockIdx.x * 4) + (threadIdx.x / 32);
byte_stream_s* const bs = &bs_g[threadIdx.x / 32];
int warp_id = threadIdx.x / 32;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While refactoring this, let's move to use cudf::detail::warp_size instead of hardcoding 32.

int chunk = (blockIdx.x * 4) + warp_id;
byte_stream_s* const bs = &bs_g[warp_id];

if (chunk < num_chunks and lane_id == 0) bs->ck = chunks[chunk];
if (chunk < num_chunks and lane_id == 0) { bs->ck = chunks[chunk]; }
if (lane_id == 0) { error[warp_id] = 0; }
__syncthreads();

if (chunk < num_chunks) {
Expand All @@ -376,7 +363,7 @@ __global__ void __launch_bounds__(128)
int32_t num_dict_pages = bs->ck.num_dict_pages;
PageInfo* page_info;

if (!lane_id) {
if (lane_id == 0) {
bs->base = bs->cur = bs->ck.compressed_data;
bs->end = bs->base + bs->ck.compressed_size;
bs->page.chunk_idx = chunk;
Expand Down Expand Up @@ -412,6 +399,9 @@ __global__ void __launch_bounds__(128)
bs->page.lvl_bytes[level_type::DEFINITION] = 0;
bs->page.lvl_bytes[level_type::REPETITION] = 0;
if (parse_page_header(bs) && bs->page.compressed_page_size >= 0) {
if (not is_supported_encoding(bs->page.encoding)) {
error[warp_id] |= static_cast<int>(decode_error::UNSUPPORTED_ENCODING);
etseidl marked this conversation as resolved.
Show resolved Hide resolved
}
switch (bs->page_type) {
case PageType::DATA_PAGE:
index_out = num_dict_pages + data_page_count;
Expand Down Expand Up @@ -440,20 +430,25 @@ __global__ void __launch_bounds__(128)
}
bs->page.page_data = const_cast<uint8_t*>(bs->cur);
bs->cur += bs->page.compressed_page_size;
if (bs->cur > bs->end) {
error[warp_id] |= static_cast<int>(decode_error::DATA_STREAM_OVERRUN);
etseidl marked this conversation as resolved.
Show resolved Hide resolved
}
bs->page.kernel_mask = kernel_mask_for_page(bs->page, bs->ck);
} else {
bs->cur = bs->end;
}
}
index_out = shuffle(index_out);
if (index_out >= 0 && index_out < max_num_pages && lane_id == 0)
if (index_out >= 0 && index_out < max_num_pages && lane_id == 0) {
page_info[index_out] = bs->page;
}
num_values = shuffle(num_values);
__syncwarp();
}
if (lane_id == 0) {
chunks[chunk].num_data_pages = data_page_count;
chunks[chunk].num_dict_pages = dictionary_page_count;
if (error[warp_id] != 0) { set_error(error[warp_id], error_code); }
}
}
}
Expand Down Expand Up @@ -509,11 +504,12 @@ __global__ void __launch_bounds__(128)

void __host__ DecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
dim3 dim_block(128, 1);
dim3 dim_grid((num_chunks + 3) >> 2, 1); // 1 chunk per warp, 4 warps per block
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_chunks);
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_chunks, error_code);
}

void __host__ BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
Expand Down
5 changes: 1 addition & 4 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -745,10 +745,7 @@ __global__ void __launch_bounds__(decode_block_size)
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

} // anonymous namespace
Expand Down
32 changes: 31 additions & 1 deletion cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include <rmm/device_scalar.hpp>
#include <rmm/device_uvector.hpp>

#include <cuda/atomic>

#include <cuda_runtime.h>

#include <vector>
Expand All @@ -54,6 +56,30 @@ constexpr int rolling_index(int index)
return index % rolling_size;
}

// see setupLocalPageInfo() in page_decode.cuh for supported page encodings
constexpr bool is_supported_encoding(Encoding enc)
{
switch (enc) {
case Encoding::PLAIN:
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE:
case Encoding::RLE_DICTIONARY:
case Encoding::DELTA_BINARY_PACKED: return true;
default: return false;
}
}

/**
* @brief Atomically OR `error` into `error_code`.
*/
constexpr void set_error(int32_t error, int32_t* error_code)
{
if (error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(error, cuda::std::memory_order_relaxed);
}
}

/**
* @brief Enum for the different types of errors that can occur during decoding.
*
Expand Down Expand Up @@ -452,9 +478,13 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk)
*
* @param[in] chunks List of column chunks
* @param[in] num_chunks Number of column chunks
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
void DecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, rmm::cuda_stream_view stream);
void DecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
int32_t* error_code,
rmm::cuda_stream_view stream);

/**
* @brief Launches kernel for building the dictionary index for the column
Expand Down
11 changes: 5 additions & 6 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "reader_impl.hpp"
#include "error.hpp"

#include <cudf/detail/stream_compaction.hpp>
#include <cudf/detail/transform.hpp>
Expand Down Expand Up @@ -163,7 +164,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
chunk_nested_valids.host_to_device_async(_stream);
chunk_nested_data.host_to_device_async(_stream);

rmm::device_scalar<int32_t> error_code(0, _stream);
// create this before we fork streams
kernel_error error_code(_stream);

// get the number of streams we need from the pool and tell them to wait on the H2D copies
int const nkernels = std::bitset<32>(kernel_mask).count();
Expand Down Expand Up @@ -199,11 +201,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
page_nesting.device_to_host_async(_stream);
page_nesting_decode.device_to_host_async(_stream);

auto const decode_error = error_code.value(_stream);
if (decode_error != 0) {
std::stringstream stream;
stream << std::hex << decode_error;
CUDF_FAIL("Parquet data decode failed with code(s) 0x" + stream.str());
if (error_code.value() != 0) {
CUDF_FAIL("Parquet data decode failed with code(s) " + error_code.str());
}

// for list columns, add the final offset to every offset buffer.
Expand Down
Loading