Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameters to control page size in Parquet writer #10882

Merged
merged 20 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9fe14b5
add setting of target page size for parquet file writer
etseidl Apr 7, 2022
7dac75f
Merge branch 'branch-22.06' of https://github.com/rapidsai/cudf into …
etseidl May 4, 2022
28c5806
split target page size into bytes and rows to match row group size
etseidl May 13, 2022
d316c71
Merge branch 'feature/pagesize' of github.com:etseidl/cudf into featu…
etseidl May 13, 2022
a869c13
Merge branch 'rapidsai:branch-22.06' into feature/pagesize
etseidl May 13, 2022
1e730ff
remove row group size unit test for now
etseidl May 13, 2022
774e08c
Merge branch 'rapidsai:branch-22.06' into feature/pagesize
etseidl May 16, 2022
a206fb2
Merge branch 'rapidsai:branch-22.06' into feature/pagesize
etseidl May 17, 2022
8534232
clang-format changes
etseidl May 17, 2022
938c3b3
make argument names consistent per suggestions from code review
etseidl May 18, 2022
6c77bfc
change target_page_size to max_page_size to be more clear about the i…
etseidl May 18, 2022
cf02a0d
forgot to change "target" to "maximum" in commencts
etseidl May 18, 2022
cab058e
ensure page size bytes is less than or equal to row group size bytes
etseidl May 19, 2022
045e73b
ensure page size rows is less than or equal to row group size rows
etseidl May 19, 2022
42c955f
reimplement tests for consistent row group and page sizes
etseidl May 19, 2022
88d0c06
more clang-format fixes
etseidl May 19, 2022
63b8609
put back validation that row group/page sizes are at least the size
etseidl May 20, 2022
46317ee
change max_page_size to this_max_page_size to avoid confusion
etseidl May 20, 2022
1fe3fc7
set lower bound of 4KB on page size
etseidl May 20, 2022
39d027e
implement suggestion from review, but needed to change type of
etseidl May 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 104 additions & 30 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ namespace io {
* @file
*/

constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB
constexpr size_type default_row_group_size_rows = 1000000;
constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB
constexpr size_type default_row_group_size_rows = 1000000;
constexpr size_t default_target_page_size_bytes = 512 * 1024;
constexpr size_type default_target_page_size_rows = 20000;

/**
* @brief Builds parquet_reader_options to use for `read_parquet()`.
Expand Down Expand Up @@ -382,6 +384,10 @@ class parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Maximum size of each page (uncompressed)
size_t _target_page_size_bytes = default_target_page_size_bytes;
// Maximum number of rows in a page
size_type _target_page_size_rows = default_target_page_size_rows;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -482,6 +488,16 @@ class parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns target page size, in bytes.
etseidl marked this conversation as resolved.
Show resolved Hide resolved
*/
auto get_target_page_size_bytes() const { return _target_page_size_bytes; }
bdice marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Returns target page size, in rows.
etseidl marked this conversation as resolved.
Show resolved Hide resolved
*/
auto get_target_page_size_rows() const { return _target_page_size_rows; }

/**
* @brief Sets partitions.
*
Expand Down Expand Up @@ -552,24 +568,22 @@ class parquet_writer_options {
/**
* @brief Sets the maximum row group size, in bytes.
*/
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
_row_group_size_bytes = size_bytes;
}
void set_row_group_size_bytes(size_t size_bytes) { _row_group_size_bytes = size_bytes; }
Copy link
Contributor

@bdice bdice May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this validate the maximum row group size against the maximum page size?

I see the comment in the tests below suggesting to test parameters on initialization, but that isn't safe if this can be altered after initialization. Perhaps the best approach is to add validation to this method, and make the "builder" interface always set the page size before the row group size. Then the error can be raised during the builder call OR after initialization. edit: sorry, that suggestion doesn't make sense. I'll think about this some more.

Suggested change
void set_row_group_size_bytes(size_t size_bytes) { _row_group_size_bytes = size_bytes; }
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= _target_page_size_bytes,
"The maximum row group size in bytes cannot be smaller than the maximum page size in bytes.");
_row_group_size_bytes = size_bytes;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was definitely wanting some help on this logic. my only concern is with confusion when modifying both row group and page sizes; the page size would have to be changed first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it makes the most sense to validate in build(). Allows any order and does not delay the validation too much (e.g. like validating in the reader would).

Copy link
Contributor

