Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect and report errors in Parquet header parsing #14237

Merged
merged 33 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6457f03
remove unneeded cruft from thrift parser
etseidl Sep 29, 2023
854916b
report errors parsing page headers
etseidl Sep 29, 2023
34ca06a
Merge branch 'rapidsai:branch-23.12' into detect_header_overrun
etseidl Sep 29, 2023
e50515a
add test
etseidl Sep 29, 2023
7486d8b
Merge branch 'detect_header_overrun' of github.com:etseidl/cudf into …
etseidl Sep 29, 2023
74e0867
Merge branch 'branch-23.12' into detect_header_overrun
etseidl Oct 3, 2023
7a4d1cb
implement suggestion from review
etseidl Oct 3, 2023
2f299a0
add some braces
etseidl Oct 3, 2023
adc5f2c
check for errors before checking returned value
etseidl Oct 3, 2023
0f086a8
create a shared error scalar
etseidl Oct 4, 2023
5573c64
rework error stuff some to pass python tests
etseidl Oct 4, 2023
3ec7225
Merge branch 'branch-23.12' into detect_header_overrun
etseidl Oct 4, 2023
50e4efc
use set_error
etseidl Oct 4, 2023
62a1f85
switch to new error reporting
etseidl Oct 4, 2023
50314f3
detect unsupported page encodings in kernel now
etseidl Oct 4, 2023
f75884c
get rid of global error and instead wrap it
etseidl Oct 4, 2023
bfa71b0
test error is not 0 before setting it
etseidl Oct 4, 2023
2cb87d4
Merge branch 'branch-23.12' into detect_header_overrun
etseidl Oct 4, 2023
7b67a1d
error vector does not need to be shared
etseidl Oct 4, 2023
9b943be
add docstring
etseidl Oct 4, 2023
d7b387e
Apply suggestions from code review
etseidl Oct 4, 2023
69bf46b
Merge branch 'branch-23.12' into detect_header_overrun
vuule Oct 5, 2023
fb0e79b
Merge remote-tracking branch 'origin/branch-23.12' into detect_header…
etseidl Oct 9, 2023
3de8283
Merge branch 'branch-23.12' into detect_header_overrun
etseidl Oct 10, 2023
ae2d00c
Merge branch 'branch-23.12' into detect_header_overrun
etseidl Oct 11, 2023
92eb08d
implement change from review
etseidl Oct 13, 2023
e145ecd
a few more review suggestions
etseidl Oct 13, 2023
be49938
Merge branch 'detect_header_overrun' of github.com:etseidl/cudf into …
etseidl Oct 13, 2023
b141c6c
Merge remote-tracking branch 'origin/branch-23.12' into detect_header…
etseidl Oct 13, 2023
12c797c
Apply suggestions from code review
etseidl Oct 16, 2023
be05dbb
Merge remote-tracking branch 'origin/branch-23.12' into detect_header…
etseidl Oct 16, 2023
733efbd
Merge branch 'branch-23.12' into detect_header_overrun
vuule Oct 17, 2023
0a01cef
Merge branch 'branch-23.12' into detect_header_overrun
vuule Oct 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,15 @@

#include <rmm/cuda_stream_view.hpp>

#include <cuda/atomic>

namespace cudf {
namespace io {
namespace parquet {
namespace gpu {
// Minimal thrift implementation for parsing page headers
// https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md

static const __device__ __constant__ uint8_t g_list2struct[16] = {0,
1,
2,
ST_FLD_BYTE,
ST_FLD_DOUBLE,
5,
ST_FLD_I16,
7,
ST_FLD_I32,
9,
ST_FLD_I64,
ST_FLD_BINARY,
ST_FLD_STRUCT,
ST_FLD_MAP,
ST_FLD_SET,
ST_FLD_LIST};

struct byte_stream_s {
uint8_t const* cur{};
uint8_t const* end{};
Expand Down Expand Up @@ -142,12 +127,13 @@ __device__ void skip_struct_field(byte_stream_s* bs, int field_type)
case ST_FLD_SET: { // NOTE: skipping a list of lists is not handled
auto const c = getb(bs);
int n = c >> 4;
if (n == 0xf) n = get_u32(bs);
field_type = g_list2struct[c & 0xf];
if (field_type == ST_FLD_STRUCT)
if (n == 0xf) { n = get_u32(bs); }
field_type = c & 0xf;
if (field_type == ST_FLD_STRUCT) {
struct_depth += n;
else
} else {
rep_cnt = n;
}
} break;
case ST_FLD_STRUCT: struct_depth++; break;
}
Expand Down Expand Up @@ -359,16 +345,19 @@ struct gpuParsePageHeader {
*/
// blockDim {128,1,1}
__global__ void __launch_bounds__(128)
gpuDecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks)
gpuDecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, int32_t* error_code)
{
gpuParsePageHeader parse_page_header;
__shared__ byte_stream_s bs_g[4];
__shared__ int error[4];

int lane_id = threadIdx.x % 32;
int warp_id = threadIdx.x / 32;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While refactoring this, let's move to use cudf::detail::warp_size instead of hardcoding 32.

int chunk = (blockIdx.x * 4) + (threadIdx.x / 32);
byte_stream_s* const bs = &bs_g[threadIdx.x / 32];
etseidl marked this conversation as resolved.
Show resolved Hide resolved

if (chunk < num_chunks and lane_id == 0) bs->ck = chunks[chunk];
if (chunk < num_chunks and lane_id == 0) { bs->ck = chunks[chunk]; }
if (lane_id == 0) { error[warp_id] = 0; }
__syncthreads();

if (chunk < num_chunks) {
Expand Down Expand Up @@ -443,6 +432,9 @@ __global__ void __launch_bounds__(128)
}
bs->page.page_data = const_cast<uint8_t*>(bs->cur);
bs->cur += bs->page.compressed_page_size;
if (bs->cur > bs->end) {
error[warp_id] |= static_cast<int>(decode_error::DATA_STREAM_OVERRUN);
etseidl marked this conversation as resolved.
Show resolved Hide resolved
}
bs->page.kernel_mask = kernel_mask_for_page(bs->page, bs->ck);
} else {
bs->cur = bs->end;
Expand All @@ -457,6 +449,10 @@ __global__ void __launch_bounds__(128)
if (lane_id == 0) {
chunks[chunk].num_data_pages = data_page_count;
chunks[chunk].num_dict_pages = dictionary_page_count;
if (error[warp_id] != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(error[warp_id], cuda::std::memory_order_relaxed);
}
}
}
}
Expand Down Expand Up @@ -512,11 +508,12 @@ __global__ void __launch_bounds__(128)

void __host__ DecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
dim3 dim_block(128, 1);
dim3 dim_grid((num_chunks + 3) >> 2, 1); // 1 chunk per warp, 4 warps per block
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_chunks);
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_chunks, error_code);
}

