Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for Parquet writer when requested pages per row is smaller than fragment size #13806

Merged
merged 3 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,15 @@ __global__ void __launch_bounds__(128)
max_RLE_page_size(col_g.num_def_level_bits(), num_vals) +
max_RLE_page_size(col_g.num_rep_level_bits(), num_vals));

if (num_rows >= ck_g.num_rows ||
(values_in_page > 0 && (page_size + fragment_data_size > this_max_page_size)) ||
rows_in_page + frag_g.num_rows > max_page_size_rows) {
// checks to see when we need to close the current page and start a new one
auto const is_last_chunk = num_rows >= ck_g.num_rows;
auto const is_page_bytes_exceeded = page_size + fragment_data_size > this_max_page_size;
auto const is_page_rows_exceeded = rows_in_page + frag_g.num_rows > max_page_size_rows;
// only check for limit overflow if there's already at least one fragment for this page
auto const is_page_too_big =
values_in_page > 0 && (is_page_bytes_exceeded || is_page_rows_exceeded);

if (is_last_chunk || is_page_too_big) {
Comment on lines +435 to +443
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fantastic! This is much cleaner and better to understand what's going on.

if (ck_g.use_dictionary) {
// Additional byte to store entry bit width
page_size = 1 + max_RLE_page_size(ck_g.dict_rle_bits, values_in_page);
Expand Down
38 changes: 38 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3709,6 +3709,44 @@ TEST_F(ParquetWriterTest, CheckPageRowsAdjusted)
EXPECT_LE(ph.data_page_header.num_values, rows_per_page);
}

TEST_F(ParquetWriterTest, CheckPageRowsTooSmall)
{
constexpr auto rows_per_page = 1'000;
constexpr auto fragment_size = 5'000;
constexpr auto num_rows = 3 * rows_per_page;
const std::string s1(32, 'a');
auto col0_elements =
cudf::detail::make_counting_transform_iterator(0, [&](auto i) { return s1; });
auto col0 = cudf::test::strings_column_wrapper(col0_elements, col0_elements + num_rows);

auto const expected = table_view{{col0}};

auto const filepath = temp_env->get_temp_filepath("CheckPageRowsTooSmall.parquet");
const cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.max_page_fragment_size(fragment_size)
.max_page_size_rows(rows_per_page);
cudf::io::write_parquet(out_opts);

// check that file is written correctly when rows/page < fragment size
auto const source = cudf::io::datasource::create(filepath);
cudf::io::parquet::FileMetaData fmd;

read_footer(source, &fmd);
ASSERT_TRUE(fmd.row_groups.size() > 0);
ASSERT_TRUE(fmd.row_groups[0].columns.size() == 1);
auto const& first_chunk = fmd.row_groups[0].columns[0].meta_data;
ASSERT_TRUE(first_chunk.data_page_offset > 0);

// read first data page header. sizeof(PageHeader) is not exact, but the thrift encoded
// version should be smaller than size of the struct.
auto const ph = read_page_header(
source, {first_chunk.data_page_offset, sizeof(cudf::io::parquet::PageHeader), 0});

// there should be only one page since the fragment size is larger than rows_per_page
EXPECT_EQ(ph.data_page_header.num_values, num_rows);
}

TEST_F(ParquetWriterTest, Decimal128Stats)
{
// check that decimal128 min and max statistics are written in network byte order
Expand Down