@bdice bdice May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vuule Are you concerned about users calling set_target_page_size_bytes and creating an invalid state after the object is built? Is that allowed by the builder interface?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, that won't always work.
We do have an equivalent interface in ORC. There, this is solved by only having a hard coded minimum for these values instead of comparing the two in the setters. We ensure that the size of the "row group" (called stripe in ORC) is not smaller than the size of "page" (called row group in ORC) in the getters instead of setters! Setters allow stripes to be set to smaller size, but then getters for the row group size return the minimum of the two. Effectively, setting stripe size low also sets the row group size to the same value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bdice @vuule Should I take a stab at reproducing the ORC logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@etseidl Yes, that approach seems fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ORC approach is better than validating in build because we allow modifying the options struct after it's built.


/**
* @brief Sets the maximum row group size, in rows.
*/
void set_row_group_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}
void set_row_group_size_rows(size_type size_rows) { _row_group_size_rows = size_rows; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this validate the maximum row group size against the maximum page size?

Suggested change
void set_row_group_size_rows(size_type size_rows) { _row_group_size_rows = size_rows; }
void set_row_group_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= _target_page_size_rows,
"The maximum row group size in rows cannot be smaller than the maximum page size in rows.");
_row_group_size_rows = size_rows;
}


/**
* @brief Sets the maximum page size, in bytes.
*/
void set_target_page_size_bytes(size_t pgsz_bytes) { _target_page_size_bytes = pgsz_bytes; }
etseidl marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Sets the maximum page size, in rows.
*/
void set_target_page_size_rows(size_type pgsz_rows) { _target_page_size_rows = pgsz_rows; }
etseidl marked this conversation as resolved.
Show resolved Hide resolved
};