void __host__ BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,13 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk)
*
* @param[in] chunks List of column chunks
* @param[in] num_chunks Number of column chunks
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
void DecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, rmm::cuda_stream_view stream);
void DecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
int32_t* error_code,
rmm::cuda_stream_view stream);

/**
* @brief Launches kernel for building the dictionary index for the column
Expand Down
33 changes: 28 additions & 5 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,20 @@ template <typename T = uint8_t>
* @brief Return the number of total pages from the given column chunks.
*
* @param chunks List of column chunk descriptors
* @param error_code Error code for kernel failures
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return The total number of pages
*/
[[nodiscard]] size_t count_page_headers(
cudf::detail::hostdevice_vector<gpu::ColumnChunkDesc>& chunks, rmm::cuda_stream_view stream)
cudf::detail::hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
size_t total_pages = 0;

chunks.host_to_device_async(stream);
gpu::DecodePageHeaders(chunks.device_ptr(), chunks.size(), stream);
gpu::DecodePageHeaders(chunks.device_ptr(), chunks.size(), error_code, stream);
chunks.device_to_host_sync(stream);

for (size_t c = 0; c < chunks.size(); c++) {
Expand Down Expand Up @@ -334,11 +337,13 @@ constexpr bool is_supported_encoding(Encoding enc)
*
* @param chunks List of column chunk descriptors
* @param pages List of page information
* @param error_code Error code for kernel failures
* @param stream CUDA stream used for device memory operations and kernel launches
* @returns The size in bytes of level type data required
*/
int decode_page_headers(cudf::detail::hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
cudf::detail::hostdevice_vector<gpu::PageInfo>& pages,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
// IMPORTANT : if you change how pages are stored within a chunk (dist pages, then data pages),
Expand All @@ -350,7 +355,7 @@ int decode_page_headers(cudf::detail::hostdevice_vector<gpu::ColumnChunkDesc>& c
}

chunks.host_to_device_async(stream);
gpu::DecodePageHeaders(chunks.device_ptr(), chunks.size(), stream);
gpu::DecodePageHeaders(chunks.device_ptr(), chunks.size(), error_code, stream);

// compute max bytes needed for level data
auto level_bit_size =
Expand Down Expand Up @@ -967,13 +972,31 @@ void reader::impl::load_and_decompress_data()
task.wait();
}

rmm::device_scalar<int32_t> error_code(0, _stream);

// Process dataset chunk pages into output columns
auto const total_pages = count_page_headers(chunks, _stream);
auto const total_pages = count_page_headers(chunks, error_code.data(), _stream);
if (total_pages <= 0) { return; }

auto decode_error = error_code.value(_stream);
vuule marked this conversation as resolved.
Show resolved Hide resolved
if (decode_error != 0) {
std::stringstream stream;
stream << std::hex << decode_error;
CUDF_FAIL("Parquet header parsing failed with code(s) 0x" + stream.str());
}

pages = cudf::detail::hostdevice_vector<gpu::PageInfo>(total_pages, total_pages, _stream);

// decoding of column/page information
_pass_itm_data->level_type_size = decode_page_headers(chunks, pages, _stream);
_pass_itm_data->level_type_size = decode_page_headers(chunks, pages, error_code.data(), _stream);

decode_error = error_code.value(_stream);
if (decode_error != 0) {
std::stringstream stream;
stream << std::hex << decode_error;
CUDF_FAIL("Parquet header parsing failed with code(s) 0x" + stream.str());
}

if (has_compressed_data) {
decomp_page_data = decompress_page_data(chunks, pages, _stream);
// Free compressed data
Expand Down
Binary file not shown.
8 changes: 8 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,14 @@ def test_parquet_reader_unsupported_page_encoding(datadir):
cudf.read_parquet(fname)


def test_parquet_reader_detect_bad_dictionary(datadir):
fname = datadir / "bad_dict.parquet"

# expect a failure when reading the whole file
with pytest.raises(RuntimeError):
cudf.read_parquet(fname)


@pytest.mark.parametrize("data", [{"a": [1, 2, 3, 4]}, {"b": [1, None, 2, 3]}])
@pytest.mark.parametrize("force_nullable_schema", [True, False])
def test_parquet_writer_schema_nullability(data, force_nullable_schema):
Expand Down