diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 3cc4fda695f..2f351edd2b9 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -388,6 +388,27 @@ __device__ uint8_t const* delta_encode(page_enc_state_s<0>* s, uint64_t* buffer, return packer.flush(); } +/** + * @brief Sets `s->cur` to point to the start of encoded page data. + * + * For V1 headers, this will be immediately after the repetition and definition level data. For V2, + * it will be at the next properly aligned location after the level data. The padding in V2 is + * needed for compressors that require aligned input. + */ +template +inline void __device__ set_page_data_start(state_type* s) +{ + s->cur = s->page.page_data + s->page.max_hdr_size; + switch (s->page.page_type) { + case PageType::DATA_PAGE: + s->cur += s->page.level_bytes(); + if (s->col.num_def_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; } + if (s->col.num_rep_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; } + break; + case PageType::DATA_PAGE_V2: s->cur += s->page.max_lvl_size; break; + } +} + } // anonymous namespace // blockDim {512,1,1} @@ -594,8 +615,13 @@ CUDF_KERNEL void __launch_bounds__(128) page_g.chunk = &chunks[blockIdx.y][blockIdx.x]; page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x; page_g.hdr_size = 0; + page_g.def_lvl_bytes = 0; + page_g.rep_lvl_bytes = 0; + page_g.max_lvl_size = 0; + page_g.comp_data_size = 0; page_g.max_hdr_size = MAX_V1_HDR_SIZE; page_g.max_data_size = ck_g.uniq_data_size; + page_g.data_size = ck_g.uniq_data_size; page_g.start_row = cur_row; page_g.num_rows = ck_g.num_dict_entries; page_g.num_leaf_values = ck_g.num_dict_entries; @@ -689,12 +715,17 @@ CUDF_KERNEL void __launch_bounds__(128) page_size = 1 + max_RLE_page_size(ck_g.dict_rle_bits, values_in_page); } if (!t) { - page_g.num_fragments = fragments_in_chunk - page_start; - page_g.chunk = &chunks[blockIdx.y][blockIdx.x]; - page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x; - page_g.page_type = data_page_type; - page_g.hdr_size = 0; - page_g.max_hdr_size = max_data_page_hdr_size; // Max size excluding statistics + page_g.num_fragments = fragments_in_chunk - page_start; + page_g.chunk = &chunks[blockIdx.y][blockIdx.x]; + page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x; + page_g.page_type = data_page_type; + page_g.hdr_size = 0; + page_g.def_lvl_bytes = 0; + page_g.rep_lvl_bytes = 0; + page_g.max_lvl_size = 0; + page_g.data_size = 0; + page_g.comp_data_size = 0; + page_g.max_hdr_size = max_data_page_hdr_size; // Max size excluding statistics if (ck_g.stats) { uint32_t stats_hdr_len = 16; if (col_g.stats_dtype == dtype_string || col_g.stats_dtype == dtype_byte_array) { @@ -716,13 +747,19 @@ CUDF_KERNEL void __launch_bounds__(128) page_g.num_valid = num_valid; auto const def_level_size = max_RLE_page_size(col_g.num_def_level_bits(), values_in_page); auto const rep_level_size = max_RLE_page_size(col_g.num_rep_level_bits(), values_in_page); + if (write_v2_headers) { + page_g.max_lvl_size = + util::round_up_unsafe(def_level_size + rep_level_size, page_align); + } // get a different bound if using delta encoding if (is_use_delta) { auto const delta_len = delta_data_len(physical_type, type_id, page_g.num_leaf_values, page_size); page_size = max(page_size, delta_len); } - auto const max_data_size = page_size + def_level_size + rep_level_size + rle_pad; + auto const max_data_size = + page_size + rle_pad + + (write_v2_headers ? page_g.max_lvl_size : def_level_size + rep_level_size); // page size must fit in 32-bit signed integer if (max_data_size > std::numeric_limits::max()) { CUDF_UNREACHABLE("page size exceeds maximum for i32"); @@ -739,7 +776,9 @@ CUDF_KERNEL void __launch_bounds__(128) page_offset += util::round_up_unsafe(page_g.max_hdr_size + page_g.max_data_size, page_align); if (not comp_page_sizes.empty()) { - comp_page_offset += page_g.max_hdr_size + comp_page_sizes[ck_g.first_page + num_pages]; + // V2 does not include level data in compressed size estimate + comp_page_offset += page_g.max_hdr_size + page_g.max_lvl_size + + comp_page_sizes[ck_g.first_page + num_pages]; } page_headers_size += page_g.max_hdr_size; max_page_data_size = max(max_page_data_size, page_g.max_data_size); @@ -774,8 +813,10 @@ CUDF_KERNEL void __launch_bounds__(128) } pages[ck_g.first_page + num_pages] = page_g; } + // page_sizes should be the number of bytes to be compressed, so don't include level + // data for V2. if (not page_sizes.empty()) { - page_sizes[ck_g.first_page + num_pages] = page_g.max_data_size; + page_sizes[ck_g.first_page + num_pages] = page_g.max_data_size - page_g.max_lvl_size; } if (page_grstats) { page_grstats[ck_g.first_page + num_pages] = pagestats_g; } } @@ -1429,10 +1470,6 @@ __device__ void finish_page_encode(state_buf* s, return thrust::reduce(thrust::seq, hist_start, hist_end, 0U); }; - // V2 does not compress rep and def level data - size_t const skip_comp_size = - write_v2_headers ? s->page.def_lvl_bytes + s->page.rep_lvl_bytes : 0; - // this will be true if max_rep > 0 (i.e. there are lists) if (s->page.rep_histogram != nullptr) { // for repetition we get hist[0] from num_rows, and can derive hist[max_rep_level] @@ -1489,10 +1526,17 @@ __device__ void finish_page_encode(state_buf* s, // FIXME(ets): this needs to do error propagation back to the host CUDF_UNREACHABLE("detected possible page data corruption"); } - s->page.max_data_size = actual_data_size; + if (s->page.is_v2()) { + auto const d_base = base + s->page.max_lvl_size; + s->page.data_size = static_cast(end_ptr - d_base) + s->page.level_bytes(); + } else { + s->page.data_size = actual_data_size; + } if (not comp_in.empty()) { - comp_in[blockIdx.x] = {base + skip_comp_size, actual_data_size - skip_comp_size}; - comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size + skip_comp_size, + auto const c_base = base + s->page.max_lvl_size; + auto const bytes_to_compress = static_cast(end_ptr - c_base); + comp_in[blockIdx.x] = {c_base, bytes_to_compress}; + comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size + s->page.max_lvl_size, 0}; // size is unused } pages[blockIdx.x] = s->page; @@ -1503,10 +1547,10 @@ __device__ void finish_page_encode(state_buf* s, } // copy uncompressed bytes over - if (skip_comp_size != 0 && not comp_in.empty()) { + if (s->page.is_v2() and not comp_in.empty()) { uint8_t* const src = s->page.page_data + s->page.max_hdr_size; uint8_t* const dst = s->page.compressed_data + s->page.max_hdr_size; - for (int i = t; i < skip_comp_size; i += block_size) { + for (int i = t; i < s->page.level_bytes(); i += block_size) { dst[i] = src[i]; } } @@ -1536,13 +1580,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) s->col = *s->ck.col_desc; s->rle_len_pos = nullptr; // get s->cur back to where it was at the end of encoding the rep and def level data - s->cur = - s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes; - // if V1 data page, need space for the RLE length fields - if (s->page.page_type == PageType::DATA_PAGE) { - if (s->col.num_def_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; } - if (s->col.num_rep_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; } - } + set_page_data_start(s); } __syncthreads(); @@ -1771,13 +1809,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) s->col = *s->ck.col_desc; s->rle_len_pos = nullptr; // get s->cur back to where it was at the end of encoding the rep and def level data - s->cur = - s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes; - // if V1 data page, need space for the RLE length fields - if (s->page.page_type == PageType::DATA_PAGE) { - if (s->col.num_def_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; } - if (s->col.num_rep_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; } - } + set_page_data_start(s); } __syncthreads(); @@ -1908,8 +1940,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) s->col = *s->ck.col_desc; s->rle_len_pos = nullptr; // get s->cur back to where it was at the end of encoding the rep and def level data - s->cur = - s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes; + set_page_data_start(s); } __syncthreads(); @@ -2017,8 +2048,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) s->col = *s->ck.col_desc; s->rle_len_pos = nullptr; // get s->cur back to where it was at the end of encoding the rep and def level data - s->cur = - s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes; + set_page_data_start(s); } __syncthreads(); @@ -2142,11 +2172,10 @@ CUDF_KERNEL void __launch_bounds__(decide_compression_block_size) auto const num_pages = ck_g[warp_id].num_pages; for (auto page_id = lane_id; page_id < num_pages; page_id += cudf::detail::warp_size) { auto const& curr_page = ck_g[warp_id].pages[page_id]; - auto const page_data_size = curr_page.max_data_size; - auto const is_v2 = curr_page.page_type == PageType::DATA_PAGE_V2; - auto const lvl_bytes = is_v2 ? curr_page.def_lvl_bytes + curr_page.rep_lvl_bytes : 0; + auto const page_data_size = curr_page.data_size; uncompressed_data_size += page_data_size; if (auto comp_res = curr_page.comp_res; comp_res != nullptr) { + auto const lvl_bytes = curr_page.is_v2() ? curr_page.level_bytes() : 0; compressed_data_size += comp_res->bytes_written + lvl_bytes; if (comp_res->status != compression_status::SUCCESS) { atomicOr(&compression_error[warp_id], 1); @@ -2614,14 +2643,13 @@ CUDF_KERNEL void __launch_bounds__(128) EncodeStatistics(hdr_start, &chunk_stats[page_g.chunk_id], col_g.stats_dtype, scratch); page_g.chunk->ck_stat_size = static_cast(hdr_end - hdr_start); } - uncompressed_page_size = page_g.max_data_size; + uncompressed_page_size = page_g.data_size; if (ck_g.is_compressed) { - auto const is_v2 = page_g.page_type == PageType::DATA_PAGE_V2; - auto const lvl_bytes = is_v2 ? page_g.def_lvl_bytes + page_g.rep_lvl_bytes : 0; + auto const lvl_bytes = page_g.is_v2() ? page_g.level_bytes() : 0; hdr_start = page_g.compressed_data; compressed_page_size = static_cast(comp_results[blockIdx.x].bytes_written) + lvl_bytes; - page_g.max_data_size = compressed_page_size; + page_g.comp_data_size = compressed_page_size; } else { hdr_start = page_g.page_data; compressed_page_size = uncompressed_page_size; @@ -2708,19 +2736,26 @@ CUDF_KERNEL void __launch_bounds__(1024) if (t == 0) { page_g = first_page[page]; } __syncthreads(); - src = (ck_g.is_compressed) ? page_g.compressed_data : page_g.page_data; + src = ck_g.is_compressed ? page_g.compressed_data : page_g.page_data; // Copy page header hdr_len = page_g.hdr_size; memcpy_block<1024, true>(dst, src, hdr_len, t); src += page_g.max_hdr_size; dst += hdr_len; - // Copy page data uncompressed_size += hdr_len; - data_len = page_g.max_data_size; + data_len = ck_g.is_compressed ? page_g.comp_data_size : page_g.data_size; + // Copy page data. For V2, the level data and page data are disjoint. + if (page_g.is_v2()) { + auto const lvl_len = page_g.level_bytes(); + memcpy_block<1024, true>(dst, src, lvl_len, t); + src += page_g.max_lvl_size; + dst += lvl_len; + data_len -= lvl_len; + } memcpy_block<1024, true>(dst, src, data_len, t); dst += data_len; __syncthreads(); - if (!t && page == 0 && ck_g.use_dictionary) { ck_g.dictionary_size = hdr_len + data_len; } + if (t == 0 && page == 0 && ck_g.use_dictionary) { ck_g.dictionary_size = hdr_len + data_len; } } if (t == 0) { chunks[blockIdx.x].bfr_size = uncompressed_size; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index d58c7f95389..b215cd7a20b 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -560,30 +560,41 @@ struct EncColumnChunk { * @brief Struct describing an encoder data page */ struct EncPage { - uint8_t* page_data; //!< Ptr to uncompressed page - uint8_t* compressed_data; //!< Ptr to compressed page - uint16_t num_fragments; //!< Number of fragments in page - PageType page_type; //!< Page type - Encoding encoding; //!< Encoding used for page data - EncColumnChunk* chunk; //!< Chunk that this page belongs to + // all pointers at the top to keep things properly aligned + uint8_t* page_data; //!< Ptr to uncompressed page + uint8_t* compressed_data; //!< Ptr to compressed page + EncColumnChunk* chunk; //!< Chunk that this page belongs to + compression_result* comp_res; //!< Ptr to compression result + uint32_t* def_histogram; //!< Histogram of counts for each definition level + uint32_t* rep_histogram; //!< Histogram of counts for each repetition level + // put this here in case it's ever made 64-bit + encode_kernel_mask kernel_mask; //!< Mask used to control which encoding kernels to run + // the rest can be 4 byte aligned uint32_t chunk_id; //!< Index in chunk array - uint32_t hdr_size; //!< Size of page header + uint32_t hdr_size; //!< Actual size of encoded page header uint32_t max_hdr_size; //!< Maximum size of page header - uint32_t max_data_size; //!< Maximum size of coded page data (excluding header) + uint32_t max_data_size; //!< Maximum size of encoded page data (excluding header) + uint32_t data_size; //!< Actual size of encoded page data (includes level data) + uint32_t comp_data_size; //!< Actual size of compressed page data uint32_t start_row; //!< First row of page uint32_t num_rows; //!< Rows in page uint32_t num_leaf_values; //!< Values in page. Different from num_rows in case of nested types uint32_t num_values; //!< Number of def/rep level values in page. Includes null/empty elements in //!< non-leaf levels - uint32_t def_lvl_bytes; //!< Number of bytes of encoded definition level data (V2 only) - uint32_t rep_lvl_bytes; //!< Number of bytes of encoded repetition level data (V2 only) - compression_result* comp_res; //!< Ptr to compression result - uint32_t num_nulls; //!< Number of null values (V2 only) (down here for alignment) - encode_kernel_mask kernel_mask; //!< Mask used to control which encoding kernels to run - uint32_t* def_histogram; //!< Histogram of counts for each definition level - uint32_t* rep_histogram; //!< Histogram of counts for each repetition level - uint32_t var_bytes_size; //!< Number of variable length bytes in the page (byte arrays only) + uint32_t def_lvl_bytes; //!< Number of bytes of encoded definition level data + uint32_t rep_lvl_bytes; //!< Number of bytes of encoded repetition level data + uint32_t max_lvl_size; //!< Maximum size of level data (V2 only, 0 for V1) + uint32_t num_nulls; //!< Number of null values uint32_t num_valid; //!< Number of valid leaf values + uint32_t var_bytes_size; //!< Number of variable length bytes in the page (byte arrays only) + // enums and smaller stuff down here + PageType page_type; //!< Page type + Encoding encoding; //!< Encoding used for page data + uint16_t num_fragments; //!< Number of fragments in page + + constexpr bool is_v2() const { return page_type == PageType::DATA_PAGE_V2; } + + constexpr auto level_bytes() const { return def_lvl_bytes + rep_lvl_bytes; } }; /** diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 93b225dca1b..0303439fb27 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -2220,10 +2220,6 @@ writer::impl::~impl() { close(); } void writer::impl::init_state() { - // See issue #14781. Can remove this check once that is fixed. - CUDF_EXPECTS(not(_write_v2_headers and _compression == Compression::ZSTD), - "V2 page headers cannot be used with ZSTD compression"); - _current_chunk_offset.resize(_out_sink.size()); // Write file header file_header_s fhdr; @@ -2405,7 +2401,8 @@ void writer::impl::write_parquet_data_to_sink( // skip dict pages if (enc_page.page_type == PageType::DICTIONARY_PAGE) { continue; } - int32_t this_page_size = enc_page.hdr_size + enc_page.max_data_size; + int32_t const this_page_size = + enc_page.hdr_size + (ck.is_compressed ? enc_page.comp_data_size : enc_page.data_size); // first_row_idx is relative to start of row group PageLocation loc{curr_pg_offset, this_page_size, enc_page.start_row - ck.start_row}; if (is_byte_arr) { var_bytes.push_back(enc_page.var_bytes_size); } diff --git a/cpp/tests/io/parquet_v2_test.cpp b/cpp/tests/io/parquet_v2_test.cpp index 1a373ed92ae..25d58a96512 100644 --- a/cpp/tests/io/parquet_v2_test.cpp +++ b/cpp/tests/io/parquet_v2_test.cpp @@ -35,7 +35,7 @@ INSTANTIATE_TEST_SUITE_P(ParquetV2ReadWriteTest, TEST_P(ParquetV2Test, MultiColumn) { - constexpr auto num_rows = 50000; + constexpr auto num_rows = 50'000; auto const is_v2 = GetParam(); // auto col0_data = random_values(num_rows); @@ -84,6 +84,7 @@ TEST_P(ParquetV2Test, MultiColumn) cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .write_v2_headers(is_v2) + .compression(cudf::io::compression_type::ZSTD) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); @@ -156,6 +157,7 @@ TEST_P(ParquetV2Test, MultiColumnWithNulls) cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .write_v2_headers(is_v2) + .compression(cudf::io::compression_type::ZSTD) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); @@ -197,6 +199,7 @@ TEST_P(ParquetV2Test, Strings) cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .write_v2_headers(is_v2) + .compression(cudf::io::compression_type::ZSTD) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 2df34c7928b..34061cb7bf8 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1064,7 +1064,6 @@ TEST_F(ParquetWriterTest, DictionaryAdaptiveTest) auto const expected = table_view{{col0, col1}}; auto const filepath = temp_env->get_temp_filepath("DictionaryAdaptiveTest.parquet"); - // no compression so we can easily read page data cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .compression(cudf::io::compression_type::ZSTD) @@ -1116,7 +1115,6 @@ TEST_F(ParquetWriterTest, DictionaryAlwaysTest) auto const expected = table_view{{col0, col1}}; auto const filepath = temp_env->get_temp_filepath("DictionaryAlwaysTest.parquet"); - // no compression so we can easily read page data cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .compression(cudf::io::compression_type::ZSTD) @@ -1428,21 +1426,6 @@ TEST_F(ParquetWriterTest, RowGroupMetadata) static_cast(num_rows * sizeof(column_type))); } -// See #14772. -// zStandard compression cannot currently be used with V2 page headers due to buffer -// alignment issues. -// TODO: Remove this test when #14781 is closed. -TEST_F(ParquetWriterTest, ZstdWithV2Header) -{ - auto const expected = table_view{}; - - cudf::io::parquet_writer_options const out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{"14772.pq"}, expected) - .compression(cudf::io::compression_type::ZSTD) - .write_v2_headers(true); - EXPECT_THROW(cudf::io::write_parquet(out_opts), cudf::logic_error); -} - ///////////////////////////////////////////////////////////// // custom mem mapped data sink that supports device writes template