Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add V2 page header support to parquet reader #11778

Merged
merged 24 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
df74201
initial
etseidl Sep 23, 2022
8b57eb2
compiles
etseidl Sep 23, 2022
af1b1d4
formatting
etseidl Sep 23, 2022
2895315
some cleanup
etseidl Sep 23, 2022
ecad6a4
more cleanup
etseidl Sep 23, 2022
667349e
fix stupid bug in decompression
etseidl Sep 23, 2022
84be649
Merge branch 'rapidsai:branch-22.10' into feature/parquetv2
etseidl Sep 26, 2022
90b0012
Merge branch 'rapidsai:branch-22.10' into feature/parquetv2
etseidl Sep 26, 2022
10b2730
clean up some comments
etseidl Sep 26, 2022
8ad19d1
simplify by getting rid of separate pointers for def/rep data
etseidl Sep 26, 2022
bcddaf9
get rid of hdr_version too
etseidl Sep 26, 2022
d991ed2
formatting
etseidl Sep 26, 2022
86343f3
add consts and use cudaMemcpyAsync
etseidl Sep 27, 2022
6e7eb8d
experiment with using gpu_copy_uncompressed_blocks to copy rep and de…
etseidl Sep 28, 2022
d3378fc
Merge pull request #5 from rapidsai/branch-22.12
etseidl Sep 28, 2022
26b9199
test v2 header reader in python. use pandas to write v2 headers
etseidl Sep 28, 2022
a1b749d
Merge branch 'feature/parquetv2' of github.com:etseidl/cudf into feat…
etseidl Sep 28, 2022
6136a47
formatting
etseidl Sep 28, 2022
077006a
remove old copy code and only use gpu_copy_uncompressed_blocks
etseidl Sep 30, 2022
a4aa28b
implement suggestion from review
etseidl Oct 7, 2022
5f9f3fd
clean up check for empty vector
etseidl Oct 7, 2022
413fc42
more review changes
etseidl Oct 8, 2022
a66038b
fix formatting
etseidl Oct 8, 2022
d34d8b0
remove pytest.param as suggested in review
etseidl Oct 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,18 @@ __device__ uint32_t InitLevelSection(page_state_s* s,
s->initial_rle_value[lvl] = 0;
s->lvl_start[lvl] = cur;
} else if (encoding == Encoding::RLE) {
if (cur + 4 < end) {
uint32_t run;
// V2 only uses RLE encoding, so only perform check here
if (s->page.def_lvl_bytes || s->page.rep_lvl_bytes) {
len = lvl == level_type::DEFINITION ? s->page.def_lvl_bytes : s->page.rep_lvl_bytes;
} else if (cur + 4 < end) {
len = 4 + (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24);
cur += 4;
run = get_vlq32(cur, end);
} else {
len = 0;
s->error = 2;
}
if (!s->error) {
uint32_t run = get_vlq32(cur, end);
s->initial_rle_run[lvl] = run;
if (!(run & 1)) {
int v = (cur < end) ? cur[0] : 0;
Expand All @@ -163,9 +170,6 @@ __device__ uint32_t InitLevelSection(page_state_s* s,
}
s->lvl_start[lvl] = cur;
if (cur > end) { s->error = 2; }
} else {
len = 0;
s->error = 2;
}
} else if (encoding == Encoding::BIT_PACKED) {
len = (s->page.num_input_values * level_bits + 7) >> 3;
Expand All @@ -176,7 +180,7 @@ __device__ uint32_t InitLevelSection(page_state_s* s,
s->error = 3;
len = 0;
}
return (uint32_t)len;
return static_cast<uint32_t>(len);
}

/**
Expand Down
17 changes: 15 additions & 2 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,11 @@ struct gpuParseDataPageHeaderV2 {
__device__ bool operator()(byte_stream_s* bs)
{
auto op = thrust::make_tuple(ParquetFieldInt32(1, bs->page.num_input_values),
ParquetFieldInt32(2, bs->page.num_nulls),
ParquetFieldInt32(3, bs->page.num_rows),
ParquetFieldEnum<Encoding>(4, bs->page.encoding),
ParquetFieldEnum<Encoding>(5, bs->page.definition_level_encoding),
ParquetFieldEnum<Encoding>(6, bs->page.repetition_level_encoding));
ParquetFieldInt32(5, bs->page.def_lvl_bytes),
ParquetFieldInt32(6, bs->page.rep_lvl_bytes));
return parse_header(op, bs);
}
};
Expand Down Expand Up @@ -382,18 +383,30 @@ __global__ void __launch_bounds__(128)
// definition levels
bs->page.chunk_row += bs->page.num_rows;
bs->page.num_rows = 0;
// zero out V2 info
bs->page.num_nulls = 0;
bs->page.def_lvl_bytes = 0;
bs->page.rep_lvl_bytes = 0;
if (parse_page_header(bs) && bs->page.compressed_page_size >= 0) {
switch (bs->page_type) {
case PageType::DATA_PAGE:
index_out = num_dict_pages + data_page_count;
data_page_count++;
bs->page.flags = 0;
// this computation is only valid for flat schemas. for nested schemas,
// they will be recomputed in the preprocess step by examining repetition and
// definition levels
bs->page.num_rows = bs->page.num_input_values;
values_found += bs->page.num_input_values;
break;
case PageType::DATA_PAGE_V2:
index_out = num_dict_pages + data_page_count;
data_page_count++;
bs->page.flags = 0;
values_found += bs->page.num_input_values;
// V2 only uses RLE, so it was removed from the header
bs->page.definition_level_encoding = Encoding::RLE;
bs->page.repetition_level_encoding = Encoding::RLE;
break;
case PageType::DICTIONARY_PAGE:
index_out = dictionary_page_count;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ struct PageInfo {
// decompression
int32_t compressed_page_size; // compressed data size in bytes
int32_t uncompressed_page_size; // uncompressed data size in bytes
// for V2 pages, the def and rep level data is not compressed, and lacks the 4-byte length
// indicator. instead the lengths for these are stored in the header.
int32_t def_lvl_bytes; // length of the definition levels (V2 header)
int32_t rep_lvl_bytes; // length of the repetition levels (V2 header)
// Number of values in this data page or dictionary.
// Important : the # of input values does not necessarily
// correspond to the number of rows in the output. It just reflects the number
Expand All @@ -131,6 +135,7 @@ struct PageInfo {
int32_t num_input_values;
int32_t chunk_row; // starting row of this page relative to the start of the chunk
int32_t num_rows; // number of rows in this page
int32_t num_nulls; // number of null values (V2 header)
int32_t chunk_idx; // column chunk this page belongs to
int32_t src_col_schema; // schema index of this column
uint8_t flags; // PAGEINFO_FLAGS_XXX
Expand Down
42 changes: 33 additions & 9 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,12 @@ rmm::device_buffer reader::impl::decompress_page_data(
std::vector<device_span<uint8_t>> comp_out;
comp_out.reserve(num_comp_pages);

// vectors to save v2 def and rep level data, if any
std::vector<device_span<uint8_t const>> copy_in;
copy_in.reserve(num_comp_pages);
std::vector<device_span<uint8_t>> copy_out;
copy_out.reserve(num_comp_pages);

rmm::device_uvector<compression_result> comp_res(num_comp_pages, _stream);
thrust::fill(rmm::exec_policy(_stream),
comp_res.begin(),
Expand All @@ -1162,15 +1168,24 @@ rmm::device_buffer reader::impl::decompress_page_data(
for (const auto& codec : codecs) {
if (codec.num_pages == 0) { continue; }

for_each_codec_page(codec.compression_type, [&](size_t page) {
auto dst_base = static_cast<uint8_t*>(decomp_pages.data());
comp_in.emplace_back(pages[page].page_data,
static_cast<size_t>(pages[page].compressed_page_size));
comp_out.emplace_back(dst_base + decomp_offset,
static_cast<size_t>(pages[page].uncompressed_page_size));

pages[page].page_data = static_cast<uint8_t*>(comp_out.back().data());
decomp_offset += comp_out.back().size();
for_each_codec_page(codec.compression_type, [&](size_t page_idx) {
auto const dst_base = static_cast<uint8_t*>(decomp_pages.data()) + decomp_offset;
auto& page = pages[page_idx];
// offset will only be non-zero for V2 pages
auto const offset = page.def_lvl_bytes + page.rep_lvl_bytes;
// for V2 need to copy def and rep level info into place, and then offset the
// input and output buffers. otherwise we'd have to keep both the compressed
// and decompressed data.
if (offset != 0) {
copy_in.emplace_back(page.page_data, offset);
copy_out.emplace_back(dst_base, offset);
}
comp_in.emplace_back(page.page_data + offset,
static_cast<size_t>(page.compressed_page_size - offset));
comp_out.emplace_back(dst_base + offset,
static_cast<size_t>(page.uncompressed_page_size - offset));
page.page_data = dst_base;
decomp_offset += page.uncompressed_page_size;
});

host_span<device_span<uint8_t const> const> comp_in_view{comp_in.data() + start_pos,
Expand Down Expand Up @@ -1222,6 +1237,15 @@ rmm::device_buffer reader::impl::decompress_page_data(

decompress_check(comp_res, _stream);

// now copy the uncompressed V2 def and rep level data
if (not copy_in.empty()) {
auto const d_copy_in = cudf::detail::make_device_uvector_async(copy_in, _stream);
auto const d_copy_out = cudf::detail::make_device_uvector_async(copy_out, _stream);

gpu_copy_uncompressed_blocks(d_copy_in, d_copy_out, _stream);
vuule marked this conversation as resolved.
Show resolved Hide resolved
_stream.synchronize();
}

// Update the page information in device memory with the updated value of
// page_data; it now points to the uncompressed data buffer
pages.host_to_device(_stream);
Expand Down
76 changes: 76 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,82 @@ def string_list_gen_wrapped(x, y):
assert expect.equals(got.to_arrow())


def test_parquet_reader_v2(tmpdir, simple_pdf):
pdf_fname = tmpdir.join("pdfv2.parquet")
simple_pdf.to_parquet(pdf_fname, data_page_version="2.0")
assert_eq(cudf.read_parquet(pdf_fname), simple_pdf)


@pytest.mark.parametrize(
"data",
[
# Structs
{
"being": [
None,
{"human?": True, "Deets": {"Name": "Carrot", "Age": 27}},
{"human?": None, "Deets": {"Name": "Angua", "Age": 25}},
{"human?": False, "Deets": {"Name": "Cheery", "Age": 31}},
{"human?": False, "Deets": None},
{"human?": None, "Deets": {"Name": "Mr", "Age": None}},
]
},
# List of Structs
{
"family": [
[None, {"human?": True, "deets": {"weight": 2.4, "age": 27}}],
[
{"human?": None, "deets": {"weight": 5.3, "age": 25}},
{"human?": False, "deets": {"weight": 8.0, "age": 31}},
{"human?": False, "deets": None},
],
[],
[{"human?": None, "deets": {"weight": 6.9, "age": None}}],
]
},
# Struct of Lists
pytest.param(
mroeschke marked this conversation as resolved.
Show resolved Hide resolved
{
"Real estate records": [
None,
{
"Status": "NRI",
"Ownerships": {
"land_unit": [None, 2, None],
"flats": [[1, 2, 3], [], [4, 5], [], [0, 6, 0]],
},
},
{
"Status": None,
"Ownerships": {
"land_unit": [4, 5],
"flats": [[7, 8], []],
},
},
{
"Status": "RI",
"Ownerships": {"land_unit": None, "flats": [[]]},
},
{"Status": "RI", "Ownerships": None},
{
"Status": None,
"Ownerships": {
"land_unit": [7, 8, 9],
"flats": [[], [], []],
},
},
]
},
),
],
)
def test_parquet_reader_nested_v2(tmpdir, data):
expect = pd.DataFrame(data)
pdf_fname = tmpdir.join("pdfv2.parquet")
expect.to_parquet(pdf_fname, data_page_version="2.0")
assert_eq(cudf.read_parquet(pdf_fname), expect)


@pytest.mark.filterwarnings("ignore:Using CPU")
def test_parquet_writer_cpu_pyarrow(
tmpdir, pdf_day_timestamps, gdf_day_timestamps
Expand Down