Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameters to control page size in Parquet writer #10882

Merged
merged 20 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9fe14b5
add setting of target page size for parquet file writer
etseidl Apr 7, 2022
7dac75f
Merge branch 'branch-22.06' of https://github.com/rapidsai/cudf into …
etseidl May 4, 2022
28c5806
split target page size into bytes and rows to match row group size
etseidl May 13, 2022
d316c71
Merge branch 'feature/pagesize' of github.com:etseidl/cudf into featu…
etseidl May 13, 2022
a869c13
Merge branch 'rapidsai:branch-22.06' into feature/pagesize
etseidl May 13, 2022
1e730ff
remove row group size unit test for now
etseidl May 13, 2022
774e08c
Merge branch 'rapidsai:branch-22.06' into feature/pagesize
etseidl May 16, 2022
a206fb2
Merge branch 'rapidsai:branch-22.06' into feature/pagesize
etseidl May 17, 2022
8534232
clang-format changes
etseidl May 17, 2022
938c3b3
make argument names consistent per suggestions from code review
etseidl May 18, 2022
6c77bfc
change target_page_size to max_page_size to be more clear about the i…
etseidl May 18, 2022
cf02a0d
forgot to change "target" to "maximum" in commencts
etseidl May 18, 2022
cab058e
ensure page size bytes is less than or equal to row group size bytes
etseidl May 19, 2022
045e73b
ensure page size rows is less than or equal to row group size rows
etseidl May 19, 2022
42c955f
reimplement tests for consistent row group and page sizes
etseidl May 19, 2022
88d0c06
more clang-format fixes
etseidl May 19, 2022
63b8609
put back validation that row group/page sizes are at least the size
etseidl May 20, 2022
46317ee
change max_page_size to this_max_page_size to avoid confusion
etseidl May 20, 2022
1fe3fc7
set lower bound of 4KB on page size
etseidl May 20, 2022
39d027e
implement suggestion from review, but needed to change type of
etseidl May 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 148 additions & 8 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace io {

constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB
constexpr size_type default_row_group_size_rows = 1000000;
constexpr size_t default_max_page_size_bytes = 512 * 1024;
constexpr size_type default_max_page_size_rows = 20000;

/**
* @brief Builds parquet_reader_options to use for `read_parquet()`.
Expand Down Expand Up @@ -382,6 +384,10 @@ class parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Maximum size of each page (uncompressed)
size_t _max_page_size_bytes = default_max_page_size_bytes;
// Maximum number of rows in a page
size_type _max_page_size_rows = default_max_page_size_rows;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -482,6 +488,24 @@ class parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns the maximum uncompressed page size, in bytes. If set larger than the row group
* size, then this will return the row group size.
*/
auto get_max_page_size_bytes() const
{
return std::min(_max_page_size_bytes, get_row_group_size_bytes());
}

/**
* @brief Returns maximum page size, in rows. If set larger than the row group size, then this
* will return the row group size.
*/
auto get_max_page_size_rows() const
{
return std::min(_max_page_size_rows, get_row_group_size_rows());
}

/**
* @brief Sets partitions.
*
Expand Down Expand Up @@ -555,8 +579,8 @@ class parquet_writer_options {
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
size_bytes >= 4 * 1024,
"The maximum row group size cannot be smaller than the minimum page size, which is 4KB.");
_row_group_size_bytes = size_bytes;
}

Expand All @@ -567,9 +591,29 @@ class parquet_writer_options {
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
"The maximum row group size cannot be smaller than the fragment size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes.
*/
void set_max_page_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(size_bytes >= 4 * 1024, "The maximum page size cannot be smaller than 4KB.");
_max_page_size_bytes = size_bytes;
}

/**
* @brief Sets the maximum page size, in rows.
*/
void set_max_page_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
vuule marked this conversation as resolved.
Show resolved Hide resolved
"The maximum page size cannot be smaller than the fragment size, which is 5000 rows.");
_max_page_size_rows = size_rows;
}
};

