Skip to content

Commit

Permalink
add V2 page header support to parquet reader (#11778)
Browse files Browse the repository at this point in the history
Adds support for reading parquet files with V2 page headers. Fixes #11686

~~Submitting as draft for now because I'm not sure how to do unit tests for this.  libcudf cannot produce files with V2 headers, so I would need to either add files to a data directory somewhere, or add raw binary of some parquet files to parquet_test.cpp. Given the comment on the `DecimalRead` test, neither seems attractive. Suggestions are welcome.  Perhaps use python to test?~~

Authors:
  - Ed Seidl (https://github.com/etseidl)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Matthew Roeschke (https://github.com/mroeschke)

URL: #11778
  • Loading branch information
etseidl authored Oct 21, 2022
1 parent 7940b5b commit f1ab5e9
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 18 deletions.
18 changes: 11 additions & 7 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,18 @@ __device__ uint32_t InitLevelSection(page_state_s* s,
s->initial_rle_value[lvl] = 0;
s->lvl_start[lvl] = cur;
} else if (encoding == Encoding::RLE) {
if (cur + 4 < end) {
uint32_t run;
// V2 only uses RLE encoding, so only perform check here
if (s->page.def_lvl_bytes || s->page.rep_lvl_bytes) {
len = lvl == level_type::DEFINITION ? s->page.def_lvl_bytes : s->page.rep_lvl_bytes;
} else if (cur + 4 < end) {
len = 4 + (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24);
cur += 4;
run = get_vlq32(cur, end);
} else {
len = 0;
s->error = 2;
}
if (!s->error) {
uint32_t run = get_vlq32(cur, end);
s->initial_rle_run[lvl] = run;
if (!(run & 1)) {
int v = (cur < end) ? cur[0] : 0;
Expand All @@ -163,9 +170,6 @@ __device__ uint32_t InitLevelSection(page_state_s* s,
}
s->lvl_start[lvl] = cur;
if (cur > end) { s->error = 2; }
} else {
len = 0;
s->error = 2;
}
} else if (encoding == Encoding::BIT_PACKED) {
len = (s->page.num_input_values * level_bits + 7) >> 3;
Expand All @@ -176,7 +180,7 @@ __device__ uint32_t InitLevelSection(page_state_s* s,
s->error = 3;
len = 0;
}
return (uint32_t)len;
return static_cast<uint32_t>(len);
}

/**
Expand Down
17 changes: 15 additions & 2 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,11 @@ struct gpuParseDataPageHeaderV2 {
__device__ bool operator()(byte_stream_s* bs)
{
auto op = thrust::make_tuple(ParquetFieldInt32(1, bs->page.num_input_values),
ParquetFieldInt32(2, bs->page.num_nulls),
ParquetFieldInt32(3, bs->page.num_rows),
ParquetFieldEnum<Encoding>(4, bs->page.encoding),
ParquetFieldEnum<Encoding>(5, bs->page.definition_level_encoding),
ParquetFieldEnum<Encoding>(6, bs->page.repetition_level_encoding));
ParquetFieldInt32(5, bs->page.def_lvl_bytes),
ParquetFieldInt32(6, bs->page.rep_lvl_bytes));
return parse_header(op, bs);
}
};
Expand Down Expand Up @@ -382,18 +383,30 @@ __global__ void __launch_bounds__(128)
// definition levels
bs->page.chunk_row += bs->page.num_rows;
bs->page.num_rows = 0;
// zero out V2 info
bs->page.num_nulls = 0;
bs->page.def_lvl_bytes = 0;
bs->page.rep_lvl_bytes = 0;
if (parse_page_header(bs) && bs->page.compressed_page_size >= 0) {
switch (bs->page_type) {
case PageType::DATA_PAGE:
index_out = num_dict_pages + data_page_count;
data_page_count++;
bs->page.flags = 0;
// this computation is only valid for flat schemas. for nested schemas,
// they will be recomputed in the preprocess step by examining repetition and
// definition levels
bs->page.num_rows = bs->page.num_input_values;
values_found += bs->page.num_input_values;
break;
case PageType::DATA_PAGE_V2:
index_out = num_dict_pages + data_page_count;
data_page_count++;
bs->page.flags = 0;
values_found += bs->page.num_input_values;
// V2 only uses RLE, so it was removed from the header
bs->page.definition_level_encoding = Encoding::RLE;
bs->page.repetition_level_encoding = Encoding::RLE;
break;
case PageType::DICTIONARY_PAGE:
index_out = dictionary_page_count;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ struct PageInfo {
// decompression
int32_t compressed_page_size; // compressed data size in bytes
int32_t uncompressed_page_size; // uncompressed data size in bytes
// for V2 pages, the def and rep level data is not compressed, and lacks the 4-byte length
// indicator. instead the lengths for these are stored in the header.
int32_t def_lvl_bytes; // length of the definition levels (V2 header)
int32_t rep_lvl_bytes; // length of the repetition levels (V2 header)
// Number of values in this data page or dictionary.
// Important : the # of input values does not necessarily
// correspond to the number of rows in the output. It just reflects the number
Expand All @@ -138,6 +142,7 @@ struct PageInfo {
int32_t num_input_values;
int32_t chunk_row; // starting row of this page relative to the start of the chunk
int32_t num_rows; // number of rows in this page
int32_t num_nulls; // number of null values (V2 header)
int32_t chunk_idx; // column chunk this page belongs to
int32_t src_col_schema; // schema index of this column
uint8_t flags; // PAGEINFO_FLAGS_XXX
Expand Down
42 changes: 33 additions & 9 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,12 @@ rmm::device_buffer reader::impl::decompress_page_data(
std::vector<device_span<uint8_t>> comp_out;
comp_out.reserve(num_comp_pages);

// vectors to save v2 def and rep level data, if any
std::vector<device_span<uint8_t const>> copy_in;
copy_in.reserve(num_comp_pages);
std::vector<device_span<uint8_t>> copy_out;
copy_out.reserve(num_comp_pages);

rmm::device_uvector<compression_result> comp_res(num_comp_pages, _stream);
thrust::fill(rmm::exec_policy(_stream),
comp_res.begin(),
Expand All @@ -1162,15 +1168,24 @@ rmm::device_buffer reader::impl::decompress_page_data(
for (const auto& codec : codecs) {
if (codec.num_pages == 0) { continue; }

for_each_codec_page(codec.compression_type, [&](size_t page) {
auto dst_base = static_cast<uint8_t*>(decomp_pages.data());
comp_in.emplace_back(pages[page].page_data,
static_cast<size_t>(pages[page].compressed_page_size));
comp_out.emplace_back(dst_base + decomp_offset,
static_cast<size_t>(pages[page].uncompressed_page_size));

pages[page].page_data = static_cast<uint8_t*>(comp_out.back().data());
decomp_offset += comp_out.back().size();
for_each_codec_page(codec.compression_type, [&](size_t page_idx) {
auto const dst_base = static_cast<uint8_t*>(decomp_pages.data()) + decomp_offset;
auto& page = pages[page_idx];
// offset will only be non-zero for V2 pages
auto const offset = page.def_lvl_bytes + page.rep_lvl_bytes;
// for V2 need to copy def and rep level info into place, and then offset the
// input and output buffers. otherwise we'd have to keep both the compressed
// and decompressed data.
if (offset != 0) {
copy_in.emplace_back(page.page_data, offset);
copy_out.emplace_back(dst_base, offset);
}
comp_in.emplace_back(page.page_data + offset,
static_cast<size_t>(page.compressed_page_size - offset));
comp_out.emplace_back(dst_base + offset,
static_cast<size_t>(page.uncompressed_page_size - offset));
page.page_data = dst_base;
decomp_offset += page.uncompressed_page_size;
});

host_span<device_span<uint8_t const> const> comp_in_view{comp_in.data() + start_pos,
Expand Down Expand Up @@ -1222,6 +1237,15 @@ rmm::device_buffer reader::impl::decompress_page_data(

decompress_check(comp_res, _stream);

// now copy the uncompressed V2 def and rep level data
if (not copy_in.empty()) {
auto const d_copy_in = cudf::detail::make_device_uvector_async(copy_in, _stream);
auto const d_copy_out = cudf::detail::make_device_uvector_async(copy_out, _stream);

gpu_copy_uncompressed_blocks(d_copy_in, d_copy_out, _stream);
_stream.synchronize();
}

// Update the page information in device memory with the updated value of
// page_data; it now points to the uncompressed data buffer
pages.host_to_device(_stream);
Expand Down
74 changes: 74 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,80 @@ def string_list_gen_wrapped(x, y):
assert expect.equals(got.to_arrow())


def test_parquet_reader_v2(tmpdir, simple_pdf):
pdf_fname = tmpdir.join("pdfv2.parquet")
simple_pdf.to_parquet(pdf_fname, data_page_version="2.0")
assert_eq(cudf.read_parquet(pdf_fname), simple_pdf)


@pytest.mark.parametrize(
"data",
[
# Structs
{
"being": [
None,
{"human?": True, "Deets": {"Name": "Carrot", "Age": 27}},
{"human?": None, "Deets": {"Name": "Angua", "Age": 25}},
{"human?": False, "Deets": {"Name": "Cheery", "Age": 31}},
{"human?": False, "Deets": None},
{"human?": None, "Deets": {"Name": "Mr", "Age": None}},
]
},
# List of Structs
{
"family": [
[None, {"human?": True, "deets": {"weight": 2.4, "age": 27}}],
[
{"human?": None, "deets": {"weight": 5.3, "age": 25}},
{"human?": False, "deets": {"weight": 8.0, "age": 31}},
{"human?": False, "deets": None},
],
[],
[{"human?": None, "deets": {"weight": 6.9, "age": None}}],
]
},
# Struct of Lists
{
"Real estate records": [
None,
{
"Status": "NRI",
"Ownerships": {
"land_unit": [None, 2, None],
"flats": [[1, 2, 3], [], [4, 5], [], [0, 6, 0]],
},
},
{
"Status": None,
"Ownerships": {
"land_unit": [4, 5],
"flats": [[7, 8], []],
},
},
{
"Status": "RI",
"Ownerships": {"land_unit": None, "flats": [[]]},
},
{"Status": "RI", "Ownerships": None},
{
"Status": None,
"Ownerships": {
"land_unit": [7, 8, 9],
"flats": [[], [], []],
},
},
]
},
],
)
def test_parquet_reader_nested_v2(tmpdir, data):
expect = pd.DataFrame(data)
pdf_fname = tmpdir.join("pdfv2.parquet")
expect.to_parquet(pdf_fname, data_page_version="2.0")
assert_eq(cudf.read_parquet(pdf_fname), expect)


@pytest.mark.filterwarnings("ignore:Using CPU")
def test_parquet_writer_cpu_pyarrow(
tmpdir, pdf_day_timestamps, gdf_day_timestamps
Expand Down

0 comments on commit f1ab5e9

Please sign in to comment.