Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameters to control page size in Parquet writer #10882

Merged
merged 20 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9fe14b5
add setting of target page size for parquet file writer
etseidl Apr 7, 2022
7dac75f
Merge branch 'branch-22.06' of https://github.com/rapidsai/cudf into …
etseidl May 4, 2022
28c5806
split target page size into bytes and rows to match row group size
etseidl May 13, 2022
d316c71
Merge branch 'feature/pagesize' of github.com:etseidl/cudf into featu…
etseidl May 13, 2022
a869c13
Merge branch 'rapidsai:branch-22.06' into feature/pagesize
etseidl May 13, 2022
1e730ff
remove row group size unit test for now
etseidl May 13, 2022
774e08c
Merge branch 'rapidsai:branch-22.06' into feature/pagesize
etseidl May 16, 2022
a206fb2
Merge branch 'rapidsai:branch-22.06' into feature/pagesize
etseidl May 17, 2022
8534232
clang-format changes
etseidl May 17, 2022
938c3b3
make argument names consistent per suggestions from code review
etseidl May 18, 2022
6c77bfc
change target_page_size to max_page_size to be more clear about the i…
etseidl May 18, 2022
cf02a0d
forgot to change "target" to "maximum" in commencts
etseidl May 18, 2022
cab058e
ensure page size bytes is less than or equal to row group size bytes
etseidl May 19, 2022
045e73b
ensure page size rows is less than or equal to row group size rows
etseidl May 19, 2022
42c955f
reimplement tests for consistent row group and page sizes
etseidl May 19, 2022
88d0c06
more clang-format fixes
etseidl May 19, 2022
63b8609
put back validation that row group/page sizes are at least the size
etseidl May 20, 2022
46317ee
change max_page_size to this_max_page_size to avoid confusion
etseidl May 20, 2022
1fe3fc7
set lower bound of 4KB on page size
etseidl May 20, 2022
39d027e
implement suggestion from review, but needed to change type of
etseidl May 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 126 additions & 30 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace io {

constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB
constexpr size_type default_row_group_size_rows = 1000000;
constexpr size_t default_max_page_size_bytes = 512 * 1024;
constexpr size_type default_max_page_size_rows = 20000;

/**
* @brief Builds parquet_reader_options to use for `read_parquet()`.
Expand Down Expand Up @@ -382,6 +384,10 @@ class parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Maximum size of each page (uncompressed)
size_t _max_page_size_bytes = default_max_page_size_bytes;
// Maximum number of rows in a page
size_type _max_page_size_rows = default_max_page_size_rows;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -482,6 +488,24 @@ class parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns the maximum uncompressed page size, in bytes. If set larger than the row group
* size, then this will return the row group size.
*/
auto get_max_page_size_bytes() const
{
return std::min(_max_page_size_bytes, get_row_group_size_bytes());
}

/**
* @brief Returns maximum page size, in rows. If set larger than the row group size, then this
* will return the row group size.
*/
auto get_max_page_size_rows() const
{
return std::min(_max_page_size_rows, get_row_group_size_rows());
}

/**
* @brief Sets partitions.
*
Expand Down Expand Up @@ -552,24 +576,22 @@ class parquet_writer_options {
/**
* @brief Sets the maximum row group size, in bytes.
*/
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
_row_group_size_bytes = size_bytes;
}
void set_row_group_size_bytes(size_t size_bytes) { _row_group_size_bytes = size_bytes; }
Copy link
Contributor

@bdice bdice May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this validate the maximum row group size against the maximum page size?

I see the comment in the tests below suggesting to test parameters on initialization, but that isn't safe if this can be altered after initialization. Perhaps the best approach is to add validation to this method, and make the "builder" interface always set the page size before the row group size. Then the error can be raised during the builder call OR after initialization. edit: sorry, that suggestion doesn't make sense. I'll think about this some more.

