From df7420147ec670e8ab77512a5438447aee600f0e Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 22 Sep 2022 17:45:37 -0700 Subject: [PATCH 01/20] initial --- cpp/src/io/parquet/page_hdr.cu | 19 ++++++++++-- cpp/src/io/parquet/parquet_gpu.hpp | 10 +++++++ cpp/src/io/parquet/reader_impl.cu | 46 ++++++++++++++++++++++++------ 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index e7856a871c1..69c0fba4ef2 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -307,10 +307,11 @@ struct gpuParseDataPageHeaderV2 { __device__ bool operator()(byte_stream_s* bs) { auto op = thrust::make_tuple(ParquetFieldInt32(1, bs->page.num_input_values), + ParquetFieldInt32(2, bs->page.num_nulls), ParquetFieldInt32(3, bs->page.num_rows), ParquetFieldEnum(4, bs->page.encoding), - ParquetFieldEnum(5, bs->page.definition_level_encoding), - ParquetFieldEnum(6, bs->page.repetition_level_encoding)); + ParquetFieldEnum(5, bs->page.def_lvl_bytes), + ParquetFieldEnum(6, bs->page.rep_lvl_bytes)); return parse_header(op, bs); } }; @@ -385,15 +386,29 @@ __global__ void __launch_bounds__(128) if (parse_page_header(bs) && bs->page.compressed_page_size >= 0) { switch (bs->page_type) { case PageType::DATA_PAGE: + index_out = num_dict_pages + data_page_count; + data_page_count++; + bs->page.flags = 0; + bs->page.version = 1; // this computation is only valid for flat schemas. for nested schemas, // they will be recomputed in the preprocess step by examining repetition and // definition levels bs->page.num_rows = bs->page.num_input_values; + // zero out V2 info + bs->page.num_nulls = 0; + bs->page.def_lvl_bytes = 0; + bs->page.rep_lvl_bytes = 0; + values_found += bs->page.num_input_values; + break; case PageType::DATA_PAGE_V2: index_out = num_dict_pages + data_page_count; data_page_count++; bs->page.flags = 0; + bs->page.version = 2; values_found += bs->page.num_input_values; + bs->page.flags = 0; + bs->page.definition_level_encoding = Encoding::RLE; + bs->page.repetition_level_encoding = Encoding::RLE; break; case PageType::DICTIONARY_PAGE: index_out = dictionary_page_count; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index d0d367df962..f4e8cefe746 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -118,6 +118,15 @@ struct PageInfo { // decompression int32_t compressed_page_size; // compressed data size in bytes int32_t uncompressed_page_size; // uncompressed data size in bytes + // for V2 pages, the def and rep level data is not compressed, and lacks the 4-byte length + // indicator. instead the lengths for these are stored in the header. for V1 we'll parse + // out the lengths and set them here, and set the pointers after decompressing. for V2 + // we'll have to allocate extra space in the buffer for decompression and copy the data. + int32_t hdr_version; // 1 for v1, 2 for v2 + uint8_t* def_lvl_data; // uncompressed but packed definition level data pointer + uint8_t* rep_lvl_data; // uncompressed but packed repetition level data pointer + int32_t def_lvl_bytes; // length of the definition levels (V2 header) + int32_t rep_lvl_bytes; // length of the repetition levels (V2 header) // Number of values in this data page or dictionary. // Important : the # of input values does not necessarily // correspond to the number of rows in the output. It just reflects the number @@ -131,6 +140,7 @@ struct PageInfo { int32_t chunk_idx; // column chunk this page belongs to int32_t src_col_schema; // schema index of this column uint8_t flags; // PAGEINFO_FLAGS_XXX + int32_t num_nulls; // number of null values (V2 header) Encoding encoding; // Encoding for data or dictionary page Encoding definition_level_encoding; // Encoding used for definition levels (data page) Encoding repetition_level_encoding; // Encoding used for repetition levels (data page) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 59bef6f5600..8ff2b32c515 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1131,6 +1131,8 @@ rmm::device_buffer reader::impl::decompress_page_data( for (auto& codec : codecs) { for_each_codec_page(codec.compression_type, [&](size_t page) { + // for V2 page headers, uncompressed_page_size is uncompressed_data_size + + // def_lvl_size + rep_lvl_size auto page_uncomp_size = pages[page].uncompressed_page_size; total_decomp_size += page_uncomp_size; codec.total_decomp_size += page_uncomp_size; @@ -1162,15 +1164,38 @@ rmm::device_buffer reader::impl::decompress_page_data( for (const auto& codec : codecs) { if (codec.num_pages == 0) { continue; } - for_each_codec_page(codec.compression_type, [&](size_t page) { - auto dst_base = static_cast(decomp_pages.data()); - comp_in.emplace_back(pages[page].page_data, - static_cast(pages[page].compressed_page_size)); - comp_out.emplace_back(dst_base + decomp_offset, - static_cast(pages[page].uncompressed_page_size)); - - pages[page].page_data = static_cast(comp_out.back().data()); - decomp_offset += comp_out.back().size(); + for_each_codec_page(codec.compression_type, [&](size_t page_idx) { + auto dst_base = static_cast(decomp_pages.data()) + decomp_offset; + auto& page = pages[page_idx]; + page.rep_lvl_data = nullptr; + page.def_lvl_data = nullptr; + if (page.hdr_version == 2) { + // for V2 need copy def and rep level info into place, and then offset the + // input and output buffers. otherwise we'd have to keep both the compressed + // and decompressed data. uncompressed and V1 pages will be done on device. + auto offset = page.def_lvl_bytes + page.rep_lvl_bytes; + if (offset) { + thrust::copy(rmm::exec_policy(_stream), + page.page_data, + page.page_data + offset, + dst_base); + if (page.rep_lvl_bytes) { page.rep_lvl_data = dst_base; } + if (page.def_lvl_bytes) { page.def_lvl_data = dst_base + page.rep_lvl_bytes; }; + } + page.page_data = dst_base + offset; + comp_in.emplace_back(page.page_data + offset, + static_cast(page.compressed_page_size - offset)); + comp_out.emplace_back(dst_base + offset, + static_cast(page.uncompressed_page_size - offset)); + } else { + comp_in.emplace_back(page.page_data, + static_cast(page.compressed_page_size)); + comp_out.emplace_back(dst_base, + static_cast(page.uncompressed_page_size)); + page.page_data = dst_base; + } + + decomp_offset += page.uncompressed_page_size; }); host_span const> comp_in_view{comp_in.data() + start_pos, @@ -1746,6 +1771,9 @@ table_with_metadata reader::impl::read(size_type skip_rows, hostdevice_vector page_nesting_info; allocate_nesting_info(chunks, pages, page_nesting_info); + // TODO(ets): before preprocess, go through the pages and fix up the + // def and rep level pointers. + // - compute column sizes and allocate output buffers. // important: // for nested schemas, we have to do some further preprocessing to determine: From 8b57eb2dbcb35ad8796f0f1fb08cdf8a1bf0cc70 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 23 Sep 2022 11:09:30 -0700 Subject: [PATCH 02/20] compiles --- cpp/src/io/parquet/page_data.cu | 49 +++++++++++++++++++----------- cpp/src/io/parquet/page_hdr.cu | 9 +++--- cpp/src/io/parquet/reader_impl.cu | 42 +++++++++++++++++++++++-- cpp/src/io/parquet/reader_impl.hpp | 11 +++++++ 4 files changed, 86 insertions(+), 25 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 531733a7df7..5cbeaf70ffd 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -146,27 +146,38 @@ __device__ uint32_t InitLevelSection(page_state_s* s, s->initial_rle_value[lvl] = 0; s->lvl_start[lvl] = cur; } else if (encoding == Encoding::RLE) { - if (cur + 4 < end) { - uint32_t run; + // only need to check for V2 pages here, since V2 only uses RLE encoding + if (s->page.hdr_version == 2) { + len = 0; + cur = lvl == level_type::DEFINITION ? s->page.def_lvl_data + : s->page.rep_lvl_data; + auto lvl_end = cur + (lvl == level_type::DEFINITION ? s->page.def_lvl_bytes + : s->page.rep_lvl_bytes); + if (cur == nullptr || lvl_end > end) { s->error = 2; } + else { + end = lvl_end; + s->lvl_end = std::max(s->lvl_end, lvl_end); + } + } else if (cur + 4 < end) { len = 4 + (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24); cur += 4; - run = get_vlq32(cur, end); - s->initial_rle_run[lvl] = run; - if (!(run & 1)) { - int v = (cur < end) ? cur[0] : 0; - cur++; - if (level_bits > 8) { - v |= ((cur < end) ? cur[0] : 0) << 8; - cur++; - } - s->initial_rle_value[lvl] = v; - } - s->lvl_start[lvl] = cur; - if (cur > end) { s->error = 2; } } else { - len = 0; s->error = 2; + return 0; + } + uint32_t run = get_vlq32(cur, end); + s->initial_rle_run[lvl] = run; + if (!(run & 1)) { + int v = (cur < end) ? cur[0] : 0; + cur++; + if (level_bits > 8) { + v |= ((cur < end) ? cur[0] : 0) << 8; + cur++; + } + s->initial_rle_value[lvl] = v; } + s->lvl_start[lvl] = cur; + if (cur > end) { s->error = 2; } } else if (encoding == Encoding::BIT_PACKED) { len = (s->page.num_input_values * level_bits + 7) >> 3; s->initial_rle_run[lvl] = ((s->page.num_input_values + 7) >> 3) * 2 + 1; // literal run @@ -176,7 +187,7 @@ __device__ uint32_t InitLevelSection(page_state_s* s, s->error = 3; len = 0; } - return (uint32_t)len; + return static_cast(len); } /** @@ -963,6 +974,8 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, } s->first_output_value = 0; + // for V2 headers need to set lvl_end in InitLevelSection + s->lvl_end = nullptr; // Find the compressed size of repetition levels cur += InitLevelSection(s, cur, end, level_type::REPETITION); // Find the compressed size of definition levels @@ -1000,7 +1013,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, break; } if (cur > end) { s->error = 1; } - s->lvl_end = cur; + if (s->lvl_end == nullptr) { s->lvl_end = cur; } s->data_start = cur; s->data_end = end; } else { diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 69c0fba4ef2..6bf60935dfe 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -310,8 +310,8 @@ struct gpuParseDataPageHeaderV2 { ParquetFieldInt32(2, bs->page.num_nulls), ParquetFieldInt32(3, bs->page.num_rows), ParquetFieldEnum(4, bs->page.encoding), - ParquetFieldEnum(5, bs->page.def_lvl_bytes), - ParquetFieldEnum(6, bs->page.rep_lvl_bytes)); + ParquetFieldInt32(5, bs->page.def_lvl_bytes), + ParquetFieldInt32(6, bs->page.rep_lvl_bytes)); return parse_header(op, bs); } }; @@ -389,7 +389,7 @@ __global__ void __launch_bounds__(128) index_out = num_dict_pages + data_page_count; data_page_count++; bs->page.flags = 0; - bs->page.version = 1; + bs->page.hdr_version = 1; // this computation is only valid for flat schemas. for nested schemas, // they will be recomputed in the preprocess step by examining repetition and // definition levels @@ -404,9 +404,10 @@ __global__ void __launch_bounds__(128) index_out = num_dict_pages + data_page_count; data_page_count++; bs->page.flags = 0; - bs->page.version = 2; + bs->page.hdr_version = 2; values_found += bs->page.num_input_values; bs->page.flags = 0; + // V2 only uses RLE, so it was removed from the header bs->page.definition_level_encoding = Encoding::RLE; bs->page.repetition_level_encoding = Encoding::RLE; break; diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 8ff2b32c515..d846070519c 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1247,6 +1247,9 @@ rmm::device_buffer reader::impl::decompress_page_data( decompress_check(comp_res, _stream); + // clean up pointers for uncompressed V2 pages + fix_v2_page_data(chunks, pages, false); + // Update the page information in device memory with the updated value of // page_data; it now points to the uncompressed data buffer pages.host_to_device(_stream); @@ -1254,6 +1257,39 @@ rmm::device_buffer reader::impl::decompress_page_data( return decomp_pages; } +/** + * @copydoc cudf::io::detail::parquet::fix_v2_page_data + */ +void reader::impl::fix_v2_page_data(hostdevice_vector& chunks, + hostdevice_vector& pages, + bool sync_to_device) +{ + for (size_t c = 0, page_count = 0; c < chunks.size(); c++) { + const auto page_stride = chunks[c].max_num_pages; + if (chunks[c].codec == Compression::UNCOMPRESSED) { + for (int k = 0; k < page_stride; k++) { + auto& page = pages[page_count + k]; + auto data = page.page_data; + page.rep_lvl_data = nullptr; + page.def_lvl_data = nullptr; + if (page.hdr_version == 2) { + auto offset = page.def_lvl_bytes + page.rep_lvl_bytes; + if (offset) { + if (page.rep_lvl_bytes) { page.rep_lvl_data = data; } + if (page.def_lvl_bytes) { page.def_lvl_data = data + page.rep_lvl_bytes; }; + } + page.page_data = data + offset; + } + } + } + page_count += page_stride; + } + + // Update the page information in device memory with the updated value of + // page_data; it now points to the uncompressed data buffer + if (sync_to_device) { pages.host_to_device(_stream); } +} + /** * @copydoc cudf::io::detail::parquet::allocate_nesting_info */ @@ -1749,6 +1785,9 @@ table_with_metadata reader::impl::read(size_type skip_rows, for (size_t c = 0; c < chunks.size(); c++) { if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { page_data[c].reset(); } } + } else { + // if not compressed, fix data pointers for V2 pages + fix_v2_page_data(chunks, pages, true); } // build output column info @@ -1771,9 +1810,6 @@ table_with_metadata reader::impl::read(size_type skip_rows, hostdevice_vector page_nesting_info; allocate_nesting_info(chunks, pages, page_nesting_info); - // TODO(ets): before preprocess, go through the pages and fix up the - // def and rep level pointers. - // - compute column sizes and allocate output buffers. // important: // for nested schemas, we have to do some further preprocessing to determine: diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index e1f275bb8e8..137165a7197 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -129,6 +129,17 @@ class reader::impl { rmm::device_buffer decompress_page_data(hostdevice_vector& chunks, hostdevice_vector& pages); + /** + * @brief Fix data poitners for V2 pages. + * + * @param chunks List of column chunk descriptors + * @param pages List of page information + * @param sync_to_device If true, copy pages to device + */ + void fix_v2_page_data(hostdevice_vector& chunks, + hostdevice_vector& pages, + bool sync_to_device); + /** * @brief Allocate nesting information storage for all pages and set pointers * to it. From af1b1d4ab5f2e26cfcb0310f899e12b13cd50f04 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 23 Sep 2022 12:52:27 -0700 Subject: [PATCH 03/20] formatting --- cpp/src/io/parquet/page_data.cu | 14 +++++++------- cpp/src/io/parquet/page_hdr.cu | 15 ++++++++------- cpp/src/io/parquet/parquet_gpu.hpp | 4 ++-- cpp/src/io/parquet/reader_impl.cu | 21 +++++++++------------ cpp/src/io/parquet/reader_impl.hpp | 4 ++-- 5 files changed, 28 insertions(+), 30 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 5cbeaf70ffd..bdebeaa7a92 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -149,13 +149,13 @@ __device__ uint32_t InitLevelSection(page_state_s* s, // only need to check for V2 pages here, since V2 only uses RLE encoding if (s->page.hdr_version == 2) { len = 0; - cur = lvl == level_type::DEFINITION ? s->page.def_lvl_data - : s->page.rep_lvl_data; - auto lvl_end = cur + (lvl == level_type::DEFINITION ? s->page.def_lvl_bytes - : s->page.rep_lvl_bytes); - if (cur == nullptr || lvl_end > end) { s->error = 2; } - else { - end = lvl_end; + cur = lvl == level_type::DEFINITION ? s->page.def_lvl_data : s->page.rep_lvl_data; + auto lvl_end = + cur + (lvl == level_type::DEFINITION ? s->page.def_lvl_bytes : s->page.rep_lvl_bytes); + if (cur == nullptr || lvl_end > end) { + s->error = 2; + } else { + end = lvl_end; s->lvl_end = std::max(s->lvl_end, lvl_end); } } else if (cur + 4 < end) { diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 6bf60935dfe..31f9f963047 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -383,30 +383,31 @@ __global__ void __launch_bounds__(128) // definition levels bs->page.chunk_row += bs->page.num_rows; bs->page.num_rows = 0; + // zero out V2 info + bs->page.num_nulls = 0; + bs->page.def_lvl_bytes = 0; + bs->page.rep_lvl_bytes = 0; + bs->page.hdr_version = 0; if (parse_page_header(bs) && bs->page.compressed_page_size >= 0) { switch (bs->page_type) { case PageType::DATA_PAGE: index_out = num_dict_pages + data_page_count; data_page_count++; - bs->page.flags = 0; + bs->page.flags = 0; bs->page.hdr_version = 1; // this computation is only valid for flat schemas. for nested schemas, // they will be recomputed in the preprocess step by examining repetition and // definition levels bs->page.num_rows = bs->page.num_input_values; - // zero out V2 info - bs->page.num_nulls = 0; - bs->page.def_lvl_bytes = 0; - bs->page.rep_lvl_bytes = 0; values_found += bs->page.num_input_values; break; case PageType::DATA_PAGE_V2: index_out = num_dict_pages + data_page_count; data_page_count++; - bs->page.flags = 0; + bs->page.flags = 0; bs->page.hdr_version = 2; values_found += bs->page.num_input_values; - bs->page.flags = 0; + bs->page.flags = 0; // V2 only uses RLE, so it was removed from the header bs->page.definition_level_encoding = Encoding::RLE; bs->page.repetition_level_encoding = Encoding::RLE; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index f4e8cefe746..a5621d61b8e 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -122,11 +122,11 @@ struct PageInfo { // indicator. instead the lengths for these are stored in the header. for V1 we'll parse // out the lengths and set them here, and set the pointers after decompressing. for V2 // we'll have to allocate extra space in the buffer for decompression and copy the data. - int32_t hdr_version; // 1 for v1, 2 for v2 uint8_t* def_lvl_data; // uncompressed but packed definition level data pointer uint8_t* rep_lvl_data; // uncompressed but packed repetition level data pointer int32_t def_lvl_bytes; // length of the definition levels (V2 header) int32_t rep_lvl_bytes; // length of the repetition levels (V2 header) + int32_t hdr_version; // 1 for v1, 2 for v2 // Number of values in this data page or dictionary. // Important : the # of input values does not necessarily // correspond to the number of rows in the output. It just reflects the number @@ -137,10 +137,10 @@ struct PageInfo { int32_t num_input_values; int32_t chunk_row; // starting row of this page relative to the start of the chunk int32_t num_rows; // number of rows in this page + int32_t num_nulls; // number of null values (V2 header) int32_t chunk_idx; // column chunk this page belongs to int32_t src_col_schema; // schema index of this column uint8_t flags; // PAGEINFO_FLAGS_XXX - int32_t num_nulls; // number of null values (V2 header) Encoding encoding; // Encoding for data or dictionary page Encoding definition_level_encoding; // Encoding used for definition levels (data page) Encoding repetition_level_encoding; // Encoding used for repetition levels (data page) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index d846070519c..d3bd6766828 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1165,20 +1165,19 @@ rmm::device_buffer reader::impl::decompress_page_data( if (codec.num_pages == 0) { continue; } for_each_codec_page(codec.compression_type, [&](size_t page_idx) { - auto dst_base = static_cast(decomp_pages.data()) + decomp_offset; - auto& page = pages[page_idx]; + auto dst_base = static_cast(decomp_pages.data()) + decomp_offset; + auto& page = pages[page_idx]; page.rep_lvl_data = nullptr; page.def_lvl_data = nullptr; - if (page.hdr_version == 2) { + // only need to modify data pages + if (page.hdr_version == 2 && page.flags == 0) { // for V2 need copy def and rep level info into place, and then offset the // input and output buffers. otherwise we'd have to keep both the compressed // and decompressed data. uncompressed and V1 pages will be done on device. auto offset = page.def_lvl_bytes + page.rep_lvl_bytes; if (offset) { - thrust::copy(rmm::exec_policy(_stream), - page.page_data, - page.page_data + offset, - dst_base); + thrust::copy( + rmm::exec_policy(_stream), page.page_data, page.page_data + offset, dst_base); if (page.rep_lvl_bytes) { page.rep_lvl_data = dst_base; } if (page.def_lvl_bytes) { page.def_lvl_data = dst_base + page.rep_lvl_bytes; }; } @@ -1188,10 +1187,8 @@ rmm::device_buffer reader::impl::decompress_page_data( comp_out.emplace_back(dst_base + offset, static_cast(page.uncompressed_page_size - offset)); } else { - comp_in.emplace_back(page.page_data, - static_cast(page.compressed_page_size)); - comp_out.emplace_back(dst_base, - static_cast(page.uncompressed_page_size)); + comp_in.emplace_back(page.page_data, static_cast(page.compressed_page_size)); + comp_out.emplace_back(dst_base, static_cast(page.uncompressed_page_size)); page.page_data = dst_base; } @@ -1272,7 +1269,7 @@ void reader::impl::fix_v2_page_data(hostdevice_vector& chu auto data = page.page_data; page.rep_lvl_data = nullptr; page.def_lvl_data = nullptr; - if (page.hdr_version == 2) { + if (page.hdr_version == 2 && page.flags == 0) { auto offset = page.def_lvl_bytes + page.rep_lvl_bytes; if (offset) { if (page.rep_lvl_bytes) { page.rep_lvl_data = data; } diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 137165a7197..91cfeae6535 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -137,8 +137,8 @@ class reader::impl { * @param sync_to_device If true, copy pages to device */ void fix_v2_page_data(hostdevice_vector& chunks, - hostdevice_vector& pages, - bool sync_to_device); + hostdevice_vector& pages, + bool sync_to_device); /** * @brief Allocate nesting information storage for all pages and set pointers From 28953150a71f98ffe5026d6b311cdca37569084d Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 23 Sep 2022 13:55:12 -0700 Subject: [PATCH 04/20] some cleanup --- cpp/src/io/parquet/page_data.cu | 23 ++++++++++++----------- cpp/src/io/parquet/page_hdr.cu | 1 - 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index bdebeaa7a92..36ccc6d0c34 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -163,21 +163,22 @@ __device__ uint32_t InitLevelSection(page_state_s* s, cur += 4; } else { s->error = 2; - return 0; } - uint32_t run = get_vlq32(cur, end); - s->initial_rle_run[lvl] = run; - if (!(run & 1)) { - int v = (cur < end) ? cur[0] : 0; - cur++; - if (level_bits > 8) { - v |= ((cur < end) ? cur[0] : 0) << 8; + if (s->error != 2) { + uint32_t run = get_vlq32(cur, end); + s->initial_rle_run[lvl] = run; + if (!(run & 1)) { + int v = (cur < end) ? cur[0] : 0; cur++; + if (level_bits > 8) { + v |= ((cur < end) ? cur[0] : 0) << 8; + cur++; + } + s->initial_rle_value[lvl] = v; } - s->initial_rle_value[lvl] = v; + s->lvl_start[lvl] = cur; + if (cur > end) { s->error = 2; } } - s->lvl_start[lvl] = cur; - if (cur > end) { s->error = 2; } } else if (encoding == Encoding::BIT_PACKED) { len = (s->page.num_input_values * level_bits + 7) >> 3; s->initial_rle_run[lvl] = ((s->page.num_input_values + 7) >> 3) * 2 + 1; // literal run diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 31f9f963047..2afeb04aed5 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -407,7 +407,6 @@ __global__ void __launch_bounds__(128) bs->page.flags = 0; bs->page.hdr_version = 2; values_found += bs->page.num_input_values; - bs->page.flags = 0; // V2 only uses RLE, so it was removed from the header bs->page.definition_level_encoding = Encoding::RLE; bs->page.repetition_level_encoding = Encoding::RLE; From ecad6a49d7933513d468590eee94f95d4ace5db8 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 23 Sep 2022 14:24:28 -0700 Subject: [PATCH 05/20] more cleanup --- cpp/src/io/parquet/reader_impl.cu | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index d3bd6766828..8aff29f01c4 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1173,7 +1173,7 @@ rmm::device_buffer reader::impl::decompress_page_data( if (page.hdr_version == 2 && page.flags == 0) { // for V2 need copy def and rep level info into place, and then offset the // input and output buffers. otherwise we'd have to keep both the compressed - // and decompressed data. uncompressed and V1 pages will be done on device. + // and decompressed data. auto offset = page.def_lvl_bytes + page.rep_lvl_bytes; if (offset) { thrust::copy( @@ -1266,16 +1266,16 @@ void reader::impl::fix_v2_page_data(hostdevice_vector& chu if (chunks[c].codec == Compression::UNCOMPRESSED) { for (int k = 0; k < page_stride; k++) { auto& page = pages[page_count + k]; - auto data = page.page_data; page.rep_lvl_data = nullptr; page.def_lvl_data = nullptr; if (page.hdr_version == 2 && page.flags == 0) { auto offset = page.def_lvl_bytes + page.rep_lvl_bytes; if (offset) { + auto data = page.page_data; if (page.rep_lvl_bytes) { page.rep_lvl_data = data; } if (page.def_lvl_bytes) { page.def_lvl_data = data + page.rep_lvl_bytes; }; + page.page_data = data + offset; } - page.page_data = data + offset; } } } From 667349edc9de544142531b5036ee9dc61a3a97f3 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 23 Sep 2022 16:55:00 -0700 Subject: [PATCH 06/20] fix stupid bug in decompression --- cpp/src/io/parquet/page_data.cu | 3 +++ cpp/src/io/parquet/reader_impl.cu | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 36ccc6d0c34..b43ccbabdc9 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -875,6 +875,9 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, if (s->page.num_input_values > 0) { uint8_t* cur = s->page.page_data; uint8_t* end = cur + s->page.uncompressed_page_size; + if (s->page.hdr_version == 2) { + end -= s->page.def_lvl_bytes + s->page.rep_lvl_bytes; + } uint32_t dtype_len_out = s->col.data_type >> 3; s->ts_scale = 0; diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 8aff29f01c4..3bd6bc306b6 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1181,11 +1181,11 @@ rmm::device_buffer reader::impl::decompress_page_data( if (page.rep_lvl_bytes) { page.rep_lvl_data = dst_base; } if (page.def_lvl_bytes) { page.def_lvl_data = dst_base + page.rep_lvl_bytes; }; } - page.page_data = dst_base + offset; comp_in.emplace_back(page.page_data + offset, static_cast(page.compressed_page_size - offset)); comp_out.emplace_back(dst_base + offset, static_cast(page.uncompressed_page_size - offset)); + page.page_data = dst_base + offset; } else { comp_in.emplace_back(page.page_data, static_cast(page.compressed_page_size)); comp_out.emplace_back(dst_base, static_cast(page.uncompressed_page_size)); From 10b2730145deb82e8083ea51bc70ed63cda835bb Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 26 Sep 2022 15:26:45 -0700 Subject: [PATCH 07/20] clean up some comments --- cpp/src/io/parquet/page_data.cu | 14 ++++++++------ cpp/src/io/parquet/parquet_gpu.hpp | 4 +--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index b43ccbabdc9..186370858ae 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -146,7 +146,7 @@ __device__ uint32_t InitLevelSection(page_state_s* s, s->initial_rle_value[lvl] = 0; s->lvl_start[lvl] = cur; } else if (encoding == Encoding::RLE) { - // only need to check for V2 pages here, since V2 only uses RLE encoding + // V2 only uses RLE encoding, so only perform check here if (s->page.hdr_version == 2) { len = 0; cur = lvl == level_type::DEFINITION ? s->page.def_lvl_data : s->page.rep_lvl_data; @@ -162,9 +162,10 @@ __device__ uint32_t InitLevelSection(page_state_s* s, len = 4 + (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24); cur += 4; } else { + len = 0; s->error = 2; } - if (s->error != 2) { + if (!s->error) { uint32_t run = get_vlq32(cur, end); s->initial_rle_run[lvl] = run; if (!(run & 1)) { @@ -874,10 +875,10 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // if (s->page.num_input_values > 0 && s->page.num_rows > 0) { if (s->page.num_input_values > 0) { uint8_t* cur = s->page.page_data; - uint8_t* end = cur + s->page.uncompressed_page_size; - if (s->page.hdr_version == 2) { - end -= s->page.def_lvl_bytes + s->page.rep_lvl_bytes; - } + // uncompressed_page_size includes v2 level lengths, so subtract them. they + // will be 0 for v1 pages. + uint8_t* end = + cur + (s->page.uncompressed_page_size - s->page.def_lvl_bytes - s->page.rep_lvl_bytes); uint32_t dtype_len_out = s->col.data_type >> 3; s->ts_scale = 0; @@ -1017,6 +1018,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, break; } if (cur > end) { s->error = 1; } + // if V1 header, set lvl_end here if (s->lvl_end == nullptr) { s->lvl_end = cur; } s->data_start = cur; s->data_end = end; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 4863e267a67..16c6a971c2d 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -122,9 +122,7 @@ struct PageInfo { int32_t compressed_page_size; // compressed data size in bytes int32_t uncompressed_page_size; // uncompressed data size in bytes // for V2 pages, the def and rep level data is not compressed, and lacks the 4-byte length - // indicator. instead the lengths for these are stored in the header. for V1 we'll parse - // out the lengths and set them here, and set the pointers after decompressing. for V2 - // we'll have to allocate extra space in the buffer for decompression and copy the data. + // indicator. instead the lengths for these are stored in the header. uint8_t* def_lvl_data; // uncompressed but packed definition level data pointer uint8_t* rep_lvl_data; // uncompressed but packed repetition level data pointer int32_t def_lvl_bytes; // length of the definition levels (V2 header) From 8ad19d1f9dcd1dd5a16f60ba438234b4172b4e04 Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 26 Sep 2022 16:06:14 -0700 Subject: [PATCH 08/20] simplify by getting rid of separate pointers for def/rep data --- cpp/src/io/parquet/page_data.cu | 21 ++------ cpp/src/io/parquet/parquet_gpu.hpp | 2 - cpp/src/io/parquet/reader_impl.cu | 81 ++++++------------------------ cpp/src/io/parquet/reader_impl.hpp | 11 ---- 4 files changed, 17 insertions(+), 98 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 186370858ae..d9ceef28dc5 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -148,16 +148,7 @@ __device__ uint32_t InitLevelSection(page_state_s* s, } else if (encoding == Encoding::RLE) { // V2 only uses RLE encoding, so only perform check here if (s->page.hdr_version == 2) { - len = 0; - cur = lvl == level_type::DEFINITION ? s->page.def_lvl_data : s->page.rep_lvl_data; - auto lvl_end = - cur + (lvl == level_type::DEFINITION ? s->page.def_lvl_bytes : s->page.rep_lvl_bytes); - if (cur == nullptr || lvl_end > end) { - s->error = 2; - } else { - end = lvl_end; - s->lvl_end = std::max(s->lvl_end, lvl_end); - } + len = lvl == level_type::DEFINITION ? s->page.def_lvl_bytes : s->page.rep_lvl_bytes; } else if (cur + 4 < end) { len = 4 + (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24); cur += 4; @@ -875,10 +866,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // if (s->page.num_input_values > 0 && s->page.num_rows > 0) { if (s->page.num_input_values > 0) { uint8_t* cur = s->page.page_data; - // uncompressed_page_size includes v2 level lengths, so subtract them. they - // will be 0 for v1 pages. - uint8_t* end = - cur + (s->page.uncompressed_page_size - s->page.def_lvl_bytes - s->page.rep_lvl_bytes); + uint8_t* end = cur + s->page.uncompressed_page_size; uint32_t dtype_len_out = s->col.data_type >> 3; s->ts_scale = 0; @@ -979,8 +967,6 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, } s->first_output_value = 0; - // for V2 headers need to set lvl_end in InitLevelSection - s->lvl_end = nullptr; // Find the compressed size of repetition levels cur += InitLevelSection(s, cur, end, level_type::REPETITION); // Find the compressed size of definition levels @@ -1018,8 +1004,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, break; } if (cur > end) { s->error = 1; } - // if V1 header, set lvl_end here - if (s->lvl_end == nullptr) { s->lvl_end = cur; } + s->lvl_end = cur; s->data_start = cur; s->data_end = end; } else { diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 16c6a971c2d..9bfc81a2868 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -123,8 +123,6 @@ struct PageInfo { int32_t uncompressed_page_size; // uncompressed data size in bytes // for V2 pages, the def and rep level data is not compressed, and lacks the 4-byte length // indicator. instead the lengths for these are stored in the header. - uint8_t* def_lvl_data; // uncompressed but packed definition level data pointer - uint8_t* rep_lvl_data; // uncompressed but packed repetition level data pointer int32_t def_lvl_bytes; // length of the definition levels (V2 header) int32_t rep_lvl_bytes; // length of the repetition levels (V2 header) int32_t hdr_version; // 1 for v1, 2 for v2 diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 3bd6bc306b6..4f6caad09e5 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1131,8 +1131,6 @@ rmm::device_buffer reader::impl::decompress_page_data( for (auto& codec : codecs) { for_each_codec_page(codec.compression_type, [&](size_t page) { - // for V2 page headers, uncompressed_page_size is uncompressed_data_size + - // def_lvl_size + rep_lvl_size auto page_uncomp_size = pages[page].uncompressed_page_size; total_decomp_size += page_uncomp_size; codec.total_decomp_size += page_uncomp_size; @@ -1165,33 +1163,21 @@ rmm::device_buffer reader::impl::decompress_page_data( if (codec.num_pages == 0) { continue; } for_each_codec_page(codec.compression_type, [&](size_t page_idx) { - auto dst_base = static_cast(decomp_pages.data()) + decomp_offset; - auto& page = pages[page_idx]; - page.rep_lvl_data = nullptr; - page.def_lvl_data = nullptr; - // only need to modify data pages - if (page.hdr_version == 2 && page.flags == 0) { - // for V2 need copy def and rep level info into place, and then offset the - // input and output buffers. otherwise we'd have to keep both the compressed - // and decompressed data. - auto offset = page.def_lvl_bytes + page.rep_lvl_bytes; - if (offset) { - thrust::copy( - rmm::exec_policy(_stream), page.page_data, page.page_data + offset, dst_base); - if (page.rep_lvl_bytes) { page.rep_lvl_data = dst_base; } - if (page.def_lvl_bytes) { page.def_lvl_data = dst_base + page.rep_lvl_bytes; }; - } - comp_in.emplace_back(page.page_data + offset, - static_cast(page.compressed_page_size - offset)); - comp_out.emplace_back(dst_base + offset, - static_cast(page.uncompressed_page_size - offset)); - page.page_data = dst_base + offset; - } else { - comp_in.emplace_back(page.page_data, static_cast(page.compressed_page_size)); - comp_out.emplace_back(dst_base, static_cast(page.uncompressed_page_size)); - page.page_data = dst_base; + auto dst_base = static_cast(decomp_pages.data()) + decomp_offset; + auto& page = pages[page_idx]; + // offset will only be non-zero for V2 pages + auto offset = page.def_lvl_bytes + page.rep_lvl_bytes; + // for V2 need to copy def and rep level info into place, and then offset the + // input and output buffers. otherwise we'd have to keep both the compressed + // and decompressed data. + if (offset) { + thrust::copy(rmm::exec_policy(_stream), page.page_data, page.page_data + offset, dst_base); } - + comp_in.emplace_back(page.page_data + offset, + static_cast(page.compressed_page_size - offset)); + comp_out.emplace_back(dst_base + offset, + static_cast(page.uncompressed_page_size - offset)); + page.page_data = dst_base; decomp_offset += page.uncompressed_page_size; }); @@ -1244,9 +1230,6 @@ rmm::device_buffer reader::impl::decompress_page_data( decompress_check(comp_res, _stream); - // clean up pointers for uncompressed V2 pages - fix_v2_page_data(chunks, pages, false); - // Update the page information in device memory with the updated value of // page_data; it now points to the uncompressed data buffer pages.host_to_device(_stream); @@ -1254,39 +1237,6 @@ rmm::device_buffer reader::impl::decompress_page_data( return decomp_pages; } -/** - * @copydoc cudf::io::detail::parquet::fix_v2_page_data - */ -void reader::impl::fix_v2_page_data(hostdevice_vector& chunks, - hostdevice_vector& pages, - bool sync_to_device) -{ - for (size_t c = 0, page_count = 0; c < chunks.size(); c++) { - const auto page_stride = chunks[c].max_num_pages; - if (chunks[c].codec == Compression::UNCOMPRESSED) { - for (int k = 0; k < page_stride; k++) { - auto& page = pages[page_count + k]; - page.rep_lvl_data = nullptr; - page.def_lvl_data = nullptr; - if (page.hdr_version == 2 && page.flags == 0) { - auto offset = page.def_lvl_bytes + page.rep_lvl_bytes; - if (offset) { - auto data = page.page_data; - if (page.rep_lvl_bytes) { page.rep_lvl_data = data; } - if (page.def_lvl_bytes) { page.def_lvl_data = data + page.rep_lvl_bytes; }; - page.page_data = data + offset; - } - } - } - } - page_count += page_stride; - } - - // Update the page information in device memory with the updated value of - // page_data; it now points to the uncompressed data buffer - if (sync_to_device) { pages.host_to_device(_stream); } -} - /** * @copydoc cudf::io::detail::parquet::allocate_nesting_info */ @@ -1782,9 +1732,6 @@ table_with_metadata reader::impl::read(size_type skip_rows, for (size_t c = 0; c < chunks.size(); c++) { if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { page_data[c].reset(); } } - } else { - // if not compressed, fix data pointers for V2 pages - fix_v2_page_data(chunks, pages, true); } // build output column info diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 91cfeae6535..e1f275bb8e8 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -129,17 +129,6 @@ class reader::impl { rmm::device_buffer decompress_page_data(hostdevice_vector& chunks, hostdevice_vector& pages); - /** - * @brief Fix data poitners for V2 pages. - * - * @param chunks List of column chunk descriptors - * @param pages List of page information - * @param sync_to_device If true, copy pages to device - */ - void fix_v2_page_data(hostdevice_vector& chunks, - hostdevice_vector& pages, - bool sync_to_device); - /** * @brief Allocate nesting information storage for all pages and set pointers * to it. From bcddaf9fede3d9942e6c29952f258bf1249b5ef0 Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 26 Sep 2022 16:13:14 -0700 Subject: [PATCH 09/20] get rid of hdr_version too --- cpp/src/io/parquet/page_data.cu | 2 +- cpp/src/io/parquet/page_hdr.cu | 3 --- cpp/src/io/parquet/parquet_gpu.hpp | 1 - 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index d9ceef28dc5..71304ff741b 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -147,7 +147,7 @@ __device__ uint32_t InitLevelSection(page_state_s* s, s->lvl_start[lvl] = cur; } else if (encoding == Encoding::RLE) { // V2 only uses RLE encoding, so only perform check here - if (s->page.hdr_version == 2) { + if (s->page.def_lvl_bytes || s->page.rep_lvl_bytes) { len = lvl == level_type::DEFINITION ? s->page.def_lvl_bytes : s->page.rep_lvl_bytes; } else if (cur + 4 < end) { len = 4 + (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24); diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 2afeb04aed5..cbe8e167de6 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -387,14 +387,12 @@ __global__ void __launch_bounds__(128) bs->page.num_nulls = 0; bs->page.def_lvl_bytes = 0; bs->page.rep_lvl_bytes = 0; - bs->page.hdr_version = 0; if (parse_page_header(bs) && bs->page.compressed_page_size >= 0) { switch (bs->page_type) { case PageType::DATA_PAGE: index_out = num_dict_pages + data_page_count; data_page_count++; bs->page.flags = 0; - bs->page.hdr_version = 1; // this computation is only valid for flat schemas. for nested schemas, // they will be recomputed in the preprocess step by examining repetition and // definition levels @@ -405,7 +403,6 @@ __global__ void __launch_bounds__(128) index_out = num_dict_pages + data_page_count; data_page_count++; bs->page.flags = 0; - bs->page.hdr_version = 2; values_found += bs->page.num_input_values; // V2 only uses RLE, so it was removed from the header bs->page.definition_level_encoding = Encoding::RLE; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 9bfc81a2868..c6c1ceb0ae6 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -125,7 +125,6 @@ struct PageInfo { // indicator. instead the lengths for these are stored in the header. int32_t def_lvl_bytes; // length of the definition levels (V2 header) int32_t rep_lvl_bytes; // length of the repetition levels (V2 header) - int32_t hdr_version; // 1 for v1, 2 for v2 // Number of values in this data page or dictionary. // Important : the # of input values does not necessarily // correspond to the number of rows in the output. It just reflects the number From d991ed2c945a4c599da13ea81687028d7e54d361 Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 26 Sep 2022 16:18:22 -0700 Subject: [PATCH 10/20] formatting --- cpp/src/io/parquet/page_hdr.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index cbe8e167de6..19922bf7022 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -392,7 +392,7 @@ __global__ void __launch_bounds__(128) case PageType::DATA_PAGE: index_out = num_dict_pages + data_page_count; data_page_count++; - bs->page.flags = 0; + bs->page.flags = 0; // this computation is only valid for flat schemas. for nested schemas, // they will be recomputed in the preprocess step by examining repetition and // definition levels @@ -402,7 +402,7 @@ __global__ void __launch_bounds__(128) case PageType::DATA_PAGE_V2: index_out = num_dict_pages + data_page_count; data_page_count++; - bs->page.flags = 0; + bs->page.flags = 0; values_found += bs->page.num_input_values; // V2 only uses RLE, so it was removed from the header bs->page.definition_level_encoding = Encoding::RLE; From 86343f309e6d6b392d93ad963fd0f56c06ad5791 Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 27 Sep 2022 10:24:14 -0700 Subject: [PATCH 11/20] add consts and use cudaMemcpyAsync --- cpp/src/io/parquet/reader_impl.cu | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 4f6caad09e5..d8ee130db30 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1163,15 +1163,16 @@ rmm::device_buffer reader::impl::decompress_page_data( if (codec.num_pages == 0) { continue; } for_each_codec_page(codec.compression_type, [&](size_t page_idx) { - auto dst_base = static_cast(decomp_pages.data()) + decomp_offset; - auto& page = pages[page_idx]; + auto const dst_base = static_cast(decomp_pages.data()) + decomp_offset; + auto& page = pages[page_idx]; // offset will only be non-zero for V2 pages - auto offset = page.def_lvl_bytes + page.rep_lvl_bytes; + auto const offset = page.def_lvl_bytes + page.rep_lvl_bytes; // for V2 need to copy def and rep level info into place, and then offset the // input and output buffers. otherwise we'd have to keep both the compressed // and decompressed data. if (offset) { - thrust::copy(rmm::exec_policy(_stream), page.page_data, page.page_data + offset, dst_base); + CUDF_CUDA_TRY( + cudaMemcpyAsync(dst_base, page.page_data, offset, cudaMemcpyDeviceToDevice, _stream)); } comp_in.emplace_back(page.page_data + offset, static_cast(page.compressed_page_size - offset)); From 6e7eb8d85e31ab51ff1492b18f2ef9eb95360c3b Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 28 Sep 2022 07:41:38 -0700 Subject: [PATCH 12/20] experiment with using gpu_copy_uncompressed_blocks to copy rep and def level info --- cpp/src/io/parquet/reader_impl.cu | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index d8ee130db30..09b9757d754 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1151,6 +1151,15 @@ rmm::device_buffer reader::impl::decompress_page_data( std::vector> comp_out; comp_out.reserve(num_comp_pages); +#define NEWCOPY 1 +#if NEWCOPY + // vectors for v2 headers, if any + std::vector> copy_in; + copy_in.reserve(num_comp_pages); + std::vector> copy_out; + copy_out.reserve(num_comp_pages); +#endif + rmm::device_uvector comp_res(num_comp_pages, _stream); thrust::fill(rmm::exec_policy(_stream), comp_res.begin(), @@ -1171,8 +1180,14 @@ rmm::device_buffer reader::impl::decompress_page_data( // input and output buffers. otherwise we'd have to keep both the compressed // and decompressed data. if (offset) { - CUDF_CUDA_TRY( - cudaMemcpyAsync(dst_base, page.page_data, offset, cudaMemcpyDeviceToDevice, _stream)); +#if NEWCOPY + copy_in.emplace_back(page.page_data, offset); + copy_out.emplace_back(dst_base, offset); +#else + //CUDF_CUDA_TRY( + // cudaMemcpyAsync(dst_base, page.page_data, offset, cudaMemcpyDeviceToDevice, _stream)); + thrust::copy(rmm::exec_policy(_stream), page.page_data, page.page_data + offset, dst_base); +#endif } comp_in.emplace_back(page.page_data + offset, static_cast(page.compressed_page_size - offset)); @@ -1231,6 +1246,18 @@ rmm::device_buffer reader::impl::decompress_page_data( decompress_check(comp_res, _stream); +#if NEWCOPY + if (copy_in.size()) { + host_span const> copy_in_view{copy_in.data(), copy_in.size()}; + auto const d_copy_in = cudf::detail::make_device_uvector_async(copy_in_view, _stream); + + host_span const> copy_out_view(copy_out.data(), copy_out.size()); + auto const d_copy_out = cudf::detail::make_device_uvector_async(copy_out_view, _stream); + + gpu_copy_uncompressed_blocks(d_copy_in, d_copy_out, _stream); + } +#endif + // Update the page information in device memory with the updated value of // page_data; it now points to the uncompressed data buffer pages.host_to_device(_stream); From 26b919965c1687ead30690cecfc74fc39cbd8d3b Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 28 Sep 2022 13:39:41 -0700 Subject: [PATCH 13/20] test v2 header reader in python. use pandas to write v2 headers --- python/cudf/cudf/tests/test_parquet.py | 77 +++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 022f7cdd6f7..8935a817980 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1296,6 +1296,82 @@ def string_list_gen_wrapped(x, y): assert expect.equals(got.to_arrow()) +def test_parquet_reader_v2(tmpdir, simple_pdf): + pdf_fname = tmpdir.join("pdfv2.parquet") + simple_pdf.to_parquet(pdf_fname, data_page_version='2.0') + assert_eq(cudf.read_parquet(pdf_fname), simple_pdf) + + +@pytest.mark.parametrize( + "data", + [ + # Structs + { + "being": [ + None, + {"human?": True, "Deets": {"Name": "Carrot", "Age": 27}}, + {"human?": None, "Deets": {"Name": "Angua", "Age": 25}}, + {"human?": False, "Deets": {"Name": "Cheery", "Age": 31}}, + {"human?": False, "Deets": None}, + {"human?": None, "Deets": {"Name": "Mr", "Age": None}}, + ] + }, + # List of Structs + { + "family": [ + [None, {"human?": True, "deets": {"weight": 2.4, "age": 27}}], + [ + {"human?": None, "deets": {"weight": 5.3, "age": 25}}, + {"human?": False, "deets": {"weight": 8.0, "age": 31}}, + {"human?": False, "deets": None}, + ], + [], + [{"human?": None, "deets": {"weight": 6.9, "age": None}}], + ] + }, + # Struct of Lists + pytest.param( + { + "Real estate records": [ + None, + { + "Status": "NRI", + "Ownerships": { + "land_unit": [None, 2, None], + "flats": [[1, 2, 3], [], [4, 5], [], [0, 6, 0]], + }, + }, + { + "Status": None, + "Ownerships": { + "land_unit": [4, 5], + "flats": [[7, 8], []], + }, + }, + { + "Status": "RI", + "Ownerships": {"land_unit": None, "flats": [[]]}, + }, + {"Status": "RI", "Ownerships": None}, + { + "Status": None, + "Ownerships": { + "land_unit": [7, 8, 9], + "flats": [[], [], []], + }, + }, + ] + }, + ), + ], +) +def test_parquet_reader_nested_v2(tmpdir, data): + expect = pd.DataFrame(data) + pdf_fname = tmpdir.join("pdfv2.parquet") + expect.to_parquet(pdf_fname, data_page_version='2.0') + assert_eq(cudf.read_parquet(pdf_fname), expect) + + @pytest.mark.filterwarnings("ignore:Using CPU") def test_parquet_writer_cpu_pyarrow( tmpdir, pdf_day_timestamps, gdf_day_timestamps @@ -1513,7 +1589,6 @@ def test_parquet_writer_gpu_chunked_context(tmpdir, simple_pdf, simple_gdf): assert_eq(pd.read_parquet(gdf_fname), pd.concat([simple_pdf, simple_pdf])) - def test_parquet_write_bytes_io(simple_gdf): output = BytesIO() simple_gdf.to_parquet(output) From 6136a47e61177fb4ae6e08766c4cf2b943033930 Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 28 Sep 2022 14:13:33 -0700 Subject: [PATCH 14/20] formatting --- python/cudf/cudf/tests/test_parquet.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 8935a817980..ad889bb913d 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1298,7 +1298,7 @@ def string_list_gen_wrapped(x, y): def test_parquet_reader_v2(tmpdir, simple_pdf): pdf_fname = tmpdir.join("pdfv2.parquet") - simple_pdf.to_parquet(pdf_fname, data_page_version='2.0') + simple_pdf.to_parquet(pdf_fname, data_page_version="2.0") assert_eq(cudf.read_parquet(pdf_fname), simple_pdf) @@ -1368,7 +1368,7 @@ def test_parquet_reader_v2(tmpdir, simple_pdf): def test_parquet_reader_nested_v2(tmpdir, data): expect = pd.DataFrame(data) pdf_fname = tmpdir.join("pdfv2.parquet") - expect.to_parquet(pdf_fname, data_page_version='2.0') + expect.to_parquet(pdf_fname, data_page_version="2.0") assert_eq(cudf.read_parquet(pdf_fname), expect) @@ -1589,6 +1589,7 @@ def test_parquet_writer_gpu_chunked_context(tmpdir, simple_pdf, simple_gdf): assert_eq(pd.read_parquet(gdf_fname), pd.concat([simple_pdf, simple_pdf])) + def test_parquet_write_bytes_io(simple_gdf): output = BytesIO() simple_gdf.to_parquet(output) From 077006a4cdcb90c870ab4798e745d03eb94f3894 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 30 Sep 2022 08:20:14 -0700 Subject: [PATCH 15/20] remove old copy code and only use gpu_copy_uncompressed_blocks --- cpp/src/io/parquet/reader_impl.cu | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 09b9757d754..3b9a129fde5 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1151,14 +1151,11 @@ rmm::device_buffer reader::impl::decompress_page_data( std::vector> comp_out; comp_out.reserve(num_comp_pages); -#define NEWCOPY 1 -#if NEWCOPY - // vectors for v2 headers, if any + // vectors to save v2 def and rep level data, if any std::vector> copy_in; copy_in.reserve(num_comp_pages); std::vector> copy_out; copy_out.reserve(num_comp_pages); -#endif rmm::device_uvector comp_res(num_comp_pages, _stream); thrust::fill(rmm::exec_policy(_stream), @@ -1180,14 +1177,8 @@ rmm::device_buffer reader::impl::decompress_page_data( // input and output buffers. otherwise we'd have to keep both the compressed // and decompressed data. if (offset) { -#if NEWCOPY copy_in.emplace_back(page.page_data, offset); copy_out.emplace_back(dst_base, offset); -#else - //CUDF_CUDA_TRY( - // cudaMemcpyAsync(dst_base, page.page_data, offset, cudaMemcpyDeviceToDevice, _stream)); - thrust::copy(rmm::exec_policy(_stream), page.page_data, page.page_data + offset, dst_base); -#endif } comp_in.emplace_back(page.page_data + offset, static_cast(page.compressed_page_size - offset)); @@ -1246,7 +1237,7 @@ rmm::device_buffer reader::impl::decompress_page_data( decompress_check(comp_res, _stream); -#if NEWCOPY + // now copy the uncompressed V2 def and rep level data if (copy_in.size()) { host_span const> copy_in_view{copy_in.data(), copy_in.size()}; auto const d_copy_in = cudf::detail::make_device_uvector_async(copy_in_view, _stream); @@ -1256,7 +1247,6 @@ rmm::device_buffer reader::impl::decompress_page_data( gpu_copy_uncompressed_blocks(d_copy_in, d_copy_out, _stream); } -#endif // Update the page information in device memory with the updated value of // page_data; it now points to the uncompressed data buffer From a4aa28bc733fab476d1758cad051e4a92928c951 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 7 Oct 2022 08:48:34 -0700 Subject: [PATCH 16/20] implement suggestion from review Co-authored-by: Vukasin Milovanovic --- cpp/src/io/parquet/reader_impl.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 3b9a129fde5..68dd52e8c34 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1176,7 +1176,7 @@ rmm::device_buffer reader::impl::decompress_page_data( // for V2 need to copy def and rep level info into place, and then offset the // input and output buffers. otherwise we'd have to keep both the compressed // and decompressed data. - if (offset) { + if (offset != 0) { copy_in.emplace_back(page.page_data, offset); copy_out.emplace_back(dst_base, offset); } From 5f9f3fd4f7a83424b2aa815d4c8d69564698c754 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 7 Oct 2022 10:02:05 -0700 Subject: [PATCH 17/20] clean up check for empty vector --- cpp/src/io/parquet/reader_impl.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 68dd52e8c34..5964ef8ce7b 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1238,7 +1238,7 @@ rmm::device_buffer reader::impl::decompress_page_data( decompress_check(comp_res, _stream); // now copy the uncompressed V2 def and rep level data - if (copy_in.size()) { + if (not copy_in.empty()) { host_span const> copy_in_view{copy_in.data(), copy_in.size()}; auto const d_copy_in = cudf::detail::make_device_uvector_async(copy_in_view, _stream); From 413fc4290c3d1799cf2728a38fd456969f8a8086 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 7 Oct 2022 17:33:35 -0700 Subject: [PATCH 18/20] more review changes --- cpp/src/io/parquet/reader_impl.cu | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 5964ef8ce7b..6471e045cbf 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1239,13 +1239,11 @@ rmm::device_buffer reader::impl::decompress_page_data( // now copy the uncompressed V2 def and rep level data if (not copy_in.empty()) { - host_span const> copy_in_view{copy_in.data(), copy_in.size()}; - auto const d_copy_in = cudf::detail::make_device_uvector_async(copy_in_view, _stream); - - host_span const> copy_out_view(copy_out.data(), copy_out.size()); - auto const d_copy_out = cudf::detail::make_device_uvector_async(copy_out_view, _stream); + auto const d_copy_in = cudf::detail::make_device_uvector_async(copy_in, _stream); + auto const d_copy_out = cudf::detail::make_device_uvector_async(copy_out, _stream); gpu_copy_uncompressed_blocks(d_copy_in, d_copy_out, _stream); + _stream.synchronize(); } // Update the page information in device memory with the updated value of From a66038bd05a34faae01d0c28b7adcf0103919936 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 7 Oct 2022 17:42:27 -0700 Subject: [PATCH 19/20] fix formatting --- cpp/src/io/parquet/reader_impl.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 6471e045cbf..7447bd30d00 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1239,7 +1239,7 @@ rmm::device_buffer reader::impl::decompress_page_data( // now copy the uncompressed V2 def and rep level data if (not copy_in.empty()) { - auto const d_copy_in = cudf::detail::make_device_uvector_async(copy_in, _stream); + auto const d_copy_in = cudf::detail::make_device_uvector_async(copy_in, _stream); auto const d_copy_out = cudf::detail::make_device_uvector_async(copy_out, _stream); gpu_copy_uncompressed_blocks(d_copy_in, d_copy_out, _stream); From d34d8b047077410fbee3fcc878566b44c5eed9e3 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 20 Oct 2022 14:49:28 -0700 Subject: [PATCH 20/20] remove pytest.param as suggested in review --- python/cudf/cudf/tests/test_parquet.py | 58 +++++++++++++------------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index ad889bb913d..2ac1dfda344 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1330,39 +1330,37 @@ def test_parquet_reader_v2(tmpdir, simple_pdf): ] }, # Struct of Lists - pytest.param( - { - "Real estate records": [ - None, - { - "Status": "NRI", - "Ownerships": { - "land_unit": [None, 2, None], - "flats": [[1, 2, 3], [], [4, 5], [], [0, 6, 0]], - }, - }, - { - "Status": None, - "Ownerships": { - "land_unit": [4, 5], - "flats": [[7, 8], []], - }, + { + "Real estate records": [ + None, + { + "Status": "NRI", + "Ownerships": { + "land_unit": [None, 2, None], + "flats": [[1, 2, 3], [], [4, 5], [], [0, 6, 0]], }, - { - "Status": "RI", - "Ownerships": {"land_unit": None, "flats": [[]]}, + }, + { + "Status": None, + "Ownerships": { + "land_unit": [4, 5], + "flats": [[7, 8], []], }, - {"Status": "RI", "Ownerships": None}, - { - "Status": None, - "Ownerships": { - "land_unit": [7, 8, 9], - "flats": [[], [], []], - }, + }, + { + "Status": "RI", + "Ownerships": {"land_unit": None, "flats": [[]]}, + }, + {"Status": "RI", "Ownerships": None}, + { + "Status": None, + "Ownerships": { + "land_unit": [7, 8, 9], + "flats": [[], [], []], }, - ] - }, - ), + }, + ] + }, ], ) def test_parquet_reader_nested_v2(tmpdir, data):