class parquet_writer_options_builder {
Expand Down Expand Up @@ -699,6 +713,30 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum page size, in bytes.
*
* @param val The page size to use.
* @return this for chaining.
*/
parquet_writer_options_builder& target_page_size_bytes(size_t val)
{
options.set_target_page_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum page size, in rows.
*
* @param val The page size to use.
* @return this for chaining.
*/
parquet_writer_options_builder& target_page_size_bytes(size_type val)
{
options.set_target_page_size_rows(val);
return *this;
}

/**
* @brief Sets whether int96 timestamps are written or not in parquet_writer_options.
*
Expand Down Expand Up @@ -783,6 +821,10 @@ class chunked_parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Maximum size of each page (uncompressed)
size_t _target_page_size_bytes = default_target_page_size_bytes;
// Maximum number of rows in a page
size_type _target_page_size_rows = default_target_page_size_rows;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -844,6 +886,16 @@ class chunked_parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns target page size, in bytes.
etseidl marked this conversation as resolved.
Show resolved Hide resolved
*/
auto get_target_page_size_bytes() const { return _target_page_size_bytes; }

/**
* @brief Returns target page size, in rows.
etseidl marked this conversation as resolved.
Show resolved Hide resolved
*/
auto get_target_page_size_rows() const { return _target_page_size_rows; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -888,24 +940,22 @@ class chunked_parquet_writer_options {
/**
* @brief Sets the maximum row group size, in bytes.
*/
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
_row_group_size_bytes = size_bytes;
}
void set_row_group_size_bytes(size_t size_bytes) { _row_group_size_bytes = size_bytes; }

/**
* @brief Sets the maximum row group size, in rows.
*/
void set_row_group_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}
void set_row_group_size_rows(size_type size_rows) { _row_group_size_rows = size_rows; }

/**
* @brief Sets the maximum page size, in bytes.
*/
void set_target_page_size_bytes(size_t pgsz_bytes) { _target_page_size_bytes = pgsz_bytes; }

/**
* @brief Sets the maximum page size, in rows.
*/
void set_target_page_size_rows(size_type pgsz_rows) { _target_page_size_rows = pgsz_rows; }

/**
* @brief creates builder to build chunked_parquet_writer_options.
Expand Down Expand Up @@ -1025,6 +1075,30 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum page size, in bytes.
*
* @param val maximum page size
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& target_page_size_bytes(size_t val)
{
options.set_target_page_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum page size, in rows.
*
* @param val maximum page size
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& target_page_size_rows(size_type val)
{
options.set_target_page_size_rows(val);
return *this;
}

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down
24 changes: 20 additions & 4 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ __global__ void __launch_bounds__(128)
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
int32_t num_columns)
int32_t num_columns,
size_t target_page_size_bytes,
size_type target_page_size_rows)
{
// TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach
__shared__ __align__(8) parquet_column_device_view col_g;
Expand Down Expand Up @@ -337,8 +339,13 @@ __global__ void __launch_bounds__(128)
uint32_t max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024
: (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024
: 512 * 1024;

// override max_page_size if target is smaller
if (max_page_size > target_page_size_bytes) max_page_size = target_page_size_bytes;

if (num_rows >= ck_g.num_rows ||
(values_in_page > 0 && (page_size + fragment_data_size > max_page_size))) {
(values_in_page > 0 && (page_size + fragment_data_size > max_page_size)) ||
rows_in_page > target_page_size_rows) {
if (ck_g.use_dictionary) {
page_size =
1 + 5 + ((values_in_page * ck_g.dict_rle_bits + 7) >> 3) + (values_in_page >> 8);
Expand Down Expand Up @@ -1927,15 +1934,24 @@ void InitEncoderPages(device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t target_page_size_bytes,
size_type target_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
rmm::cuda_stream_view stream)
{
auto num_rowgroups = chunks.size().first;
dim3 dim_grid(num_columns, num_rowgroups); // 1 threadblock per rowgroup
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(
chunks, pages, col_desc, page_grstats, chunk_grstats, max_page_comp_data_size, num_columns);
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(chunks,
pages,
col_desc,
page_grstats,
chunk_grstats,
max_page_comp_data_size,
num_columns,
target_page_size_bytes,
target_page_size_rows);
}

void EncodePages(device_span<gpu::EncPage> pages,
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ void InitEncoderPages(cudf::detail::device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t target_page_size_bytes,
size_type target_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
Expand Down
17 changes: 16 additions & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,16 @@ void writer::impl::init_page_sizes(hostdevice_2dvector<gpu::EncColumnChunk>& chu
uint32_t num_columns)
{
chunks.host_to_device(stream);
gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, 0, stream);
gpu::InitEncoderPages(chunks,
{},
col_desc,
num_columns,
target_page_size_bytes,
target_page_size_rows,
nullptr,
nullptr,
0,
stream);
chunks.device_to_host(stream, true);
}

Expand Down Expand Up @@ -965,6 +974,8 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>&
pages,
col_desc,
num_columns,
target_page_size_bytes,
target_page_size_rows,
(num_stats_bfr) ? page_stats_mrg.data() : nullptr,
(num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr,
max_page_comp_data_size,
Expand Down Expand Up @@ -1122,6 +1133,8 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
stream(stream),
max_row_group_size{options.get_row_group_size_bytes()},
max_row_group_rows{options.get_row_group_size_rows()},
target_page_size_bytes(options.get_target_page_size_bytes()),
target_page_size_rows(options.get_target_page_size_rows()),
compression_(to_parquet_compression(options.get_compression())),
stats_granularity_(options.get_stats_level()),
int96_timestamps(options.is_enabled_int96_timestamps()),
Expand All @@ -1144,6 +1157,8 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
stream(stream),
max_row_group_size{options.get_row_group_size_bytes()},
max_row_group_rows{options.get_row_group_size_rows()},
target_page_size_bytes(options.get_target_page_size_bytes()),
target_page_size_rows(options.get_target_page_size_rows()),
compression_(to_parquet_compression(options.get_compression())),
stats_granularity_(options.get_stats_level()),
int96_timestamps(options.is_enabled_int96_timestamps()),
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class writer::impl {

size_t max_row_group_size = default_row_group_size_bytes;
size_type max_row_group_rows = default_row_group_size_rows;
size_t target_page_size_bytes = default_target_page_size_bytes;
size_type target_page_size_rows = default_target_page_size_rows;
Compression compression_ = Compression::UNCOMPRESSED;
statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE;
bool int96_timestamps = false;
Expand Down
5 changes: 5 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3184,6 +3184,10 @@ TEST_F(ParquetReaderTest, EmptyOutput)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

/* temporarily remove this test. with page size being tunable, can no longer just test for
* row groups being smaller than 512KiB. ideally one would use the configured page size, but
* then that would require setting the page size parameter before the row group size, which
* seems odd. should probably test for consistent parameters when instantiating the writer.
TEST_F(ParquetWriterTest, RowGroupSizeInvalid)
{
const auto unused_table = std::make_unique<table>();
Expand All @@ -3205,5 +3209,6 @@ TEST_F(ParquetWriterTest, RowGroupSizeInvalid)
.row_group_size_bytes(511 << 10),
cudf::logic_error);
}
*/

CUDF_TEST_PROGRAM_MAIN()