Suggested change
void set_row_group_size_bytes(size_t size_bytes) { _row_group_size_bytes = size_bytes; }
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= _target_page_size_bytes,
"The maximum row group size in bytes cannot be smaller than the maximum page size in bytes.");
_row_group_size_bytes = size_bytes;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was definitely wanting some help on this logic. my only concern is with confusion when modifying both row group and page sizes; the page size would have to be changed first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it makes the most sense to validate in build(). Allows any order and does not delay the validation too much (e.g. like validating in the reader would).

Copy link
Contributor

@bdice bdice May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vuule Are you concerned about users calling set_target_page_size_bytes and creating an invalid state after the object is built? Is that allowed by the builder interface?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, that won't always work.
We do have an equivalent interface in ORC. There, this is solved by only having a hard coded minimum for these values instead of comparing the two in the setters. We ensure that the size of the "row group" (called stripe in ORC) is not smaller than the size of "page" (called row group in ORC) in the getters instead of setters! Setters allow stripes to be set to smaller size, but then getters for the row group size return the minimum of the two. Effectively, setting stripe size low also sets the row group size to the same value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bdice @vuule Should I take a stab at reproducing the ORC logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@etseidl Yes, that approach seems fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ORC approach is better than validating in build because we allow modifying the options struct after it's built.


/**
* @brief Sets the maximum row group size, in rows.
*/
void set_row_group_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}
void set_row_group_size_rows(size_type size_rows) { _row_group_size_rows = size_rows; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this validate the maximum row group size against the maximum page size?

Suggested change
void set_row_group_size_rows(size_type size_rows) { _row_group_size_rows = size_rows; }
void set_row_group_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= _target_page_size_rows,
"The maximum row group size in rows cannot be smaller than the maximum page size in rows.");
_row_group_size_rows = size_rows;
}


/**
* @brief Sets the maximum uncompressed page size, in bytes.
*/
void set_max_page_size_bytes(size_t size_bytes) { _max_page_size_bytes = size_bytes; }
vuule marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Sets the maximum page size, in rows.
*/
void set_max_page_size_rows(size_type size_rows) { _max_page_size_rows = size_rows; }
};

