From 399efb960f689085bf671f6fa62916b1020e3b30 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 2 Aug 2023 23:50:47 -0700 Subject: [PATCH] Fix for Parquet writer when requested pages per row is smaller than fragment size (#13806) #12685 introduced a bug in page calculation. If the `max_page_size_rows` parameter is set smaller than the page fragment size, the writer will produce a spurious empty page. This PR fixes this by only checking the fragment size if there are already rows in the page, and then returns the old check for number of rows exceeding the page limit. Interestingly, libcudf can read these files with empty pages just fine, but parquet-mr cannot. Authors: - Ed Seidl (https://github.com/etseidl) Approvers: - Nghia Truong (https://github.com/ttnghia) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/13806 --- cpp/src/io/parquet/page_enc.cu | 12 ++++++++--- cpp/tests/io/parquet_test.cpp | 38 ++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 05f8bba7477..190f70d0747 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -432,9 +432,15 @@ __global__ void __launch_bounds__(128) max_RLE_page_size(col_g.num_def_level_bits(), num_vals) + max_RLE_page_size(col_g.num_rep_level_bits(), num_vals)); - if (num_rows >= ck_g.num_rows || - (values_in_page > 0 && (page_size + fragment_data_size > this_max_page_size)) || - rows_in_page + frag_g.num_rows > max_page_size_rows) { + // checks to see when we need to close the current page and start a new one + auto const is_last_chunk = num_rows >= ck_g.num_rows; + auto const is_page_bytes_exceeded = page_size + fragment_data_size > this_max_page_size; + auto const is_page_rows_exceeded = rows_in_page + frag_g.num_rows > max_page_size_rows; + // only check for limit overflow if there's already at least one fragment for this page + auto const is_page_too_big = + values_in_page > 0 && (is_page_bytes_exceeded || is_page_rows_exceeded); + + if (is_last_chunk || is_page_too_big) { if (ck_g.use_dictionary) { // Additional byte to store entry bit width page_size = 1 + max_RLE_page_size(ck_g.dict_rle_bits, values_in_page); diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 4e28f536728..a5054daed19 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -3709,6 +3709,44 @@ TEST_F(ParquetWriterTest, CheckPageRowsAdjusted) EXPECT_LE(ph.data_page_header.num_values, rows_per_page); } +TEST_F(ParquetWriterTest, CheckPageRowsTooSmall) +{ + constexpr auto rows_per_page = 1'000; + constexpr auto fragment_size = 5'000; + constexpr auto num_rows = 3 * rows_per_page; + const std::string s1(32, 'a'); + auto col0_elements = + cudf::detail::make_counting_transform_iterator(0, [&](auto i) { return s1; }); + auto col0 = cudf::test::strings_column_wrapper(col0_elements, col0_elements + num_rows); + + auto const expected = table_view{{col0}}; + + auto const filepath = temp_env->get_temp_filepath("CheckPageRowsTooSmall.parquet"); + const cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .max_page_fragment_size(fragment_size) + .max_page_size_rows(rows_per_page); + cudf::io::write_parquet(out_opts); + + // check that file is written correctly when rows/page < fragment size + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; + + read_footer(source, &fmd); + ASSERT_TRUE(fmd.row_groups.size() > 0); + ASSERT_TRUE(fmd.row_groups[0].columns.size() == 1); + auto const& first_chunk = fmd.row_groups[0].columns[0].meta_data; + ASSERT_TRUE(first_chunk.data_page_offset > 0); + + // read first data page header. sizeof(PageHeader) is not exact, but the thrift encoded + // version should be smaller than size of the struct. + auto const ph = read_page_header( + source, {first_chunk.data_page_offset, sizeof(cudf::io::parquet::PageHeader), 0}); + + // there should be only one page since the fragment size is larger than rows_per_page + EXPECT_EQ(ph.data_page_header.num_values, num_rows); +} + TEST_F(ParquetWriterTest, Decimal128Stats) { // check that decimal128 min and max statistics are written in network byte order