Skip to content

Commit

Permalink
Add parameters to control row group size in Parquet writer (#9677)
Browse files Browse the repository at this point in the history
Closes #9615

Adds the following API to the Parquet writer:

- Set maximum row group size, in bytes (minimum of 512KB);
- Set maximum row group size, in rows (minimum of 5000).

The API is more limited than its ORC equivalent because of limitation in Parquet page size control/estimation.

Other changes: 

- Fix naming in some ORC APIs to be consistent. 
- Change `rowgroup` to `row_group` in APIs, since Parquet specs refer to this as "row group", not "rowgroup". 
- Replace some `uint32_t` use in Parquet writer.
- Remove unused `target_page_size`.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Yunsong Wang (https://github.com/PointKernel)
  - Ashwin Srinath (https://github.com/shwina)

URL: #9677
  • Loading branch information
vuule authored Nov 17, 2021
1 parent 3b38aa7 commit 9114104
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 90 deletions.
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class writer {
* @param[in] metadata_list List of input file metadata
* @return A parquet-compatible blob that contains the data for all rowgroups in the list
*/
static std::unique_ptr<std::vector<uint8_t>> merge_rowgroup_metadata(
static std::unique_ptr<std::vector<uint8_t>> merge_row_group_metadata(
const std::vector<std::unique_ptr<std::vector<uint8_t>>>& metadata_list);
};

Expand Down
20 changes: 10 additions & 10 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,24 +475,24 @@ class orc_writer_options {
/**
* @brief Whether writing column statistics is enabled/disabled.
*/
bool enable_statistics() const { return _enable_statistics; }
bool is_enabled_statistics() const { return _enable_statistics; }

/**
* @brief Returns maximum stripe size, in bytes.
*/
auto stripe_size_bytes() const { return _stripe_size_bytes; }
auto get_stripe_size_bytes() const { return _stripe_size_bytes; }

/**
* @brief Returns maximum stripe size, in rows.
*/
auto stripe_size_rows() const { return _stripe_size_rows; }
auto get_stripe_size_rows() const { return _stripe_size_rows; }

/**
* @brief Returns the row index stride.
*/
auto row_index_stride() const
auto get_row_index_stride() const
{
auto const unaligned_stride = std::min(_row_index_stride, stripe_size_rows());
auto const unaligned_stride = std::min(_row_index_stride, get_stripe_size_rows());
return unaligned_stride - unaligned_stride % 8;
}

Expand Down Expand Up @@ -769,24 +769,24 @@ class chunked_orc_writer_options {
/**
* @brief Whether writing column statistics is enabled/disabled.
*/
bool enable_statistics() const { return _enable_statistics; }
bool is_enabled_statistics() const { return _enable_statistics; }

/**
* @brief Returns maximum stripe size, in bytes.
*/
auto stripe_size_bytes() const { return _stripe_size_bytes; }
auto get_stripe_size_bytes() const { return _stripe_size_bytes; }

/**
* @brief Returns maximum stripe size, in rows.
*/
auto stripe_size_rows() const { return _stripe_size_rows; }
auto get_stripe_size_rows() const { return _stripe_size_rows; }

/**
* @brief Returns the row index stride.
*/
auto row_index_stride() const
auto get_row_index_stride() const
{
auto const unaligned_stride = std::min(_row_index_stride, stripe_size_rows());
auto const unaligned_stride = std::min(_row_index_stride, get_stripe_size_rows());
return unaligned_stride - unaligned_stride % 8;
}

Expand Down
125 changes: 124 additions & 1 deletion cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ namespace io {
* @file
*/

constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB
constexpr size_type default_row_group_size_rows = 1000000;

/**
* @brief Builds parquet_reader_options to use for `read_parquet()`.
*/
Expand Down Expand Up @@ -398,6 +401,10 @@ class parquet_writer_options {
bool _write_timestamps_as_int96 = false;
// Column chunks file path to be set in the raw output metadata
std::string _column_chunks_file_path;
// Maximum size of each row group (unless smaller than a single page)
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -472,6 +479,16 @@ class parquet_writer_options {
*/
std::string get_column_chunks_file_path() const { return _column_chunks_file_path; }

/**
* @brief Returns maximum row group size, in bytes.
*/
auto get_row_group_size_bytes() const { return _row_group_size_bytes; }

/**
* @brief Returns maximum row group size, in rows.
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -510,6 +527,28 @@ class parquet_writer_options {
{
_column_chunks_file_path.assign(file_path);
}

/**
* @brief Sets the maximum row group size, in bytes.
*/
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
_row_group_size_bytes = size_bytes;
}

/**
* @brief Sets the maximum row group size, in rows.
*/
void set_row_group_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}
};

class parquet_writer_options_builder {
Expand Down Expand Up @@ -582,6 +621,30 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum row group size, in bytes.
*
* @param val maximum row group size
* @return this for chaining.
*/
parquet_writer_options_builder& row_group_size_bytes(size_t val)
{
options.set_row_group_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @return this for chaining.
*/
parquet_writer_options_builder& row_group_size_rows(size_type val)
{
options.set_row_group_size_rows(val);
return *this;
}

/**
* @brief Sets whether int96 timestamps are written or not in parquet_writer_options.
*
Expand Down Expand Up @@ -637,7 +700,7 @@ std::unique_ptr<std::vector<uint8_t>> write_parquet(
* @param[in] metadata_list List of input file metadata.
* @return A parquet-compatible blob that contains the data for all row groups in the list.
*/
std::unique_ptr<std::vector<uint8_t>> merge_rowgroup_metadata(
std::unique_ptr<std::vector<uint8_t>> merge_row_group_metadata(
const std::vector<std::unique_ptr<std::vector<uint8_t>>>& metadata_list);

/**
Expand All @@ -660,6 +723,10 @@ class chunked_parquet_writer_options {
// Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS.
// If true then overrides any per-column setting in _metadata.
bool _write_timestamps_as_int96 = false;
// Maximum size of each row group (unless smaller than a single page)
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
size_type _row_group_size_rows = default_row_group_size_rows;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -703,6 +770,16 @@ class chunked_parquet_writer_options {
*/
bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; }

/**
* @brief Returns maximum row group size, in bytes.
*/
auto get_row_group_size_bytes() const { return _row_group_size_bytes; }

/**
* @brief Returns maximum row group size, in rows.
*/
auto get_row_group_size_rows() const { return _row_group_size_rows; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -732,6 +809,28 @@ class chunked_parquet_writer_options {
*/
void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; }

/**
* @brief Sets the maximum row group size, in bytes.
*/
void set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
size_bytes >= 512 * 1024,
"The maximum row group size cannot be smaller than the page size, which is 512KB.");
_row_group_size_bytes = size_bytes;
}

/**
* @brief Sets the maximum row group size, in rows.
*/
void set_row_group_size_rows(size_type size_rows)
{
CUDF_EXPECTS(
size_rows >= 5000,
"The maximum row group size cannot be smaller than the page size, which is 5000 rows.");
_row_group_size_rows = size_rows;
}

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -811,6 +910,30 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the maximum row group size, in bytes.
*
* @param val maximum row group size
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& row_group_size_bytes(size_t val)
{
options.set_row_group_size_bytes(val);
return *this;
}

/**
* @brief Sets the maximum number of rows in output row groups.
*
* @param val maximum number or rows
* @return this for chaining.
*/
chunked_parquet_writer_options_builder& row_group_size_rows(size_type val)
{
options.set_row_group_size_rows(val);
return *this;
}

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,13 @@ table_with_metadata read_parquet(parquet_reader_options const& options,
}

/**
* @copydoc cudf::io::merge_rowgroup_metadata
* @copydoc cudf::io::merge_row_group_metadata
*/
std::unique_ptr<std::vector<uint8_t>> merge_rowgroup_metadata(
std::unique_ptr<std::vector<uint8_t>> merge_row_group_metadata(
const std::vector<std::unique_ptr<std::vector<uint8_t>>>& metadata_list)
{
CUDF_FUNC_RANGE();
return detail_parquet::writer::merge_rowgroup_metadata(metadata_list);
return detail_parquet::writer::merge_row_group_metadata(metadata_list);
}

table_input_metadata::table_input_metadata(table_view const& table,
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1301,10 +1301,10 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
rmm::mr::device_memory_resource* mr)
: _mr(mr),
stream(stream),
max_stripe_size{options.stripe_size_bytes(), options.stripe_size_rows()},
row_index_stride{options.row_index_stride()},
max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()},
row_index_stride{options.get_row_index_stride()},
compression_kind_(to_orc_compression(options.get_compression())),
enable_statistics_(options.enable_statistics()),
enable_statistics_(options.is_enabled_statistics()),
single_write_mode(mode == SingleWriteMode::YES),
out_sink_(std::move(sink))
{
Expand All @@ -1321,10 +1321,10 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
rmm::mr::device_memory_resource* mr)
: _mr(mr),
stream(stream),
max_stripe_size{options.stripe_size_bytes(), options.stripe_size_rows()},
row_index_stride{options.row_index_stride()},
max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()},
row_index_stride{options.get_row_index_stride()},
compression_kind_(to_orc_compression(options.get_compression())),
enable_statistics_(options.enable_statistics()),
enable_statistics_(options.is_enabled_statistics()),
single_write_mode(mode == SingleWriteMode::YES),
out_sink_(std::move(sink))
{
Expand Down
Loading

0 comments on commit 9114104

Please sign in to comment.