class parquet_writer_options_builder {
Expand Down Expand Up @@ -690,7 +712,7 @@ class parquet_writer_options_builder {
/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @param val maximum number of rows
* @return this for chaining.
*/
parquet_writer_options_builder& row_group_size_rows(size_type val)
Expand All @@ -699,6 +721,33 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes. Serves as a hint to the writer,
* and can be exceeded under certain circumstances. Cannot be larger than the row group size in
* bytes, and will be adjusted to match if it is.
*
* @param val maximum page size
* @return this for chaining.
*/
parquet_writer_options_builder& max_page_size_bytes(size_t val)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You had uncompressed below, it would be good to include it here in this too as users are more likely to see it here.

Also is this a hard limit or a soft limit. i.e. what happens if a single row cannot be split, i.e. string or binary, and is larger than this is set to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. This is a soft limit, since the current implementation operates a fragment-at-a-time, allowing a single fragment to over-fill a page. I believe this is consistent with other parquet writers, such as parquet-mr.

{
options.set_max_page_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting.
* Cannot be larger than the row group size in rows, and will be adjusted to match if it is.
*
* @param val maximum rows per page
* @return this for chaining.
*/
parquet_writer_options_builder& max_page_size_rows(size_type val)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this impact lists? I assume that the data column will be split on rows too, but it would be good to have that in the docs. For row groups it is the top level number of rows, it cannot be anything else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with row groups, this is on row count, not value count. I originally wasn't thinking of adding the row limit, but changed my mind later to allow cudf to have something akin to the row count limit from parquet-mr. Will modify the documentation.

Question: Should I change the default to 0 and not limit on row count if that's the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this impact lists? I assume that the data column will be split on rows too

cuIO Parquet writer guarantees that lists won't be split across pages. Upcoming DataPageHeaderV2 in parquet spec also guarantees this.

{
options.set_max_page_size_rows(val);
return *this;
}

/**
* @brief Sets whether int96 timestamps are written or not in parquet_writer_options.
*
Expand Down Expand Up @@ -783,6 +832,10 @@ class chunked_parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Maximum size of each page (uncompressed)
size_t _max_page_size_bytes = default_max_page_size_bytes;
// Maximum number of rows in a page
size_type _max_page_size_rows = default_max_page_size_rows;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -844,6 +897,24 @@ class chunked_parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns maximum uncompressed page size, in bytes. If set larger than the row group size,
* then this will return the row group size.
*/
auto get_max_page_size_bytes() const
{
return std::min(_max_page_size_bytes, get_row_group_size_bytes());
}

/**
* @brief Returns maximum page size, in rows. If set larger than the row group size, then this
* will return the row group size.
*/
auto get_max_page_size_rows() const
{
return std::min(_max_page_size_rows, get_row_group_size_rows());
}

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -888,24 +959,22 @@ class chunked_parquet_writer_options {
/**
* @brief Sets the maximum row group size, in bytes.
*/
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
_row_group_size_bytes = size_bytes;
}
void set_row_group_size_bytes(size_t size_bytes) { _row_group_size_bytes = size_bytes; }

/**
* @brief Sets the maximum row group size, in rows.
*/
void set_row_group_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}
void set_row_group_size_rows(size_type size_rows) { _row_group_size_rows = size_rows; }

/**
* @brief Sets the maximum uncompressed page size, in bytes.
*/
void set_max_page_size_bytes(size_t size_bytes) { _max_page_size_bytes = size_bytes; }

/**
* @brief Sets the maximum page size, in rows.
*/
void set_max_page_size_rows(size_type size_rows) { _max_page_size_rows = size_rows; }

/**
* @brief creates builder to build chunked_parquet_writer_options.
Expand Down Expand Up @@ -1016,7 +1085,7 @@ class chunked_parquet_writer_options_builder {
/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @param val maximum number of rows
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& row_group_size_rows(size_type val)
Expand All @@ -1025,6 +1094,33 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes. Serves as a hint to the writer,
* and can be exceeded under certain circumstances. Cannot be larger than the row group size in
* bytes, and will be adjusted to match if it is.
*
* @param val maximum page size
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& max_page_size_bytes(size_t val)
{
options.set_max_page_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting.
* Cannot be larger than the row group size in rows, and will be adjusted to match if it is.
*
* @param val maximum rows per page
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& max_page_size_rows(size_type val)
{
options.set_max_page_size_rows(val);
return *this;
}

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down
24 changes: 20 additions & 4 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ __global__ void __launch_bounds__(128)
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
int32_t num_columns)
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows)
{
// TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach
__shared__ __align__(8) parquet_column_device_view col_g;
Expand Down Expand Up @@ -337,8 +339,13 @@ __global__ void __launch_bounds__(128)
uint32_t max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024
: (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024
: 512 * 1024;

// override max_page_size if target is smaller
if (max_page_size > max_page_size_bytes) max_page_size = max_page_size_bytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want a better name for max_page_size, because it isn't clear how this value's meaning is different from max_page_size_bytes.

// TODO (dm): this convoluted logic to limit page size needs refactoring

@devavret Do you have suggestions on what should be done here?

Copy link
Contributor Author

@etseidl etseidl May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that intent here is to prevent wildly lopsided page sizes within a column chunk. As the target page size shrinks, this should be less necessary. Maybe threshold_page_size would be a better name? Would something like the following make sense?

auto threshold_page_size = max_page_size_bytes;
if (threshold_page_size >= SOME_CUTOFF_VALUE && (values_in_page * 3 >= ck_g.num_values)) {
  threshold_page_size = (values_in_page * 2 >= ck_g.num_values) ? max_page_size_bytes >>1
                                         : (max_page_size_bytes>>2)*3;

SOME_CUTOFF_VALUE would be 512*1024 to match the current code, but could be adjusted downward if need be.

Copy link
Contributor

@bdice bdice May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that sounds reasonable but would like a second reviewer's opinion on this section. cc: @vuule

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking more on the lines of adding a condition to the if block below that says: cut off the page if the remaining size of chunk is less than max_page_size_bytes. But that would require calculating the per chunk byte size beforehand.

I think the current changes are fine for now. For naming confusion, you can rename max_page_size to this_page_max_size.

BTW I think this PR only allows the user to make the pages smaller than they were going to be. Is there any need to allow larger pages?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that intent here is to prevent wildly lopsided page sizes within a column chunk.

I think the real reason may be to avoid pages with very small number of values. I think we can remove this logic altogether because the final page is going to have at least 5000 values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so for now I'll just rename the variable, but leave the TODO in place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I think this PR only allows the user to make the pages smaller than they were going to be. Is there any need to allow larger pages?

@devavret Yes, this PR will only make pages smaller. But many Parquet on Hadoop guides suggest setting page size to 1M. Should we allow that here, or is that out of scope?


if (num_rows >= ck_g.num_rows ||
(values_in_page > 0 && (page_size + fragment_data_size > max_page_size))) {
(values_in_page > 0 && (page_size + fragment_data_size > max_page_size)) ||
rows_in_page > max_page_size_rows) {
if (ck_g.use_dictionary) {
page_size =
1 + 5 + ((values_in_page * ck_g.dict_rle_bits + 7) >> 3) + (values_in_page >> 8);
Expand Down Expand Up @@ -1927,15 +1934,24 @@ void InitEncoderPages(device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
rmm::cuda_stream_view stream)
{
auto num_rowgroups = chunks.size().first;
dim3 dim_grid(num_columns, num_rowgroups); // 1 threadblock per rowgroup
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(
chunks, pages, col_desc, page_grstats, chunk_grstats, max_page_comp_data_size, num_columns);
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(chunks,
pages,
col_desc,
page_grstats,
chunk_grstats,
max_page_comp_data_size,
num_columns,
max_page_size_bytes,
max_page_size_rows);
}

void EncodePages(device_span<gpu::EncPage> pages,
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ void InitEncoderPages(cudf::detail::device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
Expand Down
17 changes: 16 additions & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,16 @@ void writer::impl::init_page_sizes(hostdevice_2dvector<gpu::EncColumnChunk>& chu
uint32_t num_columns)
{
chunks.host_to_device(stream);
gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, 0, stream);
gpu::InitEncoderPages(chunks,
{},
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
nullptr,
nullptr,
0,
stream);
chunks.device_to_host(stream, true);
}

Expand Down Expand Up @@ -965,6 +974,8 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>&
pages,
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
(num_stats_bfr) ? page_stats_mrg.data() : nullptr,
(num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr,
max_page_comp_data_size,
Expand Down Expand Up @@ -1122,6 +1133,8 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
stream(stream),
max_row_group_size{options.get_row_group_size_bytes()},
max_row_group_rows{options.get_row_group_size_rows()},
max_page_size_bytes(options.get_max_page_size_bytes()),
max_page_size_rows(options.get_max_page_size_rows()),
compression_(to_parquet_compression(options.get_compression())),
stats_granularity_(options.get_stats_level()),
int96_timestamps(options.is_enabled_int96_timestamps()),
Expand All @@ -1144,6 +1157,8 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
stream(stream),
max_row_group_size{options.get_row_group_size_bytes()},
max_row_group_rows{options.get_row_group_size_rows()},
max_page_size_bytes(options.get_max_page_size_bytes()),
max_page_size_rows(options.get_max_page_size_rows()),
compression_(to_parquet_compression(options.get_compression())),
stats_granularity_(options.get_stats_level()),
int96_timestamps(options.is_enabled_int96_timestamps()),
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class writer::impl {

size_t max_row_group_size = default_row_group_size_bytes;
size_type max_row_group_rows = default_row_group_size_rows;
size_t max_page_size_bytes = default_max_page_size_bytes;
size_type max_page_size_rows = default_max_page_size_rows;
Compression compression_ = Compression::UNCOMPRESSED;
statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE;
bool int96_timestamps = false;
Expand Down
Loading