Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameters to control row group size in Parquet writer #9677

Merged
merged 17 commits into from
Nov 17, 2021
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class writer {
* @param[in] metadata_list List of input file metadata
* @return A parquet-compatible blob that contains the data for all rowgroups in the list
*/
static std::unique_ptr<std::vector<uint8_t>> merge_rowgroup_metadata(
static std::unique_ptr<std::vector<uint8_t>> merge_row_group_metadata(
const std::vector<std::unique_ptr<std::vector<uint8_t>>>& metadata_list);
};

Expand Down
20 changes: 10 additions & 10 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,24 +445,24 @@ class orc_writer_options {
/**
* @brief Whether writing column statistics is enabled/disabled.
*/
bool enable_statistics() const { return _enable_statistics; }
bool is_enabled_statistics() const { return _enable_statistics; }
PointKernel marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Returns maximum stripe size, in bytes.
*/
auto stripe_size_bytes() const { return _stripe_size_bytes; }
auto get_stripe_size_bytes() const { return _stripe_size_bytes; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember a while back, @codereport and @jrhemstad had a long discussion on whether get prefix should be used or not and the outcome is that no prefix is more readable IIRC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true in general. However, we made a conscious decision to use get_ prefix for the _options classes. I'm unable to find the discussion that led to this (perhaps Jake had the discussion with Conor later on).
We can choose to remove the prefix, but it should be done for all options. Either way, I prefer to keep the API consistent, so I'm adding these options with the prefix.


/**
* @brief Returns maximum stripe size, in rows.
*/
auto stripe_size_rows() const { return _stripe_size_rows; }
auto get_stripe_size_rows() const { return _stripe_size_rows; }

/**
* @brief Returns the row index stride.
*/
auto row_index_stride() const
auto get_row_index_stride() const
{
auto const unaligned_stride = std::min(_row_index_stride, stripe_size_rows());
auto const unaligned_stride = std::min(_row_index_stride, get_stripe_size_rows());
return unaligned_stride - unaligned_stride % 8;
}

Expand Down Expand Up @@ -739,24 +739,24 @@ class chunked_orc_writer_options {
/**
* @brief Whether writing column statistics is enabled/disabled.
*/
bool enable_statistics() const { return _enable_statistics; }
bool is_enabled_statistics() const { return _enable_statistics; }

/**
* @brief Returns maximum stripe size, in bytes.
*/
auto stripe_size_bytes() const { return _stripe_size_bytes; }
auto get_stripe_size_bytes() const { return _stripe_size_bytes; }

/**
* @brief Returns maximum stripe size, in rows.
*/
auto stripe_size_rows() const { return _stripe_size_rows; }
auto get_stripe_size_rows() const { return _stripe_size_rows; }

/**
* @brief Returns the row index stride.
*/
auto row_index_stride() const
auto get_row_index_stride() const
{
auto const unaligned_stride = std::min(_row_index_stride, stripe_size_rows());
auto const unaligned_stride = std::min(_row_index_stride, get_stripe_size_rows());
return unaligned_stride - unaligned_stride % 8;
}

Expand Down
125 changes: 124 additions & 1 deletion cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ namespace io {
* @file
*/

constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB
constexpr size_type default_row_group_size_rows = 1000000;
bdice marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Builds parquet_reader_options to use for `read_parquet()`.
*/
Expand Down Expand Up @@ -398,6 +401,10 @@ class parquet_writer_options {
bool _write_timestamps_as_int96 = false;
// Column chunks file path to be set in the raw output metadata
std::string _column_chunks_file_path;
// Maximum size of each row group (unless smaller than a single page)
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -472,6 +479,16 @@ class parquet_writer_options {
*/
std::string get_column_chunks_file_path() const { return _column_chunks_file_path; }

/**
* @brief Returns maximum row group size, in bytes.
*/
auto get_row_group_size_bytes() const { return _row_group_size_bytes; }

/**
* @brief Returns maximum row group size, in rows.
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -510,6 +527,28 @@ class parquet_writer_options {
{
_column_chunks_file_path.assign(file_path);
}

/**
* @brief Sets the maximum row group size, in bytes.
*/
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
_row_group_size_bytes = size_bytes;
}

/**
* @brief Sets the maximum row group size, in rows.
vuule marked this conversation as resolved.
Show resolved Hide resolved
*/
void set_row_group_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}
};

class parquet_writer_options_builder {
Expand Down Expand Up @@ -582,6 +621,30 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum row group size, in bytes.
*
* @param val maximum row group size
* @return this for chaining.
*/
parquet_writer_options_builder& row_group_size_bytes(size_t val)
{
options.set_row_group_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @return this for chaining.
*/
parquet_writer_options_builder& row_group_size_rows(size_type val)
{
options.set_row_group_size_rows(val);
return *this;
}

/**
* @brief Sets whether int96 timestamps are written or not in parquet_writer_options.
*
Expand Down Expand Up @@ -637,7 +700,7 @@ std::unique_ptr<std::vector<uint8_t>> write_parquet(
* @param[in] metadata_list List of input file metadata.
* @return A parquet-compatible blob that contains the data for all row groups in the list.
*/
std::unique_ptr<std::vector<uint8_t>> merge_rowgroup_metadata(
std::unique_ptr<std::vector<uint8_t>> merge_row_group_metadata(
const std::vector<std::unique_ptr<std::vector<uint8_t>>>& metadata_list);

/**
Expand All @@ -660,6 +723,10 @@ class chunked_parquet_writer_options {
// Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS.
// If true then overrides any per-column setting in _metadata.
bool _write_timestamps_as_int96 = false;
// Maximum size of each row group (unless smaller than a single page)
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -703,6 +770,16 @@ class chunked_parquet_writer_options {
*/
bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; }

/**
* @brief Returns maximum row group size, in bytes.
*/
auto get_row_group_size_bytes() const { return _row_group_size_bytes; }

/**
* @brief Returns maximum row group size, in rows.
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -732,6 +809,28 @@ class chunked_parquet_writer_options {
*/
void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; }

/**
* @brief Sets the maximum row group size, in bytes.
*/
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
_row_group_size_bytes = size_bytes;
}

/**
* @brief Sets the maximum row group size, in rows.
*/
void set_row_group_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -811,6 +910,30 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum row group size, in bytes.
*
* @param val maximum row group size
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& row_group_size_bytes(size_t val)
{
options.set_row_group_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& row_group_size_rows(size_type val)
{
options.set_row_group_size_rows(val);
return *this;
}

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,13 @@ table_with_metadata read_parquet(parquet_reader_options const& options,
}

/**
* @copydoc cudf::io::merge_rowgroup_metadata
* @copydoc cudf::io::merge_row_group_metadata
*/
std::unique_ptr<std::vector<uint8_t>> merge_rowgroup_metadata(
std::unique_ptr<std::vector<uint8_t>> merge_row_group_metadata(
const std::vector<std::unique_ptr<std::vector<uint8_t>>>& metadata_list)
{
CUDF_FUNC_RANGE();
return detail_parquet::writer::merge_rowgroup_metadata(metadata_list);
return detail_parquet::writer::merge_row_group_metadata(metadata_list);
}

table_input_metadata::table_input_metadata(table_view const& table,
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1296,10 +1296,10 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
rmm::mr::device_memory_resource* mr)
: _mr(mr),
stream(stream),
max_stripe_size{options.stripe_size_bytes(), options.stripe_size_rows()},
row_index_stride{options.row_index_stride()},
max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()},
row_index_stride{options.get_row_index_stride()},
compression_kind_(to_orc_compression(options.get_compression())),
enable_statistics_(options.enable_statistics()),
enable_statistics_(options.is_enabled_statistics()),
single_write_mode(mode == SingleWriteMode::YES),
out_sink_(std::move(sink))
{
Expand All @@ -1316,10 +1316,10 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
rmm::mr::device_memory_resource* mr)
: _mr(mr),
stream(stream),
max_stripe_size{options.stripe_size_bytes(), options.stripe_size_rows()},
row_index_stride{options.row_index_stride()},
max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()},
row_index_stride{options.get_row_index_stride()},
compression_kind_(to_orc_compression(options.get_compression())),
enable_statistics_(options.enable_statistics()),
enable_statistics_(options.is_enabled_statistics()),
single_write_mode(mode == SingleWriteMode::YES),
out_sink_(std::move(sink))
{
Expand Down
Loading