Skip to content

Commit

Permalink
Selectively use dictionary encoding in Parquet writer (#12211)
Browse files Browse the repository at this point in the history
With the addition of Zstandard compression, sometimes it is not desirable to perform dictionary encoding.  This PR adds a `dictionary_policy` setting to the parquet writer which allows either completely disabling dictionary encoding, setting a `max_dictionary_size` (like the [parquet-mr](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/README.md) `parquet.dictionary.page.size` parameter), or using the default behavior before this PR (i.e. always use dictionaries). 

This PR also adds a `max_page_fragment_size` setting as a stop gap until an adaptive fragment size calculation can be implemented.

Continuation of #12141.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #12211
  • Loading branch information
etseidl authored Jan 11, 2023
1 parent 6a59b7e commit 6dda9d8
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 36 deletions.
4 changes: 3 additions & 1 deletion cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -186,6 +186,8 @@ class writer {
/**
* @brief Writes a single subtable as part of a larger parquet file/table write.
*
* @throws rmm::bad_alloc if there is insufficient space for temporary buffers
*
* @param[in] table The table information to be written
* @param[in] partitions Optional partitions to divide the table into. If specified, must be same
* size as number of sinks.
Expand Down
191 changes: 189 additions & 2 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,6 +41,8 @@ constexpr size_type default_row_group_size_rows = 1000000; ///< 1 million ro
constexpr size_t default_max_page_size_bytes = 512 * 1024; ///< 512KB per page
constexpr size_type default_max_page_size_rows = 20000; ///< 20k rows per page
constexpr int32_t default_column_index_truncate_length = 64; ///< truncate to 64 bytes
constexpr size_t default_max_dictionary_size = 1024 * 1024; ///< 1MB dictionary size
constexpr size_type default_max_page_fragment_size = 5000; ///< 5000 rows per page fragment

class parquet_reader_options_builder;

Expand Down Expand Up @@ -487,6 +489,12 @@ class parquet_writer_options {
size_type _max_page_size_rows = default_max_page_size_rows;
// Maximum size of min or max values in column index
int32_t _column_index_truncate_length = default_column_index_truncate_length;
// When to use dictionary encoding for data
dictionary_policy _dictionary_policy = dictionary_policy::ALWAYS;
// Maximum size of column chunk dictionary (in bytes)
size_t _max_dictionary_size = default_max_dictionary_size;
// Maximum number of rows in a page fragment
size_type _max_page_fragment_size = default_max_page_fragment_size;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -640,6 +648,27 @@ class parquet_writer_options {
*/
auto get_column_index_truncate_length() const { return _column_index_truncate_length; }

/**
* @brief Returns policy for dictionary use.
*
* @return policy for dictionary use
*/
[[nodiscard]] dictionary_policy get_dictionary_policy() const { return _dictionary_policy; }

/**
* @brief Returns maximum dictionary size, in bytes.
*
* @return Maximum dictionary size, in bytes.
*/
[[nodiscard]] auto get_max_dictionary_size() const { return _max_dictionary_size; }

/**
* @brief Returns maximum page fragment size, in rows.
*
* @return Maximum page fragment size, in rows.
*/
[[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; }

/**
* @brief Sets partitions.
*
Expand Down Expand Up @@ -726,6 +755,27 @@ class parquet_writer_options {
* @param size_bytes length min/max will be truncated to
*/
void set_column_index_truncate_length(int32_t size_bytes);

/**
* @brief Sets the policy for dictionary use.
*
* @param policy Policy for dictionary use
*/
void set_dictionary_policy(dictionary_policy policy);

/**
* @brief Sets the maximum dictionary size, in bytes.
*
* @param size_bytes Maximum dictionary size, in bytes
*/
void set_max_dictionary_size(size_t size_bytes);

/**
* @brief Sets the maximum page fragment size, in rows.
*
* @param size_rows Maximum page fragment size, in rows.
*/
void set_max_page_fragment_size(size_type size_rows);
};

/**
Expand Down Expand Up @@ -843,7 +893,7 @@ class parquet_writer_options_builder {
/**
* @brief Sets the maximum uncompressed page size, in bytes.
*
* Serves as a hint to the writer, * and can be exceeded under certain circumstances.
* Serves as a hint to the writer, and can be exceeded under certain circumstances.
* Cannot be larger than the row group size in bytes, and will be adjusted to
* match if it is.
*
Expand Down Expand Up @@ -888,6 +938,50 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the policy for dictionary use.
*
* Certain compression algorithms (e.g Zstandard) have limits on how large of a buffer can
* be compressed. In some circumstances, the dictionary can grow beyond this limit, which
* will prevent the column from being compressed. This setting controls how the writer
* should act in these circumstances. A setting of dictionary_policy::ADAPTIVE will disable
* dictionary encoding for columns where the dictionary exceeds the limit. A setting of
* dictionary_policy::NEVER will disable the use of dictionary encoding globally. A setting of
* dictionary_policy::ALWAYS will allow the use of dictionary encoding even if it will result in
* the disabling of compression for columns that would otherwise be compressed.
*
* The default value is dictionary_policy::ALWAYS.
*
* @param val policy for dictionary use
* @return this for chaining
*/
parquet_writer_options_builder& dictionary_policy(enum dictionary_policy val);

/**
* @brief Sets the maximum dictionary size, in bytes.
*
* Disables dictionary encoding for any column chunk where the dictionary will
* exceed this limit. Only used when the dictionary_policy is set to 'ADAPTIVE'.
*
* Default value is 1048576 (1MiB).
*
* @param val maximum dictionary size
* @return this for chaining
*/
parquet_writer_options_builder& max_dictionary_size(size_t val);

/**
* @brief Sets the maximum page fragment size, in rows.
*
* Files with nested schemas or very long strings may need a page fragment size
* smaller than the default value of 5000 to ensure a single fragment will not
* exceed the desired maximum page size in bytes.
*
* @param val maximum page fragment size
* @return this for chaining
*/
parquet_writer_options_builder& max_page_fragment_size(size_type val);

/**
* @brief Sets whether int96 timestamps are written or not in parquet_writer_options.
*
Expand Down Expand Up @@ -977,6 +1071,12 @@ class chunked_parquet_writer_options {
size_type _max_page_size_rows = default_max_page_size_rows;
// Maximum size of min or max values in column index
int32_t _column_index_truncate_length = default_column_index_truncate_length;
// When to use dictionary encoding for data
dictionary_policy _dictionary_policy = dictionary_policy::ALWAYS;
// Maximum size of column chunk dictionary (in bytes)
size_t _max_dictionary_size = default_max_dictionary_size;
// Maximum number of rows in a page fragment
size_type _max_page_fragment_size = default_max_page_fragment_size;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -1086,6 +1186,27 @@ class chunked_parquet_writer_options {
*/
auto get_column_index_truncate_length() const { return _column_index_truncate_length; }

/**
* @brief Returns policy for dictionary use.
*
* @return policy for dictionary use
*/
[[nodiscard]] dictionary_policy get_dictionary_policy() const { return _dictionary_policy; }

/**
* @brief Returns maximum dictionary size, in bytes.
*
* @return Maximum dictionary size, in bytes.
*/
[[nodiscard]] auto get_max_dictionary_size() const { return _max_dictionary_size; }

/**
* @brief Returns maximum page fragment size, in rows.
*
* @return Maximum page fragment size, in rows.
*/
[[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -1158,6 +1279,27 @@ class chunked_parquet_writer_options {
*/
void set_column_index_truncate_length(int32_t size_bytes);

/**
* @brief Sets the policy for dictionary use.
*
* @param policy Policy for dictionary use
*/
void set_dictionary_policy(dictionary_policy policy);

/**
* @brief Sets the maximum dictionary size, in bytes.
*
* @param size_bytes Maximum dictionary size, in bytes
*/
void set_max_dictionary_size(size_t size_bytes);

/**
* @brief Sets the maximum page fragment size, in rows.
*
* @param size_rows Maximum page fragment size, in rows.
*/
void set_max_page_fragment_size(size_type size_rows);

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -1320,6 +1462,50 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Sets the policy for dictionary use.
*
* Certain compression algorithms (e.g Zstandard) have limits on how large of a buffer can
* be compressed. In some circumstances, the dictionary can grow beyond this limit, which
* will prevent the column from being compressed. This setting controls how the writer
* should act in these circumstances. A setting of dictionary_policy::ADAPTIVE will disable
* dictionary encoding for columns where the dictionary exceeds the limit. A setting of
* dictionary_policy::NEVER will disable the use of dictionary encoding globally. A setting of
* dictionary_policy::ALWAYS will allow the use of dictionary encoding even if it will result in
* the disabling of compression for columns that would otherwise be compressed.
*
* The default value is dictionary_policy::ALWAYS.
*
* @param val policy for dictionary use
* @return this for chaining
*/
chunked_parquet_writer_options_builder& dictionary_policy(enum dictionary_policy val);

/**
* @brief Sets the maximum dictionary size, in bytes.
*
* Disables dictionary encoding for any column chunk where the dictionary will
* exceed this limit. Only used when the dictionary_policy is set to 'ADAPTIVE'.
*
* Default value is 1048576 (1MiB).
*
* @param val maximum dictionary size
* @return this for chaining
*/
chunked_parquet_writer_options_builder& max_dictionary_size(size_t val);

/**
* @brief Sets the maximum page fragment size, in rows.
*
* Files with nested schemas or very long strings may need a page fragment size
* smaller than the default value of 5000 to ensure a single fragment will not
* exceed the desired maximum page size in bytes.
*
* @param val maximum page fragment size
* @return this for chaining
*/
chunked_parquet_writer_options_builder& max_page_fragment_size(size_type val);

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down Expand Up @@ -1380,6 +1566,7 @@ class parquet_chunked_writer {
* size as number of sinks.
*
* @throws cudf::logic_error If the number of partitions is not the same as number of sinks
* @throws rmm::bad_alloc if there is insufficient space for temporary buffers
* @return returns reference of the class object
*/
parquet_chunked_writer& write(table_view const& table,
Expand Down
11 changes: 10 additions & 1 deletion cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -99,6 +99,15 @@ enum statistics_freq {
STATISTICS_COLUMN = 3, ///< Full column and offset indices. Implies STATISTICS_ROWGROUP
};

/**
* @brief Control use of dictionary encoding for parquet writer
*/
enum dictionary_policy {
NEVER, ///< Never use dictionary encoding
ADAPTIVE, ///< Use dictionary when it will not impact compression
ALWAYS ///< Use dictionary reqardless of impact on compression
};

/**
* @brief Detailed name information for output columns.
*
Expand Down
Loading

0 comments on commit 6dda9d8

Please sign in to comment.