class parquet_writer_options_builder {
Expand Down Expand Up @@ -690,7 +734,7 @@ class parquet_writer_options_builder {
/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @param val maximum number of rows
* @return this for chaining.
*/
parquet_writer_options_builder& row_group_size_rows(size_type val)
Expand All @@ -699,6 +743,33 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes. Serves as a hint to the writer,
* and can be exceeded under certain circumstances. Cannot be larger than the row group size in
* bytes, and will be adjusted to match if it is.
*
* @param val maximum page size
* @return this for chaining.
*/
parquet_writer_options_builder& max_page_size_bytes(size_t val)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You had uncompressed below, it would be good to include it here in this too as users are more likely to see it here.

Also is this a hard limit or a soft limit. i.e. what happens if a single row cannot be split, i.e. string or binary, and is larger than this is set to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. This is a soft limit, since the current implementation operates a fragment-at-a-time, allowing a single fragment to over-fill a page. I believe this is consistent with other parquet writers, such as parquet-mr.

{
options.set_max_page_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting.
* Cannot be larger than the row group size in rows, and will be adjusted to match if it is.
*
* @param val maximum rows per page
* @return this for chaining.
*/
parquet_writer_options_builder& max_page_size_rows(size_type val)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this impact lists? I assume that the data column will be split on rows too, but it would be good to have that in the docs. For row groups it is the top level number of rows, it cannot be anything else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with row groups, this is on row count, not value count. I originally wasn't thinking of adding the row limit, but changed my mind later to allow cudf to have something akin to the row count limit from parquet-mr. Will modify the documentation.

Question: Should I change the default to 0 and not limit on row count if that's the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this impact lists? I assume that the data column will be split on rows too

cuIO Parquet writer guarantees that lists won't be split across pages. Upcoming DataPageHeaderV2 in parquet spec also guarantees this.

{
options.set_max_page_size_rows(val);
return *this;
}

/**
* @brief Sets whether int96 timestamps are written or not in parquet_writer_options.
*
Expand Down Expand Up @@ -783,6 +854,10 @@ class chunked_parquet_writer_options {
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;
// Maximum size of each page (uncompressed)
size_t _max_page_size_bytes = default_max_page_size_bytes;
// Maximum number of rows in a page
size_type _max_page_size_rows = default_max_page_size_rows;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -844,6 +919,24 @@ class chunked_parquet_writer_options {
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Returns maximum uncompressed page size, in bytes. If set larger than the row group size,
* then this will return the row group size.
*/
auto get_max_page_size_bytes() const
{
return std::min(_max_page_size_bytes, get_row_group_size_bytes());
}

/**
* @brief Returns maximum page size, in rows. If set larger than the row group size, then this
* will return the row group size.
*/
auto get_max_page_size_rows() const
{
return std::min(_max_page_size_rows, get_row_group_size_rows());
}

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -891,8 +984,8 @@ class chunked_parquet_writer_options {
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
size_bytes >= 4 * 1024,
"The maximum row group size cannot be smaller than the minimum page size, which is 4KB.");
_row_group_size_bytes = size_bytes;
}

Expand All @@ -903,10 +996,30 @@ class chunked_parquet_writer_options {
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
"The maximum row group size cannot be smaller than the fragment size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes.
*/
void set_max_page_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(size_bytes >= 4 * 1024, "The maximum page size cannot be smaller than 4KB.");
_max_page_size_bytes = size_bytes;
}

/**
* @brief Sets the maximum page size, in rows.
*/
void set_max_page_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum page size cannot be smaller than the fragment size, which is 5000 rows.");
_max_page_size_rows = size_rows;
}

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -1016,7 +1129,7 @@ class chunked_parquet_writer_options_builder {
/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @param val maximum number of rows
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& row_group_size_rows(size_type val)
Expand All @@ -1025,6 +1138,33 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum uncompressed page size, in bytes. Serves as a hint to the writer,
* and can be exceeded under certain circumstances. Cannot be larger than the row group size in
* bytes, and will be adjusted to match if it is.
*
* @param val maximum page size
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& max_page_size_bytes(size_t val)
{
options.set_max_page_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting.
* Cannot be larger than the row group size in rows, and will be adjusted to match if it is.
*
* @param val maximum rows per page
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& max_page_size_rows(size_type val)
{
options.set_max_page_size_rows(val);
return *this;
}

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down
30 changes: 23 additions & 7 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ __global__ void __launch_bounds__(128)
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
int32_t num_columns)
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows)
{
// TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach
__shared__ __align__(8) parquet_column_device_view col_g;
Expand Down Expand Up @@ -334,11 +336,16 @@ __global__ void __launch_bounds__(128)
? frag_g.num_leaf_values * 2 // Assume worst-case of 2-bytes per dictionary index
: frag_g.fragment_data_size;
// TODO (dm): this convoluted logic to limit page size needs refactoring
uint32_t max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024
: (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024
: 512 * 1024;
size_t this_max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024
: (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024
: 512 * 1024;

// override this_max_page_size if the requested size is smaller
this_max_page_size = min(this_max_page_size, max_page_size_bytes);

if (num_rows >= ck_g.num_rows ||
(values_in_page > 0 && (page_size + fragment_data_size > max_page_size))) {
(values_in_page > 0 && (page_size + fragment_data_size > this_max_page_size)) ||
rows_in_page > max_page_size_rows) {
if (ck_g.use_dictionary) {
page_size =
1 + 5 + ((values_in_page * ck_g.dict_rle_bits + 7) >> 3) + (values_in_page >> 8);
Expand Down Expand Up @@ -1927,15 +1934,24 @@ void InitEncoderPages(device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
rmm::cuda_stream_view stream)
{
auto num_rowgroups = chunks.size().first;
dim3 dim_grid(num_columns, num_rowgroups); // 1 threadblock per rowgroup
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(
chunks, pages, col_desc, page_grstats, chunk_grstats, max_page_comp_data_size, num_columns);
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(chunks,
pages,
col_desc,
page_grstats,
chunk_grstats,
max_page_comp_data_size,
num_columns,
max_page_size_bytes,
max_page_size_rows);
}

void EncodePages(device_span<gpu::EncPage> pages,
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ void InitEncoderPages(cudf::detail::device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
Expand Down
17 changes: 16 additions & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,16 @@ void writer::impl::init_page_sizes(hostdevice_2dvector<gpu::EncColumnChunk>& chu
uint32_t num_columns)
{
chunks.host_to_device(stream);
gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, 0, stream);
gpu::InitEncoderPages(chunks,
{},
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
nullptr,
nullptr,
0,
stream);
chunks.device_to_host(stream, true);
}

Expand Down Expand Up @@ -965,6 +974,8 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>&
pages,
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
(num_stats_bfr) ? page_stats_mrg.data() : nullptr,
(num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr,
max_page_comp_data_size,
Expand Down Expand Up @@ -1122,6 +1133,8 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
stream(stream),
max_row_group_size{options.get_row_group_size_bytes()},
max_row_group_rows{options.get_row_group_size_rows()},
max_page_size_bytes(options.get_max_page_size_bytes()),
max_page_size_rows(options.get_max_page_size_rows()),
compression_(to_parquet_compression(options.get_compression())),
stats_granularity_(options.get_stats_level()),
int96_timestamps(options.is_enabled_int96_timestamps()),
Expand All @@ -1144,6 +1157,8 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
stream(stream),
max_row_group_size{options.get_row_group_size_bytes()},
max_row_group_rows{options.get_row_group_size_rows()},
max_page_size_bytes(options.get_max_page_size_bytes()),
max_page_size_rows(options.get_max_page_size_rows()),
compression_(to_parquet_compression(options.get_compression())),
stats_granularity_(options.get_stats_level()),
int96_timestamps(options.is_enabled_int96_timestamps()),
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class writer::impl {

size_t max_row_group_size = default_row_group_size_bytes;
size_type max_row_group_rows = default_row_group_size_rows;
size_t max_page_size_bytes = default_max_page_size_bytes;
size_type max_page_size_rows = default_max_page_size_rows;
Compression compression_ = Compression::UNCOMPRESSED;
statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE;
bool int96_timestamps = false;
Expand Down
Loading